Java中Runnable、Callable、Future的介紹

本文主要是為了介紹多線程中使用的幾種任務(wù):Runnable、Callable、FutureTask等,是對(duì)前面多線程系列的最后一個(gè)補(bǔ)充了,接下來兩篇就是相當(dāng)于實(shí)戰(zhàn)練習(xí)了。

Runnable 和 Callable的區(qū)別

Runnable和Callable都是定義了接口,可以用在線程池中異步執(zhí)行,區(qū)別是:

  • Runnable可以直接被Thread執(zhí)行,但是沒有返回值
  • Callable執(zhí)行之后有返回值,但是只能提交給線程池執(zhí)行。

Future 和 FutureTask

Future是一個(gè)接口,主要是線程池中任務(wù)執(zhí)行之后用于返回結(jié)果的獲取,定義了

  • boolean cancel(boolean mayInterruptIfRunning); 取消任務(wù)
  • boolean isCancelled(); 任務(wù)是否取消
  • boolean isDone(); 任務(wù)是否執(zhí)行完畢
  • V get(); 獲取任務(wù)執(zhí)行的結(jié)果,注意這個(gè)方法阻塞線程
  • V get(long timeout, TimeUnit unit); 同上,只是增加了一個(gè)超時(shí)時(shí)間

Future有一個(gè)直接繼承接口RunnableFuture,RunnableFuture有一個(gè)實(shí)現(xiàn)的子類FutureTask,RunnableFuture這個(gè)接口同時(shí)還繼承了Runnable接口,這意味著FutureTask可以作為Future或者Runnable使用。

再來看一下FutureTask的實(shí)現(xiàn),最終內(nèi)部保存了一個(gè)Callable對(duì)象,也就是提交的任務(wù)

先看構(gòu)造函數(shù)

public FutureTask(Callable<V> callable) {                    
    if (callable == null)                                    
        throw new NullPointerException();                    
    this.callable = callable;                                
    this.state = NEW;       // ensure visibility of callable 
}                                                            

...

/**                                                                    
 * Creates a {@code FutureTask} that will, upon running, execute the   
 * given {@code Runnable}, and arrange that {@code get} will return the
 * given result on successful completion.                              
 *                                                                     
 * @param runnable the runnable task                                   
 * @param result the result to return on successful completion. If     
 * you don't need a particular result, consider using                  
 * constructions of the form:                                          
 * {@code Future<?> f = new FutureTask<Void>(runnable, null)}          
 * @throws NullPointerException if the runnable is null                
 */                                                                    
public FutureTask(Runnable runnable, V result) {                       
    this.callable = Executors.callable(runnable, result);              
    this.state = NEW;       // ensure visibility of callable           
}                                                                      

一共2個(gè)構(gòu)造函數(shù),一個(gè)是接受Callable,一個(gè)是接受Runnable和默認(rèn)返回值。

詳細(xì)看一下第二個(gè)構(gòu)造參數(shù),注釋很清楚的說明,當(dāng)你需要runnable可取消同時(shí)不關(guān)心返回值時(shí),可以這樣構(gòu)建

Future<?> f = new FutureTask<Void>(runnable, null);

同時(shí)構(gòu)造函數(shù)里,Runnable被適配成了一個(gè)Callable,看一下里面的實(shí)現(xiàn):

/**                                                                
 * Returns a {@link Callable} object that, when                    
 * called, runs the given task and returns the given result.  This 
 * can be useful when applying methods requiring a                 
 * {@code Callable} to an otherwise resultless action.             
 * @param task the task to run                                     
 * @param result the result to return                              
 * @param <T> the type of the result                               
 * @return a callable object                                       
 * @throws NullPointerException if task null                       
 */                                                                
public static <T> Callable<T> callable(Runnable task, T result) {  
    if (task == null)                                              
        throw new NullPointerException();                          
    return new RunnableAdapter<T>(task, result);                   
}     

