生產(chǎn)者與消費(fèi)者
端午節(jié)將至,大家可能已經(jīng)安排好自己的行程,不久就將出發(fā),有做飛機(jī)前往目的地,也有做輪渡在近海游玩。設(shè)想,我們做飛機(jī)出游,只需按時抵達(dá)機(jī)場,在等候一段時間,自然有相應(yīng)的飛機(jī)帶我們前往心怡許久的地方。

仔細(xì)想想,你不需要關(guān)心是哪一趟航班將承擔(dān)此次的出行任務(wù),另一方面,出行的航班也不關(guān)心會有哪些旅客將要登記?;ハ嗖恢兰?xì)節(jié),卻能彼此很好的協(xié)作,這就是 生產(chǎn)者-消費(fèi)者 帶來的好處。
生產(chǎn)者-消費(fèi)者模式,在實(shí)際開發(fā)中極為常見,源于其主要的兩個好處。
- 解耦。這一點(diǎn)是核心好處,生產(chǎn)者和消費(fèi)者完全不用知道彼此的實(shí)現(xiàn)細(xì)節(jié),未嘗有利于獨(dú)立模塊的實(shí)現(xiàn)。
- 并發(fā)支持。這一點(diǎn)在處理耗時任務(wù)時,也經(jīng)常被用到。生產(chǎn)者和消費(fèi)者可以保持不同的頻率,可以單獨(dú)調(diào)整,以滿足實(shí)際的需要。
緩沖區(qū)
如果生產(chǎn)者在完成任務(wù)后,立即交給消費(fèi)者,那么兩者之間勢必是耦合的,這和普通的函數(shù)調(diào)用沒有什么區(qū)別。實(shí)現(xiàn)解耦,就得引入第三方,生產(chǎn)者和消費(fèi)者都和這個緩沖區(qū)打交道,而彼此互不知道,就類似于房產(chǎn)中介。
這個緩沖區(qū)才如何實(shí)現(xiàn)呢?由于涉及到并發(fā)的問題,這個緩沖區(qū)必須是線程安全的,生產(chǎn)者和消費(fèi)者都需要同時訪問,生產(chǎn)者往這個區(qū)域?qū)懭雰?nèi)容,而消費(fèi)者從這個區(qū)域里讀取內(nèi)容??梢詫C(jī)場理解為緩沖區(qū),乘客涌入機(jī)場,而飛機(jī)將乘客從機(jī)場帶入目的地。
對于這個緩沖區(qū),并沒有什么特別的要求,只需要實(shí)現(xiàn)put(T item) 和 T get() 兩個接口即可。java中常用 BlockingQueue 作為緩沖區(qū)。
Handler、Looper 和 MessageQueue機(jī)制講解
在起初接觸Android的時候,第一次用于做異步通信的方式,很可能就是 Hnadler 機(jī)制,其實(shí)從某種意義上而言,這種機(jī)制也是基于 生產(chǎn)者-消費(fèi)者 模式展開的。例如UI線程就是消費(fèi)者,在其他線程(生產(chǎn)者)上通過 Handler 將要執(zhí)行的 Callback ,遷移到 UI 線程上執(zhí)行。
MessageQueue 就是前文中提及的緩沖區(qū),這里是Android Framework對其的特殊實(shí)現(xiàn)。而生產(chǎn)者需要將任務(wù)提交給緩沖區(qū),而這個提交工作是由 post(Runnable runable) 或者 [postDelayed](https://developer.android.com/reference/android/os/Handler.html#postDelayed(java.lang.Runnable, long))(Runnable r, long delayMillis) 等post方法來執(zhí)行。而消費(fèi)者(主要是UI thread)通過 looper 不斷地從 MessageQueue 中取出任務(wù)再執(zhí)行。
簡要的示意圖如下:

Message 的定義
既然是使用的生產(chǎn)者-消費(fèi)者模式,那么生產(chǎn)和消費(fèi)的內(nèi)容是什么了?答案就是 Message?,F(xiàn)在看看 Message 中幾個常用的變量。
/**
* User-defined message code so that the recipient can identify
* what this message is about. Each {@link Handler} has its own name-space
* for message codes, so you do not need to worry about yours conflicting
* with other handlers.
*/
public int what;
/**
* arg1 and arg2 are lower-cost alternatives to using
* {@link #setData(Bundle) setData()} if you only need to store a
* few integer values.
*/
public int arg1;
/**
* arg1 and arg2 are lower-cost alternatives to using
* {@link #setData(Bundle) setData()} if you only need to store a
* few integer values.
*/
public int arg2;
/*package*/ Bundle data;
/*package*/ Handler target;
/*package*/ Runnable callback;
這里 what 類似于標(biāo)明 Message 的類型 Id,調(diào)用者可以通過這個 what 做出相應(yīng)的邏輯調(diào)整。arg1 arg2 以及后面的 object 是用作額外數(shù)據(jù)傳輸?shù)摹?而 target 則定義了是哪一個消費(fèi)者來處理哪一個 callback。為什么要使用一個 Target 變量來標(biāo)明是哪一個消費(fèi)者了?因?yàn)橐粋€ LooperThread 是允許存在多個 Handler 的,也就是多個消費(fèi)者,而這些消息都被放置到一個 MessageQueue 隊(duì)列中,target 就起到了區(qū)別它們的目的。callback 即實(shí)際要執(zhí)行的東西。
Message 同時提供了 obtain() 方法,不推薦使用 new Message() 的方法,而是重復(fù)回收利用 Message,和 ThreadPool 的原理類似。
MessageQueue
Android中的 MessageQueue 就是前文中提及的緩沖區(qū),Android Framework 對其做了一些 JNI 的調(diào)用,來進(jìn)行一些保護(hù)。這里的具體實(shí)現(xiàn)就不提及了,只需要知道線程安全,并提供了 Message next() 和 boolean enqueueMessage(Message msg, long when) 接口即可。

