Android應(yīng)用程序消息處理機(jī)制

    1. 簡介
    1. 工作原理
    • 2.1 創(chuàng)建線程消息隊列
    • 2.2 線程消息循環(huán)過程
    • 2.3 線程消息發(fā)送過程
    • 2.4 線程消息處理過程
    1. 總結(jié)

#1. 簡介

Android應(yīng)用程序通過消息驅(qū)動,Android應(yīng)用程序的每一個線程在啟動時,都可以首先在其內(nèi)部創(chuàng)建一個消息隊列,然后進(jìn)入無限循環(huán)不斷檢查消息隊列是否有新的消息需要處理。如果有新的消息,則線程將會從它的消息隊列中取出來進(jìn)行處理;否則,線程就會進(jìn)入睡眠狀態(tài),直到有新的消息需要處理為止。

Android應(yīng)用程序的消息處理機(jī)制是圍繞消息隊列實現(xiàn)的,一個線程擁有一個消息隊列之后進(jìn)入到消息循環(huán)中,同時本身和其他線程可以向該隊列中發(fā)送消息。

Android消息隊列主要由MessageQueue,Looper以及Handler構(gòu)成。其中MessageQueue是存儲消息Message的數(shù)據(jù)結(jié)構(gòu),其內(nèi)部實現(xiàn)為單鏈表;

Looper負(fù)責(zé)創(chuàng)建,輪詢MessageQueue;Handler負(fù)責(zé)往MessageQueue中發(fā)送以及處理消息。


#2. 工作原理

#2.1 創(chuàng)建線程消息隊列

Android應(yīng)用程序消息隊列使用MessageQueue對象來描述,它可以同過Looper的靜態(tài)成員函數(shù)prepareMainLooper或者prepare來創(chuàng)建,其中前者為主線程創(chuàng)建消息隊列;后者用來為其他子線程創(chuàng)建消息隊列。

Java層中的每一個Looper對象內(nèi)部都有一個類型為MessageQueue的成員變量mQueue,它指向一個MessageQueue對象;而C++層中,每一個NativeMessageQueue對象內(nèi)部都有一個類型為Looper的成員變量mLooper,它指向一個C++Looper對象。

Java層中的每一個MessageQueue對象都有一個類型為int的成員變量mPtr,它保存了C++層中的一個NativeMessageQueue對象的地址值。

Java層中MessageQueue對象的成員變量mMessages描述了一個消息隊列。我們可以通過MessageQueue提供的成員函數(shù)enqueueMessage向?qū)ο笾胁迦胂ⅰ?/p>

以Android應(yīng)用程序啟動為例,JVM執(zhí)行ActivityThread中的main(),在main()中,會調(diào)用Looper.prepareMainLooper();

ActivityThread-main
    public static void main(String[] args) {
        ......
        Looper.prepareMainLooper();//初始化Looper對象
        ActivityThread thread = new ActivityThread();
        thread.attach(false);
        if (sMainThreadHandler == null) {
            sMainThreadHandler = thread.getHandler();
        }
        if (false) {
            Looper.myLooper().setMessageLogging(new
                    LogPrinter(Log.DEBUG, "ActivityThread"));
        }
        // End of event ActivityThreadMain.
        Trace.traceEnd(Trace.TRACE_TAG_ACTIVITY_MANAGER);
        Looper.loop();//輪詢消息隊列
        throw new RuntimeException("Main thread loop unexpectedly exited");
    }

Looper.prepareMainLooper()所執(zhí)行的操作是,函數(shù)內(nèi)部首先調(diào)用 prepare(false),在prepare函數(shù)中會去校驗Looper對象是否存在,通過sThreadLocal.get()! 來進(jìn)行判斷,如果存在拋出異常,否則創(chuàng)建新的Looper對象,并將其存放在 sThreadLocal實例中。TheadLocal為線程數(shù)據(jù)存儲局部變量,其實現(xiàn)類似于HashMap,作用是可以為每一個線程提供單獨的數(shù)據(jù)副本。在Looper構(gòu)造函數(shù)中,初始化消息隊列的實例對象MessageQueue,并將它保存在成員變量mQueue中。

NT: Looper類的靜態(tài)成員函數(shù)prepareMainLooper()只能在主線程中調(diào)用。將主線程的Looper對象另外保存在一個獨立的靜態(tài)成員變量(sMainLooper)中,是為了讓其他線程可以通過Looper類的靜態(tài)成員函數(shù)getMainLooper()來訪問,從而可以向它的消息隊列中發(fā)送一些與UI操作相關(guān)的信息。

Looper
public final class Looper {
// sThreadLocal.get() will return null unless you've called prepare().
static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();
private static Looper sMainLooper;  // guarded by Looper.class

final MessageQueue mQueue;
final Thread mThread;

