由Handler、MessageQueue、Looper構(gòu)成的線程消息通信機(jī)制在Android開發(fā)中非常常用,不過(guò)大部分人都只粗淺地看了Java層的實(shí)現(xiàn),對(duì)其中的細(xì)節(jié)不甚了了,這篇博文將研究Android消息機(jī)制從Java層到Native層的實(shí)現(xiàn)。
消息機(jī)制由于更貼近抽象設(shè)計(jì),所以整個(gè)結(jié)構(gòu)更簡(jiǎn)單,只包含了消息的產(chǎn)生、分發(fā),不像Input子系統(tǒng)那樣還有歸類、過(guò)濾等環(huán)節(jié)。整體的結(jié)構(gòu)如下圖:

Android Java層消息機(jī)制
消息的產(chǎn)生
在Java層中消息的產(chǎn)生都來(lái)源于用戶創(chuàng)建的Message對(duì)象,經(jīng)過(guò)封裝的Runnable對(duì)象,或調(diào)用obtainMessage從Message Pool中獲得,Message Pool指的是Message類內(nèi)的Message循環(huán)隊(duì)列,隊(duì)頭是靜態(tài)的Message對(duì)象sPool,該隊(duì)列最大容納MAX_POOL_SIZE(50)個(gè)Message:

MessagePool對(duì)Message的復(fù)用節(jié)省了不斷創(chuàng)建Message帶來(lái)的開銷,如果當(dāng)前50個(gè)Message都已經(jīng)被用過(guò),由于MessagePool是循環(huán)隊(duì)列,則會(huì)回到隊(duì)頭并請(qǐng)空該Message,向下復(fù)用。
BlockingRunnable
看Java層Handler的源碼的時(shí)候發(fā)現(xiàn)了一個(gè)奇怪的東西:BlockingRunnable,基本上沒有用過(guò)的東西,也沒看別人講過(guò),于是我就來(lái)鉆研一下吧:
private static final class BlockingRunnable implements Runnable {
private final Runnable mTask;
private boolean mDone;
public BlockingRunnable(Runnable task) {
mTask = task;
}
@Override
public void run() {
try {
mTask.run();
} finally {
synchronized (this) {
mDone = true;
notifyAll();
}
}
}
public boolean postAndWait(Handler handler, long timeout) {
if (!handler.post(this)) {
return false;
}
synchronized (this) {
if (timeout > 0) {
final long expirationTime = SystemClock.uptimeMillis() + timeout;
while (!mDone) {
long delay = expirationTime - SystemClock.uptimeMillis();
if (delay <= 0) {
return false; // timeout
}
try {
wait(delay);
} catch (InterruptedException ex) {
}
}
} else {
while (!mDone) {
try {
wait();
} catch (InterruptedException ex) {
}
}
}
}
return true;
}
}
我們可以看到,BlockingRunnable是一個(gè)“包裹”構(gòu)造方法中傳入的Runnable的Runnable,調(diào)用BlockingRunnable的postAndWait會(huì)做以下事情:
- 如果投遞BlockingRunnable失敗,返回false
- 鎖住投遞BlockingRunnable的線程
- 如果timeout大于0,計(jì)算參數(shù)Runnable的到期時(shí)間,只要參數(shù)Runnable還沒處理完,則一直輪詢還剩多少時(shí)間,并調(diào)用wait(delay)讓投遞BlockingRunnable的線程繼續(xù)等待,直參數(shù)Runnable處理完(mDone為true)這個(gè)過(guò)程才結(jié)束
- 如果timeout小于等于0,而且參數(shù)Runnable還沒處理完,則一直等待直到參數(shù)Runnable處理完(mDone為true)
這個(gè)東西的說(shuō)明書和使用風(fēng)險(xiǎn)可以在runWithScissors方法的注釋里看到,我在這里就不當(dāng)翻譯工了。
消息的投遞和處理
得到Message后,就會(huì)通過(guò)Handler的sendMessageAtTime調(diào)用MessageQueue的enqueueMessage將Message投遞到MessageQueue中,在往下學(xué)習(xí)之前必須先了解Handler的創(chuàng)建,因?yàn)楹竺娴闹R(shí)和它有關(guān)聯(lián)。
Handler的創(chuàng)建和初始化
其實(shí)Handler的初始化沒什么好看的,就是保存Callback、mLooper的MessageQueue的引用,以及聲明Handler是否異步投遞所有Message。但是里面有一個(gè)內(nèi)存泄露的檢查,可以學(xué)習(xí)一下,就是如果打開了FIND_POTENTIAL_LEAKS,就會(huì)進(jìn)行內(nèi)存泄露的檢查,它會(huì)做以下事情:
- 獲取當(dāng)前Handler類
- 如果Handler是匿名內(nèi)部類,或成員類,或局部類,且Handler的修飾符不是static
- 那么就會(huì)打出log提示可能會(huì)發(fā)生內(nèi)存泄露
public Handler(Callback callback, boolean async) {
if (FIND_POTENTIAL_LEAKS) {
final Class<? extends Handler> klass = getClass();
if ((klass.isAnonymousClass() || klass.isMemberClass() || klass.isLocalClass()) &&
(klass.getModifiers() & Modifier.STATIC) == 0) {
Log.w(TAG, "The following Handler class should be static or leaks might occur: " +
klass.getCanonicalName());
}
}
mLooper = Looper.myLooper();
if (mLooper == null) {
throw new RuntimeException(
"Can't create handler inside thread that has not called Looper.prepare()");
}
mQueue = mLooper.mQueue;
mCallback = callback;
mAsynchronous = async;
}
public static @Nullable Looper myLooper() {
return sThreadLocal.get();
}
既然Handler的創(chuàng)建這么簡(jiǎn)單,為什么說(shuō)后面要學(xué)習(xí)的內(nèi)容和它相關(guān)呢?原因就出在Looper中,我們可以看到Looper是通過(guò)sThreadLocal返回的,這個(gè)ThreadLocal是什么呢?
ThreadLocal - 維持線程內(nèi)對(duì)象的唯一性
ThreadLocal是一個(gè)關(guān)于創(chuàng)建線程局部變量的類。
通常情況下,我們創(chuàng)建的變量是可以被任何一個(gè)線程訪問(wèn)并修改的。而使用ThreadLocal創(chuàng)建的變量只能被當(dāng)前線程訪問(wèn),其他線程則無(wú)法訪問(wèn)和修改。它的實(shí)現(xiàn)原理如下:

