Handler機制與生產(chǎn)者消費者模式

Handler機制

Handler機制在Android中通常用來更新UI。子線程執(zhí)行任務(wù),任務(wù)執(zhí)行完畢后發(fā)送消息:Handler.sendMessage(),然后在UI線程Handler.handleMessage()就會調(diào)用,執(zhí)行相應(yīng)處理。
Handler機制有幾個非常重要的類:

  • Handler:用來發(fā)送消息:sendMessage等多個方法,并實現(xiàn)handleMessage()方法處理回調(diào)(還可以使用Message或Handler的Callback進(jìn)行回調(diào)處理,具體可以看看源碼)。
  • Message:消息實體,發(fā)送的消息即為Message類型。
  • MessageQueue:消息隊列,用于存儲消息。發(fā)送消息時,消息入隊列,然后Looper會從這個MessageQueue取出消息進(jìn)行處理。
  • Looper:與線程綁定,不僅僅局限于主線程,綁定的線程用來處理消息。loop()方法是一個死循環(huán),一直從MessageQueue里取出消息進(jìn)行處理。

這幾個類的作用還可以用下圖解釋:

Handler機制

Looper.loop()是一個死循環(huán),為什么不會卡死主線程呢?簡單的說,就是當(dāng)MessageQueue為empty時,線程會掛起,一有消息,就會喚醒線程處理消息。關(guān)于這個問題可以看看這個回答:Android中為什么主線程不會因為Looper.loop()里的死循環(huán)卡死? 。
Handler不僅僅可以在主線程處理消息,還可以在子線程,前提是子線程要關(guān)聯(lián)Looper,標(biāo)準(zhǔn)寫法為:

class LooperThread extends Thread {
    public Handler mHandler;
    public void run() {
        Looper.prepare();
        mHandler = new Handler() {
            public void handleMessage(Message msg) {
                // process incoming messages here
            }
        };
        Looper.loop();
    }
}

一定要調(diào)用Looper.prepare()和Looper.loop()方法。Looper.prepare()使用ThreadLocal將當(dāng)前線程與new出來的Looper關(guān)聯(lián),Looper.loop()開啟循環(huán)處理消息。這樣子,mHandler發(fā)送的消息就可以在LooperThread進(jìn)行處理了。

生產(chǎn)者消費者模式

在并發(fā)編程中使用生產(chǎn)者和消費者模式能夠解決絕大多數(shù)并發(fā)問題。該模式通過平衡生產(chǎn)線程和消費線程的工作能力來提高程序的整體處理數(shù)據(jù)的速度。

在線程世界里,生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的線程,消費者就是消費數(shù)據(jù)的線程。在多線程開發(fā)當(dāng)中,如果生產(chǎn)者處理速度很快,而消費者處理速度很慢,那么生產(chǎn)者就必須等待消費者處理完,才能繼續(xù)生產(chǎn)數(shù)據(jù)。同樣的道理,如果消費者的處理能力大于生產(chǎn)者,那么消費者就必須等待生產(chǎn)者。為了解決這種生產(chǎn)消費能力不均衡的問題,所以便有了生產(chǎn)者和消費者模式。具體介紹可參考:聊聊并發(fā)(十)生產(chǎn)者消費者模式

生產(chǎn)者消費者模式

Handler機制與生產(chǎn)者消費者模式

那么Handler機制和生產(chǎn)者消費者模式有什么關(guān)系呢?
Handler機制就是一個生產(chǎn)者消費者模式。可以這么理解,Handler發(fā)送消息,它就是生產(chǎn)者,生產(chǎn)的是一個個Message。Looper可以理解為消費者,在其loop()方法中,死循環(huán)從MessageQueue取出Message進(jìn)行處理。而MessageQueue就是緩沖區(qū)了,Handler產(chǎn)生的Message放到MessageQueue中,Looper從MessageQueue取出消息。
既然Handler機制本質(zhì)上是一個生產(chǎn)者消費者模式,那么我們就可以脫離Android來實現(xiàn)一個Handler機制。

Handler