 /** Initialize the current thread as a looper.
  * This gives you a chance to create handlers that then reference
  * this looper, before actually starting the loop. Be sure to call
  * {@link #loop()} after calling this method, and end it by calling
  * {@link #quit()}.
  */
public static void prepare() {
    prepare(true);
}

private static void prepare(boolean quitAllowed) {
    if (sThreadLocal.get() != null) {
        throw new RuntimeException("Only one Looper may be created per thread");
    }
    sThreadLocal.set(new Looper(quitAllowed));
}

/**
 * Initialize the current thread as a looper, marking it as an
 * application's main looper. The main looper for your application
 * is created by the Android environment, so you should never need
 * to call this function yourself.  See also: {@link #prepare()}
 */
public static void prepareMainLooper() {
    prepare(false);
    synchronized (Looper.class) {
        if (sMainLooper != null) {
            throw new IllegalStateException("The main Looper has already been prepared.");
        }
        sMainLooper = myLooper();
    }
}

/**
 * Returns the application's main looper, which lives in the main thread of the application.
 */
public static Looper getMainLooper() {
    synchronized (Looper.class) {
        return sMainLooper;
    }
}

/**
 * Return the Looper object associated with the current thread.  Returns
 * null if the calling thread is not associated with a Looper.
 */
public static @Nullable Looper myLooper() {
    return sThreadLocal.get();
}

private Looper(boolean quitAllowed) {
    mQueue = new MessageQueue(quitAllowed);
    mThread = Thread.currentThread();
}

}

MessageQueue對象在構(gòu)造的過程中,會通過JNI調(diào)用native層方法,在C++層初始化一個NativeMessageQueue對象,并將對象指針保存在Java層MessageQueue對象的成員變量mPtr中。

MessageQueue
    MessageQueue(boolean quitAllowed) {
        mQuitAllowed = quitAllowed;
        mPtr = nativeInit();
    }

創(chuàng)建C++層的NativeMessageQueue對象,并將對象指針返回給調(diào)用者。

static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
    NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
    if (!nativeMessageQueue) {
        jniThrowRuntimeException(env, "Unable to allocate native queue");
        return 0;
    }

    nativeMessageQueue->incStrong(env);
    return reinterpret_cast<jlong>(nativeMessageQueue);
}

一個NativeMessageQueue對象在創(chuàng)建的過程中,又會在內(nèi)部創(chuàng)建一個C++層的Looper對象。

NativeMessageQueue::NativeMessageQueue() :
        mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
    mLooper = Looper::getForThread();
    if (mLooper == NULL) {
        mLooper = new Looper(false);
        Looper::setForThread(mLooper);
    }
}

一個C++層的Looper對象在創(chuàng)建的過程中,又會在內(nèi)部創(chuàng)建一個管道。調(diào)用pipe(int[])函數(shù)來創(chuàng)建一個管道,并將read端文件描述符保存在Looper對象的成員變量mWakeReadPipeFd中,將write端文件描述符保存在mWakeWritePipeFd中。管道在一個線程的消息循環(huán)過程中起到的作用非常大。當(dāng)一個線程沒有新的消息處理時,它就會睡眠在這個管道的讀端文件描述符上,直到有新的消息需要處理為止;當(dāng)其他線程向這個線程的消息隊列發(fā)送一個消息后,其他線程會通過這個管道的寫端文件描述符往這個管道寫入一個數(shù)據(jù),從而將這個線程喚醒,以便它可以對剛才發(fā)送到消息隊列中的消息進(jìn)行處理。

mWakeReadPipeFd
Looper::Looper(bool allowNonCallbacks) :
        mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
        mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
    int wakeFds[2];
    int result = pipe(wakeFds);//創(chuàng)建管道,wakeFds[0]為read()端文件描述符,wakeFds[1]為write端文件描述符
    LOG_ALWAYS_FATAL_IF(result != 0, "Could not create wake pipe.  errno=%d", errno);
    mWakeReadPipeFd = wakeFds[0];
    mWakeWritePipeFd = wakeFds[1];
    result = fcntl(mWakeReadPipeFd, F_SETFL, O_NONBLOCK);
    LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake read pipe non-blocking.  errno=%d",
            errno);
    result = fcntl(mWakeWritePipeFd, F_SETFL, O_NONBLOCK);
    LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake write pipe non-blocking.  errno=%d",
            errno);
    // Allocate the epoll instance and register the wake pipe.
    mEpollFd = epoll_create(EPOLL_SIZE_HINT);
    LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance.  errno=%d", errno);
    struct epoll_event eventItem;
    memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
    eventItem.events = EPOLLIN;
    eventItem.data.fd = mWakeReadPipeFd;
    result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem);
    LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake read pipe to epoll instance.  errno=%d",
            errno);
}
  • Looper內(nèi)部通過epoll_create來創(chuàng)建一個epoll實例,并且將它的文件描述符保存在C++層的Looper類的成員變量mEpollFd中。然后將之前創(chuàng)建的管道讀端文件描述符添加到這個epoll實例中,以便可以對它所描述的管道的寫操作進(jìn)行監(jiān)聽。

  • Linux系統(tǒng)的epoll機(jī)制是為了同時監(jiān)聽多個文件描述符的IO讀寫事件而設(shè)計的,它是一個多路復(fù)用IO接口,類似于Linux系統(tǒng)的select機(jī)制,但是它是select機(jī)制的增強(qiáng)版。如果一個epoll實例監(jiān)聽了大量的文件描述符的IO讀寫事件,但是只有少量的文件描述符是活躍的,即只有少量的文件描述符會發(fā)生IO讀寫事件,那么這個epoll實例會顯著地減少CPU使用率,從而提供系統(tǒng)的并發(fā)能力。

