java多線程的常用實踐方式

筆者所有文章第一時間發(fā)布于:
hhbbz的個人博客

場景

在我們實際開發(fā)過程中,往往會遇到執(zhí)行接口邏輯以及批任務(wù)處理的的執(zhí)行效率問題,在這些場景中,都可以通過使用多線程的方式,把占據(jù)長時間的程序中的任務(wù)放到后臺去處理,更好的發(fā)揮計算機(jī)的多核cpu的優(yōu)勢。

概述

這篇文章只介紹開發(fā)過程中實用的多線程代碼的三種編寫方法和實踐過程,參數(shù)說明、線程安全、多線程之間的調(diào)度策略和狀態(tài)同步這里就不多介紹了,會在后面的文章中加以詳細(xì)說明。

多線程三板斧

  • CompletableFuture 配合 TaskExecutor 異步執(zhí)行
  • ThreadFactory 、 TaskExecutor 配合 service和handler的消費者模式 異步執(zhí)行
  • ForkJoin將任務(wù)切割成子任務(wù),并行執(zhí)行

CompletableFuture 配合 TaskExecutor 異步執(zhí)行

CompletableFuture是java8新增加的類,提供了非常強(qiáng)大的Future的擴(kuò)展功能,可以幫助我們簡化異步編程的復(fù)雜性,提供了函數(shù)式編程的能力。

下面是簡單使用CompletableFuture進(jìn)行異步任務(wù)的執(zhí)行。

  1. 封裝一個TaskExecutor
@Configuration
public class AsyncConfiguration {

    /**異步執(zhí)行的線程池 */
    @Bean
    public TaskExecutor dataAsyncTaskExecutor(DataImportProperties importProperties){
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(importProperties.getThreadNumber());
        taskExecutor.setMaxPoolSize(50);
        taskExecutor.setThreadGroupName("data-async-importer");
        taskExecutor.setThreadNamePrefix("data-async");
        taskExecutor.initialize();
        return taskExecutor;
    }
}
  1. 在上面封裝的線程池中使用CompletableFuture
    @Resource(name = "dataAsyncTaskExecutor")
    private TaskExecutor taskExecutor;
    //異步任務(wù)
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()->1,taskExecutor);
    //阻塞異步任務(wù)獲取結(jié)果
    future.get();
如果使用CompletableFuture過程中不傳入自己封裝的線程池,CompletableFuture會使用ForkJoinPool.commonPool(),它是一個會被很多任務(wù) 共享 的線程池,比如同一 JVM 上的所有 CompletableFuture、并行 Stream 都將共享 commonPool,除此之外,應(yīng)用代碼也能使用它。

ThreadFactory 、 TaskExecutor 配合 service和handler的消費者模式 異步執(zhí)行

這種是大家比較常用的異步執(zhí)行任務(wù)的做法。代碼比較直觀,更容易調(diào)試。下面展示一個多線程異步批次消費隊列的實踐代碼。

  1. 定義隊列數(shù)據(jù)封裝
@Getter
@Setter
public class QueueData {

    /**隊列數(shù)據(jù) */
    private Map<String,Object> data;

    /**數(shù)據(jù)的數(shù)據(jù)標(biāo)識 */
    private String dbTableCode;
}
  1. 定義隊列任務(wù)生產(chǎn)端
import java.util.Map;
import java.util.Queue;
/**
 * 采用內(nèi)存隊列作為消息處理服務(wù)緩存
 */
public class MemoryQueueService {
    public Queue<QueueData> queue;

    public MemoryQueueService(Queue<QueueData> queue){
        this.queue = queue;
    }
    //推入隊列
    public Integer publish(Map<String, Object> eventData) {
        QueueData queueData = new QueueData();
        queueData.setData(eventData);
        queue.offer(queueData);
        return queue.size();
    }
}
  1. 定義隊列任務(wù)批次消費端handler
import lombok.extern.slf4j.Slf4j;

import java.util.*;
import java.util.concurrent.TimeUnit;

/**
 * 內(nèi)存隊列的消費端
 */
@Slf4j
public class MemoryQueueDataHandler implements Runnable {

    private int batchSize;

    private List<QueueData> eventCache;

    private Queue<QueueData> queue;
    /**
     * 是否運(yùn)行
     */
    private boolean running;

    public MemoryQueueDataHandler(Queue<QueueData> queue, int batchSize) {
        this.queue = queue;
        this.batchSize = batchSize;
        eventCache = new ArrayList<>(batchSize);
        running = true;
    }

