AQS實現(xiàn)一個簡單的FutureTask FutureTask的get方法實現(xiàn)

FutureTask的get方法實現(xiàn):

1、允許多個線程get這個結(jié)果

2、多個線程get這個結(jié)果時,可能任務(wù)還沒運行完。

3、任務(wù)運行完成后才能拿到結(jié)果,而且這個時候要讓get結(jié)果的多個線程都可以拿到結(jié)果

/**

* FutureTask的get方法實現(xiàn):

* 1、允許多個線程get這個結(jié)果

* 2、多個線程get這個結(jié)果時,可能任務(wù)還沒運行完。

* 3、任務(wù)運行完成后才能拿到結(jié)果,而且這個時候要讓get結(jié)果的多個線程都可以拿到結(jié)果

*/

public class MyFutureTask<V> implements Runnable, Future<V> {

? ? private final Sync sync;

? ? public MyFutureTask(Callable<V> callable) {

? ? ? ? if (callable == null) {

? ? ? ? ? ? throw new NullPointerException();

? ? ? ? }

? ? ? ? sync = new Sync(callable);

? ? }

? ? private final class Sync extends AbstractQueuedSynchronizer {

? ? ? ? /**

? ? ? ? * 表示任務(wù)正在執(zhí)行

? ? ? ? */

? ? ? ? private static final int RUNNING = 1;

? ? ? ? /**

? ? ? ? * 表示任務(wù)已經(jīng)運行完畢

? ? ? ? */

? ? ? ? private static final int RAN = 2;

? ? ? ? /**

? ? ? ? * 執(zhí)行結(jié)果

? ? ? ? */

? ? ? ? private V result;

? ? ? ? private Callable<V> callable;

? ? ? ? public Sync(Callable<V> callable) {

? ? ? ? ? ? super();

? ? ? ? ? ? this.callable = callable;

? ? ? ? }

? ? ? ? /*任務(wù)沒完成,讓get結(jié)果的線程全部進(jìn)入同步隊列

? ? ? ? * acquireShared方法返回了,說明可以拿結(jié)果了,直接返回結(jié)果*/

? ? ? ? V innerGet() throws InterruptedException, ExecutionException {

? ? ? ? ? ? acquireShared(0);

? ? ? ? ? ? return result; // 成功執(zhí)行完成,返回執(zhí)行結(jié)果。

? ? ? ? }

? ? ? ? /*對任務(wù)的狀態(tài)進(jìn)行變化,設(shè)置執(zhí)行結(jié)果,并喚醒所有等待結(jié)果的線程*/

? ? ? ? void innerSet(V v) {

? ? ? ? ? ? for (; ; ) {

? ? ? ? ? ? ? ? int s = getState(); // 獲取任務(wù)執(zhí)行狀態(tài)。

? ? ? ? ? ? ? ? if (s == RAN) {

? ? ? ? ? ? ? ? ? ? return; // 如果任務(wù)已經(jīng)執(zhí)行完畢,退出。

? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? // 嘗試將任務(wù)狀態(tài)設(shè)置為執(zhí)行完成。

? ? ? ? ? ? ? ? if (compareAndSetState(s, RAN)) {

? ? ? ? ? ? ? ? ? ? result = v; // 設(shè)置執(zhí)行結(jié)果。

? ? ? ? ? ? ? ? ? ? releaseShared(0); // 釋放控制權(quán)。

? ? ? ? ? ? ? ? ? ? return;

? ? ? ? ? ? ? ? }

? ? ? ? ? ? }

? ? ? ? }

? ? ? ? @Override

? ? ? ? protected boolean tryReleaseShared(int releases) {

? ? ? ? ? ? return true;

? ? ? ? }

? ? ? ? /*任務(wù)沒完成,返回-1,讓get結(jié)果的線程全部進(jìn)入同步隊列

? ? ? ? * 返回1,可以讓所有在同步隊列上等待的線程一一去拿結(jié)果*/

? ? ? ? @Override

? ? ? ? protected int tryAcquireShared(int acquires) {

? ? ? ? ? ? return this.getState() == RAN ? 1 : -1;

? ? ? ? }

? ? ? ? void innerRun() {

? ? ? ? ? ? if (this.compareAndSetState(0, RUNNING)) {

? ? ? ? ? ? ? ? if (this.getState() == RUNNING) {//再檢查一次,雙重保障

? ? ? ? ? ? ? ? ? ? try {

? ? ? ? ? ? ? ? ? ? ? ? /*將call()方法的執(zhí)行結(jié)果賦值給Sync中的result*/

? ? ? ? ? ? ? ? ? ? ? ? this.innerSet(this.callable.call());

? ? ? ? ? ? ? ? ? ? } catch (Exception e) {

? ? ? ? ? ? ? ? ? ? ? ? e.printStackTrace();

? ? ? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? ? } else {

? ? ? ? ? ? ? ? ? ? /*如果不等于RUNNING,表示被取消或者是拋出了異常。這時候喚醒調(diào)用get的線程。*/

? ? ? ? ? ? ? ? ? ? this.releaseShared(0);

? ? ? ? ? ? ? ? }

? ? ? ? ? ? }

? ? ? ? }

? ? }

? ? @Override

? ? public void run() {

? ? ? ? this.sync.innerRun();

? ? }

? ? @Override

? ? public boolean cancel(boolean mayInterruptIfRunning) {

? ? ? ? throw new UnsupportedOperationException();

? ? }

? ? @Override

? ? public boolean isCancelled() {

? ? ? ? throw new UnsupportedOperationException();

? ? }

? ? @Override

? ? public boolean isDone() {

? ? ? ? throw new UnsupportedOperationException();

? ? }

? ? @Override

? ? public V get() throws InterruptedException, ExecutionException {

? ? ? ? return this.sync.innerGet();

? ? }

? ? @Override

? ? public V get(long timeout, TimeUnit unit)

? ? ? ? ? ? throws InterruptedException, ExecutionException,

? ? ? ? ? ? TimeoutException {

? ? ? ? throw new UnsupportedOperationException();

? ? }

}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容