#2.2 線程消息循環(huán)過程

一個Android應(yīng)用程序的消息隊列創(chuàng)建完成之后,就可以調(diào)用Looper類的靜態(tài)成員函數(shù)loop使它進(jìn)入一個消息循環(huán)隊列。

graph TD
    A(Start) --> B[Looper.loop]
    B-->C[MessageQueue.next] 
    C-->D[MessageQueue.nativePollOnce]
    D-->E[NativeMessageQueue.pollOnce]
    E-->F[C++ Looper.pollOnce]
    F-->G[C++ Looper.pollInner]
    G-->H[C++ Looper.awken]
    H-->I(End)
#2.2.1 Looper.loop

首先通過myLooper()獲取到當(dāng)前線程的Looper對象(me),通過Looper的成員變量mQueue獲取到創(chuàng)建MessageQueue的時候保存在其中的對象(me.mQueue)。接著循環(huán)遍歷消息隊列中的消息,如果有新的消息需要處理即(msg!=null),則把消息交給Handler處理(msg.target.dispatchMessage)。否則,當(dāng)前當(dāng)前線程就會在執(zhí)行MessageQueue的next()時阻塞住(queue.next),直到有新的消息需要處理為止。如果MessageQueue的next()返回的msg為空(msg==null),意味著消息隊列退出,Looper的loop操作停止。

public static void loop() {
        final Looper me = myLooper();
        if (me == null) {
            throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
        }
        final MessageQueue queue = me.mQueue;

        // Make sure the identity of this thread is that of the local process,
        // and keep track of what that identity token actually is.
        Binder.clearCallingIdentity();
        final long ident = Binder.clearCallingIdentity();

        for (;;) {
            Message msg = queue.next(); // might block
            if (msg == null) {
                // No message indicates that the message queue is quitting.
                return;
            }

            // This must be in a local variable, in case a UI event sets the logger
            final Printer logging = me.mLogging;
            if (logging != null) {
                logging.println(">>>>> Dispatching to " + msg.target + " " +
                        msg.callback + ": " + msg.what);
            }

            final long traceTag = me.mTraceTag;
            if (traceTag != 0 && Trace.isTagEnabled(traceTag)) {
                Trace.traceBegin(traceTag, msg.target.getTraceName(msg));
            }
            try {
                msg.target.dispatchMessage(msg);
            } finally {
                if (traceTag != 0) {
                    Trace.traceEnd(traceTag);
                }
            }

            if (logging != null) {
                logging.println("<<<<< Finished to " + msg.target + " " + msg.callback);
            }

            // Make sure that during the course of dispatching the
            // identity of the thread wasn't corrupted.
            final long newIdent = Binder.clearCallingIdentity();
            if (ident != newIdent) {
                Log.wtf(TAG, "Thread identity changed from 0x"
                        + Long.toHexString(ident) + " to 0x"
                        + Long.toHexString(newIdent) + " while dispatching to "
                        + msg.target.getClass().getName() + " "
                        + msg.callback + " what=" + msg.what);
            }

            msg.recycleUnchecked();
        }
    }

接下來具體分析MessageQueue.next()所執(zhí)行的操作。

#2.2.2 MessageQueue.next
  • next()方法首先會獲取到mPtr保存的NativeMessageQueue的對象指針,如果為空,next()返回null,Loop.loop()在檢驗到msg==null時會退出loop輪詢。

  • next()中的局部變量pendingIdleHandlerCount [n1] 記錄的是注冊到消息隊列中的空閑消息處理器IdleHandler的個數(shù),當(dāng)線程發(fā)現(xiàn)其消息隊列沒有新的消息需要處理時,不是馬上進(jìn)入睡眠狀態(tài),而是先調(diào)用注冊到它的消息隊列中的IdleHandler對象的成員函數(shù)queueIdle,以便它們有機(jī)會在線程空閑的時候進(jìn)行一些操作。

  • 局部變量nextPollTimeoutMillis [n2] ,用來描述消息隊列中有沒有新的消息需要處理,當(dāng)前線程進(jìn)入阻塞狀態(tài)所需要的時間。如果變量nextPollTimeoutMillis的==0,即表示即使當(dāng)前消息隊列中沒有消息處理,當(dāng)前線程也不要進(jìn)入阻塞狀態(tài)。如果nextPollTimeoutMillis==-1,表示當(dāng)前消息隊列沒有消息處理時,當(dāng)前線程需要無限地處于休眠狀態(tài),直到被其他線程喚醒。for循環(huán)不斷地調(diào)用nativePollOnce [n3] 來檢查當(dāng)前的消息隊列中是否有新的消息需要處理。

  • 在調(diào)用nativePollOnce(ptr,nextPollTimeoutMillis)時當(dāng)前線程可能進(jìn)入睡眠等待狀態(tài),等待的時間由nextPollTimeoutMillis決定。

  • MessageQueue內(nèi)部的成員變量mMessages記錄的是當(dāng)前需要處理的消息隊列,當(dāng)線程從阻塞狀態(tài)恢復(fù)之后,即nativePollOnce(ptr,nextPollTimeoutMillis)執(zhí)行完畢,如果有新的消息需要處理,則執(zhí)行 [n4] - [n5] 步驟對消息進(jìn)行處理,否則 [n6] 初將nextPollTimeoutMillis設(shè)置為-1,表示下次調(diào)用nativePollOnce時,如果沒有新的消息要處理,那么就要無限的處于睡眠等待狀態(tài)。

  • [n4] - [n5] 操作首先通過do-whle循環(huán)找到消息隊列中的下一個異步消息并把它保存在msg變量中,如果msg!=null,此時判斷系統(tǒng)當(dāng)前的時間是否小于消息應(yīng)該被處理的時間,如果小于則計算出下次線程需要休眠等待的時間[msg.when-now],否則說明消息需要馬上處理,具體為如果當(dāng)前prevMsg!=null,則把msg.next掛載到prevMsg.next ==[prev|current|next,如果返回current,則需要把next指針指向prev指針(鏈表的基本操作)]== ,如果preMsg==null,說明當(dāng)前的msg就處于頭的位置,則把next設(shè)置為鏈頭[mMessages = msg.next]。