    @Override
    public void run() {
        log.info("內(nèi)存隊列數(shù)據(jù)監(jiān)聽handler啟動.");
        QueueData eventData=null;
        while (running || eventData!=null) {
            //消費
            eventData = queue.poll();
            if (eventData != null) {
                //事件消息不為空
                eventCache.add(eventData);
                //批量寫入
                if (eventCache.size() >= batchSize) {
                    flushCacheToDb();
                }
            } else if(!eventCache.isEmpty()){
                //緩存不為空
                flushCacheToDb();
            } else {
                //如果隊列為空,緩存也為空則等待
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                }
            }
        }
        //刷新緩存
        flushCacheToDb();
    }
    private void flushCacheToDb(){
        Map<String,List<Map<String,Object>>> wData = new HashMap<>(batchSize);
        //構(gòu)造批次
        for (QueueData queueData : eventCache) {
            List<Map<String, Object>> cacheQueue = wData.computeIfAbsent(queueData.getDbTableCode(), k -> new ArrayList<>(batchSize));
            cacheQueue.add(queueData.getData());
            wData.put(queueData.getDbTableCode(),cacheQueue);
        }
        //批量寫入
        for (Map.Entry<String, List<Map<String, Object>>> entry : wData.entrySet()) {
            //TODO 寫入邏輯
        }
        eventCache.clear();
    }
    public void setRunning(boolean running) {
        this.running = running;
    }
}
  1. 簡單定義ThreadFactory、消費端ExecutorService、生產(chǎn)端ConcurrentLinkedQueue,
    并新增任務(wù)。
            private ExecutorService executorService;
            private MemoryQueueService queueService;
            ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("event-consume-%d").build();
            //后面把異步任務(wù)委托給ExecutorService
          executorService = new ThreadPoolExecutor(
                    3, //核心線程
                    5, //最大線程
                    300,
                    TimeUnit.SECONDS,
                    new LinkedBlockingQueue<>(),
                    threadFactory
            );
            //存放數(shù)據(jù)的隊列
            ConcurrentLinkedQueue<QueueData> queue = new ConcurrentLinkedQueue<>();
            //生產(chǎn)端
            queueService = new MemoryQueueService(queue);
            //啟用5個線程進(jìn)行消費
            for (int i = 0; i < 5; i++) {
                //消費端
                executorService.submit(new MemoryQueueDataHandler(dataEngine,queue,dataProperties.getBatchSize()));
            }

        //往隊列中緩存數(shù)據(jù)
        HashMap<String,Object> map = new HashMap();
        queueService.publish(map)

ForkJoin將大任務(wù)切割成小任務(wù),并行執(zhí)行

支和并框架的目的是以遞歸的方式將可以并行的任務(wù)拆分成更小的任務(wù),然后將每個子任務(wù)的結(jié)果合并起來生成整體的結(jié)果,它是ExecutorService的一個實現(xiàn),它把子任務(wù)分配給線程池(ForkJoinPool)中的工作線程。

  1. 基于ForkJoin封裝拆分任務(wù),執(zhí)行邏輯的抽象類
import lombok.extern.slf4j.Slf4j;

import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.RecursiveTask;
import java.util.function.Function;

@Slf4j
public class ListForkJoinExecution<V, R> extends RecursiveTask<R> {

    /**
     * 待處理數(shù)據(jù)
     */
    private transient List<V> values;

    /**
     * 單元邏輯執(zhí)行函數(shù)
     */
    private transient Function<V, R> function;

    /**
     * 結(jié)果隊列
     */
    private transient ConcurrentLinkedQueue<ListForkJoinExecution<V, R>> resultQueue;

    public ListForkJoinExecution(List<V> values, Function<V, R> function){
        this.values = values;
        this.function = function;
    }

    public void setResult(ConcurrentLinkedQueue<ListForkJoinExecution<V, R>> resultQueue) {
        this.resultQueue = resultQueue;
    }

