Handler機制
Handler機制在Android中通常用來更新UI。子線程執(zhí)行任務(wù),任務(wù)執(zhí)行完畢后發(fā)送消息:Handler.sendMessage(),然后在UI線程Handler.handleMessage()就會調(diào)用,執(zhí)行相應(yīng)處理。
Handler機制有幾個非常重要的類:
- Handler:用來發(fā)送消息:sendMessage等多個方法,并實現(xiàn)handleMessage()方法處理回調(diào)(還可以使用Message或Handler的Callback進(jìn)行回調(diào)處理,具體可以看看源碼)。
- Message:消息實體,發(fā)送的消息即為Message類型。
- MessageQueue:消息隊列,用于存儲消息。發(fā)送消息時,消息入隊列,然后Looper會從這個MessageQueue取出消息進(jìn)行處理。
- Looper:與線程綁定,不僅僅局限于主線程,綁定的線程用來處理消息。loop()方法是一個死循環(huán),一直從MessageQueue里取出消息進(jìn)行處理。
這幾個類的作用還可以用下圖解釋:

Looper.loop()是一個死循環(huán),為什么不會卡死主線程呢?簡單的說,就是當(dāng)MessageQueue為empty時,線程會掛起,一有消息,就會喚醒線程處理消息。關(guān)于這個問題可以看看這個回答:Android中為什么主線程不會因為Looper.loop()里的死循環(huán)卡死? 。
Handler不僅僅可以在主線程處理消息,還可以在子線程,前提是子線程要關(guān)聯(lián)Looper,標(biāo)準(zhǔn)寫法為:
class LooperThread extends Thread {
public Handler mHandler;
public void run() {
Looper.prepare();
mHandler = new Handler() {
public void handleMessage(Message msg) {
// process incoming messages here
}
};
Looper.loop();
}
}
一定要調(diào)用Looper.prepare()和Looper.loop()方法。Looper.prepare()使用ThreadLocal將當(dāng)前線程與new出來的Looper關(guān)聯(lián),Looper.loop()開啟循環(huán)處理消息。這樣子,mHandler發(fā)送的消息就可以在LooperThread進(jìn)行處理了。
生產(chǎn)者消費者模式
在并發(fā)編程中使用生產(chǎn)者和消費者模式能夠解決絕大多數(shù)并發(fā)問題。該模式通過平衡生產(chǎn)線程和消費線程的工作能力來提高程序的整體處理數(shù)據(jù)的速度。
在線程世界里,生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的線程,消費者就是消費數(shù)據(jù)的線程。在多線程開發(fā)當(dāng)中,如果生產(chǎn)者處理速度很快,而消費者處理速度很慢,那么生產(chǎn)者就必須等待消費者處理完,才能繼續(xù)生產(chǎn)數(shù)據(jù)。同樣的道理,如果消費者的處理能力大于生產(chǎn)者,那么消費者就必須等待生產(chǎn)者。為了解決這種生產(chǎn)消費能力不均衡的問題,所以便有了生產(chǎn)者和消費者模式。具體介紹可參考:聊聊并發(fā)(十)生產(chǎn)者消費者模式