Message next() {
        // Return here if the message loop has already quit and been disposed.
        // This can happen if the application tries to restart a looper after quit
        // which is not supported.
        
        //mPtr保存的是NativeMessageQueue的對象指針
        final long ptr = mPtr;
        //判斷NativeMessageQueue有沒有創(chuàng)建成功,如果沒有,退出loop循環(huán)
        if (ptr == 0) {
            return null;
        }
        //[n1]
        int pendingIdleHandlerCount = -1; // -1 only during first iteration
        //[n2]
        int nextPollTimeoutMillis = 0;
        for (;;) {
            if (nextPollTimeoutMillis != 0) {
                Binder.flushPendingCommands();
            }
            //[n3]
            nativePollOnce(ptr, nextPollTimeoutMillis);
            
            synchronized (this) {
                // Try to retrieve the next message.  Return if found.
                //[n4]
                final long now = SystemClock.uptimeMillis();
                Message prevMsg = null;
                Message msg = mMessages;
                if (msg != null && msg.target == null) {
                    // Stalled by a barrier.  Find the next asynchronous message in the queue.
                    do {
                        prevMsg = msg;
                        msg = msg.next;
                    } while (msg != null && !msg.isAsynchronous());
                }
                if (msg != null) {
                    if (now < msg.when) {
                        // Next message is not ready.  Set a timeout to wake up when it is ready.
                        nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                    } else {
                        // Got a message.     //prev|current|next
                        mBlocked = false;
                        if (prevMsg != null) {      
                            prevMsg.next = msg.next;
                        } else {
                            mMessages = msg.next;
                        }
                        msg.next = null;
                        if (DEBUG) Log.v(TAG, "Returning message: " + msg);
                        msg.markInUse();
                        return msg;
                    }//[n5]
                } else {
                    // No more messages.[n6]
                    nextPollTimeoutMillis = -1;
                }

                // Process the quit message now that all pending messages have been handled.
                if (mQuitting) {
                    dispose();
                    return null;
                }

                // If first time idle, then get the number of idlers to run.
                // Idle handles only run if the queue is empty or if the first message
                // in the queue (possibly a barrier) is due to be handled in the future.
                if (pendingIdleHandlerCount < 0
                        && (mMessages == null || now < mMessages.when)) {
                    pendingIdleHandlerCount = mIdleHandlers.size();
                }
                if (pendingIdleHandlerCount <= 0) {
                    // No idle handlers to run.  Loop and wait some more.
                    mBlocked = true;
                    continue;
                }

                if (mPendingIdleHandlers == null) {
                    mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
                }
                mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
            }

            // Run the idle handlers.
            // We only ever reach this code block during the first iteration.
            for (int i = 0; i < pendingIdleHandlerCount; i++) {
                final IdleHandler idler = mPendingIdleHandlers[i];
                mPendingIdleHandlers[i] = null; // release the reference to the handler

                boolean keep = false;
                try {
                    keep = idler.queueIdle();
                } catch (Throwable t) {
                    Log.wtf(TAG, "IdleHandler threw exception", t);
                }

                if (!keep) {
                    synchronized (this) {
                        mIdleHandlers.remove(idler);
                    }
                }
            }

            // Reset the idle handler count to 0 so we do not run them again.
            pendingIdleHandlerCount = 0;

            // While calling an idle handler, a new message could have been delivered
            // so go back and look again for a pending message without waiting.
            nextPollTimeoutMillis = 0;
        }
    }

