MessageQueue, Looper源碼分析(Native層)

本源碼分析基于Android8.0

源碼目錄

Java層
framework/base/core/java/andorid/os/MessageQueue.java
framework/base/core/java/andorid/os/Looper.java

Native層
system/core/libutils/include/utils/RefBase.h
system/core/libutils/RefBase.cpp

framework/base/core/jni/android_os_MessageQueue.h
framework/base/core/jni/android_os_MessageQueue.cpp

system/core/libutils/include/utils/Looper.h
system/core/libutils/Looper.cpp 

framework/native/include/android/looper.h
framework/base/native/android/looper.cpp 

回顧

??在上一篇文章中,我們講解了Handler,Looper,MessageQueue的關(guān)系,其中在MessageQueue的next方法中有這樣一段代碼

Message next() {
        ....
        for (;;) {
            if (nextPollTimeoutMillis != 0) {
                Binder.flushPendingCommands();
            }

            nativePollOnce(ptr, nextPollTimeoutMillis);
        }
        ...
}

而添加消息入隊的時候,有這樣一段代碼

boolean enqueueMessage(Message msg, long when) {
        ...
        synchronized (this) {
                ...
                if (p == null || when == 0 || when < p.when) {
                        // New head, wake up the event queue if blocked.
                        msg.next = p;
                        mMessages = msg;
                        needWake = mBlocked;
                 }
                ...
                // We can assume mPtr != 0 because mQuitting is false.
                if (needWake) {
                        nativeWake(mPtr);
                }        
        }
}

同樣Looper里面

public static void loop() {
        ...
        for (;;) {
            Message msg = queue.next(); // might block
        }
        ...
}

??通過以上三段代碼和注釋可以看出,添加消息的時候有可能在阻塞狀態(tài):即之前消息隊列為空,取消息的時候也可能在阻塞狀態(tài),為什么會這樣呢,阻塞不會導(dǎo)致ANR嗎?其實關(guān)鍵就在于兩個native方法身上nativePollOnce和nativeWake****
??** 它的本質(zhì)就是Linux的管道。管道,其本質(zhì)是也是文件,但又和普通的文件會有所不同:管道緩沖區(qū)大小一般為1頁,即4K字節(jié)。管道分為讀端和寫端,讀端負(fù)責(zé)從管道拿數(shù)據(jù),當(dāng)數(shù)據(jù)為空時則阻塞;寫端向管道寫數(shù)據(jù),當(dāng)管道緩存區(qū)滿時則阻塞。**

接下來我們進(jìn)入Native層

UML圖

uml.png

首先查看MessageQueue.java里面的native方法

    MessageQueue.java

    private native static long nativeInit();
    private native static void nativeDestroy(long ptr);
    private native void nativePollOnce(long ptr, int timeoutMillis); /*non-static for callbacks*/
    private native static void nativeWake(long ptr);
    private native static boolean nativeIsPolling(long ptr);
    private native static void nativeSetFileDescriptorEvents(long ptr, int fd, int events);

    //構(gòu)造函數(shù)
    MessageQueue(boolean quitAllowed) {
        mQuitAllowed = quitAllowed;
        mPtr = nativeInit();
    }

我們發(fā)現(xiàn)調(diào)用了nativeInit()方法,我們進(jìn)入native層看它做了什么

android_os_MessageQueue.cpp

static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
    NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
    if (!nativeMessageQueue) {
        jniThrowRuntimeException(env, "Unable to allocate native queue");
        return 0;
    }

    nativeMessageQueue->incStrong(env);
    return reinterpret_cast<jlong>(nativeMessageQueue);
}

它就是生成一個NativeMessageQueue()對象,那我們?nèi)タ礃?gòu)造函數(shù)做了什么

NativeMessageQueue::NativeMessageQueue() :
        mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
    mLooper = Looper::getForThread();
    if (mLooper == NULL) {
        mLooper = new Looper(false);
        Looper::setForThread(mLooper);
    }
}

發(fā)現(xiàn)它生成了Looper對象,這個是native層的,跟java層的Looper不一樣,它幾乎重寫了java層的Looper邏輯。

Looper::Looper(bool allowNonCallbacks) :
        mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
        mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),
        mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
    mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);//1
    LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0, "Could not make wake event fd: %s",
                        strerror(errno));

    AutoMutex _l(mLock);//2
    rebuildEpollLocked();//3
}

