AsyncTask 源碼淺析
作用
是圍繞Handler和Thread開發(fā)的幫助類,方便開發(fā)者在子線程執(zhí)行幾秒鐘以內(nèi)的耗時(shí)任務(wù),在主線程返回任務(wù)執(zhí)行的結(jié)果。簡(jiǎn)而言之,是對(duì)異步操作的封裝
四個(gè)重要回調(diào)方法
- onPreExecute()
- doInBackground()
- onProgressUpdate()
- onPostExecute()
只有doInBackground()在線程池執(zhí)行,其余方法在主線程執(zhí)行
執(zhí)行過(guò)程
1. 編譯期創(chuàng)建線程池
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
private static final int KEEP_ALIVE_SECONDS = 30;
private static final ThreadFactory sThreadFactory = new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
}
};
private static final BlockingQueue<Runnable>
sPoolWorkQueue =new LinkedBlockingQueue<Runnable>(128);
public static final Executor THREAD_POOL_EXECUTOR;
static {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAXIMUM_POOL_SIZE,
KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
sPoolWorkQueue, sThreadFactory);
//超過(guò)閑置等待時(shí)間后允許回收所有線程,包括核心線程
threadPoolExecutor.allowCoreThreadTimeOut(true);
THREAD_POOL_EXECUTOR = threadPoolExecutor;
}
//默認(rèn)的Executor
public static final Executor SERIAL_EXECUTOR = new SerialExecutor();
THREAD_POOL_EXECUTOR即AsyncTask的線程池之一,根據(jù)Math.max(2, Math.min(CPU_COUNT - 1, 4))設(shè)置線程池的核心線程數(shù)最小為2,最大為4,最大線程數(shù)為CPU_COUNT * 2 + 1,線程閑置等待時(shí)間為30秒,任務(wù)隊(duì)列LinkedBlockingQueue最大長(zhǎng)度為128,線程工廠設(shè)置線程的name為AsyncTask # i++
THREAD_POOL_EXECUTOR和SERIAL_EXECUTOR會(huì)被所有AsyncTask實(shí)例共用
2. 構(gòu)造方法
只做了兩件事,初始化mWorker(也就是Callable)和mFuture(也就是FutureTask) 屬于Java并發(fā)包下的兩個(gè)類,下面對(duì)他們進(jìn)行簡(jiǎn)單的介紹
-
Callable簡(jiǎn)單來(lái)說(shuō)就是可以拋出異常的,有返回值的Runnable -
FutureTask擴(kuò)展了RunnableFuture,而RunnableFuture又?jǐn)U展了Runnable和Future。將會(huì)執(zhí)行構(gòu)造方法傳入的Callable,并取得返回值
雖然這兩個(gè)類里的方法還沒(méi)有被調(diào)用
但是我們卻可以很清晰的看見整個(gè)AsyncTask的工作流程了
public AsyncTask() {
//實(shí)現(xiàn)了Callable
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
//標(biāo)識(shí)有任務(wù)被調(diào)用了
mTaskInvoked.set(true);
Result result = null;
try {
//標(biāo)準(zhǔn)的后臺(tái)線程優(yōu)先級(jí),優(yōu)先級(jí)不是太高
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
//調(diào)用我們的四個(gè)重要抽象方法之二
result = doInBackground(mParams);
Binder.flushPendingCommands();
} catch (Throwable tr) {
mCancelled.set(true);
throw tr;
} finally {
//Handler調(diào)用,暫且按下不表
postResult(result);
}
return result;
}
};
mFuture = new FutureTask<Result>(mWorker) {
@Override
protected void done() {
try {
//處理未能順利完成調(diào)用的情況
postResultIfNotInvoked(get());
} catch (InterruptedException e) {
android.util.Log.w(LOG_TAG, e);
} catch (ExecutionException e) {
throw new RuntimeException("An error occurred while executing doInBackground()",
e.getCause());
} catch (CancellationException e) {
postResultIfNotInvoked(null);
}
}
};
}
3. executeOnExecutor
所有的執(zhí)行命令最終都會(huì)調(diào)用這個(gè)方法
所以是我們開始使用AsyncTask的第一站
這個(gè)方法只能在主線程調(diào)用
@MainThread
public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
Params... params) {
if (mStatus != Status.PENDING) {
switch (mStatus) {
case RUNNING:
throw new IllegalStateException("Cannot execute task:"
+ " the task is already running.");
case FINISHED:
throw new IllegalStateException("Cannot execute task:"
+ " the task has already been executed "
+ "(a task can be executed only once)");
}
}
//設(shè)置狀態(tài)為RUNNING
mStatus = Status.RUNNING;
//調(diào)用我們四個(gè)重要抽象方法之一
onPreExecute();
//把Params傳給我們實(shí)現(xiàn)Caller的Worker
mWorker.mParams = params;
//調(diào)用Executor執(zhí)行,同時(shí)把我們的FutureTask傳進(jìn)去
//默認(rèn)的Executor是在編譯器就初始化好的SERIAL_EXECUTOR,串行執(zhí)行
exec.execute(mFuture);
return this;
}
接下來(lái)會(huì)調(diào)用Executor.execute()
下面看一下我們默認(rèn)的Executor,也就是SerialExecutor
private static class SerialExecutor implements Executor {
//線性雙端隊(duì)列
final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
Runnable mActive;
public synchronized void execute(final Runnable r) {
//提交一個(gè)Runnable到雙端隊(duì)列
mTasks.offer(new Runnable() {
public void run() {
try {
//運(yùn)行我們的mFuture,忘記了這是啥?回去看第二部分
r.run();
} finally {
//調(diào)度下一個(gè)
scheduleNext();
}
}
});
//如果沒(méi)有正在活動(dòng)的任務(wù),就調(diào)度下一個(gè)
if (mActive == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
if ((mActive = mTasks.poll()) != null) {
//終于提交到線程池執(zhí)行了,THREAD_POOL_EXECUTOR是啥?忘了的話去看第一部分
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
}
4. THREAD_POOL_EXECUTOR中的調(diào)用
這一部分其實(shí)已經(jīng)超出了我們分析的范圍了,所以 我們不會(huì)去關(guān)心線程池的實(shí)現(xiàn),而是理一理調(diào)用流程就好
記住了,我們execute到線程池的Runnable,是poll到ArrayDeque的Runnable,這個(gè)Runnable中調(diào)用了mFuture.run(),
我們關(guān)心的是傳到線程池的Runnable
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
這個(gè)方法沒(méi)啥說(shuō)的,就是做一下檢查,然后調(diào)用addWorker(),這個(gè)方法里面,會(huì)找到一個(gè)由ThreadFactory創(chuàng)建的線程,這個(gè)第一部分我們有交代,然后在這個(gè)線程中調(diào)用Runnable.run(),最終調(diào)用到mFurture.run( )
public void run() {
if (state != NEW ||
!U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
return;
try {
//構(gòu)造方法傳進(jìn)來(lái)的callable,對(duì)應(yīng)于AsyncTask中的mWorker,我們第二部分有講
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//調(diào)用mWorker.call(),并且取得返回值。我們第二部分有講
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
這里面都是在子線程操作了哈,調(diào)用了mWorker.call(),回調(diào)了四個(gè)重要抽象方法的doInBackground()哈,然后這個(gè)方法的返回值作為result,這個(gè)result交給Handler了咯,完了后調(diào)用mFuture.done()
5. Handler中的調(diào)用
先看看mWorker.call()中的postResult(result);
這里面都是在子線程執(zhí)行的,所以用Handler返回主線程
private Result postResult(Result result) {
@SuppressWarnings("unchecked")
//result是我們?cè)赿oInBackground()返回的值
Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
new AsyncTaskResult<Result>(this, result));
message.sendToTarget();
return result;
}
private static Handler getHandler() {
synchronized (AsyncTask.class) {
if (sHandler == null) {
sHandler = new InternalHandler();
}
return sHandler;
}
}
private static class InternalHandler extends Handler {
public InternalHandler() {
super(Looper.getMainLooper());
}
@SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
@Override
public void handleMessage(Message msg) {
AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
switch (msg.what) {
case MESSAGE_POST_RESULT:
result.mTask.finish(result.mData[0]);
break;
case MESSAGE_POST_PROGRESS:
result.mTask.onProgressUpdate(result.mData);
break;
}
}
}
case MESSAGE_POST_RESULT: 調(diào)用了finish()
private void finish(Result result) {
if (isCancelled()) {
onCancelled(result);
} else {
onPostExecute(result);
}
mStatus = Status.FINISHED;
}
最后調(diào)用onPostExecute(result),整個(gè)流程結(jié)束
publishProgress()還是借助的AsyncTaskResult<DATA>,沒(méi)什么好分析的,但是代碼還是貼出來(lái)
protected final void publishProgress(Progress... values) {
if (!isCancelled()) {
getHandler().obtainMessage(MESSAGE_POST_PROGRESS,
new AsyncTaskResult<Progress>(this, values)).sendToTarget();
}
}
總結(jié)
AsyncTask利用Java并發(fā)包下的FutureTask來(lái)封裝了一次請(qǐng)求的信息,SerialExecutor來(lái)保證串行執(zhí)行,利用THREAD_POOL_EXECUTOR線程池在子線程執(zhí)行任務(wù),用Handler返回主線程
線程池中子線程執(zhí)行的是在SerialExecutor中添加到ArrayDeque的Runnable,這里面會(huì)串行的調(diào)用Runnable中的mFuture.run(),mFuture.run()會(huì)調(diào)用mWorker.call(),mWorker.call()會(huì)回調(diào)doInBackground()并且取得返回值
優(yōu)點(diǎn)
- 足夠的輕巧,全局維護(hù)僅一個(gè)線性的隊(duì)列,一個(gè)線程池
- 相比
Handler來(lái)說(shuō),足夠的方便 - 編寫好一個(gè)AsyncTask后,有一定的復(fù)用性
缺點(diǎn)
- 需要繼承
AsyncTask,如果它作為Activity的內(nèi)部類,要小心內(nèi)存泄漏 - 雖然提供了一個(gè)API來(lái)取消一次請(qǐng)求,但是不一定能夠取消掉
- 一次事件的起因和結(jié)果是耦合在一起的
- 除了足夠輕巧,都不如
RxJava,體現(xiàn)在你無(wú)法合理的控制流程,怎么合并多個(gè)任務(wù)?怎么串聯(lián)多個(gè)任務(wù)?怎么拆分多個(gè)任務(wù)?流程中出現(xiàn)錯(cuò)誤怎么辦?
終上所述,AsyncTask的使用價(jià)值以及不足夠明顯,我們不需要再使用它了