    @Override
    protected R compute() {
        int len = values.size();

        try {
            if(len >= 3){
                int min = len / 2;

                // 拆分前一半
                List<V> headValues = values.subList(0 , min);
                ListForkJoinExecution<V,R> a = new ListForkJoinExecution(headValues, function);
                a.setResult(resultQueue);
                a.fork();
                resultQueue.offer(a);

                // 拆分后一半
                List<V> endValues = values.subList(min + 1 , len);
                ListForkJoinExecution<V,R> b = new ListForkJoinExecution(endValues, function);
                b.setResult(resultQueue);
                b.fork();
                resultQueue.offer(b);

                // 本次任務(wù)處理一個
                R r = function.apply(values.get(min));
                if (r != null) {
                    return r;
                }
            } else if (len == 2){

                List<V> headValues = values.subList(0 , 1);
                ListForkJoinExecution<V,R> a = new ListForkJoinExecution(headValues, function);
                a.setResult(resultQueue);
                a.fork();
                resultQueue.offer(a);

                // 拆分后一半
                List<V> endValues = values.subList(1 , 2);
                ListForkJoinExecution<V,R> b = new ListForkJoinExecution(endValues, function);
                b.setResult(resultQueue);
                b.fork();
                resultQueue.offer(b);

            } else if(len == 1){

                R r = function.apply(values.get(0));
                if (r != null) {
                    return r;
                }
            }
        }catch (Exception e){
            log.error(e.getMessage(), e);
        }

        return null;
    }
}
  1. 執(zhí)行forkjoin任務(wù)
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ForkJoinPool;

@Slf4j
public class ForkJoinPoolRun {

    /**
     * 并行處理列表方法
     * @param task  任務(wù)
     * @param <V>   參數(shù)對象類型
     * @param <R>   返回對象類型
     * @return
     */
    public static <V, R> List<R> run(ListForkJoinExecution<V, R> task){
        return run(8, task);
    }

    public static <V, R> List<R> run(int poolSize, ListForkJoinExecution<V, R> task){
        ForkJoinPool pool = new ForkJoinPool(poolSize);

        List<R> result = Lists.newArrayList();
        ConcurrentLinkedQueue<ListForkJoinExecution<V, R>> resultQueue = new ConcurrentLinkedQueue<>();
        try {

            task.setResult(resultQueue);
            // 執(zhí)行
            R r = pool.submit(task).get();
            // 沒有結(jié)算結(jié)果的不追加到結(jié)果集中
            if (r != null) {
                result.add(r);
            }

            while (resultQueue.iterator().hasNext()) {
                ListForkJoinExecution<V, R> poll = resultQueue.poll();
                if (poll != null) {
                    R join = poll.join();
                    // 沒有結(jié)算結(jié)果的不追加到結(jié)果集中
                    if (join != null) {
                        result.add(join);
                    }
                }

            }

            pool.shutdown();

            return result;
        } catch (Exception e) {
            log.error("遍歷處理任務(wù)異常!", e);
        }

        return result;
    }

    /**
     * 并執(zhí)行無返回方法
     * @param task  任務(wù)
     * @param <R>   返回對象類型
     * @return
     */
    public static <R> void run(VoidForkJoinExecution<R> task){
        run(8, task);
    }

    public static <R> void run(int poolSize, VoidForkJoinExecution<R> task){
        ForkJoinPool pool = new ForkJoinPool(poolSize);

        try {
            // 執(zhí)行
            pool.submit(task);

            while (!pool.isTerminated()){
                pool.shutdown();
            }
        } catch (Exception e) {
            log.error("遍歷處理任務(wù)異常!", e);
        }
    }
}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。
禁止轉(zhuǎn)載,如需轉(zhuǎn)載請通過簡信或評論聯(lián)系作者。

相關(guān)閱讀更多精彩內(nèi)容

  • 前段時間遇到這樣一個問題,有人問微信朋友圈的上傳圖片的功能怎么做才能讓用戶的等待時間較短,比如說一下上傳9張圖片,...
    加油碼農(nóng)閱讀 1,285評論 0 2
  • 文章轉(zhuǎn)自http://www.itdecent.cn/p/87bff5cc8d8c 前言 線程是稀缺資源,如果被...
    斯文遮陽閱讀 474評論 0 1
  • 1.為什么要使用多線程?多線程的優(yōu)點和缺點是什么? 首先說下多線程出現(xiàn)的原因: 為了解決負(fù)載均衡問題,充分利用CP...
    一個小安卓閱讀 516評論 0 0
  • Linux軟件安裝 【重點】 了解Linux應(yīng)用程序的組成部分 掌握應(yīng)用程序安裝的方法 在Linux中安裝JDK ...
    lucky珂閱讀 224評論 0 1
  • 看見――承認(rèn)――改變,這是三部曲,前兩部做了,第三部是自然而然發(fā)生的。 過去我們有很多錯誤認(rèn)知,比如:學(xué)海無涯苦作...
    綠水情緒療愈閱讀 117評論 0 2

友情鏈接更多精彩內(nèi)容