如圖所示,ThreadLocalRef其實(shí)是同一個(gè)ThreadLocal對(duì)象的引用,為了不讓線看起來(lái)很亂我分別用了兩個(gè)方塊表示ThreadLocal對(duì)象,但其實(shí)是同一個(gè)對(duì)象。ThreadLocal同時(shí)是ThreadA、ThreadB甚至ThreadN內(nèi)ThreadLocalMap的Key,但取出來(lái)的對(duì)象時(shí)不一樣的,因?yàn)镸ap不一樣對(duì)應(yīng)的鍵值對(duì)也不一樣嘛。
ThreadLocalMap
ThreadLocalMap是僅用于維護(hù)ThreadLocal值的自定義HashMap,只在Thread類內(nèi)使用。為了避免ThreadLocalMap的Key->ThreadLocal在GC時(shí)無(wú)法被回收,里邊的元素都是用WeakReference封裝的。ThreadLocalMap除了這點(diǎn)以外,沒有什么特別的,就不細(xì)講了。
需要注意的一點(diǎn)是:ThreadLocalMap是可能帶來(lái)內(nèi)存泄露的,但root cause不是ThreadLocalMap本身,而是代碼質(zhì)量不夠高。首先,由于作為Map的Key的ThreadLocal是弱引用,那么GC時(shí)ThreadLocal會(huì)被回收,此時(shí)Map內(nèi)存在一對(duì)Key為null的鍵值對(duì),而Value仍然被線程強(qiáng)引用著,那么如果用完ThreadLocal后不主動(dòng)移除,就會(huì)內(nèi)存泄露了。但事實(shí)上,ThreadLocal用完后主動(dòng)調(diào)remove就能規(guī)避這個(gè)問(wèn)題,本來(lái)也該這樣做。
Entry
Entry作為ThreadLocalMap的元素,表示的是一對(duì)鍵值對(duì):ThreadLocal的弱引用為鍵,將要用ThreadLocal存儲(chǔ)的對(duì)象為值。
static class Entry extends WeakReference<ThreadLocal> {
/** The value associated with this ThreadLocal. */
Object value;
Entry(ThreadLocal k, Object v) {
super(k);
value = v;
}
}
ThreadLocal總結(jié)
換句話說(shuō),所謂的不可被其他線程修改的局部變量,表示的是:每個(gè)線程中都會(huì)維護(hù)一個(gè)ThreadLocalMap,里邊以ThreadLocal為鍵,對(duì)應(yīng)的局部變量為值,通過(guò)鍵值對(duì)來(lái)控制訪問(wèn)和數(shù)據(jù)的一致性,而不是通過(guò)鎖來(lái)控制。
Looper
既然一個(gè)線程只有一個(gè)Looper,那么Looper里面有什么呢?從源碼可以看到,Looper的構(gòu)造方法是私有的,也就意味著獲得Looper對(duì)象基本都是單例,這一點(diǎn)和線程<->Looper的一對(duì)一映射關(guān)系切合。
private Looper(boolean quitAllowed) {
mQueue = new MessageQueue(quitAllowed);
mThread = Thread.currentThread();
}
private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
sThreadLocal.set(new Looper(quitAllowed));
}
從Looper的成員變量我們可以知道Looper包含了以下東西:
- sMainLooper:應(yīng)用主線程的Looper,創(chuàng)建其他線程的Looper時(shí)為null
- mQueue:Looper關(guān)聯(lián)的MessageQueue
- mThread:Looper關(guān)聯(lián)的線程
- sThreadLocal:線程局部變量的Key
從這可以知道,一個(gè)線程對(duì)應(yīng)一個(gè)Looper,一個(gè)Looper對(duì)應(yīng)一個(gè)MessageQueue
----------------分割線,接下來(lái)回到消息的投遞結(jié)束的地方----------------
得到Message后,就會(huì)通過(guò)Handler的sendMessageAtTime調(diào)用MessageQueue的enqueueMessage將Message投遞到MessageQueue中,在往下學(xué)習(xí)之前必須先了解Handler的創(chuàng)建,因?yàn)楹竺娴闹R(shí)和它有關(guān)聯(lián)。
現(xiàn)在我們知道Message將要投遞到哪里的MessageQueue里了,那么投遞過(guò)去之后,消息是怎么被處理的呢?這代碼很長(zhǎng),而且就是個(gè)進(jìn)入隊(duì)列的過(guò)程,我就不貼了,做了以下事情:
- 合法性檢查
- 標(biāo)記Message正在使用
- 入列
- 喚醒native的MessageQueue
在這里有個(gè)有意思的概念必須提一下,就是Barrier Message,它表示的是一種柵欄的概念,將它加入MessageQueue可以攔住所有執(zhí)行時(shí)間在它之后的同步Message,異步Message則不受影響,遍歷到就會(huì)處理,這種狀況會(huì)持續(xù)到把Barrier Message移除。