Handler機制與生產(chǎn)者消費者模式
那么Handler機制和生產(chǎn)者消費者模式有什么關(guān)系呢?
Handler機制就是一個生產(chǎn)者消費者模式。可以這么理解,Handler發(fā)送消息,它就是生產(chǎn)者,生產(chǎn)的是一個個Message。Looper可以理解為消費者,在其loop()方法中,死循環(huán)從MessageQueue取出Message進(jìn)行處理。而MessageQueue就是緩沖區(qū)了,Handler產(chǎn)生的Message放到MessageQueue中,Looper從MessageQueue取出消息。
既然Handler機制本質(zhì)上是一個生產(chǎn)者消費者模式,那么我們就可以脫離Android來實現(xiàn)一個Handler機制。
Handler
用來發(fā)送和處理消息,因此主要方法就是sendMessage()和handleMessage():
public abstract class Handler {
private IMessageQueue messageQueue;
public Handler(Looper looper) {
messageQueue = looper.messageQueue;
}
public Handler() {
Looper.myLooper();
}
public void sendMessage(Message message) {
// 指定發(fā)送Message的Handler,方便回調(diào)
message.target = this;
try {
messageQueue.enqueueMessage(message);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public abstract void handleMessage(Message msg);
}
Message
相當(dāng)簡單,充當(dāng)一個使者的角色,重要的是要與對應(yīng)的Handler綁定:target變量。
public class Message {
private int code;
private String msg;
Handler target;
public Message() { }
public Message(int code, String msg) {
this.code = code;
this.msg = msg;
}
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
}
IMessageQueue
定義的一個接口,由于MessageQueue可以有不同的實現(xiàn),因此抽象一個接口出來。
public interface IMessageQueue {
Message next() throws InterruptedException;
void enqueueMessage(Message message) throws InterruptedException;
}
Looper
loop()方法是一個死循環(huán),在這里處理消息,沒有消息時,綁定線程會處于wait狀態(tài)。使用ThreadLocal來與線程進(jìn)行綁定。
public class Looper {
static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();
IMessageQueue messageQueue;
private static Looper sMainLooper;
public Looper() {
messageQueue = new MessageQueue(2);
// messageQueue = new MessageQueue1(2);
// messageQueue = new MessageQueue2(2);
}
public static void prepare() {
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
sThreadLocal.set(new Looper());
}
public static void prepareMainLooper() {
prepare();
synchronized (Looper.class) {
if (sMainLooper != null) {
throw new IllegalStateException("The main Looper has already been prepared.");
}
sMainLooper = myLooper();
}
}
public static Looper getMainLooper() {
return sMainLooper;
}
public static Looper myLooper() {
return sThreadLocal.get();
}
public static void loop() {
final Looper me = myLooper();
if (me == null) {
throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
}
for (;;) {
// 消費Message,如果MessageQueue為null,則等待
Message message = null;
try {
message = me.messageQueue.next();
} catch (InterruptedException e) {
e.printStackTrace();
}
if (message != null) {
message.target.handleMessage(message);
}
}
}
}
LinkedBlockingQueue實現(xiàn)的MessageQueue
MessageQueue是緩沖區(qū),它需要滿足以下功能:
- 當(dāng)緩沖區(qū)滿時,掛起執(zhí)行enqueueMessage的線程。
- 當(dāng)緩沖區(qū)空時,掛起執(zhí)行next的線程。
- 當(dāng)緩沖區(qū)非空時,喚醒被掛起在next的線程。
- 當(dāng)緩沖區(qū)不滿時,喚醒被掛起在enqueueMessage的線程。
所以MessageQueue最簡單的實現(xiàn)莫過于使用LinkedBlockingQueue了。(源碼|并發(fā)一枝花之BlockingQueue)
public class MessageQueue implements IMessageQueue {
private final BlockingQueue<Message> queue;
public MessageQueue(int cap) {
this.queue = new LinkedBlockingQueue<>(cap);
}
public Message next() throws InterruptedException {
return queue.take();
}
public void enqueueMessage(Message message) {
try {
queue.put(message);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
像這樣,一個簡易的Handler機制就實現(xiàn)了。Android系統(tǒng)的Handler機制肯定不是那么簡單,做了很多魯棒性相關(guān)的處理,還有和底層的交互等等。另外,Android系統(tǒng)MessageQueue是結(jié)合Message實現(xiàn)的一個無界隊列,意味著發(fā)送消息的隊列不會阻塞。
Handler機制搭建好了,那么來測試一下吧:
public class Main {
public static void main(String[] args) {
MainThread mainThread = new MainThread();
mainThread.start();
// 確保mainLooper構(gòu)建完成
while (Looper.getMainLooper() == null) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Handler handler = new Handler(Looper.getMainLooper()) {
@Override
public void handleMessage(Message msg) {
System.out.println("execute in : " + Thread.currentThread().getName());
switch (msg.getCode()) {
case 0 :
System.out.println("code 0 : " + msg.getMsg());
break;
case 1 :
System.out.println("code 1 : " + msg.getMsg());
break;
default :
System.out.println("other code : " + msg.getMsg());
}
}
};
Message message1 = new Message(0, "I am the first message!");
WorkThread workThread1 = new WorkThread(handler, message1);
Message message2 = new Message(1, "I am the second message!");
WorkThread workThread2 = new WorkThread(handler, message2);
Message message3 = new Message(34, "I am a message!");
WorkThread workThread3 = new WorkThread(handler, message3);
workThread1.start();
workThread2.start();
workThread3.start();
}
/**模擬工作線程**/
public static class WorkThread extends Thread {
private Handler handler;
private Message message;
public WorkThread(Handler handler, Message message) {
setName("WorkThread");
this.handler = handler;
this.message = message;
}
@Override
public void run() {
super.run();
// 模擬耗時操作
Random random = new Random();
try {
Thread.sleep(random.nextInt(10) * 300);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 任務(wù)執(zhí)行完,sendMessage
handler.sendMessage(message);
}
}
/**模擬主線程*/
public static class MainThread extends Thread {
public MainThread() {
setName("MainThread");
}
@Override
public void run() {
super.run();
// 這里是不是與系統(tǒng)的調(diào)用一樣,哈哈
Looper.prepareMainLooper();
System.out.println(getName() + " the looper is prepared");
Looper.loop();
}
}
}
這里模擬了子線程執(zhí)行任務(wù),發(fā)送消息到主線程處理的場景。那么執(zhí)行結(jié)果就是:
MainThread the looper is prepared
execute in : MainThread
other code : I am a message!
execute in : MainThread
code 0 : I am the first message!
execute in : MainThread
code 1 : I am the second message!
可以看出handleMessage的執(zhí)行都是在主線程的,簡易Handler機制搞定!??!
本文章到這就應(yīng)該結(jié)束了,但是MessageQueue的實現(xiàn)方式是可以有多種的,因此來看看其不同的實現(xiàn)。
wait/notify實現(xiàn)
public class MessageQueue1 implements IMessageQueue {
private Queue<Message> queue;
private final AtomicInteger integer = new AtomicInteger(0);
private volatile int count;
private final Object BUFFER_LOCK = new Object();
public MessageQueue1(int cap) {
this.count = cap;
queue = new LinkedList<>();
}
@Override
public Message next() throws InterruptedException {
synchronized (BUFFER_LOCK) {
while (queue.size() == 0) {
BUFFER_LOCK.wait();
}
Message message = queue.poll();
BUFFER_LOCK.notifyAll();
return message;
}
}
@Override
public void enqueueMessage(Message message) throws InterruptedException {
synchronized (BUFFER_LOCK) {
while (queue.size() == count) {
BUFFER_LOCK.wait();
}
queue.offer(message);
BUFFER_LOCK.notifyAll();
}
}
}
BUFFER_LOCK 用來處理與鎖相關(guān)的邏輯。
next()方法中,如果queue的size 為0,說明現(xiàn)在沒有消息待處理,因此執(zhí)行BUFFER_LOCK.wait()掛起線程,當(dāng)queue.poll();時,queue里就有了消息,需要喚醒因為沒有消息而掛起的線程,所以執(zhí)行BUFFER_LOCK.notifyAll();。
enqueueMessage()方法中,如果queue.size() == count說明消息滿了,如果Handler繼續(xù)sendMessage,queue無法繼續(xù)裝下,因此該線程需要掛起: BUFFER_LOCK.wait();。當(dāng)執(zhí)行queue.offer(message);時,queue里頭保存的Message就少了一個,可以插入新的Message,因此BUFFER_LOCK.notifyAll();喚醒因為queue滿了而掛起的線程。
Lock實現(xiàn)
既然wait/notify可以實現(xiàn)MessageQueue,那么ReentrantLock肯定也能實現(xiàn),下面就是使用ReentrantLock實現(xiàn)的例子,原理上來講是一致的。
public class MessageQueue2 implements IMessageQueue {
private final Queue<Message> queue;
private int cap = 0;
private final Lock lock = new ReentrantLock();
private final Condition BUFFER_CONDITION = lock.newCondition();
public MessageQueue2(int cap) {
this.cap = cap;
queue = new LinkedList<>();
}
@Override
public Message next() throws InterruptedException {
try {
lock.lock();
while (queue.size() == 0) {
BUFFER_CONDITION.await();
}
Message message = queue.poll();
BUFFER_CONDITION.signalAll();
return message;
} finally {
lock.unlock();
}
}
@Override
public void enqueueMessage(Message message) throws InterruptedException {
try {
lock.lock();
while (queue.size() == cap) {
BUFFER_CONDITION.await();
}
queue.offer(message);
BUFFER_CONDITION.signalAll();
} finally {
lock.unlock();
}
}
}
Lock實現(xiàn)的優(yōu)化
上面的實現(xiàn)過程中,入隊出隊是基于同一個鎖的,意味著,如果有一個線程正在入隊,其他線程就不能出隊;有一個線程出隊,其它不能入隊。一個時刻只能有一個線程操作緩沖區(qū),這顯然在多線程環(huán)境下會影響性能。所以對其進(jìn)行優(yōu)化。
優(yōu)化想法是,入隊和出隊互相不干擾。所以入隊和出隊分別需要一個鎖,入隊在隊列的尾部進(jìn)行,出隊在頭部進(jìn)行。代碼如下:
public class MessageQueue3 implements IMessageQueue {
private final Lock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
private final Lock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private Node head; // 隊頭
private Node last; // 隊尾
private AtomicInteger count = new AtomicInteger(0); // 記錄大小
private int cap = 10; // 容量,默認(rèn)為10
public MessageQueue3(int cap) {
this.cap = cap;
}
@Override
public Message next() throws InterruptedException {
Node node;
int c = -1;
takeLock.lock();
try {
while (count.get() == 0) {
notEmpty.await();
}
node = head;
head = head.next;
c = count.getAndDecrement();
if (c > 0) {
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (count.get() < cap) {
signalNotFull();
}
return node.data;
}
@Override
public void enqueueMessage(Message message) throws InterruptedException {
Node node = new Node(message);
int c = -1;
putLock.lock();
try {
while (count.get() == cap) {
notFull.await();
}
// 初始狀態(tài)
if (head == null && last == null) {
head = last = node;
} else {
last.next = node;
last = last.next;
}
c = count.getAndIncrement();
if (c < cap) {
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c > 0) {
signalNotEmpty();
}
}
private void signalNotEmpty() {
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
private void signalNotFull() {
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
static class Node {
Message data;
Node next;
public Node(Message data) {
this.data = data;
}
}
}
全文完~