1,eventfd(),使用這個函數(shù)來創(chuàng)建一個事件對象,該函數(shù)返回一個文件描述符來代表這個事件對象,之后我們就用這個來調(diào)用對象;
2,AutoMutex _l(),給mLock對象加鎖;執(zhí)行完后自動釋放鎖,它的原理是利用了c++的構(gòu)造和析構(gòu)函數(shù)完成自動加鎖和放鎖。
3,rebuildEpollLocked(),重建epoll事件。

接下來看rebuildEpollLocked

void Looper::rebuildEpollLocked() {
    // Close old epoll instance if we have one.
    if (mEpollFd >= 0) {
#if DEBUG_CALLBACKS
        ALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set", this);
#endif
//關(guān)閉舊的epoll
        close(mEpollFd);
    }

    // Allocate the new epoll instance and register the wake pipe.
   //創(chuàng)建新的epoll并注冊管道,參數(shù)表示監(jiān)聽的文件描述符數(shù)目,它向內(nèi)核申請了一段內(nèi)存空間
    mEpollFd = epoll_create(EPOLL_SIZE_HINT);
    LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno));

    struct epoll_event eventItem;
    memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
    eventItem.events = EPOLLIN;
    eventItem.data.fd = mWakeEventFd;//把之前創(chuàng)建的mWakeEventFd賦給item
   //把之前生成的mWakeEventFd加入到 epoll,eventItem也加入epoll,這樣就能控制我們的mWakeEventFd所表示的對象了
    int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);
    LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance: %s",
                        strerror(errno));

    for (size_t i = 0; i < mRequests.size(); i++) {
        const Request& request = mRequests.valueAt(i);
        struct epoll_event eventItem;
        request.initEventItem(&eventItem);

        int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem);
        if (epollResult < 0) {
            ALOGE("Error adding epoll events for fd %d while rebuilding epoll set: %s",
                  request.fd, strerror(errno));
        }
    }
}

注意int epoll_ctl(int epfd, intop, int fd, struct epoll_event* event);
他是epoll的事件注冊函數(shù):
??第一個參數(shù)是epoll_create()的返回值,
??第二個參數(shù)表示動作,用三個宏來表示:
??EPOLL_CTL_ADD: 注冊新的fd到epfd中;
??EPOLL_CTL_MOD: 修改已經(jīng)注冊的fd的監(jiān)聽事件;
??EPOLL_CTL_DEL: 從epfd中刪除一個fd;
??第三個參數(shù)是需要監(jiān)聽的fd,
??第四個參數(shù)是告訴內(nèi)核需要監(jiān)聽什么事件

我們回到NativeMessageQueue::NativeMessageQueue()
它不是每次都生成新的Looper,而是保存到TSL中

void Looper::setForThread(const sp<Looper>& looper) {
    sp<Looper> old = getForThread(); // also has side-effect of initializing TLS

    if (looper != NULL) {
        looper->incStrong((void*)threadDestructor);
    }

    pthread_setspecific(gTLSKey, looper.get());

    if (old != NULL) {
        old->decStrong((void*)threadDestructor);
    }
}

??sp就類似于java的強(qiáng)引用,native層還有一個wp類似于java的弱引用,因為Android封裝了c++的對象回收機(jī)制,具體的可閱讀深入理解Android卷I相關(guān)源碼。
??TLS,即線程本地存儲(Thread Local Storage),可以對比理解為Java層的ThreadLocal,在單線程模式下,所有整個程序生命周期的變量都是只有一份,那是因為只是一個執(zhí)行單元;而在多線程模式下,有些變量需要支持每個線程獨(dú)享一份的功能。這種每個線程獨(dú)享的變量放到每個線程專有的存儲區(qū)域,所以稱為線程本地存儲(Thread Local Storage)或者線程私有數(shù)據(jù)(Thread Specific Data)。
??那么到這里初始化就完成了,即創(chuàng)建NativeMessageQueue,創(chuàng)建Looper并保存到TLS中,Looper里面創(chuàng)建了epoll,注冊了事件,之后我們就能收到回調(diào),這里可以對比理解為setOnclickListener。最后返回生成的NativeMessageQueue指針(jlong類型)給Java層,注意reinterpret_cast是c++的強(qiáng)轉(zhuǎn),通常將一個類型指針轉(zhuǎn)換為另一個類型指針 。