提示:圖里綠色代表Message可以被取出執(zhí)行,紅色表示無(wú)法被取出執(zhí)行
它和Message的根本差別是,他沒有target,即:沒有處理該Message的Handler,但我們自己將Message的Handler設(shè)為null是沒法加入MessageQueue的,必須調(diào)用postSyncBarrier方法:
boolean enqueueMessage(Message msg, long when) {
if (msg.target == null) {
throw new IllegalArgumentException("Message must have a target.");
}
……
}
private int postSyncBarrier(long when) {
// Enqueue a new sync barrier token.
// We don't need to wake the queue because the purpose of a barrier is to stall it.
synchronized (this) {
final int token = mNextBarrierToken++;
final Message msg = Message.obtain();
msg.markInUse();
msg.when = when;
msg.arg1 = token;
Message prev = null;
Message p = mMessages;
if (when != 0) {
while (p != null && p.when <= when) {
prev = p;
p = p.next;
}
}
if (prev != null) { // invariant: p == prev.next
msg.next = p;
prev.next = msg;
} else {
msg.next = p;
mMessages = msg;
}
return token;
}
}
消息的分發(fā)
前面已經(jīng)知道Message投遞后就會(huì)到達(dá)MessageQueue,接下來(lái)就看消息是怎么被遍歷處理的。首先要知道的一點(diǎn)是,Looper在調(diào)用prepare創(chuàng)建后,是必須調(diào)loop()方法的,很多人會(huì)問(wèn),我平常用的時(shí)候沒用loop()方法也沒問(wèn)題啊。那是因?yàn)槟闶窃谥骶€程用的,主線程在創(chuàng)建Looper的時(shí)候已經(jīng)調(diào)用過(guò)loop()方法了。
我們創(chuàng)建了其他線程的Looper后,調(diào)loop()方法會(huì)做以下事情:
- 循環(huán)獲取MessageQueue中的Message
- 將Message通過(guò)Handler的dispatchMessage方法分發(fā)到對(duì)應(yīng)的Handler中
- 將Message的信息清空,回收到Message Pool中等待下一次使用
public static void loop() {
……
for (;;) {
Message msg = queue.next(); // might block
……
try {
msg.target.dispatchMessage(msg);
} finally {
……
}
……
msg.recycleUnchecked();
}
}
在Handler的dispatchMessage中,對(duì)Message的處理其實(shí)是有優(yōu)先順序這個(gè)說(shuō)法的:
- 如果Message設(shè)置了callback,則將Message交給Message的callback處理
- 如果Handler設(shè)置了callback,則將Message先交給Handler的callback處理
- 否則的話,將Message交給Handler的handleMessage處理
public void dispatchMessage(Message msg) {
if (msg.callback != null) {
handleCallback(msg);
} else {
if (mCallback != null) {
if (mCallback.handleMessage(msg)) {
return;
}
}
handleMessage(msg);
}
}
對(duì)于MessageQueue,它實(shí)際表示了Java層和Native層的MessageQueue,Java層的MessageQueue就是mMessages表示的循環(huán)隊(duì)列,Native層的MessageQueue就是mPtr。它的next()方法里做的事情如下:
- 調(diào)用nativePollOnce讓native層的MessageQueue先處理Native層的Message,再處理Java層的Message,這個(gè)過(guò)程可能阻塞
- 如果在按時(shí)序遍歷MessageQueue的過(guò)程中發(fā)現(xiàn)了Barrier Message,即handler為空的Message,則跳過(guò)它后面的所有同步Message,只處理異步Message
- 如果消息是延時(shí)消息,計(jì)算當(dāng)前時(shí)間和目標(biāo)時(shí)間的差值,休眠這個(gè)時(shí)間差后再去取這個(gè)Message
- 如果消息不是延時(shí)消息,在Message Pool里標(biāo)記該Message正在使用,并返回它
Java層Android消息機(jī)制的整個(gè)過(guò)程可以用下圖概括:

