在[Handler源码解析(一)](https://liqi.site/archives/h-a-n-d-l-e-r-yuan-ma-jie-xi--yi-)中,我们分析了一个Hander创建的整个过程,通过在java层和native分别创建Looper和MessageQueue,利用linux的epoll机制我们阻塞和唤醒消息队列,使Handler具体的消息发送的能力。这篇文章我们就通过分析消息发送和接受的具体过程,来看看Handler具体消息的传递是怎么进行的,以及看看前面介绍的epoll机制在Handler这里是怎么应用的。
# 消息发送
上一篇文章我们说了,消息通过调用Handler的post方法和sendMessage系列方法来发送,post方法最终也会通过sendMessage系列方法来发送,而sendMessage的一系列方法最终会调用到enqueueMessage方法,我们就从enqueueMessage方法开始看起:
```java
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
msg.target = this;
if (mAsynchronous) { // 如果Handler是异步的,那么消息也设置为异步的
msg.setAsynchronous(true);
}
// 继续调用queue的enqueueMessage方法放置消息
return queue.enqueueMessage(msg, uptimeMillis);
}
```
这个方法首先把传入的Message的target赋值当前的Hander,后面在处理消息的时候会根据每个消息的这个字段调用对应Handler的分发方法,这个在后面分析的时候会看到。然后根据当前这个Handler是处理同步消息还是异步消息,给Message的赋值,这个Message是同步和异步消息在后面读取消息的时候会有处理的区别,这里也不深入,到后面遇到的时候再说。最后就会调用MessageQueue的enqueueMessage方法。我们继续到MessageQueue那里去看:
```java
boolean enqueueMessage(Message msg, long when) {
if (msg.target == null) { // message没有设置Handler,报错
throw new IllegalArgumentException("Message must have a target.");
}
if (msg.isInUse()) { // 这个message正在使用(即还在池中,或者在messageQueue中),报错
throw new IllegalStateException(msg + " This message is already in use.");
}
synchronized (this) {
if (mQuitting) { // 如果queue正在退出的话,return
IllegalStateException e = new IllegalStateException(
msg.target + " sending message to a Handler on a dead thread");
Log.w(TAG, e.getMessage(), e);
msg.recycle(); // 回收这个message
return false;
}
msg.markInUse(); // 标记正在使用
msg.when = when; // 时间
Message p = mMessages; // 执行队头
boolean needWake;
// 当前消息队列为空,或者加入的时间是0,或者加入的这个消息时间比队头还早,插入到队头
if (p == null || when == 0 || when < p.when) {
// New head, wake up the event queue if blocked.
msg.next = p; // 插入的这个消息执行队头
mMessages = msg; // 队头设置为这个消息
needWake = mBlocked; // true表示当前是否阻塞了,由于插入了队头,所以如果阻塞了,后面就需要唤醒
} else { // 到这里就说插入的不是队头,并且队列中肯定有元素的
// Inserted within the middle of the queue. Usually we don't have to wake
// up the event queue unless there is a barrier at the head of the queue
// and the message is the earliest asynchronous message in the queue.
// 插入队伍中间,正常来说不需要唤醒。但是如果当前队头是个屏障,插入的消息是个异步,并且现在是阻塞的。
// 由于队头是个屏障,所以根据next方法的处理,可能这个屏障时间已经到了,但是后面没有异步消息,所以之前最多处理
// 了些idleHandler,所以现在还阻塞在那里。既然现在有个异步了,那么就唤醒,说不定就可以处理这个异步消息了
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
// 寻找msg插入的位置
for (;;) {
prev = p; // prev开始指向队头
p = p.next; // p执行prev下一个
// 如果下一个是空,或插入的msg事件早于下一个,那么就找到位置了,break
if (p == null || when < p.when) {
break;
}
// 如果needWake是true,说明队头是个屏障。p是屏障后面的一个异步消息
// 说明还没轮到当前存在的这个p执行,而既然现在执行到这里,根据前面一句代码已经说明的现在插入的msg
// 时间是晚于p的,所以不需要唤醒了
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
// 到这里,p要么是null,要么是第一个时间晚于插入msg的,所以msg的next指向p
msg.next = p; // invariant: p == prev.next
// prev是最后一个时间早于msg的,所以prev的next指向msg
prev.next = msg;
}
// We can assume mPtr != 0 because mQuitting is false.
if (needWake) { // 如果需要唤醒
nativeWake(mPtr); // 往epoll的awakefd写入数据,即唤醒等待这个fd的地方
}
}
return true;
}
```
方法首先判断传入的Message的target是否为空,如果为空说明他没有Handler,报错。然后如果这个Message已经在使用中了,那么也报错。之后进入同步代码块,如果当前MessageQueue正在退出,那么就没必要继续下去了,回收这个Message,退出。
通过上面的一系列检查后,开始正常的发送消息流程了。首先把这个Message置为使用中,之后把传入的时间设置给Message的when字段,接着就获取当前等待处理的消息队头,下面会根据当前这个Message的时间插入到消息队列中的合适位置,并判断是否需要唤醒epoll来处理消息,我们看下具体是怎么处理的。
首先如果消息队列为空或者消息的执行时间为0,或者插入的消息执行时间比队头的还早,那么就说明当前消息应该是消息队列中的第一个,那么就把当前消息插入队头,如果当前是阻塞状态那么就唤醒,否则已经是唤醒状态就不用做什么。
如果插入的消息不是插入在队头,这个时候正常来说是不需要唤醒的,因为队列前面已经有消息了。但是如果队头消息的target是空的话,说明这个队头是个同步屏障,所谓同步屏障的插入我们在后面会专门说,这里大致先介绍一下,记得前面我们分析创建Handler的时候,会传入一个是否是异步消息的参数吗,正常情况下我们平时常用的都不是异步消息,所以如果遇到一个同步屏障的话,排在他后面的同步消息都会被阻塞,就是不会继续分发执行了,而如果是异步消息的话就不受这个影响。
这个如果插入的是一个异步消息,并且插入的位置不是队头,而队头是个同步屏障,当前又是被阻塞的状态,那么说明当前很可能就是被同步屏障给阻塞了,虽然后面还有消息要么是同步消息,要么是未到执行时间的异步消息,所以现在插入了一个异步消息的话,需要唤醒一下,看是否这个异步消息时间到了能被执行。
之后就开始遍历消息队列了,如果找到了一个队列位置已经到最后还没找到,或者找到了一个指向时间比当前插入的消息晚的,那么就把消息插入到遍历的这个位置。
跟上面分析的,如果当前插入的消息是个异步,并且队头是同步屏障的话,可能需要唤醒epoll,也可能不需要,因为如果插入位置前面已经有个异步消息了,说明这个异步消息的时间还没到,如果遇到一个异步消息执行时间是在插入消息前面的,那么就说明不需要唤醒epoll,所以前面把needWake置为true的话,还需要重新置回false。
# 唤醒epoll
以上一切都完成后,如果不需要唤醒,说明当前已经处理唤醒状态,正在Looper那里处理消息,这个我们待会去Looper那里看。如果当前需要唤醒的话,会调用native方法nativeWake,传入的参数是native层的MessageQueue的地址,我们在到native层看这个nativeWake方法。
同样我们看注册JNI方法的地方找个这个方法的实现:
```c++
static const JNINativeMethod gMessageQueueMethods[] = {
/* name, signature, funcPtr */
{ "nativeInit", "()J", (void*)android_os_MessageQueue_nativeInit },
{ "nativeDestroy", "(J)V", (void*)android_os_MessageQueue_nativeDestroy },
{ "nativePollOnce", "(JI)V", (void*)android_os_MessageQueue_nativePollOnce },
{ "nativeWake", "(J)V", (void*)android_os_MessageQueue_nativeWake },
{ "nativeIsPolling", "(J)Z", (void*)android_os_MessageQueue_nativeIsPolling },
{ "nativeSetFileDescriptorEvents", "(JII)V",
(void*)android_os_MessageQueue_nativeSetFileDescriptorEvents },
};
```
可以看到这里nativeWake方法对应的是android_os_MessageQueue_nativeWake方法,我们继续跟进:
```c++
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->wake();
}
```
这里继续调用NativeMessageQueue的wake方法:
```c++
void NativeMessageQueue::wake() {
mLooper->wake();
}
```
这里会调用Looper的wake方法:
```c++
void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ wake", this);
#endif
uint64_t inc = 1;
// 往唤醒的fd写入一个数据,这个随便写了个1,数据是什么不重要,目的是要触发pollonce,重新创建epoll
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
if (nWrite != sizeof(uint64_t)) {
if (errno != EAGAIN) {
LOG_ALWAYS_FATAL("Could not write wake signal to fd %d: %s",
mWakeEventFd, strerror(errno));
}
}
}
```
这里可以看到调用write方法往mWakeEventFd中写入一个1,mWakeEventFd是一个Eventfd,关于Eventfd我们在[Handler源码解析(一)](https://liqi.site/archives/h-a-n-d-l-e-r-yuan-ma-jie-xi--yi-)中已经有过分析,不熟悉的同学可以先看下这里的介绍,epoll会监听这个Eventfd,如果有消息发送,会唤醒epoll来处理后面的执行。这里后面的流程应该是接着epoll被唤醒的地方,我们前面还没分析过epoll被阻塞,所以我们现在暂时不直接跳到epoll的被唤醒的地方,现在看下epoll是怎么阻塞的,知道了阻塞的地方,自然就知道唤醒后的流程了,我们先回到jave层。
# Looper的loop方法
我们知道在使用Handler的时候,必要的两个步骤是,开始调用Looper.prepare,创建本线程的Looper以及MessageQueue。之后调用Looper.loop方法监听着新消息的到来。Looper.prepare我们前面已经分析过了,现在我们来分析下Looper.loop方法:
```java
public static void loop() {
final Looper me = myLooper(); // 获取当前线程的looper
if (me == null) {
throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
}
final MessageQueue queue = me.mQueue; // 获取这个looper的message queue
// Make sure the identity of this thread is that of the local process,
// and keep track of what that identity token actually is.
Binder.clearCallingIdentity();
final long ident = Binder.clearCallingIdentity();
for (;;) {
Message msg = queue.next(); // might block 获取数据
if (msg == null) { // next方法开头queue为空才会返回null,这里return
// No message indicates that the message queue is quitting.
return;
}
// This must be in a local variable, in case a UI event sets the logger
// 获取looper的Printer,如果不空,可以执行pringln方法。setMessageLogging方法设置Printer
final Printer logging = me.mLogging;
if (logging != null) {
logging.println(">>>>> Dispatching to " + msg.target + " " +
msg.callback + ": " + msg.what);
}
// 获取一个默认设置的处理时间,比如UiThread中会设置一个时间,后面会根据是否设置了
// 这个时间,如果实际运行时间超过了这个默认设置的时间,会打印log
final long slowDispatchThresholdMs = me.mSlowDispatchThresholdMs;
final long traceTag = me.mTraceTag;
if (traceTag != 0 && Trace.isTagEnabled(traceTag)) {
Trace.traceBegin(traceTag, msg.target.getTraceName(msg));
}
// 分发出来开始时间
final long start = (slowDispatchThresholdMs == 0) ? 0 : SystemClock.uptimeMillis();
final long end;
try {
msg.target.dispatchMessage(msg); // 把msg分发处理
// 分发处理结束时间
end = (slowDispatchThresholdMs == 0) ? 0 : SystemClock.uptimeMillis();
} finally {
if (traceTag != 0) {
Trace.traceEnd(traceTag);
}
}
if (slowDispatchThresholdMs > 0) {
final long time = end - start; // 计算分发处理总时间
// 如果实际处理时间比默认设置的时间大,打印一下
if (time > slowDispatchThresholdMs) {
Slog.w(TAG, "Dispatch took " + time + "ms on "
+ Thread.currentThread().getName() + ", h=" +
msg.target + " cb=" + msg.callback + " msg=" + msg.what);
}
}
if (logging != null) {
logging.println("<<<<< Finished to " + msg.target + " " + msg.callback);
}
// Make sure that during the course of dispatching the
// identity of the thread wasn't corrupted.
final long newIdent = Binder.clearCallingIdentity();
if (ident != newIdent) {
Log.wtf(TAG, "Thread identity changed from 0x"
+ Long.toHexString(ident) + " to 0x"
+ Long.toHexString(newIdent) + " while dispatching to "
+ msg.target.getClass().getName() + " "
+ msg.callback + " what=" + msg.what);
}
// 回收这个message
msg.recycleUnchecked();
}
}
```
loop方法首先会获取当前线程中的Looper,如果Looper为空的话就会报错。之后验证下调用者的权限后,开始了一个死循环,这里死循环里就是不断的获取消息然后处理的过程,有同学可能会有疑问,死循环程序不会卡死吗?你看前面我们创建Handler的时候说了那么多,一定有同学会想到,难道这里用的是epoll,所以不会卡死而是睡眠了?确实是这样啊,这里用的就是epoll的机制,所以不用担心卡死的问题啦,这里会调用MessageQueue的next方法获取一个消息,如果有消息的话这个方法就会返回一个消息,否则就睡眠在这里方法里面,记得我们上面分析消息入队时候说的唤醒epoll后的流程吗?唤醒后就在这个方法里面,我们稍微来具体看下这个方法,现在先把这个方法往后看。
如果获取了一个消息后,如果是一个消息是空,说明MessageQueue要退出了,所以就会return。之后会从Looper中获取一个Printer对象,我们如果有需要的话,可以通过setMessageLogging方法,创建一个Printer对象设置给Looper:
```java
public interface Printer {
void println(String x);
}
public void setMessageLogging(@Nullable Printer printer) {
mLogging = printer;
}
```
Printer是一个接口,我们可以实现这个接口,在looper中打印一些对开发有用的log。之后如果Looper的mSlowDispatchThresholdMs字段被赋值了,下面就会打印这个消息被分发处理的时间。mSlowDispatchThresholdMs是一个数字,这个数字表示一个消息正常处理大概会在多少时间,如果超过这个时间可能会有点多,所以会打印log以便于可能后续需要分析。这个值可以通过setSlowDispatchThresholdMs方法设置,比如在DisplayThread类中就有设置这个值,这个是UI线程的Looper中设置的参数,用于给UI线程设置默认显示时间的,所以如果超过这个默认显示时间就会打印log。我们有机会分析主线程显示流程的时候会来看看这个地方。
接着除了调用取出消息持有的Handler的dispatchMessage方法外,主要是就是打印一些和性能有关的log,在调用dispatchMessage方法前会设置开始时间,调用方法结束后会计算结束时间和开始时间的差值,如果大于默认给出的数值会打印log,最后全部处理完后,会调用Message的recycleUnchecked方法回收这个消息,这个我们之前的文章已经分析过了,就不多说了。
这里我们有两个方法需要看下,一个是MessageQueue的next方法,用于取出消息。另一个是Handler的dispatchMessage方法用于分发处理消息。我们先看下dispatchMessage方法,这个相对容易一些:
```java
// 优先级,1.message的runnable
// 2. Handler的callback
// 3. Handler的handleMessage方法
public void dispatchMessage(Message msg) {
// 如果message有设置runnable,就执行message的
if (msg.callback != null) {
handleCallback(msg);
} else {
// message没设置runnable,执行初始化Handler时候设置的Callback(如果初始化有设置的话)
if (mCallback != null) {
// 根据Callback返回值决定还要不要
if (mCallback.handleMessage(msg)) {
return;
}
}
// 执行Handler的handleMessage(不要和上面Calleback中的handleMessage搞混了)
handleMessage(msg);
}
}
```
这个方法可以看到主要就是把这个消息分发给具体处理的地方。之前我们分析过Handler发送消息说过,有两个发送消息的方式,其中post方法的参数是一个Runnable,这个Runnable会设置到一个Message的callback上,所以如果一个消息的callback有设置Runnable的话,最后执行的方法就会是这个消息的callback方法。
如果消息没有设置callback,那么Handler也有一个mCallback字段,这是是在创建callback时候传入的,他是一个接口:
```java
public interface Callback {
public boolean handleMessage(Message msg);
}
```
具体创建一个Handler的时候可以传入这个接口的实现,可以自己处理接收到Message后的处理逻辑。这个接口的handleMessage返回值是个布尔类型,如果返回true的话,那么执行完这个方法也就结束了,否则还会调用Handler的handleMessage方法,这个方法是一个空方法,参数也是Message,所以如果直接覆写这个方法也可以达到
和前面2个方法类似的效果。所以Handler总共提供的三种处理Message实现的方法,可以根据自己的需要来实现,优先级最高的是Message的callback,其次是Handler的callback,最后是Handler的handleMessage。好了,dispatchMessage还是比较简单的,就说这么多,下面我们去看下MessageQueue的next方法:
```java
Message next() {
final long ptr = mPtr; // 获得natvie层的messageQueue的地址
if (ptr == 0) { // 地址为空,return
return null;
}
int pendingIdleHandlerCount = -1; // -1 only during first iteration
int nextPollTimeoutMillis = 0; // 表示离开下次执行消息的时间有多久
for (;;) {
// 这里nextPollTimeoutMillis非0,表示离开下次执行还有一些时间,所以接下去可能会阻塞
// 所以这里会调用Binder.flushPendingCommands()命令,这个命令最终会
// 调用到c++层的IPCThreadStated的talkWithDriver(false)方法,这个方法
// 参数是false的时候,表示不希望binder驱动那里接受数据,默认是true,表示希望接受数据。
// 因为下面可能会要被阻塞了,这里调用这个方法的意图是,尽量能把现有的命令发出去,即写入命令的缓冲区尽量给与一定的空间
// 而读取数据的缓冲区设置为0,因为binder可能会往该线程的读取缓冲区写入数据,既然马上要阻塞了,写入也没什么用。
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}
// 都c++层epoll那里等待唤醒
nativePollOnce(ptr, nextPollTimeoutMillis);
synchronized (this) {
// Try to retrieve the next message. Return if found.
final long now = SystemClock.uptimeMillis(); // 当前时间
Message prevMsg = null;
Message msg = mMessages; // msg指向队头
// 如果有消息,并且是个同步屏障
if (msg != null && msg.target == null) {
// Stalled by a barrier. Find the next asynchronous message in the queue.
do {
prevMsg = msg; // 获取屏障
msg = msg.next; // 获取屏障后一个异步消息(跳过同步和屏障)
} while (msg != null && !msg.isAsynchronous()); // 非空且非异步
}
// 到这里如果非空,说明有同步或者异步消息
if (msg != null) {
if (now < msg.when) { // 如果最近的消息时间比当前晚,说明执行时间没到
// Next message is not ready. Set a timeout to wake up when it is ready.
// 计算离开下次执行时间有多久
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else { // 走到这里说明有个消息现在可以执行了
// Got a message.
mBlocked = false; // 既然有消息可以执行了,设置当前状态不阻塞了
if (prevMsg != null) { // 非空,说明可能前面有同步屏障
prevMsg.next = msg.next; // prevMsg执行当前的前一个,把前一个指向当前的下一个
} else { // 前面一个是空,说明没有同步屏障
mMessages = msg.next; // 队头设置为当前这个可执行消息的下一个
}
msg.next = null; //当前这个消息马上要取出执行的所以next置空
if (DEBUG)
Log.v(TAG, "Returning message: " + msg);
msg.markInUse(); // 这个消息马上要执行了,所以设置为使用中
return msg;
}
} else { // 当前没有可执行的消息,说明队列是空,或者有同步屏障,当前队列中没有运行执行的消息
// No more messages.
nextPollTimeoutMillis = -1; // -1参数在epoll_wait中,表示会一直阻塞,直到主动唤醒
}
// Process the quit message now that all pending messages have been handled.
if (mQuitting) { // 当前正在退出
dispose();
return null;
}
```
next方法虽然也不算很长,但是说的东西不算少,我们分为两段来讲。首先会判断是否持有native层的MessageQueue的地址,如果为0的话,说明有问题,退出。接着pendingIdleHandlerCount是代表是否有IdleHandler消息要处理,默认值是-1,第一次执行的时候如果是一个小于0的是就会知道这是第一次执行,执行过一次后会是一个不小于0的数,具体IdleHandler消息的处理我们在第二段代码的时候讲。
之后nextPollTimeoutMillis这个表示距离下一个消息执行的时候是多少,默认会是0,表示下一次消息立刻就要执行了,所以会唤醒epoll。这个nextPollTimeoutMillis表示的是距离下一次消息执行还有多少时间,也可以理解为epoll距离下一次被唤醒有多少时间,正常情况都是不小于0的,那么只要时间一到epoll就自动唤醒了,如果nextPollTimeoutMillis的值是-1的话,那么epoll是不会自动醒的,只有主动唤醒才行。说到这里,让我们的思路回到前面分析的入队的时候,那个时候我们分析到了一个新消息入队后有可能会主动唤醒epoll,唤醒后在java层执行的开始处就是这里了。当前由于唤醒是在native层的,所以唤醒后最开始也是在native层的,这个我们下面会说,但是java层这边被阻塞或者唤醒就是从这里要开始了。
下面开始是一个for循环,如果nextPollTimeoutMillis非等于0的话,根据我们上面分析的,下面就会被阻塞,直到下一个唤醒时间到了或者主动被唤醒,那么就会执行Binder的flushPendingCommands方法,这个方法的主要作用是尽量把现有发往binder的命令发出去,而binder返回的命令让binder不要返回,这样说有点抽象,我们稍稍看下代码,因为这块主要涉及到binder通信,这里也不可能短短几句话说清楚,下面把这个核心的几行代码尽量解释下,具体的还请参考之前我们分析binder的几篇文章,那里讲解的比较详细。
```c++
static void android_os_Binder_flushPendingCommands(JNIEnv* env, jobject clazz)
{
IPCThreadState::self()->flushCommands();
}
```
native的flushPendingCommands方法最终会触发到android_util_Binder.cpp的android_os_Binder_flushPendingCommands方法,这里会调用IPCThreadState的flushCommands方法:
```c++
void IPCThreadState::flushCommands()
{
if (mProcess->mDriverFD <= 0)
return;
talkWithDriver(false);
}
```
这里继续调用talkWithDriver方法:
```c++
status_t IPCThreadState::talkWithDriver(bool doReceive)
{
LOG_ASSERT(mProcess->mDriverFD >= 0, "Binder driver is not opened");
binder_write_read bwr;
// Is the read buffer empty?
const bool needRead = mIn.dataPosition() >= mIn.dataSize(); // mIn是binder返回给进程的缓冲区。 当前读的位置>=数据长度了,说明读取完了.true表示binder返回的命令已经处理完毕。
// We don't want to write anything if we are still reading
// from data left in the input buffer and the caller
// has requested to read the next data.
// 如果调用者不希望接受数据(false),或者之前的数据已经处理完毕(true),那么写入缓冲区的大小还是和之前一样,写入的位置还是原来的数据大小位置
// 如果希望接受数据(true)并且之前的数据读取还有没读取完(false),那么不让写入数据,所以设置写入binder命令缓冲区大小为0
// 这里可以看到默认是只希望接受数据,不发送命令,直到读取的数据完毕了才会继续发送命令。
const size_t outAvail = (!doReceive || needRead) ? mOut.dataSize() : 0;
bwr.write_size = outAvail; // 设置输出缓冲区大小
bwr.write_buffer = (long unsigned int)mOut.data(); // 设置输出缓冲区地址
// This is what we'll read.
if (doReceive && needRead) { // 如果希望binder驱动返回数据,并且之前所有的返回数据已经处理完毕了
bwr.read_size = mIn.dataCapacity(); // 设置输入缓冲区大小
bwr.read_buffer = (long unsigned int)mIn.data(); // 设置输入缓冲区
} else {
bwr.read_size = 0; // 否则返回binder命令缓冲区大小设置为0
}
................
```
这个方法参数doReceive表示是否希望接受binder传回的数据,一般情况是希望的,由于我们现在分析到的Looper马上要阻塞了,所以不希望binder返回消息。这个方法的needRead表示当前读缓冲区是否还有数据需要读,true表示没有数据了,false表示还有数据(这里true和false的含义不要理解反了)。之后会设置写缓冲区和读缓冲区的大小,正常情况下,由于doReceive一般为true,所以写缓冲区要不要设置大于0的数取决于needRead是否为true,只有needRead是true的时候才会设置写缓冲区,即读缓冲区已经没有需要读取的数据了才会发出命令。但是这里由于传入的参数是false,所以直接会设置写缓冲区的大小,目的就是如果有命令要发出的话,尽快发出,因为马上要阻塞了。
同理后面设置读缓冲区的大小的时候,正常情况只有needRead为true,即已经没有要读取的数据了,才会去设置读缓冲区。而现在doReceive的值是false,所以直接就设置读缓冲区大小为0,表示binder驱动不要往我这里写入数据,我马上要阻塞了。
以上就是Looper进入阻塞前调用flushPendingCommands方法对Binder缓冲区做的设置,binder这里后面的代码就不进入分析了,可以参考之前binder的文章。我们回到Looper那里。
接着会调用native的nativePollOnce方法,这个方法就是和epoll的阻塞以及唤醒后的流程有关了,这里我们先不进入这个方法,等Looper这里的代码分析完再回来看。接下去的代码就是被唤醒后,获取消息的流程了,下面会遍历消息队列,从中取出一个合适的消息。
这里遍历消息队列的过程中,首先会判断,如果一个消息非空,但是他的target是空,那么说明这个消息是一个同步屏障,所谓同步屏障也是一个消息,这个消息的target为空,一旦消息队列中有一个同步屏障,那么在这个同步屏障后面的同步消息就会停止被分发,直到这个同步屏障被移除为止,所以在获取小时的如果如果遇到一个同步屏障,往后遍历的时候就会跳过所有的同步消息,如果遇到一个异步消息,就会取出。
取出消息后赋值给msg,如果msg是空的话,说明当前没有获取到Message,所以会设置nextPollTimeoutMillis为-1,表示要阻塞epoll。如果msg非空,说明已经获取了一个Message,接着就要判断这个Message是否是有效的。如果这个Message的时间大于当前时间,那么说明还没有到执行时间,所以会计算出距离执行时间还有多少,然后重新设置到epoll中。如果这个消息的执行时间已经到了,那么这个消息是符合条件的,就会去队列中取出这个消息返回。如果这个消息不是在队头的话,还需要做一些前后消息链接的处理,这个不多说了,上面代码有注释。
# IdelHandler
接着再往下执行,如果当前队列正在退出的话,那么需要调用dispose方法去销毁一个native层的MessageQueue,dispose方法在native层是调用android_os_MessageQueue_nativeDestroy:
```c++
static void android_os_MessageQueue_nativeDestroy(JNIEnv* env, jclass clazz, jlong ptr) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->decStrong(env);
}
```
这里可以看到native层销毁一个MessageQueue就是减少一个强引用指针。这个就是next方法的上半段代码,这里主要遗留了一个native层的nativePollOnce方法,我们等下回头在看,我们先再看一下下半段代码,下半段代码主要是处理和IdleHandler相关的部分,在分析下半段代码之前,我们先大致介绍一下IdleHandler。
```java
public static interface IdleHandler {
// 空闲时候执行的方法
// 返回true,执行完后这个IdleHandler还会保留在queue中,false则会移除
boolean queueIdle();
}
```
以上是IdleHandler的定义,可以看到IdleHandler是一个接口,需要我们自己来实现queueIdle方法,这个方法的返回值是一个布尔型,如果返回true表示这个IdleHandler在执行完后还会保留在MessageQueue中,说明这个是一个反复会执行的任务,如果返回false,则在执行完后就移除了,说明是一个一次性的任务。
添加一个IdleHandler是通过MessageQueue的addIdleHandler的方法:
```java
private final ArrayList<IdleHandler> mIdleHandlers = new ArrayList<IdleHandler>();
public void addIdleHandler(@NonNull IdleHandler handler) {
if (handler == null) {
throw new NullPointerException("Can't add a null IdleHandler");
}
synchronized (this) {
mIdleHandlers.add(handler);
}
}
```
这个方法可以看到最终会把IdleHandler添加到mIdleHandlers集合中,这个集合也就是我们下面分析MessageQueue的next下半段方法中将要看到的。有添加,也有删除,IdleHand的删除方法如下:
```java
public void removeIdleHandler(@NonNull IdleHandler handler) {
synchronized (this) {
mIdleHandlers.remove(handler);
}
}
```
删除方法也就是从mIdleHandlers集合中移除,也没什么好说的。基本上IdleHandler就这几个方法,还是比较简单的,我们可以看Android中运行IdleHandler的一个实际例子,如果熟悉Activity启动流程或者之前看过我们Activity分析文章的同学应该还记得在启动一个Activity后,生命周期会走到onResume,最终onResume会调用到ActivityThread的handleResumeActivity方法中,我们这里不会详细分析Activity相关的内容只不过这里举一个例子来说明IdleHandler的应用,在handleResumeActivity中我们可以看到下面代码:
```java
final void handleResumeActivity(IBinder token,
boolean clearHide, boolean isForward, boolean reallyResume, int seq, String reason) {
........
if (!r.onlyLocalRequest) {
r.nextIdle = mNewActivities;
mNewActivities = r;
if (localLOGV) Slog.v(
TAG, "Scheduling idle handler for " + r);
Looper.myQueue().addIdleHandler(new Idler());
}
...............
}
```
这里代码只截取了一小部分,其余大部分我们在分析Activity的文章中已经分析过了,这里不多说。这里在Activiy走到onResume生命周期的时候会添加一个Idler类的实例到IdleHandler集合中,我们看下Idler这个类:
```java
private class Idler implements MessageQueue.IdleHandler {
@Override
public final boolean queueIdle() {
ActivityClientRecord a = mNewActivities;
boolean stopProfiling = false;
if (mBoundApplication != null && mProfiler.profileFd != null
&& mProfiler.autoStopProfiler) {
stopProfiling = true;
}
if (a != null) {
mNewActivities = null;
IActivityManager am = ActivityManager.getService();
ActivityClientRecord prev;
do {
if (localLOGV) Slog.v(
TAG, "Reporting idle of " + a +
" finished=" +
(a.activity != null && a.activity.mFinished));
if (a.activity != null && !a.activity.mFinished) {
try {
// 调用AMSactivityIdle
am.activityIdle(a.token, a.createdConfig, stopProfiling);
a.createdConfig = null;
} catch (RemoteException ex) {
throw ex.rethrowFromSystemServer();
}
}
prev = a;
a = a.nextIdle;
prev.nextIdle = null;
} while (a != null);
}
if (stopProfiling) {
mProfiler.stopProfiling();
}
ensureJitEnabled();
return false;
}
}
```
这个类继承了IdleHandler,实现了queueIdle方法。在queueIdle方法中,会判断当前这个Activity如果非空,同时没有被finish,那么就会调用AMS的activityIdle方法,我们到AMS的activityIdle方法那里去看:
```java
@Override
public final void activityIdle(IBinder token, Configuration config, boolean stopProfiling) {
final long origId = Binder.clearCallingIdentity();
synchronized (this) {
ActivityStack stack = ActivityRecord.getStackLocked(token);
if (stack != null) {
ActivityRecord r =
mStackSupervisor.activityIdleInternalLocked(token, false /* fromTimeout */,
false /* processPausingActivities */, config);
if (stopProfiling) {
if ((mProfileProc == r.app) && (mProfileFd != null)) {
try {
mProfileFd.close();
} catch (IOException e) {
}
clearProfilerLocked();
}
}
}
}
Binder.restoreCallingIdentity(origId);
}
```
这个方法会继续调用ActivityStackSupervisor中的activityIdleInternalLocked方法,这个方法之前我们也在分析Activity中有过分析,之前和流程相关的已经分析过了,这里只分析下和IdleHandler有关的部分。
```java
final ActivityRecord activityIdleInternalLocked(final IBinder token, boolean fromTimeout,
boolean processPausingActivities, Configuration config) {
........
final ArrayList<ActivityRecord> stops = processStoppingActivitiesLocked(r,
true /* remove */, processPausingActivities);
NS = stops != null ? stops.size() : 0;
if ((NF = mFinishingActivities.size()) > 0) {
// 获取finish的Activity
finishes = new ArrayList<>(mFinishingActivities);
mFinishingActivities.clear();
}
........
for (int i = 0; i < NF; i++) {
r = finishes.get(i);
final ActivityStack stack = r.getStack();
if (stack != null) {
activityRemoved |= stack.destroyActivityLocked(r, true, "finish-idle");
}
}
.........
}
```
这里我们截取了两段代码,首先会获取mFinishingActivities这个集合,这个集合中是那些还没有被finish完成的Activity会保留在这个集合中,比如我们之前分析Activity的finish流程的时候会分析到调用finishCurrentActivityLocked方法,这个方法中会判断被删除的Activity在删除前的工作还没完成的话,是不会走onDestory生命周期的,所以会先把Activity加入mFinishingActivities集合中,所以这里会从mFinishingActivities集合中取出等待被删除的Activity。之后就会调用ActivityStack的destroyActivityLocked来销毁Activity。从这里我们就可以看出IdleHandler的作用,那些需要销毁的Activity并不是马上要做的事情,所以可以拖一拖,而这里显示一个Activity对用户来说是立刻能感觉到的,所以先显示一个Activity等显示完成后,顺便启动一个IdleHandler来做那些并不是非常紧急的事情,这就是IdleHandler的作用。
好了,IdleHandler我们就说到这里,我们回到MessageQueue的next方法,继续看后半段代码:
```java
for (;;) {
synchronized (this) {
.........
if (pendingIdleHandlerCount < 0
&& (mMessages == null || now < mMessages.when)) {
pendingIdleHandlerCount = mIdleHandlers.size();
}
// 如果没有idle的消息,那么当前状态还是阻塞,continue
if (pendingIdleHandlerCount <= 0) {
// No idle handlers to run. Loop and wait some more.
mBlocked = true;
continue;
}
// 如果mPendingIdleHandlers数组是空,需要先new一个
if (mPendingIdleHandlers == null) {
// 到这里需要new一个idle的数组,数量最小是4个
mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
}
// 把mIdleHandlers中的数据存在新的数组中
mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
}
// Run the idle handlers.
// We only ever reach this code block during the first iteration.
// 遍历mPendingIdleHandlers数组
for (int i = 0; i < pendingIdleHandlerCount; i++) {
// 取出每个IdleHandler
final IdleHandler idler = mPendingIdleHandlers[i];
// 把取出的这个元素置null
mPendingIdleHandlers[i] = null; // release the reference to the handler
boolean keep = false; // keep是还行每个idle后返回的值,表示是否要保留在queue中
try {
keep = idler.queueIdle(); // 执行这个idle的方法
} catch (Throwable t) {
Log.wtf(TAG, "IdleHandler threw exception", t);
}
if (!keep) { // 如果不要保留在queue中
synchronized (this) {
// 从mIdleHandlers集合中移除
mIdleHandlers.remove(idler);
}
}
}
// Reset the idle handler count to 0 so we do not run them again.
pendingIdleHandlerCount = 0; // 这一轮以上idle后执行完了,重置数量为0
// While calling an idle handler, a new message could have been delivered
// so go back and look again for a pending message without waiting.
// 能走到这里说明前面没有可执行的消息,而且idle也执行完了,所以nextPollTimeoutMillis重置0
nextPollTimeoutMillis = 0;
}
```
首先,根据前半段代码的分析,如果取得了一个有效的Message就会返回去,所以如果执行到后半段代码肯定是没有取得有效的Message,既然没有取得Message,那么如果有IdleHandler的话,会尝试执行IdleHandler,如之前所述,IdleHandler就是那些并不紧急的任务,所以会在这里尝试执行。首先会判断一下如果pendingIdleHandlerCount是小于0的,即是初始值,所以是第一次执行,同时当前队头如果是空或者队头的消息执行时间还没到,那么会计算mIdleHandlers集合的大小,这个集合之前分析了是保存所有IdleHandler的对象的,现在把他的大小赋值给pendingIdleHandlerCount。
接着判断如果pendingIdleHandlerCount小于等于0,会阻塞并退出。这里我们可以看到,这里只有pendingIdleHandlerCount大于0才会继续进行下去,而pendingIdleHandlerCount的初始值是-1,之后被赋值后是可能大于0的,而在第一轮的循环结束后会被赋值为0,所以说每个执行next方法,只有第一次才会去处理IdleHandler,之后就没有机会执行了。接着会创建一个IdleHandler数组,这个数组的大小根据pendingIdleHandlerCount的值来创建,但是最小也有4个元素,所以是个不小于4的数组。最后就把mIdleHandlers数组中复制给新创建的IdleHandler数组mPendingIdleHandlers,下面就开始遍历这个数组。
整个遍历的过程很简单,不断取出IdleHandler对象,执行queueIdle方法,如果方法返回的值是false,说明执行完一次后就移除,所以会从mIdleHandlers这个集合中把这个IdleHandler移除,否则就继续下一轮的遍历。
最后全部遍历完后就把pendingIdleHandlerCount置为0,只要还在next方法的循环中,下一次就不会再执行IdleHandler的方法了,之后把nextPollTimeoutMillis置为0,开始下一次的循环。下一次循环又会重新执行前面的nativePollOnce方法,所以又是同一个流程。
# 唤醒epoll后处理消息
前面我们没有深入的分析nativePollOnce方法,现在我们就去看看nativePollOnce方法。
```c++
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jlong ptr, jint timeoutMillis) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}
```
nativePollOnce是个nativce方法,最终会调用到android_os_MessageQueue.cpp的android_os_MessageQueue_nativePollOnce方法,这里继续会调用NativeMessageQueue的pollOnce方法:
```c++
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
mPollEnv = env;
mPollObj = pollObj; // jave层的调用者
mLooper->pollOnce(timeoutMillis); // 调用Looper的poll
mPollObj = NULL;
mPollEnv = NULL;
if (mExceptionObj) {
env->Throw(mExceptionObj);
env->DeleteLocalRef(mExceptionObj);
mExceptionObj = NULL;
}
}
```
这里pollOnce的前2个参数分别是JNI调用环境以及调用者,这个方法主要会调用pollOnce方法继续执行,这里把java相关的参数赋值给mPollEnv和mPollObj后继续调用native层的Looper的pollOnce方法,调用完成后重置保存的java相关参数,最后如果有异常的话会抛给jave层,这些也是正常的逻辑,我们主要看下pollOnce方法:
```c++
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
// mResponses是有request加入的,进入这里while的都是自定义的fd
while (mResponseIndex < mResponses.size()) {
// 获取一个epoll消息
const Response& response = mResponses.itemAt(mResponseIndex++);
int ident = response.request.ident;
// 走到这里的是自定义fd,这里ident>=0,说明是没有设置回调
if (ident >= 0) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - returning signalled identifier %d: "
"fd=%d, events=0x%x, data=%p",
this, ident, fd, events, data);
#endif
if (outFd != NULL) *outFd = fd;
if (outEvents != NULL) *outEvents = events;
if (outData != NULL) *outData = data;
return ident;
}
}
// 到这里要么是一个默认fd的消息,要么是自定义fd且有设置回调
// 所以会进入下面pollInner方法,先获取result,看是一个消息的通知,还是唤醒通知
if (result != 0) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - returning result %d", this, result);
#endif
if (outFd != NULL) *outFd = 0;
if (outEvents != NULL) *outEvents = 0;
if (outData != NULL) *outData = NULL;
return result;
}
// 处理有回调的fd情况
result = pollInner(timeoutMillis);
}
}
```
这个方法里面也是一个for循环,不断地获取消息,如果获取了消息,就会返回,否则会继续等待。这里首先会遍历mResponses这个容器,这个容器是一个Response对象,这里Response对象的意思是指那些自定义的被添加到epoll中的文件描述符,之前在初始化Looper的时候我们已经分析了在创建epoll的时候会创建一个Eventfd,这个就是默认的添加到epoll中的文件描述符,一般情况下我们通过Handler发送消息的时候epoll机制都是监听这个描述符来进行唤醒和阻塞的,但是这个默认的描述符只监听读取事件,如果你有特殊的需求,项监听比如写事件,可以添加一个自定义的文件描述符,自定义被添加的文件描述符最终会加入到一个mRequests的map中,一般epoll被唤醒,会从mRequests中获取数据加入到mResponses容器中,上面我们看到的mResponses容器就是表达某个自定义的fd被唤醒了,所以这个方法的开头先处理自定义的fd唤醒事件。这里为了便于对代码的理解,关于自定义fd做了一些说明,等这个获取消息的方法分析完后,我们会单独在分析了和自定义fd相关的代码,所以这里先理解自定义fd就行,具体的内容后面会说。
在自定义fd的时候会设置这些fd被唤醒后是否要添加回调方法,如果添加的话会设置Request对象的ident字段为一个小于0的值,这个后面我们也会说到,所以这里如果ident是大于0,说明没有回调方法,直接返回就行,后面的流程就到调用者那里,比如我们前面分析的java层的next方法后面。一般来说这里ident都是大于0的,因为这里只会处理没有回调的自定义fd事件,那些有回调的事件在添加到mResponses这个容器之后就已经被执行了,我们在下面马上要看到的pollInner方法中可以看到这一点。
之后就是处理默认fd相关的唤醒事件了。这里result是一个返回值,默认值是0,如果正确的获得了一个消息或者出错了,则会是一个非0值,不管怎么样总是算处理完了,所以就返回到调用者那里。如果是0的话,说明是第一次执行到这里,所以会继续调用下面的pollInner方法,这里传入的参数是距离下次被唤醒还有多少时间。
```c++
int Looper::pollInner(int timeoutMillis) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);
#endif
// Adjust the timeout based on when the next message is due.
// 更新下次消息的超时时间
if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
if (messageTimeoutMillis >= 0
&& (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
timeoutMillis = messageTimeoutMillis;
}
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - next message in %" PRId64 "ns, adjusted timeout: timeoutMillis=%d",
this, mNextMessageUptime - now, timeoutMillis);
#endif
}
// Poll.
int result = POLL_WAKE; // 进入到这里,当前是个阻塞状态,所以下面一个状态先设置唤醒
// 由于mResponses里面有2种消息ident>=0的,在前面Looper::pollOnce中会遍历返回,所以肯定是清空后才会走到这里
// 而POLL_CALLBACK类型的,在上一次执行到下面的时候会执行完才退出这个方法,所以这里mResponses清空应该没什么问题
mResponses.clear();
mResponseIndex = 0;
// We are about to idle.
mPolling = true; // 当前阻塞着
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
// 参数一 epoll文件描述符
// 参数二 监听有消息后,触发的fd的结构体数组
// 参数三 最多监听几个文件描述符
// 参数四 超时时间
// 返回结果,有几个fd被触发
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
// No longer idling.
mPolling = false; // 现在有消息了不阻塞了
// Acquire lock.
mLock.lock();
// Rebuild epoll set if needed.
// epoll如果异常的话,重建
if (mEpollRebuildRequired) {
mEpollRebuildRequired = false;
rebuildEpollLocked();
goto Done;
}
// Check for poll error.
// eventCount小于0,说明没有fd被触发,返回错误
if (eventCount < 0) {
if (errno == EINTR) {
goto Done;
}
ALOGW("Poll failed with an unexpected error: %s", strerror(errno));
result = POLL_ERROR;
goto Done;
}
// Check for poll timeout.
// eventCount为0,说明超时,返回错误
if (eventCount == 0) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - timeout", this);
#endif
result = POLL_TIMEOUT;
goto Done;
}
// Handle all events.
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
#endif
...............
}
```
pollInner方法比较长,我们也分两段来分析,以上是第一段。这段开始把传入的距离下一次唤醒epoll的时间和当前的下一次唤醒epoll的时间做比对,哪个更早唤醒就设置为哪个。之后会清空mResponses这个容器,这个是存放自定义fd事件的,下面马上要进入epoll的监听了,如果阻塞被唤醒后,有自定义fd事件的话会把自定义事件添加到mResponses这里,所以这里先清空。
接着就是之前在上一篇文章中我们谈到过的epoll_wait方法,这个方法就是epoll的监听方法。这里有四个参数,参数一是epoll的文件描述符,参数二是被唤醒fd对应的数据数组,类型是一个epoll_event结构体,这个结构体我们在前一篇分析Handler的文章中已经说过了,在把某个文件描述符添加到epoll的时候会指定epoll_event这个结构体,还记得是哪里被添加的吗?在Looper.cpp的rebuildEpollLocked方法中,这里稍稍提一下,怕不熟悉的同学忘记了,具体在第一篇分析Handler的文章中已经分析过了,这里就不分析了。这里只要epoll被唤醒了,这个数组中就会有和唤醒相关的结构体。
参数三是这个epoll一共可以监听多少个文件描述符,这里可以监听16个。参数四是下次被唤醒还有多少时间。到这里如果没有新的消息,该线程就在这里进入休眠了,直到被唤醒为止。所以之前我们分析过的唤醒的wake方法,一旦唤醒了epoll,真正唤醒后开始的地方就是这里的epoll_wait后面,我们继续往下看。
唤醒后显示加上锁,反正线程安全问题。epoll_wait方法被唤醒后,会返回一个值,如果值为0说明是超时了,如果小于0说明没有fd被唤醒,代表错误了,所以会先处理下异常情况。mEpollRebuildRequired这个变量代表了是epoll操作过程中出现了错误,这个在添加自定义fd的时候我们会看到,这里这个值为true了,说明epoll出错了,需要重新初始化,会调用rebuildEpollLocked方法初始化,rebuildEpollLocked方法前面介绍过了,这里不说了。之后跳转到后面Done的代码处,Done代码我们在后半段代码中分析。
如果epoll_wait返回0或者小于0,都说明有异常,超时或者报错,也会直接跳到最后的Done代码处,我们后面在看。如果这些都没问题,下面就开始处理被唤醒的fd消息了,我们看后半段代码。
```c++
int Looper::pollInner(int timeoutMillis) {
..............
// 遍历所有被触发的fd结构体。处理java层的消息。对于java层非自定义fd来说
// 这里唤醒后就做了一件事情,清空epoll的唤醒fd数据,具体做事情还是返回到java层next方法后面继续处理
for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd; // 获取fd
uint32_t epollEvents = eventItems[i].events; // 获取fd的类型
if (fd == mWakeEventFd) { // 默认的eventfd
if (epollEvents & EPOLLIN) {
awoken(); // 如果是读取指令,主要就是情况掉epoll中唤醒eventfd的数据,没实际做什么事情
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
}
} else { // 自定义的fd
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
int events = 0;
// 设置fd的类型
if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
// 把自定义的fd请求添加到response集合中
pushResponse(events, mRequests.valueAt(requestIndex));
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
"no longer registered.", epollEvents, fd);
}
}
}
Done: ;
// Invoke pending message callbacks.
mNextMessageUptime = LLONG_MAX;
while (mMessageEnvelopes.size() != 0) { // 这段是处理C++层的默认fd的消息
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
// 获取第一个消息
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
if (messageEnvelope.uptime <= now) { // 如果这个消息是到时候了
// Remove the envelope from the list.
// We keep a strong reference to the handler until the call to handleMessage
// finishes. Then we drop it so that the handler can be deleted *before*
// we reacquire our lock.
{ // obtain handler
// 获取这个消息的Handler
sp<MessageHandler> handler = messageEnvelope.handler;
// 或者这个消息
Message message = messageEnvelope.message;
mMessageEnvelopes.removeAt(0); // 从消息集合中移除
mSendingMessage = true;
mLock.unlock();
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",
this, handler.get(), message.what);
#endif
handler->handleMessage(message); // 调用c++的Handler处理方法
} // release handler
mLock.lock(); // 解锁
mSendingMessage = false;
result = POLL_CALLBACK; // 触发fd变化的监听类型
} else { // 这个消息还没到时间,把他的时间设置给下一次时间mNextMessageUptime
// The last message left at the head of the queue determines the next wakeup time.
mNextMessageUptime = messageEnvelope.uptime;
break;
}
}
// Release lock.
mLock.unlock();
// Invoke all response callbacks.
// 这段是处理自定义fd消息的
for (size_t i = 0; i < mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
// ident非等于POLL_CALLBACK的就是>=0,如果是这个的话会在pollonce开始时候处理,所以这里只考虑
// POLL_CALLBACK类型的,即fd监听收到消息
if (response.request.ident == POLL_CALLBACK) {
int fd = response.request.fd; // 自定义的fd
int events = response.events; // 监听类型
void* data = response.request.data; // 回调方法返回的值,即一个监听类型
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",
this, response.request.callback.get(), fd, events, data);
#endif
// Invoke the callback. Note that the file descriptor may be closed by
// the callback (and potentially even reused) before the function returns so
// we need to be a little careful when removing the file descriptor afterwards.
//执行回调方法,返回的是一个监听类型
int callbackResult = response.request.callback->handleEvent(fd, events, data);
// 监听类型为0,说明监听的fd可能关闭或者失效了,从request集合中移除
if (callbackResult == 0) {
removeFd(fd, response.request.seq);
}
// Clear the callback reference in the response structure promptly because we
// will not clear the response vector itself until the next poll.
response.request.callback.clear(); // 清除回调方法,主要就是callback指向的地址变为0
result = POLL_CALLBACK; // 触发fd变化的事件
}
}
return result;
}
```
后半段代码开始部分是变量epoll唤醒后返回数组eventItems的过程。这个取出eventItems数组的每个元素,每个元素都包含一个对应的fd。这里对于默认fd和自定义的fd做了分开处理。如果是一个默认fd事件的话,那么会判断是不是读事件如果是的话,那么就和初始化默认fd是一样的,所以这里就调用awoken方法情况eventFd的值,如果不是的话,说明不是默认的fd,那就什么也不做。我们看下awoken方法:
```c++
void Looper::awoken() {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ awoken", this);
#endif
uint64_t counter;
// 读出mWakeEventFd数据,情况这里的数据而已,不做什么实际的事情
TEMP_FAILURE_RETRY(read(mWakeEventFd, &counter, sizeof(uint64_t)));
}
```
这个方法其实很简单,读取默认fd的值保存到counter中。根据我们之前介绍过的Eventfd,只要读取一次就清0了,所以这里读取的意义在于把fd清0,这样等待下次的唤醒,具体读出来什么值不重要,好了,我们回到前面的方法。
如果被唤醒的fd不是默认的话,那肯定是自定义的。那么就从mRequests这个map中取出fd对应的Request对象,mRequests中的对象是通过MessageQueue的addOnFileDescriptorEventListener方法添加进去的,这个我们后面再说,这里的话就是遍历mRequests中所有的对象,前面我们说过自定义类型的fd监听的类型可以比默认的多,这里就取出fd的类型,这里可以看到有四种,读写,错误和挂起,这里所谓的挂起是指如果这个fd对应的文件关闭了,那么他就属于挂起状态。这个设置好了后,就调用pushResponse方法:
```c++
struct Response {
int events;
Request request;
};
void Looper::pushResponse(int events, const Request& request) {
Response response;
response.events = events;
response.request = request;
mResponses.push(response);
}
```
Response是个结构体,里面就2个字段,第一个events代表监听的类型,第二个就是一个request,Request这个结构体我们在后面分析添加自定义fd的时候再看。这里pushResponse方法很简单,创建一个Response对象,放入mResponses容器中。
好了,到这里位置,其实一般jave层默认的Handler发送消息也就处理完了,我们通常使用Handler发送消息后,大部分到这里就基本结束了。后面的部分主要是处理自定义fd以及native层的Handler发送的消息的。大家有没有发现,上面在处理jave层的消息的时候,其实主要的作用就是唤醒epoll,还记得我们前面分析java层MessageQueue的next方法的时候,其实是阻塞在那里的,只有这里被唤醒了,java层才会继续执行下去,而获取具体的一个消息以及分发消息的回调方法其实都是在java层处理,native这里仅仅是起到了阻塞和唤醒的作用,所以说通常我们开发一个app的时候,Handler主要事情都是在jave层就完了。但是我们我们自定义fd和native层的消息的分发处理就是在现在这个pollInner方法里执行的,这点就是和处理jave层默认fd不同的地方。我们先看下处理native层消息的情况。
这里有一个mMessageEnvelopes容器,这个容器就是保存native层具体消息,每个消息是一个MessageEnvelope结构体,我们从名字可以看出,这个结构体封装了一个具体的消息,我们看下这个结构体:
```c++
struct MessageEnvelope {
MessageEnvelope() : uptime(0) { }
MessageEnvelope(nsecs_t u, const sp<MessageHandler> h,
const Message& m) : uptime(u), handler(h), message(m) {
}
nsecs_t uptime;
sp<MessageHandler> handler;
Message message;
};
class MessageHandler : public virtual RefBase {
protected:
virtual ~MessageHandler();
public:
virtual void handleMessage(const Message& message) = 0;
};
struct Message {
Message() : what(0) { }
Message(int w) : what(w) { }
/* The message type. (interpretation is left up to the handler) */
int what;
};
```
MessageEnvelope结构体中有3个字段,uptime代表执行时间,handler就是native层的Handler,这里可以看到是一个类,里面有个handleMessage虚方法需要调用者实现,这个就是消息的回调方法。还有个Message的结构体,这个结构体我们看到里面有个what,这个就是和java层一模一样,代表具体是哪个消息。
我们回到pollInner方法,获取了一个MessageEnvelope对象后,如果这个MessageEnvelope执行时间还没到,那么就更新下次唤醒epoll的时间mNextMessageUptime。否则就调用MessageEnvelope中Handler的handleMessage方法,并且把message作为参数传入,这样native层的消息回调方法就在这里执行了,注意这里有个mSendingMessage变量,为true表示当前正在处理native层消息分发,此时如果有往native层添加消息,是不会处理的,等到这里处理完消息后,会把mSendingMessage重新置为false,native层的消息添加才能进行,这个后面我们分析到native层的消息添加时候会看到,最后这里返回的result赋值为POLL_CALLBACK,表示这个消息已经执行了回调方法。这个就是native层消息的处理,还是比较简单的,对于native层消息的添加,我们会在下一篇文章中讲。
接着我们再看下代码的最后一段,是处理自定义fd消息的。上面分析遍历epoll返回数组的时候,会根据fd来区别是默认的fd还是自定义的fd,如果是自定义的fd会把这些对应消息保存在mResponses容器中。同时我们前面在分析pollOnce方法的时候也提到了,自定义fd会有回调函数和没有回调函数,在pollOnce方法中根据Request不小于0来判断是没有回调函数的,那么如果有回调函数的处理就在现在的pollInner方法这里了。
这里我们看到会遍历mResponses容器,获取每个Response对象,同样也是根据Response中保存的Request的ident值来判断是否有回调方法,如果ident值等于POLL_CALLBACK说明是有回调方法的,那么会调用Request的callback,callback是一个LooperCallback类,我们可以看下:
```c++
struct Request {
int fd;
int ident;
int events;
int seq;
sp<LooperCallback> callback;
void* data;
void initEventItem(struct epoll_event* eventItem) const;
};
class LooperCallback : public virtual RefBase {
protected:
virtual ~LooperCallback();
public:
virtual int handleEvent(int fd, int events, void* data) = 0;
};
```
可以看到Request的callback是一个LooperCallback类,而LooperCallback类中主要就是一个handleEvent虚方法,是下篇文章我们会分析添加自定义fd时会看到实际给callback赋值的是一个SimpleLooperCallback类,SimpleLooperCallback继承了LooperCallback类:
```c++
class SimpleLooperCallback : public LooperCallback {
protected:
virtual ~SimpleLooperCallback();
public:
SimpleLooperCallback(Looper_callbackFunc callback);
virtual int handleEvent(int fd, int events, void* data);
private:
Looper_callbackFunc mCallback;
};
SimpleLooperCallback::SimpleLooperCallback(Looper_callbackFunc callback) :
mCallback(callback) {
}
int SimpleLooperCallback::handleEvent(int fd, int events, void* data) {
return mCallback(fd, events, data);
}
```
可以看到,这个类的初始化方法会传入一个Looper_callbackFunc方法,最后handleEvent就是调用这个Looper_callbackFunc方法的,关于这里的Looper_callbackFunc方法传入我们也放到下篇文章的自定义fd中说,这里只要知道这个方法是调用者传入的消息回调方法就可以了。
我们回到pollInner方法,这里调用了handleEvent后,那么自定义fd的回调方法也就执行完了,如果回调方法的返回结果是0的话,说明这个监听的fd可能出现异常了,比如被关闭了,那么需要从mRequests这个map中把这个自定义fd对应的元素移除,这里调用了removeFd方法,然后result也是设置为POLL_CALLBACK表示这个回调方法也执行过了,整个消息处理也就完成了。我们看下上面移除mRequests中自定义fd的方法removeFd:
```c++
int Looper::removeFd(int fd, int seq) {
.......
{ // acquire lock
AutoMutex _l(mLock);
ssize_t requestIndex = mRequests.indexOfKey(fd); // 获取这个在map中的下标
if (requestIndex < 0) { // 下标<0 return
return 0;
}
// Check the sequence number if one was given.
// 检查是否这个自定义fd添加的顺序数字在requests中与response中是否一样,不一样或者-1, return
if (seq != -1 && mRequests.valueAt(requestIndex).seq != seq) {
........
return 0;
}
mRequests.removeItemsAt(requestIndex); // 从mRequests中移除这个fd
// 从epoll删除这个fd
int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_DEL, fd, NULL);
if (epollResult < 0) { // 删除报错
if (seq != -1 && (errno == EBADF || errno == ENOENT)) { // 如果是坏的fd或者不存在
.......
scheduleEpollRebuildLocked(); // 重新创建epoll
} else { // 其他错误类型也要重新创建epoll
ALOGE("Error removing epoll events for fd %d: %s", fd, strerror(errno));
scheduleEpollRebuildLocked();
return -1;
}
}
} // release lock
return 1;
}
```
这个方法首先从mRequests根据fd为key获取下标,如果下标小于0的话,说明没有这个元素,退出。其次Request的seq自动表示这个自定义fd被更新过几次,这个我们下篇文章在分析添加自定义fd的时候会看到,然后把mRequests中fd对应的seq和需要移除的fd对应的seq做比较,如果不一样的话,那么也退出。一切都正常后,就可以从mRequests中把这个fd移除了。
上面移除的只是Looper中保存的fd,但是epoll中还有监听这个fd,所以还需要从epoll中移除,移除epoll中的监听fd调用的还是之前添加fd一样的方法epoll_ctl,只不过这里第二个参数之前添加是用的EPOLL_CTL_ADD,移除用的是EPOLL_CTL_DEL。移除如果正常的话就可以返回了,如果移除报错了,那么epoll_ctl返回值会是小于0,这里如果移除的错误是由于fd已经关闭了或者不存在的情况会打印相关的log,其他情况的错误也会打印其他的log,不管怎么样,这里只要是从epoll中删除fd报错了都需要重新初始化epoll,这里会调用scheduleEpollRebuildLocked方法:
```c++
void Looper::scheduleEpollRebuildLocked() {
if (!mEpollRebuildRequired) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ scheduleEpollRebuildLocked - scheduling epoll set rebuild", this);
#endif
mEpollRebuildRequired = true;
wake(); // 唤醒,主要就是往唤醒epollfd中写入一个数字,唤醒等待的地方
}
}
```
这里的mEpollRebuildRequired变量前面也看到过了,为true的话说明epoll出错了,需要重新初始化,默认肯定是false的,所以如果第一次进入这里后会把他置为true,之后调用wake方法,wake方法前面也介绍过了,会往默认的Eventfd中写入一个1的数字,目的只是唤醒epoll,这样epoll最终又会走到前面分析的pollInner方法那里,记得前面分析的,如果epoll被唤醒后会根据mEpollRebuildRequired的置是否为true,如果为true会重新初始化epoll,从而整个流程又回到了我们前面分析的地方了。
到这里,一个Handler发送消息到消息的处理流程就都完成了,这个流程从java层到native层里面涉及到的东西还是挺多的,我们上面分析过程中还有些遗留的部分没有分析过,一个是关于自定义fd的添加,另一个是natice层消息的发送,我们会在下一篇文章中继续分析,这篇文章就分析到这里了,我们下篇文章再见。
Handler源码解析(二)