nativePollOnce()

在Looper的loop()死循環(huán)里面,會調(diào)用MessageQueue的next(),next()會調(diào)用nativePollOnce(),進(jìn)入native層:

static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
        jlong ptr, jint timeoutMillis) {
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}

這里傳入了一個參數(shù),就是剛剛調(diào)用nativeInit()得到的NativemessageQueue的jlong指針,再強(qiáng)轉(zhuǎn)回來,然后調(diào)用pollOnce方法

MessageQueue.cpp

void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
    mPollEnv = env;
    mPollObj = pollObj;
    mLooper->pollOnce(timeoutMillis);
    mPollObj = NULL;
    mPollEnv = NULL;

    if (mExceptionObj) {
        env->Throw(mExceptionObj);
        env->DeleteLocalRef(mExceptionObj);
        mExceptionObj = NULL;
    }
}

Looper.h

inline int pollOnce(int timeoutMillis) {
        return pollOnce(timeoutMillis, NULL, NULL, NULL);
}

Looper.cpp

int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
    int result = 0;
    for (;;) {
        while (mResponseIndex < mResponses.size()) {
            const Response& response = mResponses.itemAt(mResponseIndex++);
            int ident = response.request.ident;
            if (ident >= 0) {
                int fd = response.request.fd;
                int events = response.events;
                void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE
                ALOGD("%p ~ pollOnce - returning signalled identifier %d: "
                        "fd=%d, events=0x%x, data=%p",
                        this, ident, fd, events, data);
#endif
                if (outFd != NULL) *outFd = fd;
                if (outEvents != NULL) *outEvents = events;
                if (outData != NULL) *outData = data;
                return ident;
            }
        }

        if (result != 0) {
#if DEBUG_POLL_AND_WAKE
            ALOGD("%p ~ pollOnce - returning result %d", this, result);
#endif
            if (outFd != NULL) *outFd = 0;
            if (outEvents != NULL) *outEvents = 0;
            if (outData != NULL) *outData = NULL;
            return result;
        }

        result = pollInner(timeoutMillis);
    }
}

pollOnce的timeoutMillis就是我們java層設(shè)置的超時參數(shù),接下來調(diào)用pollInner


int Looper::pollInner(int timeoutMillis) {
    ...
    // Poll.
    int result = POLL_WAKE;
    mResponses.clear();
    mResponseIndex = 0;

    // We are about to idle.
    mPolling = true;

    struct epoll_event eventItems[EPOLL_MAX_EVENTS];
    //關(guān)鍵方法
    int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);

    // No longer idling.
    mPolling = false;

    // Acquire lock.
    mLock.lock();

    // Rebuild epoll set if needed.
    if (mEpollRebuildRequired) {
        mEpollRebuildRequired = false;
        rebuildEpollLocked();
        goto Done;
    }

    // Check for poll error.
    if (eventCount < 0) {
        if (errno == EINTR) {
            goto Done;
        }
        ALOGW("Poll failed with an unexpected error: %s", strerror(errno));
        result = POLL_ERROR;
        goto Done;
    }

    // Check for poll timeout.
    if (eventCount == 0) {
#if DEBUG_POLL_AND_WAKE
        ALOGD("%p ~ pollOnce - timeout", this);
#endif
        result = POLL_TIMEOUT;
        goto Done;
    }

    // Handle all events.
#if DEBUG_POLL_AND_WAKE
    ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
#endif

    for (int i = 0; i < eventCount; i++) {
        int fd = eventItems[i].data.fd;
        uint32_t epollEvents = eventItems[i].events;
        //找到我們注冊事件的文件描述符
        if (fd == mWakeEventFd) {
            if (epollEvents & EPOLLIN) {
                //從epoll_wait()里喚醒了,讀取管道內(nèi)容
                awoken();
            } else {
                ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
            }
        } else {
            ssize_t requestIndex = mRequests.indexOfKey(fd);
            if (requestIndex >= 0) {
                int events = 0;
                if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
                if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
                if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
                if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
                pushResponse(events, mRequests.valueAt(requestIndex));
            } else {
                ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
                        "no longer registered.", epollEvents, fd);
            }
        }
    }
