(十一) J.U.C-FutureTask

FutureTask

FutureTask是J.U.C中的類,是一個可刪除的異步計算類。這個類提供了Future接口的的基本實現(xiàn),使用相關方法啟動和取消計算,查詢計算是否完成,并檢索計算結(jié)果。只有在計算完成時才能使用get方法檢索結(jié)果;如果計算尚未完成,get方法將會阻塞。一旦計算完成,計算就不能重新啟動或取消(除非使用runAndReset方法調(diào)用計算)。

Runnable與Callable對比

通常實現(xiàn)一個線程我們會使用繼承Thread的方式或者實現(xiàn)Runnable接口,這兩種方式有一個共同的缺陷就是在執(zhí)行完任務之后無法獲取執(zhí)行結(jié)果。從Java1.5之后就提供了Callable與Future,這兩個接口就可以實現(xiàn)獲取任務執(zhí)行結(jié)果。

  • Runnable接口:代碼非常簡單,只有一個方法run
public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}
  • Callable泛型接口:有泛型參數(shù),提供了一個call方法,執(zhí)行后可返回傳入的泛型參數(shù)類型的結(jié)果。
public interface Callable<V> {
    V call() throws Exception;
}

Future接口

Future接口提供了一系列方法用于控制線程執(zhí)行計算,如下:

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);//取消任務
    boolean isCancelled();//是否被取消
    boolean isDone();//計算是否完成
    V get() throws InterruptedException, ExecutionException;//獲取計算結(jié)果,在執(zhí)行過程中任務被阻塞
    V get(long timeout, TimeUnit unit)//timeout等待時間、unit時間單位
        throws InterruptedException, ExecutionException, TimeoutException;
}

使用方法:

public class FutureExample {

    static class MyCallable implements Callable<String> {
        @Override
        public String call() throws Exception {
            log.info("do something in callable");
            Thread.sleep(5000);
            return "Done";
        }
    }

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        Future<String> future = executorService.submit(new MyCallable());//線程池提交任務
        log.info("do something in main");
        Thread.sleep(1000);
        String result = future.get();//獲取不到一直阻塞
        log.info("result:{}", result);
    }
}

運行結(jié)果:阻塞效果


FutureTask

Future實現(xiàn)了RunnableFuture接口,而RunnableFuture接口繼承了Runnable與Future接口,所以它既可以作為Runnable被線程中執(zhí)行,又可以作為callable獲得返回值。

public class FutureTask<V> implements RunnableFuture<V> {
    ...
}

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

FutureTask支持兩種參數(shù)類型,Callable和Runnable,在使用Runnable 時,還可以多指定一個返回結(jié)果類型。

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}

public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

1. FutureTask執(zhí)行多任務計算的使用場景

利用FutureTask和ExecutorService,可以用多線程的方式提交計算任務,主線程繼續(xù)執(zhí)行其他任務,當主線程需要子線程的計算結(jié)果時,在異步獲取子線程的執(zhí)行結(jié)果。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
 
public class FutureTaskForMultiCompute {
    
    public static void main(String[] args) {
        
        FutureTaskForMultiCompute inst=new FutureTaskForMultiCompute();
        // 創(chuàng)建任務集合
        List<FutureTask<Integer>> taskList = new ArrayList<FutureTask<Integer>>();
        // 創(chuàng)建線程池
        ExecutorService exec = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 10; i++) {
            // 傳入Callable對象創(chuàng)建FutureTask對象
            FutureTask<Integer> ft = new FutureTask<Integer>(inst.new ComputeTask(i, ""+i));
            taskList.add(ft);
            // 提交給線程池執(zhí)行任務,也可以通過exec.invokeAll(taskList)一次性提交所有任務;
            exec.submit(ft);
        }
        
        System.out.println("所有計算任務提交完畢, 主線程接著干其他事情!");
 