...

/**                                                            
 * A callable that runs given task and returns given result    
 */                                                            
static final class RunnableAdapter<T> implements Callable<T> { 
    final Runnable task;                                       
    final T result;                                            
    RunnableAdapter(Runnable task, T result) {                 
        this.task = task;                                      
        this.result = result;                                  
    }                                                          
    public T call() {                                          
        task.run();                                            
        return result;                                         
    }                                                          
}                                                              
                                                             

上面兩個(gè)函數(shù)將一個(gè)Runnable適配成了一個(gè)Callable,是Executors中提供的靜態(tài)方法。

再看一下FutureTask對(duì)Runnable的實(shí)現(xiàn)

public void run() {                                                    
    if (state != NEW ||                                                
        !UNSAFE.compareAndSwapObject(this, runnerOffset,               
                                     null, Thread.currentThread()))    
        return;                                                        
    try {                                                              
        Callable<V> c = callable;                                      
        if (c != null && state == NEW) {                               
            V result;                                                  
            boolean ran;                                               
            try {                                                      
                result = c.call();                                     
                ran = true;                                            
            } catch (Throwable ex) {                                   
                result = null;                                         
                ran = false;                                           
                setException(ex);                                      
            }                                                          
            if (ran)                                                   
                set(result);                                           
        }                                                              
    } finally {                                                        
        // runner must be non-null until state is settled to           
        // prevent concurrent calls to run()                           
        runner = null;                                                 
        // state must be re-read after nulling runner to prevent       
        // leaked interrupts                                           
        int s = state;                                                 
        if (s >= INTERRUPTING)                                         
            handlePossibleCancellationInterrupt(s);                    
    }                                                                  
}                                                                      

拋開其他的判斷條件,其實(shí)就是對(duì)內(nèi)部保存的Callable調(diào)用了call方法,進(jìn)行執(zhí)行并保存結(jié)果。這就是FutureTask主要的幾個(gè)方法,下面有用。

ExecutorService中Future的應(yīng)用

上面2點(diǎn)主要是為了給這點(diǎn)做伏筆,現(xiàn)在我們來看為什么ExecutorService中的submit()既可以提交Runnable又可以提交Callable并返回結(jié)果,同時(shí)看看直接execute() Runnable會(huì)有什么不同。

Future submit(Runnable task)

先來看一下這個(gè)方法的實(shí)現(xiàn)

public Future<?> submit(Runnable task) {                       
    if (task == null) throw new NullPointerException();        
    RunnableFuture<Void> ftask = newTaskFor(task, null);       
    execute(ftask);                                            
    return ftask;                                              
}                                                              

代碼上可以很直觀的看到,提交的Runnable被newTaskFor()適配成了RunnableFuture。來看一下newTaskFor()這個(gè)方法的實(shí)現(xiàn)。

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { 
    return new FutureTask<T>(runnable, value);                           
}                                                                        

直接是new了一個(gè)FutureTask對(duì)象,上面我們分析過這種情況,runnable其實(shí)是會(huì)被適配成一個(gè)Callable的。

Future<T> submit(Callable<T> task)

再來看一下這個(gè)方法

public <T> Future<T> submit(Callable<T> task) {         
    if (task == null) throw new NullPointerException(); 
    RunnableFuture<T> ftask = newTaskFor(task);         
    execute(ftask);                                     
    return ftask;                                       
}                                                       

跟上面的代碼簡直一摸一樣,都是適配成了RunnableFuture。

看到這里可以明白,提交Runnable時(shí)是將Runnable適配成了Callable,也就是submit方法最終都會(huì)調(diào)用的的是Callable對(duì)象。

上面我們說過RunnableFuture實(shí)現(xiàn)了Runnable接口,當(dāng)他被execute時(shí),肯定是被當(dāng)作Runnable使用的,看一下兩個(gè)submit方法最終都是通過execute來執(zhí)行的。

