參考
- 本文源碼版本:
Pie 9.0.0_r3 - 在線源碼地址:AsyncTask.java
1 AsyncTask簡單用法
// 三個(gè)泛型參數(shù)分別代表傳入的參數(shù)類型,任務(wù)執(zhí)行過程需要更新的數(shù)據(jù)類型,任務(wù)執(zhí)行結(jié)束返回的結(jié)果類型,如果無類型則可以用Void類
AsyncTask<Integer,Integer,Void> asyncTask = new AsyncTask<Integer, Integer, Void>() {
/**
* 得到結(jié)果,在主線程執(zhí)行
* @param aVoid
*/
@Override
protected void onPostExecute(Void aVoid) {
Log.d(TAG, "onPostExecute: >>>");
super.onPostExecute(aVoid);
}
/**
* 任務(wù)內(nèi)容,在工作線程執(zhí)行
* @param integers
* @return
*/
@Override
protected Void doInBackground(Integer... integers) {
Log.d(TAG, "doInBackground: >>>params: "+Arrays.toString(integers));
return null;
}
/**
* 任務(wù)執(zhí)行前,在主線程執(zhí)行
*/
@Override
protected void onPreExecute() {
Log.d(TAG, "onPreExecute: >>");
super.onPreExecute();
}
/**
* 任務(wù)已取消(帶結(jié)果),在主線程執(zhí)行
* @param aVoid
*/
@Override
protected void onCancelled(Void aVoid) {
super.onCancelled(aVoid);
Log.d(TAG, "onCancelled(有參): >>>");
}
/**
* 任務(wù)已取消,在主線程執(zhí)行
*/
@Override
protected void onCancelled() {
super.onCancelled();
Log.d(TAG, "onCancelled: >>>");
}
/**
* 指定過程更新的數(shù)據(jù),在主線程執(zhí)行
* @param values
*/
@Override
protected void onProgressUpdate(Integer... values) {
super.onProgressUpdate(values);
Log.d(TAG, "onProgressUpdate: "+ Arrays.toString(values));
}
};
asyncTask.execute(2,3,1);
執(zhí)行結(jié)果:
06-20 18:48:44.440 2761-2761/me.newtrekwang.customwidget D/TaskLibActivity: onPreExecute: >>
06-20 18:48:44.441 2761-2808/me.newtrekwang.customwidget D/TaskLibActivity: doInBackground: >>>params: [2, 3, 1]
06-20 18:48:44.454 2761-2761/me.newtrekwang.customwidget D/TaskLibActivity: onPostExecute: >>>
2 源碼解讀
2.1 線程池大小相關(guān)
/**
* cpu核心數(shù)
*/
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
/**
* 核心線程:至少兩個(gè)線程,最多4個(gè)線程。
*/
private static final int CORE_POOL_SIZE = Math.max(2,Math.min(CPU_COUNT-1,4));
/**
* 最多線程數(shù)
*/
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
/**
* 多余線程存活時(shí)間
*/
private static final int KEEP_ALIVE_SECONDS = 30;
怎么分配線程池合理?
一般來說,設(shè)N為CPU核數(shù)。
- 如果是CPU密集型應(yīng)用,則線程池大小設(shè)置為
N+1. - 如果是IO密集型應(yīng)用,則線程池大小設(shè)置為
N*2+1
IO密集型
I/O bound 指的是系統(tǒng)的CPU效能相對(duì)硬盤/內(nèi)存的效能要好很多,此時(shí),系統(tǒng)運(yùn)作,大部分的狀況是 CPU 在等 I/O (硬盤/內(nèi)存) 的讀/寫,此時(shí) CPU Loading 不高。
CPU密集型
CPU bound 指的是系統(tǒng)的 硬盤/內(nèi)存 效能 相對(duì) CPU 的效能 要好很多,此時(shí),系統(tǒng)運(yùn)作,大部分的狀況是 CPU Loading 100%,CPU 要讀/寫 I/O (硬盤/內(nèi)存),I/O在很短的時(shí)間就可以完成,而 CPU 還有許多運(yùn)算要處理,CPU Loading 很高。
Android 應(yīng)用的話應(yīng)該是屬于IO密集型應(yīng)用,所以數(shù)量一般設(shè)置為 2N+1,AsyncTask里執(zhí)行任務(wù)的線程池也是這樣設(shè)置的。
2.2 兩個(gè)線程池
在AsyncTask中有兩個(gè)線程池,一個(gè)線程池(SerialExecutor)用于處理任務(wù)列表,一個(gè)線程池(ThreadPoolExecutor)用于執(zhí)行任務(wù)。
構(gòu)建用于執(zhí)行任務(wù)的ThreadPoolExecutor
/**
* 處理任務(wù)的線程池
*/
public static final Executor THREAD_POOL_EXECUTOR;
/**
* 任務(wù)隊(duì)列
*/
private static final BlockingQueue<Runnable> sPoolWorkQueue = new LinkedBlockingDeque<>(128);
/**
* 線程工廠
*/
private static final ThreadFactory sThreadFactory = new ThreadFactory() {
/**
* 線程安全的計(jì)數(shù)器
*/
private final AtomicInteger mCount = new AtomicInteger(1);
@Override
public Thread newThread(@NonNull Runnable runnable) {
return new Thread(runnable,"AsyncTask #"+mCount.getAndIncrement());
}
};
static {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE,MAXIMUM_POOL_SIZE,KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,sPoolWorkQueue,sThreadFactory
);
threadPoolExecutor.allowCoreThreadTimeOut(true);
THREAD_POOL_EXECUTOR = threadPoolExecutor;
}
構(gòu)建用于任務(wù)排隊(duì)的SerialExecutor
/**
* 串行任務(wù)執(zhí)行器
*/
private static final Executor SERIAL_EXECUTOR = new SerialExecutor();
/**
* 默認(rèn)任務(wù)執(zhí)行器
*/
private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
/**
* @className WangAsyncTask
* @createDate 2019/6/20 17:52
* @author newtrekWang
* @email 408030208@qq.com
* @desc 串行任務(wù)執(zhí)行器類
*
*/
private static class SerialExecutor implements Executor{
final ArrayDeque<Runnable> mTasks = new ArrayDeque<>();
Runnable mActive;
@Override
public synchronized void execute(@NonNull final Runnable runnable) {
// 入隊(duì)一個(gè)runnable,對(duì)原始的runnable加了點(diǎn)修飾
mTasks.offer(new Runnable() {
@Override
public void run() {
try {
runnable.run();
} finally {
// 任務(wù)執(zhí)行后,要檢查是否有下一個(gè)runnable需要執(zhí)行
scheduleNext();
}
}
});
// 第一次的時(shí)候需要觸發(fā)下才能執(zhí)行
if (mActive == null){
scheduleNext();
}
}
/**
* 檢查是否有下一個(gè)runnable需要執(zhí)行,如果有,則交給另一個(gè)線程池執(zhí)行
*/
protected synchronized void scheduleNext(){
if ((mActive = mTasks.poll()) != null){
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
}
為什么用兩個(gè)線程池?
因?yàn)锳syncTask是設(shè)計(jì)為串行執(zhí)行任務(wù)的,所以另外最好需要一個(gè)線程池負(fù)責(zé)任務(wù)的排隊(duì)。
2.3 任務(wù)執(zhí)行流程
2.3.1 構(gòu)造AsyncTask
public AsyncTask(@Nullable Looper callbackLooper) {
mHandler = callbackLooper == null || callbackLooper == Looper.getMainLooper()
? getMainHandler()
: new Handler(callbackLooper);
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
mTaskInvoked.set(true);
Result result = null;
try {
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
//noinspection unchecked
result = doInBackground(mParams);
Binder.flushPendingCommands();
} catch (Throwable tr) {
mCancelled.set(true);
throw tr;
} finally {
postResult(result);
}
return result;
}
};
mFuture = new FutureTask<Result>(mWorker) {
@Override
protected void done() {
try {
postResultIfNotInvoked(get());
} catch (InterruptedException e) {
android.util.Log.w(LOG_TAG, e);
} catch (ExecutionException e) {
throw new RuntimeException("An error occurred while executing doInBackground()",
e.getCause());
} catch (CancellationException e) {
postResultIfNotInvoked(null);
}
}
};
}
AsynckTask有關(guān)鍵的三個(gè)成員:mHandler,mWorker,mFuture
mHandler
用于線程間通信,將工作線程的消息傳遞到主線程并做出處理,也就是實(shí)現(xiàn)AsyncTask過程數(shù)據(jù)回調(diào),結(jié)果回調(diào)在主線程的關(guān)鍵。
private static Handler getMainHandler() {
synchronized (AsyncTask.class) {
if (sHandler == null) {
sHandler = new InternalHandler(Looper.getMainLooper());
}
return sHandler;
}
}
然后再看InternalHandler類:
private static class InternalHandler extends Handler {
public InternalHandler(Looper looper) {
super(looper);
}
@SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
@Override
public void handleMessage(Message msg) {
AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
switch (msg.what) {
case MESSAGE_POST_RESULT:
// There is only one result
result.mTask.finish(result.mData[0]);
break;
case MESSAGE_POST_PROGRESS:
result.mTask.onProgressUpdate(result.mData);
break;
}
}
}
private static class AsyncTaskResult<Data> {
final AsyncTask mTask;
final Data[] mData;
AsyncTaskResult(AsyncTask task, Data... data) {
mTask = task;
mData = data;
}
}
AsyncTaskResult里包含的AsynTask對(duì)象的引用。
Handler可以從收到的msg里得到統(tǒng)一的AsynctTaskResult數(shù)據(jù)對(duì)象,然后根據(jù)消息類型,進(jìn)行處理。
比如MESSAGE_POST_RESULT是工作線程傳的已完成任務(wù)標(biāo)志,然后此時(shí)AsyncTask的結(jié)束回調(diào)方法應(yīng)該被調(diào)用,通過result.mTask.finish(result.mData[0])既可以實(shí)現(xiàn)結(jié)束回調(diào),使AsyncTask使用者可以從回調(diào)種拿到結(jié)果。
private void finish(Result result) {
if (isCancelled()) {
onCancelled(result);
} else {
onPostExecute(result);
}
mStatus = Status.FINISHED;
}
另外MESSAGE_POST_PROGRESS就是過程中工作線程傳的更新消息。也一樣通過result.mTask.onProgressUpdate(result.mData)實(shí)現(xiàn)在Handler所在線程更新過程數(shù)據(jù)。
mWorker
mWorker就是一個(gè)Callable,即有返回結(jié)果的Runable,而它的實(shí)現(xiàn)類WorkerRunnable還帶上了任務(wù)參數(shù)Params。
private static abstract class WorkerRunnable<Params, Result> implements Callable<Result> {
Params[] mParams;
}
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
mTaskInvoked.set(true);
Result result = null;
try {
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
//noinspection unchecked
result = doInBackground(mParams);
Binder.flushPendingCommands();
} catch (Throwable tr) {
mCancelled.set(true);
throw tr;
} finally {
postResult(result);
}
return result;
}
};
通過上面代碼可以知道,用戶實(shí)現(xiàn)的doInBackground(mParams)方法就是在這里調(diào)用的,相當(dāng)于是把用戶的業(yè)務(wù)代碼包裝成一個(gè)Callable,然后再包裝成FutureTask,接著將FutureTask提交給線程池,這樣來實(shí)現(xiàn)在工作線程執(zhí)行用戶定義的代碼塊。
然后具體分析call()方法:首先mTaskInvoked是一個(gè)AtomicBoolean對(duì)象,它能保證線程安全地更新boolean值,mTaskInvoked.set(true)表示該AsyncTask對(duì)象已被調(diào)用。然后就是設(shè)置當(dāng)前線程的優(yōu)先級(jí)Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND)。然后就是調(diào)用用戶實(shí)現(xiàn)的doInBackground(mParams)方法。然后Binder.flushPendingCommands();是一個(gè)native方法,應(yīng)該是重新調(diào)整線程優(yōu)先級(jí)的。然后是捕獲異常,最后是通過postResult(result)提交結(jié)果到主線程。
// 提交結(jié)果到Handler所在線程
private Result postResult(Result result) {
@SuppressWarnings("unchecked")
Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
new AsyncTaskResult<Result>(this, result));
message.sendToTarget();
return result;
}
// 提交更新數(shù)據(jù)到Handler所在線程
protected final void publishProgress(Progress... values) {
if (!isCancelled()) {
getHandler().obtainMessage(MESSAGE_POST_PROGRESS,
new AsyncTaskResult<Progress>(this, values)).sendToTarget();
}
}
mFuture
如果單單是提交callable到線程池執(zhí)行有callable就完事了,但是還需要支持任務(wù)的取消操作,那么這個(gè)功能就需要FutureTask了。
mFuture = new FutureTask<Result>(mWorker) {
@Override
protected void done() {
try {
postResultIfNotInvoked(get());
} catch (InterruptedException e) {
android.util.Log.w(LOG_TAG, e);
} catch (ExecutionException e) {
throw new RuntimeException("An error occurred while executing doInBackground()",
e.getCause());
} catch (CancellationException e) {
postResultIfNotInvoked(null);
}
}
};
mFuture就是mWorker的一個(gè)包裝類,它也具體實(shí)現(xiàn)了Future的一些任務(wù)操作接口,比如取消任務(wù),mFuture重寫了done()方法,應(yīng)該是考慮到AsyncTask的get()和cancel()方法內(nèi)部出異常時(shí)對(duì)結(jié)果的處理。
至于FutureTask是怎樣實(shí)現(xiàn)取消和get()阻塞得到結(jié)果的,我會(huì)在另一篇文字做介紹。
2.3.2 execute():執(zhí)行AsyncTask
execute(Params... params)是AsyncTask為使用者提供的API,它的具體實(shí)現(xiàn)如下:
關(guān)鍵代碼:
@MainThread
public final AsyncTask<Params, Progress, Result> execute(Params... params) {
return executeOnExecutor(sDefaultExecutor, params);
}
@MainThread
public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
Params... params) {
if (mStatus != Status.PENDING) {
switch (mStatus) {
case RUNNING:
throw new IllegalStateException("Cannot execute task:"
+ " the task is already running.");
case FINISHED:
throw new IllegalStateException("Cannot execute task:"
+ " the task has already been executed "
+ "(a task can be executed only once)");
}
}
mStatus = Status.RUNNING;
onPreExecute();
mWorker.mParams = params;
exec.execute(mFuture);
return this;
}
可以看出executor方法實(shí)際及時(shí)調(diào)用executeOnExecutor方法,如果用AsyncTask默認(rèn)的串行執(zhí)行任務(wù)的線程池就用executor(),默認(rèn)線程池就是前面提到的sDefaultExecutor,如果是自定義線程池,就用executeOnExecutor()。
然后看executeOnExecutor方法:首先會(huì)判斷status,如果是RUNNING和FINISHED,會(huì)拋異常,也驗(yàn)證了一個(gè)AsyncTask只能被執(zhí)行一次。
然后更新狀態(tài)為RUNNING,調(diào)用onPreExecute(),因?yàn)閑xecute()是在主線程調(diào)用的,此時(shí)onPreExecute()就是在主線程執(zhí)行的。然后將用戶傳的mParams注入mWorker,最后再將future提交到線程池執(zhí)行。
通過這些代碼分析可以得出:一個(gè)任務(wù)對(duì)應(yīng)一個(gè)AsyncTask,一個(gè)AsyncTask對(duì)應(yīng)一個(gè)Worker,一個(gè)Worker對(duì)應(yīng)一個(gè)FutureTask,然后多個(gè)AsyncTask共用兩個(gè)默認(rèn)的線程池和InternalHandler。
大致類關(guān)系圖

結(jié)語
分析到這里,AsyncTask其實(shí)也并不復(fù)雜,它始終還是用的線程池+Handler機(jī)制來設(shè)計(jì)的,只要理解了其中的設(shè)計(jì)步驟,我們自己也可以定義一個(gè)AsyncTask。
平時(shí)在業(yè)務(wù)開發(fā)中根本就接觸不到并發(fā)編程知識(shí),只知道使用別人的框架,這對(duì)于個(gè)人技術(shù)提高并沒有什么進(jìn)步。
在AsyncTask中我就了解到了:
- 線程池的線程數(shù)該如何設(shè)置?怎樣定義線程池?
- 怎樣使用線程池?
- Handler機(jī)制
- FutureTask,RunnableFuture,Callable,Runnable,F(xiàn)uture之間的關(guān)系
- 怎樣實(shí)現(xiàn)任務(wù)的取消?
感覺這些知識(shí)點(diǎn)都是通用的,理解了這些問題后再去看其它框架源碼或設(shè)計(jì)一個(gè)框架,自己心里也有一個(gè)方案。