Looper 是什么?
前文提及的是,Looper 主要負(fù)責(zé)的工作是從 MessageQueue 中取出要執(zhí)行的任務(wù),也就是維護(hù)一個消息循環(huán),現(xiàn)在看看 Looper 具體是怎么運(yùn)作的。
class LooperThread extends Thread {
public Handler mHandler;
public void run() {
Looper.prepare();
mHandler = new Handler() {
public void handleMessage(Message msg) {
// process incoming messages here
}
};
Looper.loop();
}
}
這是常用的Looper 示例,通過Looper.prepare()進(jìn)行相應(yīng)的初始化工作,而Looper.loop()則正式開啟消息循環(huán)。簡單來說,Looper 使得一個普通的線程具備了消息循環(huán)的能力,也就是獲取信息并消費(fèi)的能力,現(xiàn)在從源碼中簡單分析下幾個重要的方法。
// 檢查Looper是否創(chuàng)建,并保證其全局唯一性
private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
// 通過ThreadLocal 關(guān)鍵字保證每一個線程只存在一份
sThreadLocal.set(new Looper(quitAllowed));
}
private Looper(boolean quitAllowed) {
// 私有構(gòu)造函數(shù),初始化 MessageQueue.
mQueue = new MessageQueue(quitAllowed);
mThread = Thread.currentThread();
}
loop方法在實(shí)現(xiàn)上也很簡單,首先檢查Looper創(chuàng)建,如果沒有就拋出異常。這里的Binder.clearCallingIdentity()是移除舊有的 Binder Identity,并在每次循環(huán)中做檢驗(yàn),為什么要調(diào)用這個方法,可以參考這篇博文,也推薦大家看我之前寫的 Binder 完全解析 。之后,進(jìn)入消息循環(huán),不斷地從MessageQueue中獲取要處理的消息,并通過 msg.target.dispatchMessage(msg) 方法進(jìn)行消息派發(fā)。
public static void loop() {
final Looper me = myLooper();
if (me == null) {
throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
}
final MessageQueue queue = me.mQueue;
// Make sure the identity of this thread is that of the local process,
// and keep track of what that identity token actually is.
Binder.clearCallingIdentity();
final long ident = Binder.clearCallingIdentity();
for (;;) {
Message msg = queue.next(); // might block
if (msg == null) {
// No message indicates that the message queue is quitting.
return;
}
msg.target.dispatchMessage(msg);
if (logging != null) {
logging.println("<<<<< Finished to " + msg.target + " " + msg.callback);
}
// Make sure that during the course of dispatching the
// identity of the thread wasn't corrupted.
final long newIdent = Binder.clearCallingIdentity();
msg.recycleUnchecked();
}
}
Handler
Handler 在系統(tǒng)中承擔(dān)的角色較為復(fù)雜,可是當(dāng)做是全局的操作者,接下來簡要地進(jìn)行下分析。
public Handler(Callback callback, boolean async) {
...
mLooper = Looper.myLooper();
if (mLooper == null) {
throw new RuntimeException(
"Can't create handler inside thread that has not called Looper.prepare()");
}
mQueue = mLooper.mQueue;
mCallback = callback;
mAsynchronous = async;
}
Handler 必須依附于相應(yīng)的Looper線程,如果線程沒有Looper 或者 Looper 沒有調(diào)用 prepare 方法,會拋出new RuntimeException("Can't create handler inside thread that has not called Looper.prepare()")的異常。道理也很簡單,不開啟相應(yīng)的Looper,Handler 發(fā)送的消息往什么地方傳遞了? 在這個構(gòu)造函數(shù)里,賦值相應(yīng)的 MessageQueue 和 callback。callback的定義如下,即在 Looper Thread 要執(zhí)行的任務(wù),一般情況可以是在其他線程耗時操作執(zhí)行完成后,回到Looper Thread 上要執(zhí)行的UI 更新操作。
/**
* Callback interface you can use when instantiating a Handler to avoid
* having to implement your own subclass of Handler.
*
* @param msg A {@link android.os.Message Message} object
* @return True if no further handling is desired
*/
public interface Callback {
public boolean handleMessage(Message msg);
}
Handler 通過 post postDelayed 等等方法,來將相應(yīng)的 Message 發(fā)送到消息隊(duì)列中去,最后通過 sendMessageAtTime() 來進(jìn)行發(fā)送,進(jìn)行的工作特別簡單,將 Message.target 指定為自己,同時將自己加入到隊(duì)列中。
public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
MessageQueue queue = mQueue;
if (queue == null) {
RuntimeException e = new RuntimeException(
this + " sendMessageAtTime() called with no mQueue");
Log.w("Looper", e.getMessage(), e);
return false;
}
return enqueueMessage(queue, msg, uptimeMillis);
}
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
msg.target = this;
if (mAsynchronous) {
msg.setAsynchronous(true);
}
return queue.enqueueMessage(msg, uptimeMillis);
}
機(jī)制總結(jié)
Handler 消息處理者
它主要有兩大作用:① 處理Message。② 發(fā)送Message,并將某個Message壓入到MessageQueue中。Looper 輪詢器
在 Looper里面的 loop()函數(shù)中有個死循環(huán),它不斷地從 MessageQueue 中取出一個Message,然后傳給Handler進(jìn)行處理,如此循環(huán)往復(fù)。假如隊(duì)列為空,那么它會進(jìn)入休眠。MessageQueue 消息隊(duì)列
消息隊(duì)列中含有多個Message,每個Message中包含了具體的調(diào)用信息。
Android 使用Handler實(shí)例
在每一個Application啟動的時候,會給這個Application分配一個 ActivityThread ,就是我們所說的 UI 線程,一個類的入口方法是 main 函數(shù),這里看下源碼。
public static void main(String[] args) {
SamplingProfilerIntegration.start();
// CloseGuard defaults to true and can be quite spammy. We
// disable it here, but selectively enable it later (via
// StrictMode) on debug builds, but using DropBox, not logs.
CloseGuard.setEnabled(false);
Environment.initForCurrentUser();
// Set the reporter for event logging in libcore
EventLogger.setReporter(new EventLoggingReporter());
Security.addProvider(new AndroidKeyStoreProvider());
// Make sure TrustedCertificateStore looks in the right place for CA certificates
final File configDir = Environment.getUserConfigDirectory(UserHandle.myUserId());
TrustedCertificateStore.setDefaultUserDirectory(configDir);
Process.setArgV0("<pre-initialized>");
Looper.prepareMainLooper();
ActivityThread thread = new ActivityThread();
thread.attach(false);
if (sMainThreadHandler == null) {
sMainThreadHandler = thread.getHandler();
}
if (false) {
Looper.myLooper().setMessageLogging(new
LogPrinter(Log.DEBUG, "ActivityThread"));
}
Looper.loop();
throw new RuntimeException("Main thread loop unexpectedly exited");
}
這里使用的 MainLooper 中的 Handler 被稱為 H,這里的 H 定義了一些列的消息,如下所示,也就是說 Activity 相關(guān)的線程間通信,就是依賴于 Handler 機(jī)制的。
...
public static final int LAUNCH_ACTIVITY = 100;
public static final int PAUSE_ACTIVITY = 101;
public static final int PAUSE_ACTIVITY_FINISHING= 102;
public static final int STOP_ACTIVITY_SHOW = 103;
public static final int STOP_ACTIVITY_HIDE = 104;
public static final int SHOW_WINDOW = 105;
public static final int HIDE_WINDOW = 106;
public static final int RESUME_ACTIVITY = 107;
public static final int SEND_RESULT = 108;
public static final int DESTROY_ACTIVITY = 109;
...