上面介紹FutureTask時(shí)我們知道,對(duì)Runnable的實(shí)現(xiàn)FutureTask最后調(diào)用的是Callable的call方法。

到這里可以知道了,

  1. 當(dāng)我們提交一個(gè)Runnable的任務(wù)時(shí),首先通過FutureTask的構(gòu)造函數(shù)被適配成了一個(gè)Callable對(duì)象被保存FutureTask中。
  2. 當(dāng)任務(wù)被執(zhí)行時(shí),F(xiàn)utureTask又被當(dāng)作一個(gè)Runnable使用,調(diào)用了保存在內(nèi)部的Callable的call方法,任務(wù)被執(zhí)行并返回了結(jié)果。
  3. Runnable被適配成Callable時(shí)最終調(diào)用的還是自己的run方法。

ExecutorService中execute()

public void execute(Runnable command) {                             
    if (command == null)                                            
        throw new NullPointerException();                           
    /*                                                              
     * Proceed in 3 steps:                                          
     *                                                              
     * 1. If fewer than corePoolSize threads are running, try to    
     * start a new thread with the given command as its first       
     * task.  The call to addWorker atomically checks runState and  
     * workerCount, and so prevents false alarms that would add     
     * threads when it shouldn't, by returning false.               
     *                                                              
     * 2. If a task can be successfully queued, then we still need  
     * to double-check whether we should have added a thread        
     * (because existing ones died since last checking) or that     
     * the pool shut down since entry into this method. So we       
     * recheck state and if necessary roll back the enqueuing if    
     * stopped, or start a new thread if there are none.            
     *                                                              
     * 3. If we cannot queue task, then we try to add a new         
     * thread.  If it fails, we know we are shut down or saturated  
     * and so reject the task.                                      
     */                                                             
    int c = ctl.get();                                              
    if (workerCountOf(c) < corePoolSize) {                          
        if (addWorker(command, true))                               
            return;                                                 
        c = ctl.get();                                              
    }                                                               
    if (isRunning(c) && workQueue.offer(command)) {                 
        int recheck = ctl.get();                                    
        if (! isRunning(recheck) && remove(command))                
            reject(command);                                        
        else if (workerCountOf(recheck) == 0)                       
            addWorker(null, false);                                 
    }                                                               
    else if (!addWorker(command, false))                            
        reject(command);                                            
}                                                                   

上面注釋的意思是:

  1. 當(dāng)前核心線程數(shù)少于corePoolSize是,嘗試直接新建Thread用來執(zhí)行任務(wù)。同時(shí)校驗(yàn)添加的過程,防止出錯(cuò)。
  2. 任務(wù)入隊(duì)時(shí)二次校驗(yàn)是否需要新建線程,判斷是否需要回滾等。
  3. 如果任務(wù)不能入隊(duì)則新建非核心線程處理,如果失敗那么就拒絕任務(wù)。

這個(gè)就是任務(wù)具體執(zhí)行的過程,同時(shí)也可以知道為什么上一篇博客中通過反射獲取workers的size就能知道當(dāng)前線程的數(shù)量。

總結(jié)

又到總結(jié)時(shí)間,博文主要講了幾個(gè)概念Runnable、Callable、Future以及相關(guān)的子類,總結(jié)如下:

  • Runnable可以直接被Thread執(zhí)行,但是沒有返回值
  • Callable執(zhí)行之后有返回值,但是只能提交給線程池執(zhí)行。
  • Future定義了一系列關(guān)于任務(wù)取消的接口方法
  • FutureTask是Future唯一實(shí)現(xiàn)類,它也實(shí)現(xiàn)了Runnable接口
  • 線程池submit Callable和Runnable時(shí)最終都會(huì)轉(zhuǎn)換成FutureTask
  • FutureTask被執(zhí)行時(shí)是被當(dāng)成Runnable使用的,執(zhí)行了內(nèi)部保存的Callable的call方法
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容