NT: for循環(huán)中,每一次調(diào)用nativePollOnce函數(shù)之前,都會判斷nextPollTimeoutMillis變量是否等于0,如果不等于0,那么當(dāng)前線程會在nativePollOnce函數(shù)中進(jìn)入睡眠等待狀態(tài)。這時候調(diào)用Binder.flushPendingCommands()來處理那些正在等待處理的Binder進(jìn)程間通信請求,避免它們長時間得不到處理。

#2.2.3 MessageQueue.nativePollOnce

MessageQueue的成員變量mPtr保存的是C++層的NativeMessageQueue對象指針,因此可以將其轉(zhuǎn)換為一個NativeMessageQueue對象指針。然后調(diào)用NativeMessageQueue的成員函數(shù)pollOnce。

static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
        jlong ptr, jint timeoutMillis) {
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}
#2.2.4 NativeMessageQueue.pollOnce

NativeMessageQueue.pollOnce內(nèi)部調(diào)用了C++層Looper的成員函數(shù)pollOnce。

void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
    mPollEnv = env;
    mPollObj = pollObj;
    mLooper->pollOnce(timeoutMillis);
    mPollObj = NULL;
    mPollEnv = NULL;

    if (mExceptionObj) {
        env->Throw(mExceptionObj);
        env->DeleteLocalRef(mExceptionObj);
        mExceptionObj = NULL;
    }
}

#2.2.5 Looper.pollOnce

Looper.pollOnce內(nèi)部通過for循環(huán)不斷的調(diào)用pollInner函數(shù)來判斷當(dāng)前線程是否有新的消息需要處理。如果有,result返回不為0。然后跳出for循環(huán),以便當(dāng)前線程對新的消息進(jìn)行處理。

int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
    int result = 0;
    for (;;) {
        while (mResponseIndex < mResponses.size()) {
            const Response& response = mResponses.itemAt(mResponseIndex++);
            int ident = response.request.ident;
            if (ident >= 0) {
                int fd = response.request.fd;
                int events = response.events;
                void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE
                ALOGD("%p ~ pollOnce - returning signalled identifier %d: "
                        "fd=%d, events=0x%x, data=%p",
                        this, ident, fd, events, data);
#endif
                if (outFd != NULL) *outFd = fd;
                if (outEvents != NULL) *outEvents = events;
                if (outData != NULL) *outData = data;
                return ident;
            }
        }
        if (result != 0) {
#if DEBUG_POLL_AND_WAKE
            ALOGD("%p ~ pollOnce - returning result %d", this, result);
#endif
            if (outFd != NULL) *outFd = 0;
            if (outEvents != NULL) *outEvents = 0;
            if (outData != NULL) *outData = NULL;
            return result;
        }
        result = pollInner(timeoutMillis);
    }
}
#2.2.6 Looper.pollInner

在C++層Looper創(chuàng)建的的時候,會創(chuàng)建一個epoll實例,并且將它的文件描述符保存在Looper類的成員變量mEpollFd中,同時還將一個管道的讀端文件描述符注冊在它里面,以便可以監(jiān)聽這個管道的IO寫事件,pollInner內(nèi)部調(diào)用epoll_wait來監(jiān)聽注冊在前面所創(chuàng)建的epoll實例的文件描述符的IO讀寫事件。如果這些文件描述符都沒有發(fā)生IO讀寫事件,那么當(dāng)前線程就會在函數(shù)epoll_wait中進(jìn)入睡眠等待狀態(tài),等待的時間由timeoutMillis決定。

從函數(shù)epoll_wait返回來之后,通過for循環(huán)檢測哪個文件描述符發(fā)生了IO讀寫事件。如果發(fā)生IO讀寫事件的文件描述符是當(dāng)前線程所關(guān)聯(lián)的一個管道的讀端文件描述符mWakeReadPipeFd,并且它所發(fā)生的IO讀寫事件類型為EPOLLIN,那么說明其他線程向與當(dāng)前線程關(guān)聯(lián)的管道中寫入了一個新的數(shù)據(jù)。

int Looper::pollInner(int timeoutMillis) {
    .......
    // Poll.
    int result = ALOOPER_POLL_WAKE;
    mResponses.clear();
    mResponseIndex = 0;
    struct epoll_event eventItems[EPOLL_MAX_EVENTS];
    int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
    ......
    for (int i = 0; i < eventCount; i++) {
        int fd = eventItems[i].data.fd;
        uint32_t epollEvents = eventItems[i].events;
        if (fd == mWakeReadPipeFd) {
            if (epollEvents & EPOLLIN) {
                awoken();
            } else {
                ALOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents);
            }
        } else {
            ssize_t requestIndex = mRequests.indexOfKey(fd);
            if (requestIndex >= 0) {
                int events = 0;
                if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT;
                if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT;
                if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR;
                if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP;
                pushResponse(events, mRequests.valueAt(requestIndex));
            } else {
                ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
                        "no longer registered.", epollEvents, fd);
            }
        }
    }
    ......
    return result;
}
#2.2.7 :Looper.awoken

