并發(fā)編程實戰(zhàn)二之線程池和CompletionService

線程池

線程饑餓死鎖

任務(wù)依賴于其他任務(wù),線程池不夠大
單線程,一個任務(wù)將另一個任務(wù)提交到同一個Executor。

設(shè)置線程池的大小

int N_CPUS = Runtime.getRuntime().availableProcessors();
計算密集型  thread = N_CPUS+1
包含I/O或其他阻塞操作的任務(wù)  thread = N_CPUS*U_CPUS(1 + W/C)
    U_CPUS  ——  基準(zhǔn)負(fù)載
    W/C  ——  等待時間與計算時間的比值
內(nèi)存、文件句柄、套接字句柄、數(shù)據(jù)庫連接 —— 資源可用總量/每個任務(wù)的需求量

線程池的創(chuàng)建

public ThreadPoolExecutor(int corePoolSize,          //基本大小
                              int maximumPoolSize,        //最大
                              long keepAliveTime,        //線程存活時間
                              TimeUnit unit,            
                              BlockingQueue<Runnable> workQueue,    //線程隊列
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
Executors.newSingleThreadExecutor();

Executors.newFixedThreadPool(100)基本大小和最大大小設(shè)置為參數(shù)指定值
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

Executors.newCachedThreadPool()線程池最大大小設(shè)置為Integer.MAX_VALUE,隊列為SynchronousQueue
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

串行遞歸轉(zhuǎn)并行遞歸

public class SeToParallel {
    public <T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results){
        for(Node<T> n:nodes){
            results.add(n.compute());
            sequentialRecursive(n.getChildren(),results);
        }
    }

    public <T> void parallelRecursive(final Executor exec,List<Node<T>> nodes,final Collection<T> results){
        for(final Node<T> n : nodes){
            exec.execute(new Runnable() {
                @Override
                public void run() {
                    results.add(n.compute());
                }
            });
            parallelRecursive(exec,n.getChildren(),results);
        }
    }

    public <T> Collection<T> getParallelResults(List<Node<T>> nodes) throws InterruptedException {
        ExecutorService exec = Executors.newCachedThreadPool();
        Queue<T> resultQueue = new ConcurrentLinkedQueue<>();
        parallelRecursive(exec,nodes,resultQueue);
        exec.shutdown();
        exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
        return resultQueue;
    }

        class Node<T>{

            private T t;
            private List<Node<T>> children;

            public List<Node<T>> getChildren() {
                return children;
            }

            public T compute(){
                return t;
            }
        }
}

CompletionService:Executor與BlockingQueue

ExecutorCompletionService實現(xiàn)了CompletionService,用BlockingQueue保存計算完成的結(jié)果。提交任務(wù)是,任務(wù)被包裝成QueueingFuture.
頁面逐漸渲染:

public class Renderer {
    private final ExecutorService executorService;

    public Renderer(ExecutorService executorService) {
        this.executorService = executorService;
    }
    
    void renderPage(CharSequence source){
        List<ImageInfo> info = scanForImageInfo(source);
            CompletionService<ImageData> completionService = new ExecutorCompletionService<ImageData>(executorService);
        for(final ImageInfo imageInfo:info){
            completionService.submit(new Callable<ImageData>() {
                @Override
                public ImageData call() throws Exception {
                    return imageInfo.downloadImage();
                }
            });
        }
        
        renderText(source);
        try {
            for(int t = 0,n = info.size();t < n;t++){
                Future<ImageData> f = completionService.take();
                ImageData imageData = f.get();
                renderImage(imageData);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容