用來發(fā)送和處理消息,因此主要方法就是sendMessage()和handleMessage():

public abstract class Handler {
    private IMessageQueue messageQueue;
    public Handler(Looper looper) {
        messageQueue = looper.messageQueue;
    }
    public Handler() {
        Looper.myLooper();
    }
    public void sendMessage(Message message) {
        // 指定發(fā)送Message的Handler,方便回調(diào)
        message.target = this;
        try {
            messageQueue.enqueueMessage(message);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public abstract void handleMessage(Message msg);
}
Message

相當(dāng)簡單,充當(dāng)一個使者的角色,重要的是要與對應(yīng)的Handler綁定:target變量。

public class Message {
    private int code;
    private String msg;
    Handler target;

    public Message() { }
    public Message(int code, String msg) {
        this.code = code;
        this.msg = msg;
    }
    public int getCode() {
        return code;
    }
    public void setCode(int code) {
        this.code = code;
    }
    public String getMsg() {
        return msg;
    }
    public void setMsg(String msg) {
        this.msg = msg;
    }
}
IMessageQueue

定義的一個接口,由于MessageQueue可以有不同的實現(xiàn),因此抽象一個接口出來。

public interface IMessageQueue {
    Message next() throws InterruptedException;
    void enqueueMessage(Message message) throws InterruptedException;
}
Looper

loop()方法是一個死循環(huán),在這里處理消息,沒有消息時,綁定線程會處于wait狀態(tài)。使用ThreadLocal來與線程進(jìn)行綁定。

public class Looper {
    static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();
    IMessageQueue messageQueue;
    private static Looper sMainLooper;
    public Looper() {
        messageQueue = new MessageQueue(2);
        // messageQueue = new MessageQueue1(2);
        // messageQueue = new MessageQueue2(2);
    }
    public static void prepare() {
        if (sThreadLocal.get() != null) {
            throw new RuntimeException("Only one Looper may be created per thread");
        }
        sThreadLocal.set(new Looper());
    }
    public static void prepareMainLooper() {
        prepare();
        synchronized (Looper.class) {
            if (sMainLooper != null) {
                throw new IllegalStateException("The main Looper has already been prepared.");
            }
            sMainLooper = myLooper();
        }
    }
    public static Looper getMainLooper() {
        return sMainLooper;
    }
    public static Looper myLooper() {
        return sThreadLocal.get();
    }
    public static void loop() {
        final Looper me = myLooper();
        if (me == null) {
            throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
        }
        for (;;) {
            // 消費Message,如果MessageQueue為null,則等待
            Message message = null;
            try {
                message = me.messageQueue.next();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (message != null) {
                message.target.handleMessage(message);
            }
        }
    }
}
LinkedBlockingQueue實現(xiàn)的MessageQueue

MessageQueue是緩沖區(qū),它需要滿足以下功能:

  • 當(dāng)緩沖區(qū)滿時,掛起執(zhí)行enqueueMessage的線程。
  • 當(dāng)緩沖區(qū)空時,掛起執(zhí)行next的線程。
  • 當(dāng)緩沖區(qū)非空時,喚醒被掛起在next的線程。
  • 當(dāng)緩沖區(qū)不滿時,喚醒被掛起在enqueueMessage的線程。

所以MessageQueue最簡單的實現(xiàn)莫過于使用LinkedBlockingQueue了。(源碼|并發(fā)一枝花之BlockingQueue

public class MessageQueue implements IMessageQueue {
    private final BlockingQueue<Message> queue;
    public MessageQueue(int cap) {
        this.queue = new LinkedBlockingQueue<>(cap);
    }
    public Message next() throws InterruptedException {
        return queue.take();
    }
    public void enqueueMessage(Message message) {
        try {
            queue.put(message);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

像這樣,一個簡易的Handler機制就實現(xiàn)了。Android系統(tǒng)的Handler機制肯定不是那么簡單,做了很多魯棒性相關(guān)的處理,還有和底層的交互等等。另外,Android系統(tǒng)MessageQueue是結(jié)合Message實現(xiàn)的一個無界隊列,意味著發(fā)送消息的隊列不會阻塞。
Handler機制搭建好了,那么來測試一下吧:

public class Main {
    public static void main(String[] args) {
        MainThread mainThread = new MainThread();
        mainThread.start();
        // 確保mainLooper構(gòu)建完成
        while (Looper.getMainLooper() == null) {
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        Handler handler = new Handler(Looper.getMainLooper()) {
            @Override
            public void handleMessage(Message msg) {
                System.out.println("execute in : " + Thread.currentThread().getName());
                switch (msg.getCode()) {
                    case 0 :
                        System.out.println("code 0 : " + msg.getMsg());
                        break;
                    case 1 :
                        System.out.println("code 1 : " + msg.getMsg());
                        break;
                    default :
                        System.out.println("other code : " + msg.getMsg());
                }
            }
        };

        Message message1 = new Message(0, "I am the first message!");
        WorkThread workThread1 = new WorkThread(handler, message1);

        Message message2 = new Message(1, "I am the second message!");
        WorkThread workThread2 = new WorkThread(handler, message2);

        Message message3 = new Message(34, "I am a message!");
        WorkThread workThread3 = new WorkThread(handler, message3);

        workThread1.start();
        workThread2.start();
        workThread3.start();
    }
    /**模擬工作線程**/
    public static class WorkThread extends Thread {
        private Handler handler;
        private Message message;

        public WorkThread(Handler handler, Message message) {
            setName("WorkThread");
            this.handler = handler;
            this.message = message;
        }

        @Override
        public void run() {
            super.run();
            // 模擬耗時操作
            Random random = new Random();
            try {
                Thread.sleep(random.nextInt(10) * 300);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 任務(wù)執(zhí)行完,sendMessage
            handler.sendMessage(message);
        }
    }
    /**模擬主線程*/
    public static class MainThread extends Thread {
        public MainThread() {
            setName("MainThread");
        }
        @Override
        public void run() {
            super.run();
            // 這里是不是與系統(tǒng)的調(diào)用一樣,哈哈
            Looper.prepareMainLooper();
            System.out.println(getName() + " the looper is prepared");
            Looper.loop();
        }
    }
}

這里模擬了子線程執(zhí)行任務(wù),發(fā)送消息到主線程處理的場景。那么執(zhí)行結(jié)果就是:

MainThread the looper is prepared
execute in : MainThread
other code : I am a message!

execute in : MainThread
code 0 : I am the first message!

execute in : MainThread
code 1 : I am the second message!

可以看出handleMessage的執(zhí)行都是在主線程的,簡易Handler機制搞定!??!

本文章到這就應(yīng)該結(jié)束了,但是MessageQueue的實現(xiàn)方式是可以有多種的,因此來看看其不同的實現(xiàn)。

wait/notify實現(xiàn)
public class MessageQueue1 implements IMessageQueue {

    private Queue<Message> queue;
    private final AtomicInteger integer = new AtomicInteger(0);
    private volatile int count;
    private final Object BUFFER_LOCK = new Object();
    public MessageQueue1(int cap) {
        this.count = cap;
        queue = new LinkedList<>();
    }

    @Override
    public Message next() throws InterruptedException {
        synchronized (BUFFER_LOCK) {
            while (queue.size() == 0) {
                BUFFER_LOCK.wait();
            }
            Message message = queue.poll();
            BUFFER_LOCK.notifyAll();
            return message;
        }
    }

    @Override
    public void enqueueMessage(Message message) throws InterruptedException {
        synchronized (BUFFER_LOCK) {
            while (queue.size() == count) {
                BUFFER_LOCK.wait();
            }
            queue.offer(message);
            BUFFER_LOCK.notifyAll();
        }
    }
}

BUFFER_LOCK 用來處理與鎖相關(guān)的邏輯。

next()方法中,如果queue的size 為0,說明現(xiàn)在沒有消息待處理,因此執(zhí)行BUFFER_LOCK.wait()掛起線程,當(dāng)queue.poll();時,queue里就有了消息,需要喚醒因為沒有消息而掛起的線程,所以執(zhí)行BUFFER_LOCK.notifyAll();。

enqueueMessage()方法中,如果queue.size() == count說明消息滿了,如果Handler繼續(xù)sendMessage,queue無法繼續(xù)裝下,因此該線程需要掛起: BUFFER_LOCK.wait();。當(dāng)執(zhí)行queue.offer(message);時,queue里頭保存的Message就少了一個,可以插入新的Message,因此BUFFER_LOCK.notifyAll();喚醒因為queue滿了而掛起的線程。

Lock實現(xiàn)

既然wait/notify可以實現(xiàn)MessageQueue,那么ReentrantLock肯定也能實現(xiàn),下面就是使用ReentrantLock實現(xiàn)的例子,原理上來講是一致的。

public class MessageQueue2 implements IMessageQueue {
    private final Queue<Message> queue;
    private int cap = 0;
    private final Lock lock = new ReentrantLock();
    private final Condition BUFFER_CONDITION = lock.newCondition();
    public MessageQueue2(int cap) {
        this.cap = cap;
        queue = new LinkedList<>();
    }

    @Override
    public Message next() throws InterruptedException {
        try {
            lock.lock();
            while (queue.size() == 0) {
                BUFFER_CONDITION.await();
            }
            Message message = queue.poll();
            BUFFER_CONDITION.signalAll();
            return message;
        } finally {
            lock.unlock();
        }
    }
    @Override
    public void enqueueMessage(Message message) throws InterruptedException {
        try {
            lock.lock();
            while (queue.size() == cap) {
                BUFFER_CONDITION.await();
            }
            queue.offer(message);
            BUFFER_CONDITION.signalAll();
        } finally {
            lock.unlock();
        }
    }
}
Lock實現(xiàn)的優(yōu)化

上面的實現(xiàn)過程中,入隊出隊是基于同一個鎖的,意味著,如果有一個線程正在入隊,其他線程就不能出隊;有一個線程出隊,其它不能入隊。一個時刻只能有一個線程操作緩沖區(qū),這顯然在多線程環(huán)境下會影響性能。所以對其進(jìn)行優(yōu)化。
優(yōu)化想法是,入隊和出隊互相不干擾。所以入隊和出隊分別需要一個鎖,入隊在隊列的尾部進(jìn)行,出隊在頭部進(jìn)行。代碼如下:

public class MessageQueue3 implements IMessageQueue {
    private final Lock putLock = new ReentrantLock();
    private final Condition notFull = putLock.newCondition();
    private final Lock takeLock = new ReentrantLock();
    private final Condition notEmpty = takeLock.newCondition();
    private Node head; // 隊頭
    private Node last; // 隊尾
    private AtomicInteger count = new AtomicInteger(0); // 記錄大小
    private int cap = 10; // 容量,默認(rèn)為10

    public MessageQueue3(int cap) {
        this.cap = cap;
    }

    @Override
    public Message next() throws InterruptedException {
        Node node;
        int c = -1;
        takeLock.lock();
        try {
            while (count.get() == 0) {
                notEmpty.await();
            }
            node = head;
            head = head.next;
            c = count.getAndDecrement();
            if (c > 0) {
                notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        if (count.get() < cap) {
            signalNotFull();
        }
        return node.data;
    }

    @Override
    public void enqueueMessage(Message message) throws InterruptedException {
        Node node = new Node(message);
        int c = -1;
        putLock.lock();
        try {
            while (count.get() == cap) {
                notFull.await();
            }
            // 初始狀態(tài)
            if (head == null && last == null) {
                head = last = node;
            } else {
                last.next = node;
                last = last.next;
            }

            c = count.getAndIncrement();
            if (c < cap) {
                notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        if (c > 0) {
            signalNotEmpty();
        }
    }

    private void signalNotEmpty() {
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

    private void signalNotFull() {
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }
    
    static class Node {
        Message data;
        Node next;
        public Node(Message data) {
            this.data = data;
        }
    }
}

全文完~

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容