awoken()中調(diào)用read()函數(shù)將與當(dāng)前線程所關(guān)聯(lián)的一個管道的數(shù)據(jù)讀出來。線程并不關(guān)心管道中具體的數(shù)據(jù)到底是什么,它只是簡單的將數(shù)據(jù)讀取出來,以便可以清理這個管道中的舊數(shù)據(jù)。這樣當(dāng)前線程在下一次消息循環(huán)時,如果沒有新的消息處理,那么它就可以通過監(jiān)聽這個管道的IO寫事件進(jìn)入睡眠等待狀態(tài),直到其他線程向它的消息隊列發(fā)送了一個新的消息為止。

void Looper::awoken() {
#if DEBUG_POLL_AND_WAKE
    ALOGD("%p ~ awoken", this);
#endif
    char buffer[16];
    ssize_t nRead;
    do {
        nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer));
    } while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer));
}
#2.3 線程消息發(fā)送過程

Android系統(tǒng)提供了Handler類,用來向一個線程的消息隊列發(fā)送消息,Handler內(nèi)部有mLooper和mQueue兩個成員變量。它們分別指向一個Looper對象和一個MessageQueue對象。Handler類還有sendMessage和handleMessage兩個成員函數(shù),其中,sendMessage()用來向成員變量mQueue所描述的消息隊列中發(fā)送消息,handleMessage()用來處理這個消息,并且它是在與成員變量mLooper所關(guān)聯(lián)的線程中被調(diào)用。

graph TD
A(Start)-->B[Handler.sendMessage]
B-->C[MessageQueue.enqueueMessage]
C-->D[MessageQueue.nativeWake]
D-->E[NativeMessageQueue.wake]
E-->F[Looper.wake]
F-->H(End)
Handler
public class Handler {
    /*
     * Set this flag to true to detect anonymous, local or member classes
     * that extend this Handler class and that are not static. These kind
     * of classes can potentially create leaks.
     */
    private static final boolean FIND_POTENTIAL_LEAKS = false;
    private static final String TAG = "Handler";

    /**
     * Callback interface you can use when instantiating a Handler to avoid
     * having to implement your own subclass of Handler.
     *
     * @param msg A {@link android.os.Message Message} object
     * @return True if no further handling is desired
     */
    public interface Callback {
        public boolean handleMessage(Message msg);
    }
    
    /**
     * Subclasses must implement this to receive messages.
     */
    public void handleMessage(Message msg) {
    }
    
    /**
     * Handle system messages here.
     */
    public void dispatchMessage(Message msg) {
        if (msg.callback != null) {
            handleCallback(msg);
        } else {
            if (mCallback != null) {
                if (mCallback.handleMessage(msg)) {
                    return;
                }
            }
            handleMessage(msg);
        }
    }


    /**
     * Use the provided {@link Looper} instead of the default one and take a callback
     * interface in which to handle messages.  Also set whether the handler
     * should be asynchronous.
     *
     * Handlers are synchronous by default unless this constructor is used to make
     * one that is strictly asynchronous.
     *
     * Asynchronous messages represent interrupts or events that do not require global ordering
     * with respect to synchronous messages.  Asynchronous messages are not subject to
     * the synchronization barriers introduced by {@link MessageQueue#enqueueSyncBarrier(long)}.
     *
     * @param looper The looper, must not be null.
     * @param callback The callback interface in which to handle messages, or null.
     * @param async If true, the handler calls {@link Message#setAsynchronous(boolean)} for
     * each {@link Message} that is sent to it or {@link Runnable} that is posted to it.
     *
     * @hide
     */
    public Handler(Looper looper, Callback callback, boolean async) {
        mLooper = looper;
        mQueue = looper.mQueue;
        mCallback = callback;
        mAsynchronous = async;
    }
    
    ......
    
    private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
        msg.target = this;
        if (mAsynchronous) {
            msg.setAsynchronous(true);
        }
        return queue.enqueueMessage(msg, uptimeMillis);
    }

    ......
    
    final Looper mLooper;
    final MessageQueue mQueue;
    final Callback mCallback;
}
#2.3.1 Handler.sendMessage
   /**
     * Pushes a message onto the end of the message queue after all pending messages
     * before the current time. It will be received in {@link #handleMessage},
     * in the thread attached to this handler.
     *
     * @return Returns true if the message was successfully placed in to the
     * message queue.  Returns false on failure, usually because the
     * looper processing the message queue is exiting.
     */
    public final boolean sendMessage(Message msg) {
        return sendMessageDelayed(msg, 0);
    }
    
        /**
     * Enqueue a message into the message queue after all pending messages
     * before (current time + delayMillis). You will receive it in
     * {@link #handleMessage}, in the thread attached to this handler.
     *
     * @return Returns true if the message was successfully placed in to the
     * message queue.  Returns false on failure, usually because the
     * looper processing the message queue is exiting.  Note that a
     * result of true does not mean the message will be processed -- if
     * the looper is quit before the delivery time of the message
     * occurs then the message will be dropped.
     */
    public final boolean sendMessageDelayed(Message msg, long delayMillis) {
        if (delayMillis < 0) {
            delayMillis = 0;
        }
        return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
    }
    
        /**
     * Enqueue a message into the message queue after all pending messages
     * before the absolute time (in milliseconds) <var>uptimeMillis</var>.
     * <b>The time-base is {@link android.os.SystemClock#uptimeMillis}.</b>
     * Time spent in deep sleep will add an additional delay to execution.
     * You will receive it in {@link #handleMessage}, in the thread attached
     * to this handler.
     *
     * @param uptimeMillis The absolute time at which the message should be
     *                     delivered, using the
     *                     {@link android.os.SystemClock#uptimeMillis} time-base.
     * @return Returns true if the message was successfully placed in to the
     * message queue.  Returns false on failure, usually because the
     * looper processing the message queue is exiting.  Note that a
     * result of true does not mean the message will be processed -- if
     * the looper is quit before the delivery time of the message
     * occurs then the message will be dropped.
     */
    public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
        MessageQueue queue = mQueue;
        if (queue == null) {
            RuntimeException e = new RuntimeException(
                    this + " sendMessageAtTime() called with no mQueue");
            Log.w("Looper", e.getMessage(), e);
            return false;
        }
        return enqueueMessage(queue, msg, uptimeMillis);
    }
    
    private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis){
        msg.target = this;
        if (mAsynchronous) {
            msg.setAsynchronous(true);
        }
        return queue.enqueueMessage(msg, uptimeMillis);
    }