        // 開始統(tǒng)計各計算線程計算結(jié)果
        Integer totalResult = 0;
        for (FutureTask<Integer> ft : taskList) {
            try {
                //FutureTask的get方法會自動阻塞,直到獲取計算結(jié)果為止
                totalResult = totalResult + ft.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
 
        // 關閉線程池
        exec.shutdown();
        System.out.println("多任務計算后的總結(jié)果是:" + totalResult);
 
    }
 
    private class ComputeTask implements Callable<Integer> {
 
        private Integer result = 0;
        private String taskName = "";
        
        public ComputeTask(Integer iniResult, String taskName){
            result = iniResult;
            this.taskName = taskName;
            System.out.println("生成子線程計算任務: "+taskName);
        }
        
        public String getTaskName(){
            return this.taskName;
        }
        
        @Override
        public Integer call() throws Exception {
            // TODO Auto-generated method stub
 
            for (int i = 0; i < 100; i++) {
                result =+ i;
            }
            // 休眠5秒鐘,觀察主線程行為,預期的結(jié)果是主線程會繼續(xù)執(zhí)行,到要取得FutureTask的結(jié)果是等待直至完成。
            Thread.sleep(5000);
            System.out.println("子線程計算任務: "+taskName+" 執(zhí)行完成!");
            return result;
        }
    }
}

2. FutureTask在高并發(fā)環(huán)境下確保任務只執(zhí)行一次
在很多高并發(fā)的環(huán)境下,往往我們只需要某些任務只執(zhí)行一次。這種使用情景FutureTask的特性恰能勝任。舉一個例子,假設有一個帶key的連接池,當key存在時,即直接返回key對應的對象;當key不存在時,則創(chuàng)建連接。對于這樣的應用場景,通常采用的方法為使用一個Map對象來存儲key和連接池對應的對應關系,典型的代碼如下面所示:

private Map<String, Connection> connectionPool = new HashMap<String, Connection>();
    private ReentrantLock lock = new ReentrantLock();
    
    public Connection getConnection(String key){
        try{
            lock.lock();
            if(connectionPool.containsKey(key)){
                return connectionPool.get(key);
            }
            else{
                //創(chuàng)建 Connection
                Connection conn = createConnection();
                connectionPool.put(key, conn);
                return conn;
            }
        }
        finally{
            lock.unlock();
        }
    }
    
    //創(chuàng)建Connection
    private Connection createConnection(){
        return null;
    }

在上面的例子中,我們通過加鎖確保高并發(fā)環(huán)境下的線程安全,也確保了connection只創(chuàng)建一次,然而確犧牲了性能。改用ConcurrentHash的情況下,幾乎可以避免加鎖的操作,性能大大提高,但是在高并發(fā)的情況下有可能出現(xiàn)Connection被創(chuàng)建多次的現(xiàn)象。這時最需要解決的問題就是當key不存在時,創(chuàng)建Connection的動作能放在connectionPool之后執(zhí)行,這正是FutureTask發(fā)揮作用的時機,基于ConcurrentHashMap和FutureTask的改造代碼如下:

private ConcurrentHashMap<String,FutureTask<Connection>>connectionPool = new ConcurrentHashMap<String, FutureTask<Connection>>();
    
    public Connection getConnection(String key) throws Exception{
        FutureTask<Connection>connectionTask=connectionPool.get(key);
        if(connectionTask!=null){
            return connectionTask.get();
        }
        else{
            Callable<Connection> callable = new Callable<Connection>(){
                @Override
                public Connection call() throws Exception {
                    // TODO Auto-generated method stub
                    return createConnection();
                }
            };
            FutureTask<Connection>newTask = new FutureTask<Connection>(callable);
            connectionTask = connectionPool.putIfAbsent(key, newTask);
            if(connectionTask==null){
                connectionTask = newTask;
                connectionTask.run();
            }
            return connectionTask.get();
        }
    }
    
    //創(chuàng)建Connection
    private Connection createConnection(){
        return null;
    }

經(jīng)過這樣的改造,可以避免由于并發(fā)帶來的多次創(chuàng)建連接及鎖的出現(xiàn)。

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容