筆者所有文章第一時間發(fā)布于:
hhbbz的個人博客
場景
在我們實際開發(fā)過程中,往往會遇到執(zhí)行接口邏輯以及批任務(wù)處理的的執(zhí)行效率問題,在這些場景中,都可以通過使用多線程的方式,把占據(jù)長時間的程序中的任務(wù)放到后臺去處理,更好的發(fā)揮計算機(jī)的多核cpu的優(yōu)勢。
概述
這篇文章只介紹開發(fā)過程中實用的多線程代碼的三種編寫方法和實踐過程,參數(shù)說明、線程安全、多線程之間的調(diào)度策略和狀態(tài)同步這里就不多介紹了,會在后面的文章中加以詳細(xì)說明。
多線程三板斧
- CompletableFuture 配合 TaskExecutor 異步執(zhí)行
- ThreadFactory 、 TaskExecutor 配合 service和handler的消費者模式 異步執(zhí)行
- ForkJoin將任務(wù)切割成子任務(wù),并行執(zhí)行
CompletableFuture 配合 TaskExecutor 異步執(zhí)行
CompletableFuture是java8新增加的類,提供了非常強(qiáng)大的Future的擴(kuò)展功能,可以幫助我們簡化異步編程的復(fù)雜性,提供了函數(shù)式編程的能力。
下面是簡單使用CompletableFuture進(jìn)行異步任務(wù)的執(zhí)行。
- 封裝一個TaskExecutor
@Configuration
public class AsyncConfiguration {
/**異步執(zhí)行的線程池 */
@Bean
public TaskExecutor dataAsyncTaskExecutor(DataImportProperties importProperties){
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(importProperties.getThreadNumber());
taskExecutor.setMaxPoolSize(50);
taskExecutor.setThreadGroupName("data-async-importer");
taskExecutor.setThreadNamePrefix("data-async");
taskExecutor.initialize();
return taskExecutor;
}
}
- 在上面封裝的線程池中使用CompletableFuture
@Resource(name = "dataAsyncTaskExecutor")
private TaskExecutor taskExecutor;
//異步任務(wù)
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()->1,taskExecutor);
//阻塞異步任務(wù)獲取結(jié)果
future.get();
如果使用CompletableFuture過程中不傳入自己封裝的線程池,CompletableFuture會使用ForkJoinPool.commonPool(),它是一個會被很多任務(wù) 共享 的線程池,比如同一 JVM 上的所有 CompletableFuture、并行 Stream 都將共享 commonPool,除此之外,應(yīng)用代碼也能使用它。
ThreadFactory 、 TaskExecutor 配合 service和handler的消費者模式 異步執(zhí)行
這種是大家比較常用的異步執(zhí)行任務(wù)的做法。代碼比較直觀,更容易調(diào)試。下面展示一個多線程異步批次消費隊列的實踐代碼。
- 定義隊列數(shù)據(jù)封裝
@Getter
@Setter
public class QueueData {
/**隊列數(shù)據(jù) */
private Map<String,Object> data;
/**數(shù)據(jù)的數(shù)據(jù)標(biāo)識 */
private String dbTableCode;
}
- 定義隊列任務(wù)生產(chǎn)端
import java.util.Map;
import java.util.Queue;
/**
* 采用內(nèi)存隊列作為消息處理服務(wù)緩存
*/
public class MemoryQueueService {
public Queue<QueueData> queue;
public MemoryQueueService(Queue<QueueData> queue){
this.queue = queue;
}
//推入隊列
public Integer publish(Map<String, Object> eventData) {
QueueData queueData = new QueueData();
queueData.setData(eventData);
queue.offer(queueData);
return queue.size();
}
}
- 定義隊列任務(wù)批次消費端handler
import lombok.extern.slf4j.Slf4j;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
* 內(nèi)存隊列的消費端
*/
@Slf4j
public class MemoryQueueDataHandler implements Runnable {
private int batchSize;
private List<QueueData> eventCache;
private Queue<QueueData> queue;
/**
* 是否運(yùn)行
*/
private boolean running;
public MemoryQueueDataHandler(Queue<QueueData> queue, int batchSize) {
this.queue = queue;
this.batchSize = batchSize;
eventCache = new ArrayList<>(batchSize);
running = true;
}
@Override
public void run() {
log.info("內(nèi)存隊列數(shù)據(jù)監(jiān)聽handler啟動.");
QueueData eventData=null;
while (running || eventData!=null) {
//消費
eventData = queue.poll();
if (eventData != null) {
//事件消息不為空
eventCache.add(eventData);
//批量寫入
if (eventCache.size() >= batchSize) {
flushCacheToDb();
}
} else if(!eventCache.isEmpty()){
//緩存不為空
flushCacheToDb();
} else {
//如果隊列為空,緩存也為空則等待
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
}
}
//刷新緩存
flushCacheToDb();
}
private void flushCacheToDb(){
Map<String,List<Map<String,Object>>> wData = new HashMap<>(batchSize);
//構(gòu)造批次
for (QueueData queueData : eventCache) {
List<Map<String, Object>> cacheQueue = wData.computeIfAbsent(queueData.getDbTableCode(), k -> new ArrayList<>(batchSize));
cacheQueue.add(queueData.getData());
wData.put(queueData.getDbTableCode(),cacheQueue);
}
//批量寫入
for (Map.Entry<String, List<Map<String, Object>>> entry : wData.entrySet()) {
//TODO 寫入邏輯
}
eventCache.clear();
}
public void setRunning(boolean running) {
this.running = running;
}
}
- 簡單定義ThreadFactory、消費端ExecutorService、生產(chǎn)端ConcurrentLinkedQueue,
并新增任務(wù)。
private ExecutorService executorService;
private MemoryQueueService queueService;
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("event-consume-%d").build();
//后面把異步任務(wù)委托給ExecutorService
executorService = new ThreadPoolExecutor(
3, //核心線程
5, //最大線程
300,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
threadFactory
);
//存放數(shù)據(jù)的隊列
ConcurrentLinkedQueue<QueueData> queue = new ConcurrentLinkedQueue<>();
//生產(chǎn)端
queueService = new MemoryQueueService(queue);
//啟用5個線程進(jìn)行消費
for (int i = 0; i < 5; i++) {
//消費端
executorService.submit(new MemoryQueueDataHandler(dataEngine,queue,dataProperties.getBatchSize()));
}
//往隊列中緩存數(shù)據(jù)
HashMap<String,Object> map = new HashMap();
queueService.publish(map)
ForkJoin將大任務(wù)切割成小任務(wù),并行執(zhí)行
支和并框架的目的是以遞歸的方式將可以并行的任務(wù)拆分成更小的任務(wù),然后將每個子任務(wù)的結(jié)果合并起來生成整體的結(jié)果,它是ExecutorService的一個實現(xiàn),它把子任務(wù)分配給線程池(ForkJoinPool)中的工作線程。
- 基于ForkJoin封裝拆分任務(wù),執(zhí)行邏輯的抽象類
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.RecursiveTask;
import java.util.function.Function;
@Slf4j
public class ListForkJoinExecution<V, R> extends RecursiveTask<R> {
/**
* 待處理數(shù)據(jù)
*/
private transient List<V> values;
/**
* 單元邏輯執(zhí)行函數(shù)
*/
private transient Function<V, R> function;
/**
* 結(jié)果隊列
*/
private transient ConcurrentLinkedQueue<ListForkJoinExecution<V, R>> resultQueue;
public ListForkJoinExecution(List<V> values, Function<V, R> function){
this.values = values;
this.function = function;
}
public void setResult(ConcurrentLinkedQueue<ListForkJoinExecution<V, R>> resultQueue) {
this.resultQueue = resultQueue;
}
@Override
protected R compute() {
int len = values.size();
try {
if(len >= 3){
int min = len / 2;
// 拆分前一半
List<V> headValues = values.subList(0 , min);
ListForkJoinExecution<V,R> a = new ListForkJoinExecution(headValues, function);
a.setResult(resultQueue);
a.fork();
resultQueue.offer(a);
// 拆分后一半
List<V> endValues = values.subList(min + 1 , len);
ListForkJoinExecution<V,R> b = new ListForkJoinExecution(endValues, function);
b.setResult(resultQueue);
b.fork();
resultQueue.offer(b);
// 本次任務(wù)處理一個
R r = function.apply(values.get(min));
if (r != null) {
return r;
}
} else if (len == 2){
List<V> headValues = values.subList(0 , 1);
ListForkJoinExecution<V,R> a = new ListForkJoinExecution(headValues, function);
a.setResult(resultQueue);
a.fork();
resultQueue.offer(a);
// 拆分后一半
List<V> endValues = values.subList(1 , 2);
ListForkJoinExecution<V,R> b = new ListForkJoinExecution(endValues, function);
b.setResult(resultQueue);
b.fork();
resultQueue.offer(b);
} else if(len == 1){
R r = function.apply(values.get(0));
if (r != null) {
return r;
}
}
}catch (Exception e){
log.error(e.getMessage(), e);
}
return null;
}
}
- 執(zhí)行forkjoin任務(wù)
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ForkJoinPool;
@Slf4j
public class ForkJoinPoolRun {
/**
* 并行處理列表方法
* @param task 任務(wù)
* @param <V> 參數(shù)對象類型
* @param <R> 返回對象類型
* @return
*/
public static <V, R> List<R> run(ListForkJoinExecution<V, R> task){
return run(8, task);
}
public static <V, R> List<R> run(int poolSize, ListForkJoinExecution<V, R> task){
ForkJoinPool pool = new ForkJoinPool(poolSize);
List<R> result = Lists.newArrayList();
ConcurrentLinkedQueue<ListForkJoinExecution<V, R>> resultQueue = new ConcurrentLinkedQueue<>();
try {
task.setResult(resultQueue);
// 執(zhí)行
R r = pool.submit(task).get();
// 沒有結(jié)算結(jié)果的不追加到結(jié)果集中
if (r != null) {
result.add(r);
}
while (resultQueue.iterator().hasNext()) {
ListForkJoinExecution<V, R> poll = resultQueue.poll();
if (poll != null) {
R join = poll.join();
// 沒有結(jié)算結(jié)果的不追加到結(jié)果集中
if (join != null) {
result.add(join);
}
}
}
pool.shutdown();
return result;
} catch (Exception e) {
log.error("遍歷處理任務(wù)異常!", e);
}
return result;
}
/**
* 并執(zhí)行無返回方法
* @param task 任務(wù)
* @param <R> 返回對象類型
* @return
*/
public static <R> void run(VoidForkJoinExecution<R> task){
run(8, task);
}
public static <R> void run(int poolSize, VoidForkJoinExecution<R> task){
ForkJoinPool pool = new ForkJoinPool(poolSize);
try {
// 執(zhí)行
pool.submit(task);
while (!pool.isTerminated()){
pool.shutdown();
}
} catch (Exception e) {
log.error("遍歷處理任務(wù)異常!", e);
}
}
}