#2.3.2 MessageQueue.enqueueMessage

由于一個消息隊列中的消息是按照它們處理的時間從小到大的順序來排列的,因此,當(dāng)我們將一個消息發(fā)送到消息隊列時,需要根據(jù)這個消息的處理時間找到它在目標(biāo)消息隊列的合適位置,然后再將它插入到目標(biāo)消息隊列中。

以下分4類情況來分析如何將一個消息插入到一個目標(biāo)消息隊列中:

  1. 目標(biāo)消息隊列為一個空隊列[p==null]。
  2. 插入消息的處理時間等于0[when==0]。
  3. 插入消息的處理時間小于保存在目標(biāo)消息隊列對頭的消息的處理時間[when<p.when]。
  4. 插入消息的處理時間大于等于保存在目標(biāo)消息隊列頭的消息的處理時間。

對于前面的三種情況需要將消息保存在消息隊列的頭部,對于最后一種情況需要將消息插入到消息隊列中合適的位置上,通過遍歷鏈接,逐個與結(jié)點中的時間進(jìn)行對比,直到找到最佳位置。

[NT] 如果被插入的消息的處理時間與目標(biāo)消息隊列中的某一個消息的處理時間相同,那么后來插入的消息會被保存在后面,這樣可以保證先發(fā)送的消息可以先獲得處理。

一個線程將一個消息插入到消息隊列之后,可能需要將目標(biāo)線程喚醒,這需要分兩種情況:

  1. 插入的消息在目標(biāo)消息中間。
  2. 插入的消息在目標(biāo)消息隊列頭部。

對于第一種情況,由于保存在消息隊列頭部的消息沒有任何變化,因此,當(dāng)前線程無論如何都不需要對目標(biāo)線程執(zhí)行喚醒操作。

對于第二種情況,由于保存在目標(biāo)消息隊列的頭部消息發(fā)生了變化,因此當(dāng)前線程需要喚醒目標(biāo)線程,以便它可以對保存在目標(biāo)消息隊列頭部的新消息進(jìn)行處理。但是如果這時目標(biāo)線程不是正處于睡眠等待狀態(tài),那么當(dāng)前線程就不需要對它執(zhí)行喚醒操作。當(dāng)前正在處理的MessageQueue對象成員mBlocked記錄了目標(biāo)線程是否正處于睡眠等待狀態(tài)。如果mBlocked==true,那么就表示目標(biāo)線程正處于休眠狀態(tài),這時候當(dāng)前線程需要將目標(biāo)線程喚醒。

boolean enqueueMessage(Message msg, long when) {
    if (msg.target == null) {
        throw new IllegalArgumentException("Message must have a target.");
    }
    if (msg.isInUse()) {
        throw new IllegalStateException(msg + " This message is already in use.");
    }

    synchronized (this) {
        if (mQuitting) {
            IllegalStateException e = new IllegalStateException(
                    msg.target + " sending message to a Handler on a dead thread");
            Log.w(TAG, e.getMessage(), e);
            msg.recycle();
            return false;
        }

        msg.markInUse();
        msg.when = when;
        Message p = mMessages;
        boolean needWake;
        if (p == null || when == 0 || when < p.when) {
            // New head, wake up the event queue if blocked.
            msg.next = p;
            mMessages = msg;
            needWake = mBlocked;
        } else {
            // Inserted within the middle of the queue.  Usually we don't have to wake
            // up the event queue unless there is a barrier at the head of the queue
            // and the message is the earliest asynchronous message in the queue.
            needWake = mBlocked && p.target == null && msg.isAsynchronous();
            Message prev;
            for (;;) {
                prev = p;
                p = p.next;
                if (p == null || when < p.when) {
                    break;
                }
                if (needWake && p.isAsynchronous()) {
                    needWake = false;
                }
            }
            msg.next = p; // invariant: p == prev.next
            prev.next = msg;
        }

        // We can assume mPtr != 0 because mQuitting is false.
        if (needWake) {
            nativeWake(mPtr);
        }
    }
    return true;
}
#2.3.3 MessageQueue.nativeWake

