線程池及線程調(diào)度

背景

文章通過接口層表象來實(shí)現(xiàn)一個(gè)簡(jiǎn)版且穩(wěn)定的線程調(diào)度庫,給予一個(gè)臺(tái)階,當(dāng)你讀完文章的末尾,希望你有一探RxJava欲望與信心。

目標(biāo)

  1. TaskScheduler.executeMain(...); //主線程, 執(zhí)行任務(wù)
  2. TaskScheduler.executeTask(...); //子線程, 線程池執(zhí)行任務(wù)
  3. TaskScheduler.executeSingle(...); //子線程, 單線程執(zhí)行任務(wù)
  4. TaskScheduler.create(...); //任務(wù)調(diào)度

項(xiàng)目

設(shè)計(jì)

  1. .func(...).func(...).func(...)...順序流執(zhí)行
  2. .observeOn(...)線程切換

效果圖

        TaskScheduler.create(new Task<List<String>>() {
            @Override
            public List<String> run() {
                ...do something in io thread
                return new ArrayList<>();
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(Schedulers.newThread())
                .map(new Function<List<String>, String>() {
                    @Override
                    public String apply(@NonNull List<String> strings) throws Exception {
                        ...do something in new thread, such as time-consuming, map conversion, etc.
                        return "";
                    }
                })
                .observeOn(Schedulers.io())
                .map(new Function<String, Boolean>() {
                    @Override
                    public Boolean apply(@NonNull String s) throws Exception {
                        ...do something in io thread, such as time-consuming, map conversion, etc.
                        return true;
                    }
                })
                ...
                .observeOn(Schedulers.mainThread())
                .subscribe(new Observer<Boolean>() {
                    @Override
                    public void onNext(@NonNull Boolean result) {
                        ...do something in main thread
                    }

                    @Override
                    public void onError(Throwable e) {
                        ...do something in main thread
                    }
                });

分析

  1. 線程
  2. 線程切換
  3. 任務(wù)調(diào)度

1. 線程

public class TaskManager {
    private static TaskManager ins;

    private Handler mainHandler;
    private ExecutorService cachedThreadPool;
    private ExecutorService singleThreadExecutor;

    private TaskManager() {
        mainHandler = new Handler(Looper.getMainLooper());
        cachedThreadPool = Executors.newCachedThreadPool();
        singleThreadExecutor = Executors.newSingleThreadExecutor();
    }

    static TaskManager getIns() {
        if (ins == null) {
            synchronized (TaskManager.class) {
                if (ins == null) {
                    ins = new TaskManager();
                }
            }
        }
        return ins;
    }

    /**
     * Execute sync task in main thread
     */
    void executeMain(Runnable runnable) { mainHandler.post(runnable); }

    /**
     * Execute async task in cached thread pool
     */
    void executeTask(Runnable runnable) { cachedThreadPool.execute(runnable); }

    /**
     * Execute async task in single thread pool
     */
    void executeSingle(Runnable runnable) { singleThreadExecutor.execute(runnable); }

    /**
     * Execute async task in a new thread
     */
    void executeNew(Runnable runnable) { new Thread(runnable).start(); }
}

線程切換的方法:拋runnable到相應(yīng)線程,由線程來調(diào)度執(zhí)行runnable,runnable中的方法即在相應(yīng)線程中執(zhí)行。
如無這樣的顯式切換線程,代碼流(無論多少次方法遞歸調(diào)用)將在當(dāng)前線程一直執(zhí)行下去。同一線程,代碼總是順序的執(zhí)行。

Log.d("Current Thread", Thread.currentThread().getId() + "--NAME--" + Thread.currentThread().getName());

通過這行代碼可以打印出當(dāng)前在那一個(gè)線程。主線程的getName是main。

        new Thread(() -> {
                // Code block 1
                Log.d("Current Thread", Thread.currentThread().getId() + "--NAME--" + Thread.currentThread().getName());
                ...
                new Handler(Looper.getMainLooper()).post(new Runnable() {
                    @Override
                    public void run() {
                        // Code block 2
                        Log.d("Current Thread", Thread.currentThread().getId() + "--NAME--" + Thread.currentThread().getName());
                        ...
                    }
                });
        }).start();

這是一個(gè)通常的代碼形式
Code block 1處在一個(gè)子線程中執(zhí)行代碼,通過new Handler(Looper.getMainLooper()).post(...)向主線程拋入一個(gè)runnable,runnable進(jìn)入主線程消息隊(duì)列,然后等主線程消息隊(duì)列取出該runnable執(zhí)行時(shí),Code line 2處代碼即在主線程中執(zhí)行。
Code block 1與Code block 2在時(shí)間上并行執(zhí)行。線程池同理。

public class TaskScheduler<T> {
    public static void executeMain(Runnable runnable) { TaskManager.getIns().executeMain(runnable); }

    public static void executeTask(Runnable runnable) { TaskManager.getIns().executeTask(runnable); }

    public static void executeSingle(Runnable runnable) { TaskManager.getIns().executeSingle(runnable); }

    ...
}

通過單例簡(jiǎn)單包裝,實(shí)現(xiàn)目標(biāo)1、2、3

2. 線程切換

    /**
     * Switch thread
     * scheduler 線程枚舉,int類型: defaultThread、newThread、io、mainThread
     */
    public static void switchThread(@Scheduler int scheduler, final Runnable runnable) {
        if (scheduler == NEW_THREAD) {
            new Thread(() -> {
                    if (runnable != null) {
                        runnable.run();
                    }
            }).start();
            return;
        } else if (scheduler == IO) {
            TaskScheduler.executeTask(() -> {
                    if (runnable != null) {
                        runnable.run();
                    }
            });
            return;
        } else if (scheduler == MAIN_THREAD) {
            if (!isMainThread()) {
                TaskScheduler.executeMain(() -> {
                        if (runnable != null) {
                            runnable.run();
                        }
                });
                return;
            }
        }
        if (runnable != null) {
            runnable.run();
        }
    }

    public static boolean isMainThread() {
        return Looper.getMainLooper().getThread() == Thread.currentThread();
    }

3. 任務(wù)調(diào)度

3.1 開始前的準(zhǔn)備

我們先來定義3個(gè)接口

interface.png

然后是2個(gè)對(duì)應(yīng)的包裝類,后面會(huì)用到

Task -> TaskEmitter
Function -> FunctionEmitter

public class Emitter {
    public int scheduler;
}
public class TaskEmitter<T> extends Emitter {
    public Task<T> task;

    public TaskEmitter(Task<T> task, @Schedulers.Scheduler int scheduler) {
        this.task = task;
        this.scheduler = scheduler;
    }
}
public class FunctionEmitter<T, R> extends Emitter {
    public Function<? super T, ? extends R> function;

    public FunctionEmitter(Function<? super T, ? extends R> function, @Schedulers.Scheduler int scheduler) {
        this.function = function;
        this.scheduler = scheduler;
    }
}

3.2 Create

開始前,我們知道一些開源庫如Glide,慣用.with(...)形式,這種方式實(shí)質(zhì):靜態(tài)方法 + return new Instance(),
這里我們也用這種模式來開始create(...)。

實(shí)現(xiàn)分三步走

Step 1: Create

    public static <T> TaskScheduler<T> create(final Task<T> task) {
        TaskScheduler<T> schedulers = new TaskScheduler<T>();
        schedulers.task = task;
        return schedulers;
    }

創(chuàng)建TaskScheduler實(shí)例,持有 源任務(wù)task

    public TaskObserve<T> subscribeOn(@Schedulers.Scheduler int scheduler) {
        this.subscribeScheduler = scheduler;
        return new TaskObserve<T>(new TaskEmitter<T>(task, subscribeScheduler));
    }

指定 源任務(wù)task 執(zhí)行所在線程,丟棄當(dāng)前TaskScheduler實(shí)例。
源任務(wù)task、 線程枚舉 注入TaskEmitter后,返回新的實(shí)例TaskObserve,后續(xù)邏輯全由TaskObserve處理

Step 2: TaskObserve中間件

public static class TaskObserve<T> {
        private TaskEmitter taskEmitter;
        private List<FunctionEmitter> emitters;
        private int observeOnScheduler = Schedulers.defaultThread();

        TaskObserve(TaskEmitter<T> taskEmitter) {
            this.taskEmitter = taskEmitter;
            this.emitters = new ArrayList<>();
        }

        ...
}

TaskObserve: 中間件,初始和map轉(zhuǎn)換時(shí)生成,包含以下成員
taskEmitter: 源任務(wù)
emitters: 轉(zhuǎn)換隊(duì)列,map轉(zhuǎn)換時(shí)遞增
observeOnScheduler: 線程枚舉,observeOn觀察者所在線程,可重復(fù)調(diào)用,當(dāng)然只保留最后一次指定的線程

        TaskObserve(TaskObserve middle) {
            this.taskEmitter = middle.taskEmitter;
            this.observeOnScheduler = middle.observeOnScheduler;
            this.emitters = middle.emitters;
        }

        public <TR> TaskObserve<TR> map(Function<? super T, ? extends TR> f) {
            this.emitters.add(new FunctionEmitter<T, TR>(f, observeOnScheduler));
            return new TaskObserve<TR>(this);
        }

map轉(zhuǎn)換時(shí),將 轉(zhuǎn)換體Function 、當(dāng)前 線程枚舉 observeOnScheduler注入 FunctionEmitter ,添加到 轉(zhuǎn)換隊(duì)列
返回新的實(shí)例TaskObserve,丟棄當(dāng)前TaskObserve實(shí)例,新實(shí)例線程枚舉observeOnScheduler默認(rèn)為默認(rèn)線程

Step 3: Subscribe,才是開始!??!

核心思想

  1. 先執(zhí)行 源任務(wù) ,返回值
  2. 遞歸從 轉(zhuǎn)換隊(duì)列 取出 FunctionEmitter (含有轉(zhuǎn)換體、線程枚舉),Schedulers.switchThread(...)指定線程執(zhí)行,轉(zhuǎn)換返回值
  3. 轉(zhuǎn)換隊(duì)列 執(zhí)行盡,提交任務(wù),任務(wù)結(jié)束
        public void subscribe(final Observer<T> callback) {
            // 指定源任務(wù)線程枚舉
            Schedulers.switchThread(taskEmitter.scheduler, () -> {
                    try {
                        // 執(zhí)行源任務(wù)
                        Object t = taskEmitter.task.run();
                        // 轉(zhuǎn)換隊(duì)列是否為空
                        if (assertInterrupt(t)) {
                            // 轉(zhuǎn)換隊(duì)列空,提交本次任務(wù),任務(wù)結(jié)束
                            submit(t, callback);
                            return;
                        }
                        // 轉(zhuǎn)換隊(duì)列不為空,繼續(xù)轉(zhuǎn)換
                        apply(t, emitters, callback);
                    } catch (Throwable e) {
                        // 任務(wù)流拋出異常,即時(shí)中斷,任務(wù)結(jié)束
                        error(e, callback);
                    }
            });
        }
        private boolean assertInterrupt(Object emitter) throws Exception {
            if (emitter == null) {
                // 轉(zhuǎn)換返回值,不能為Null?。?!
                throw new RuntimeException("Apply output must not be null!");
            }
            return emitters.size() <= 0;
        }

assertInterrupt判斷當(dāng)前轉(zhuǎn)換隊(duì)列,是否執(zhí)行盡了

Step 3 - 1: Apply轉(zhuǎn)換隊(duì)列轉(zhuǎn)換

        private <E, F> void apply(final E o, final List<FunctionEmitter> emitters, final Observer<F> callback) {
            // 依次從轉(zhuǎn)換隊(duì)列取出FunctionEmitter,然后移除
            final FunctionEmitter<E, F> f = emitters.get(0);
            emitters.remove(f);
            // 指定當(dāng)前轉(zhuǎn)換線程枚舉
            Schedulers.switchThread(f.scheduler, () -> {
                    try {
                        // 轉(zhuǎn)換,返回轉(zhuǎn)換值
                        Object emitter = f.function.apply(o);
                        // 轉(zhuǎn)換隊(duì)列是否為空
                        if (assertInterrupt(emitter)) {
                            // 轉(zhuǎn)換隊(duì)列空,提交本次任務(wù),任務(wù)結(jié)束
                            submit(emitter, callback);
                            return;
                        }
                        // 轉(zhuǎn)換隊(duì)列不為空,繼續(xù)轉(zhuǎn)換
                        apply(emitter, emitters, callback);
                    } catch (Throwable e) {
                        // 任務(wù)流拋出異常,即時(shí)中斷,任務(wù)結(jié)束
                        error(e, callback);
                    }
            });
        }

Step 3 - 2: Submit提交

        private <S> void submit(final Object result, final Observer<S> callback) {
            // 指定當(dāng)前轉(zhuǎn)換線程枚舉,即當(dāng)前中間件線程枚舉observeOnScheduler
            Schedulers.switchThread(observeOnScheduler, () -> {
                    try {
                        if (callback != null) {
                            // 成功,任務(wù)結(jié)束
                            callback.onNext((S) result);
                        }
                    } catch (Throwable e) {
                        error(e, callback);
                    }
            });
        }

        private <S> void error(final Throwable e, final Observer<S> callback) {
            // 指定當(dāng)前轉(zhuǎn)換線程枚舉,即當(dāng)前中間件線程枚舉observeOnScheduler
            Schedulers.switchThread(observeOnScheduler, () -> {
                    if (callback != null) {
                        // 出錯(cuò),任務(wù)結(jié)束
                        callback.onError(e);
                    }
            });
        }

小結(jié):

泛型: java泛型屬于類型擦除,無論T、F還是R...,最終都是Object。
設(shè)計(jì): 這里的 任務(wù)流 實(shí)現(xiàn)方式為遞歸嵌套調(diào)用。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,506評(píng)論 19 139
  • Android 自定義View的各種姿勢(shì)1 Activity的顯示之ViewRootImpl詳解 Activity...
    passiontim閱讀 178,789評(píng)論 25 709
  • 作為一本被眾人追捧,被眾多知識(shí)型大咖認(rèn)可的暢銷書,提起想要閱讀它的心,并不難, 但真正開始讀下去,卻也不是件容易的...
    小夭生活館閱讀 1,631評(píng)論 0 2
  • 歲末豈無雪,有意故來遲。 寒夜燈帳里,斟酌古人詩。 【2015年1月24日】 ?
    d03e056874dc閱讀 238評(píng)論 0 0
  • 劉 娜 焦點(diǎn)解決網(wǎng)絡(luò)初級(jí)九期 駐馬店 2018~05~30 堅(jiān)持分享第95天 今天在天中晚報(bào)公眾微信號(hào)上看...
    洋帆起航閱讀 132評(píng)論 0 0

友情鏈接更多精彩內(nèi)容