Done: ;

    // Invoke pending message callbacks.
    mNextMessageUptime = LLONG_MAX;
    while (mMessageEnvelopes.size() != 0) {
        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
        const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
        if (messageEnvelope.uptime <= now) {
            // Remove the envelope from the list.
            // We keep a strong reference to the handler until the call to handleMessage
            // finishes.  Then we drop it so that the handler can be deleted *before*
            // we reacquire our lock.
            { // obtain handler
                sp<MessageHandler> handler = messageEnvelope.handler;
                Message message = messageEnvelope.message;
                mMessageEnvelopes.removeAt(0);
                mSendingMessage = true;
                mLock.unlock();

#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
                ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",
                        this, handler.get(), message.what);
#endif
                handler->handleMessage(message);
            } // release handler

            mLock.lock();
            mSendingMessage = false;
            result = POLL_CALLBACK;
        } else {
            // The last message left at the head of the queue determines the next wakeup time.
            mNextMessageUptime = messageEnvelope.uptime;
            break;
        }
    }

    // Release lock.
    mLock.unlock();

    // Invoke all response callbacks.
    for (size_t i = 0; i < mResponses.size(); i++) {
        Response& response = mResponses.editItemAt(i);
        if (response.request.ident == POLL_CALLBACK) {
            int fd = response.request.fd;
            int events = response.events;
            void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
            ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",
                    this, response.request.callback.get(), fd, events, data);
#endif
            // Invoke the callback.  Note that the file descriptor may be closed by
            // the callback (and potentially even reused) before the function returns so
            // we need to be a little careful when removing the file descriptor afterwards.
            int callbackResult = response.request.callback->handleEvent(fd, events, data);
            if (callbackResult == 0) {
                removeFd(fd, response.request.seq);
            }

            // Clear the callback reference in the response structure promptly because we
            // will not clear the response vector itself until the next poll.
            response.request.callback.clear();
            result = POLL_CALLBACK;
        }
    }
    return result;
}

最關(guān)鍵的方法就在epoll_wait()身上,這個方法會等待事件發(fā)生或者超時,在nativeWake()方法,向管道寫端寫入字符時,則該方法會返回,否則一直阻塞;注意result返回值有以下幾種類型:

??POLL_WAKE,初始化狀態(tài),它表示由管道寫入端觸發(fā),pipe write;
??POLL_ERROR阻塞等待期間發(fā)生錯誤,發(fā)生錯誤goto到Done處;
??POLL_TIMEOUT 發(fā)生超時;
??POLL_CALLBACK: 表示某個被監(jiān)聽的文件描述符被觸發(fā),比如我們nativeInit創(chuàng)建的mWakeEventFd;

當(dāng)喚醒后,要不斷的去從管道中讀取數(shù)據(jù),這時調(diào)用了awoken()方法

void Looper::awoken() {
#if DEBUG_POLL_AND_WAKE
    ALOGD("%p ~ awoken", this);
#endif

    uint64_t counter;
    TEMP_FAILURE_RETRY(read(mWakeEventFd, &counter, sizeof(uint64_t)));
}

很簡單,就是從管道里讀取內(nèi)容,這是我們已經(jīng)拿到Native層的Message了,在Done里面,我們會處理Message,并回調(diào)handler->handleMessage()方法,注意此handler非java層的handler,它是一個MessageHandler,Message等類在Looper.h中。

nativeWake()

接下來我們看是怎么喚醒的

android_os_MessageQueue.cpp

 static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->wake();
}

void NativeMessageQueue::wake() {
    mLooper->wake();
}

Looper.cpp

void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
    ALOGD("%p ~ wake", this);
#endif

    uint64_t inc = 1;
    ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
    if (nWrite != sizeof(uint64_t)) {
        if (errno != EAGAIN) {
            LOG_ALWAYS_FATAL("Could not write wake signal to fd %d: %s",
                    mWakeEventFd, strerror(errno));
        }
    }
}

就是調(diào)用write()向管道寫入一個整數(shù)1,TEMP_FAILURE_RETRY就是失敗不斷的重試,知道成功喚醒為止,成功寫入后,管道的另一端就會接收到,并從阻塞狀態(tài)結(jié)束,即從epoll_wait()返回,執(zhí)行它后面的代碼。

流程圖

liu.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容