有鉆研過(guò)Java層代碼的朋友肯定知道,Handler里面還有個(gè)用于跨進(jìn)程Message通信的MessengerImpl,這個(gè)東西我就不在這里說(shuō)了,因?yàn)樗褪莻€(gè)簡(jiǎn)單的跨進(jìn)程通信,和整個(gè)Handler、Looper、MessageQueue其實(shí)關(guān)系不大。
Android Native層消息機(jī)制
Android消息機(jī)制在Native層其實(shí)和Java層很相似,保留了Handler、Looper、MessageQueue的結(jié)構(gòu)。但是Native層Message、Handler、MessageQueue的概念被弱化得很厲害,基本上只是個(gè)“空殼”,核心邏輯都在Looper里邊了。
其他區(qū)別都不大了,只是在實(shí)現(xiàn)上有一點(diǎn)不一樣,具體的差別就在源碼中找答案吧。整體結(jié)構(gòu)圖如下:

消息的產(chǎn)生
在Native層中,消息由MessageEnvelope和封裝fd(Java層Handler可以添加fd的監(jiān)聽、Native當(dāng)然也可以)相關(guān)信息后得到的epoll_event組成。
fd
對(duì)于要被監(jiān)聽的fd的消息,Looper做了以下事情:
- 合法性檢查
- 將相關(guān)信息封裝到Request中,并初始化為epoll_event
- 將該fd以及要監(jiān)聽的epoll_event事件(步驟2轉(zhuǎn)換Request得到)注冊(cè)到當(dāng)前Looper的epollFd中
- 如果出錯(cuò),進(jìn)行出錯(cuò)處理
- 更新mRequests
int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) {
……
{ // acquire lock
AutoMutex _l(mLock);
……
struct epoll_event eventItem;
request.initEventItem(&eventItem);
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex < 0) {
int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);
……
mRequests.add(fd, request);
} else {
int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem);
……
mRequests.replaceValueAt(requestIndex, request);
}
} // release lock
return 1;
}
MessageEnvelope
MessageEnvelope相對(duì)于fd就簡(jiǎn)單多了,在調(diào)用Native層Looper的sendMessage相關(guān)函數(shù)時(shí)會(huì)將uptime、MessageHandler、Native層Message封裝到MessageEnvelope中,然后插入mMessageEnvelopes中。
void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler, const Message& message) {
……
size_t i = 0;
{ // acquire lock
AutoMutex _l(mLock);
size_t messageCount = mMessageEnvelopes.size();
while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
i += 1;
}
MessageEnvelope messageEnvelope(uptime, handler, message);
mMessageEnvelopes.insertAt(messageEnvelope, i, 1);
……
}
……
}
消息的投遞和處理
前面已經(jīng)提到了,Java層的MessageQueue處理消息時(shí),會(huì)先調(diào)用Native層MessageQueue的nativePollOnce(),它實(shí)際調(diào)用的是native層MessageQueue的pollOnce(),而native的pollOnce調(diào)用的是Native層的Looper的pollOnce:
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jlong ptr, jint timeoutMillis) {
……
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
……
mLooper->pollOnce(timeoutMillis);
……
}
在看Native層Looper的pollOnce方法之前,先看看Native層的Looper和Java層的Looper會(huì)不會(huì)有一些不一樣吧。
Looper Native
和Java層Looper的使用一樣,Native層Looper也需要prepare,也是一個(gè)通過(guò)線程局部變量存儲(chǔ)的對(duì)象,一個(gè)線程只有一個(gè)。那么在Native層是怎么實(shí)現(xiàn)線程局部變量的呢?
Linux TSD(Thread-specific Data)池
Native層線程局部變量的思想和Java層很類似,Native層會(huì)維護(hù)一個(gè)全局的pthread_keys數(shù)組,用于存放線程局部變量的鍵。其中seq用于標(biāo)記是否"in_use",destr則是一個(gè)函數(shù)指針,可用作析構(gòu)函數(shù),在線程退出時(shí)釋放該鍵對(duì)應(yīng)于線程中的線程局部變量。
static struct pthread_key_struct pthread_keys[PTHREAD_KEYS_MAX] ={{0,NULL}};
int pthread_key_create(pthread_key_t *key, void (*destr_function) (void*));
struct pthread_key_struct
{
/* Sequence numbers. Even numbers indicated vacant entries. Note
that zero is even. We use uintptr_t to not require padding on
32- and 64-bit machines. On 64-bit machines it helps to avoid
wrapping, too. */
uintptr_t seq;
/* Destructor for the data. */
void (*destr) (void *);
};
pthread在創(chuàng)建線程時(shí)會(huì)維護(hù)一個(gè)指針數(shù)組,數(shù)組元素指向線程局部變量的數(shù)據(jù)塊。整體解構(gòu)如下圖:

創(chuàng)建Looper
創(chuàng)建Looper時(shí),會(huì)做以下事情:
- 通過(guò)eventfd創(chuàng)建mWakeEventFd用于線程間通信去喚醒Looper的,當(dāng)需要喚醒Looper時(shí),就往里面寫1
- 創(chuàng)建用于監(jiān)聽epoll_event的mEpollFd,并初始化mEpollFd要監(jiān)聽的epoll_event類型
- 通過(guò)epoll_ctl將mWakeEventFd注冊(cè)到mEpollFd中,當(dāng)mWakeEventFd有事件可讀則喚醒Looper
- 如果mRequests不為空的話,說(shuō)明前面注冊(cè)了有要監(jiān)聽的fd,則遍歷mRequests中的Request,將它初始化為epoll_event并通過(guò)epoll_ctl注冊(cè)到mEpollFd中,當(dāng)有可讀事件同樣喚醒Looper
Looper::Looper(bool allowNonCallbacks) :
mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),
mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
……
rebuildEpollLocked();
}
void Looper::rebuildEpollLocked() {
……
// Allocate the new epoll instance and register the wake pipe.
mEpollFd = epoll_create(EPOLL_SIZE_HINT);
……
struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
eventItem.events = EPOLLIN;
eventItem.data.fd = mWakeEventFd;
int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);
……
for (size_t i = 0; i < mRequests.size(); i++) {
const Request& request = mRequests.valueAt(i);
struct epoll_event eventItem;
request.initEventItem(&eventItem);
int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem);
……
}
}
pollOnce
對(duì)于Native層Looper的pollOnce,找它函數(shù)定義稍微有點(diǎn)隱秘,它在Looper.h中聲明,inline到pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData)函數(shù)里了,它做了以下事情:
- 優(yōu)先處理mResponses里的Response,即來(lái)自fd的事件
- 如果沒有需處理的Response,再調(diào)用pollInner
inline int pollOnce(int timeoutMillis) {
return pollOnce(timeoutMillis, NULL, NULL, NULL);
}
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
while (mResponseIndex < mResponses.size()) {
const Response& response = mResponses.itemAt(mResponseIndex++);
int ident = response.request.ident;
if (ident >= 0) {
……
return ident;
}
}
if (result != 0) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - returning result %d", this, result);
#endif
……
return result;
}
result = pollInner(timeoutMillis);
}
}
pollInner這個(gè)函數(shù)比較長(zhǎng),它做了以下事情:
- 基于下一個(gè)Message調(diào)整獲取Message的時(shí)間間隔timeoutMillis
- 清空mResponses
- 獲取epoll事件,即將要處理的Message
- 更新mPolling,防止進(jìn)入idle
- 執(zhí)行合法性檢查
- 如果epoll_event的fd為mWakeFd,說(shuō)明是Looper的喚醒事件,則喚醒Looper
- 否則先將epoll_event封裝為Request,更新epoll_event的事件類型,再封裝為Response裝入mResponses
- 循環(huán)取出mMessageEnvelopes隊(duì)頭的MessageEnvelope,并將MessageEnvelope中的Message交給對(duì)應(yīng)的Native層的Handler處理
- 循環(huán)調(diào)用mResponses中所有Response的callback
至此對(duì)Android消息機(jī)制的學(xué)習(xí)就結(jié)束啦。