將保存在MessageQueue中的mPtr傳給nativeWake(),方法內(nèi)部直接將ptr轉(zhuǎn)換為NativeMessageQueue對象指針。然后調(diào)用NativeMessageQueue的wake方法。

static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->wake();
}
#2.3.4 NativeMessageQueue.wake

NativeMessageQueue.wake()方法內(nèi)部調(diào)用了C++層Looper對象的wake()。

void NativeMessageQueue::wake() {
    mLooper->wake();
}
#2.3.5 Loper.wake

Looper.wake通過調(diào)用write(mWakeWritePipeFd, "W",1)向管道寫入一個數(shù)據(jù)。mWakeWritePipeFd代表的是管道的寫端文件描述符。

void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
    ALOGD("%p ~ wake", this);
#endif
    ssize_t nWrite;
    do {
        nWrite = write(mWakeWritePipeFd, "W", 1);
    } while (nWrite == -1 && errno == EINTR);
    if (nWrite != 1) {
        if (errno != EAGAIN) {
            ALOGW("Could not write wake signal, errno=%d", errno);
        }
    }
}
#2.4 線程消息處理過程

當(dāng)一個線程沒有新的消息需要處理時,它就會在C++層的Looper類的成員函數(shù)pollInner中進(jìn)入睡眠等待狀態(tài),因此,當(dāng)這個線程有新的消息需要處理時,它首先會在C++層的Looper類的成員函數(shù)pollInner中喚醒,然后沿著之前的調(diào)用路徑一直返回到Java層的Looper類的靜態(tài)函數(shù)loop中,最后對消息進(jìn)行處理。

graph TD
A(Start)-->B[Looper.loop]
B-->C[Handler.dispatchMessage]
C-->D[End]
#2.4.1 Looper.loop
    public static void loop() {
        final Looper me = myLooper();
        if (me == null) {
            throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
        }
        final MessageQueue queue = me.mQueue;

        // Make sure the identity of this thread is that of the local process,
        // and keep track of what that identity token actually is.
        Binder.clearCallingIdentity();
        final long ident = Binder.clearCallingIdentity();

        for (;;) {
            Message msg = queue.next(); // might block
            if (msg == null) {
                // No message indicates that the message queue is quitting.
                return;
            }
        
            ......
            try {
                msg.target.dispatchMessage(msg);
            } finally {
                if (traceTag != 0) {
                    Trace.traceEnd(traceTag);
                }
            }
            ......

            msg.recycleUnchecked();
        }
    }

#2.4.2 Handler.dispatchMessage

Handler類的成員函數(shù)dispatchMessage按照以下順序分發(fā)消息:

  1. 如果要處理的消息指定了回調(diào)接口,即msg.callback!=null為true,就會調(diào)用Handler類的成員函數(shù)來處理消息,Message中的成員變量callback是一個runnable對象。
  2. 如果條件1不滿足,并且負(fù)責(zé)分發(fā)消息的Handler的成員變量mCallback指向了一個回調(diào)接口,即mCallback!=null為true,就會回調(diào)接口的成員函數(shù)handleMessage來處理此消息。
  3. 如果條件2不滿足,或者負(fù)責(zé)分發(fā)消息的Handler的成員變量mCallback所指向的回調(diào)接口不處理消息,則消息將最后交給Handler的成員函數(shù)handleMessage來處理消息。handleMessage為空實現(xiàn),則實際上消息將交給Handler的子類去處理。
/**
 * Handle system messages here.
 */
public void dispatchMessage(Message msg) {
    if (msg.callback != null) {
        handleCallback(msg);
    } else {
        if (mCallback != null) {
            if (mCallback.handleMessage(msg)) {
                return;
            }
        }
        handleMessage(msg);
    }
}

private static void handleCallback(Message message) {
    message.callback.run();
}

/**
 * Subclasses must implement this to receive messages.
 */
public void handleMessage(Message msg) {
}

    /**
 * Callback interface you can use when instantiating a Handler to avoid
 * having to implement your own subclass of Handler.
 *
 * @param msg A {@link android.os.Message Message} object
 * @return True if no further handling is desired
 */
public interface Callback {
    public boolean handleMessage(Message msg);
}

#3. 總結(jié)

  1. 談?wù)勏C(jī)制Handler?作用?有哪些要素?流程是怎樣的?
  2. 一個Thread可以有幾個Looper?幾個Handler?
  3. 如何將一個Thread線程變成Looper線程?Looper線程有哪些特點?
  4. 可以在子線程直接new一個Handler嗎?那該怎么做?
  5. Message可以如何創(chuàng)建?哪種效果更好,為什么?
  6. 這里的ThreadLocal有什么作用?
  7. 主線程中Looper的輪詢死循環(huán)為何沒有阻塞主線程?
  8. 使用Hanlder的postDealy()后消息隊列會發(fā)生什么變化?

學(xué)習(xí)參考鏈接

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容