背景:
在項(xiàng)目中使用多線程抓取第三方數(shù)據(jù)執(zhí)行數(shù)據(jù)入庫(kù)時(shí),如果某個(gè)子線程執(zhí)行異常,其他子線事務(wù)全部回滾,spring對(duì)多線程無(wú)法進(jìn)行事務(wù)控制,是因?yàn)槎嗑€程底層連接數(shù)據(jù)庫(kù)的時(shí)候,是使用的線程變量(TheadLocal),線程之間事務(wù)隔離,每個(gè)線程有自己的連接,事務(wù)肯定不是同一個(gè)了。
解決辦法
思想就是使用兩個(gè)CountDownLatch實(shí)現(xiàn)子線程的二段提交
步驟:
1、主線程將任務(wù)分發(fā)給子線程,然后使用childMonitor.await();阻塞主線程,等待所有子線程處理向數(shù)據(jù)庫(kù)中插入的業(yè)務(wù),并使用BlockingDeque存儲(chǔ)線程的返回結(jié)果。
2、使用childMonitor.countDown()釋放子線程鎖定,同時(shí)使用mainMonitor.await();阻塞子線程,將程序的控制權(quán)交還給主線程。
3、主線程檢查子線程執(zhí)行任務(wù)的結(jié)果,若有失敗結(jié)果出現(xiàn),主線程標(biāo)記狀態(tài)告知子線程回滾,然后使用mainMonitor.countDown();將程序控制權(quán)再次交給子線程,子線程檢測(cè)回滾標(biāo)志,判斷是否回滾。
線程池工具類
package com.yunshidi.freight.wf.operation;
import java.lang.reflect.Constructor;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.logging.Handler;
public class ThreadPoolTool {
/** * 多線程任務(wù)
* @param transactionManager
* @param data
* @param threadCount
* @param params
* @param clazz
*/
public void excuteTask(DataSourceTransactionManager transactionManager, List data,int threadCount, Map params, Class clazz) {
if(data ==null|| data.size() == 0) {
return;
}
int batch = 0;
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
//監(jiān)控子線程的任務(wù)執(zhí)行
CountDownLatch childMonitor = new CountDownLatch(threadCount);
//監(jiān)控主線程,是否需要回滾
CountDownLatch mainMonitor = new CountDownLatch(1);
//存儲(chǔ)任務(wù)的返回結(jié)果,返回true表示不需要回滾,反之,則回滾(由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列)
BlockingDeque results = new LinkedBlockingDeque(threadCount);
RollBack rollback = new RollBack(false);
try {
LinkedBlockingQueue queue = splitQueue(data, threadCount);//由鏈表結(jié)構(gòu)組成的有界阻塞隊(duì)列
while(true) {
List list = queue.poll();
if(list ==null) {
break;
}
batch++;
params.put("batch", batch);
Constructor constructor = clazz.getConstructor(new Class[]{
CountDownLatch.class, CountDownLatch.class, BlockingDeque.class, RollBack.class, DataSourceTransactionManager.class, Object.class, Map.class});
ThreadTask task = (ThreadTask) constructor.newInstance(childMonitor, mainMonitor, results, rollback, transactionManager, list, params);
executor.execute(task);
}
// 1、主線程將任務(wù)分發(fā)給子線程,然后使用childMonitor.await();阻塞主線程,等待所有子線程處理向數(shù)據(jù)庫(kù)中插入的業(yè)務(wù)。
childMonitor.await();
System.out.println("主線程開始執(zhí)行任務(wù)");
//根據(jù)返回結(jié)果來(lái)確定是否回滾
for(int i = 0; i < threadCount; i++) {
Boolean result = results.take();
if(!result) {
//有線程執(zhí)行異常,需要回滾子線程
rollback.setNeedRoolBack(true);
}
}
// 3、主線程檢查子線程執(zhí)行任務(wù)的結(jié)果,若有失敗結(jié)果出現(xiàn),主線程標(biāo)記狀態(tài)告知子線程回滾,然后使用mainMonitor.countDown();將程序控制權(quán)再次交給子線程,子線程檢測(cè)回滾標(biāo)志,判斷是否回滾。
mainMonitor.countDown();
} catch (Exception e) {
log.error(e.getMessage());
} finally {
//關(guān)閉線程池,釋放資源
executor.shutdown();
}
}
/** * 隊(duì)列拆分
*
* @param data 需要執(zhí)行的數(shù)據(jù)集合
* @param threadCount 核心線程數(shù)
* @return*/
private LinkedBlockingQueue splitQueue(List data, int threadCount) {
LinkedBlockingQueue queueBatch =new LinkedBlockingQueue();
int total = data.size();
int oneSize = total / threadCount;
int start;
int end;
for(int i = 0; i < threadCount; i++) {
start = i * oneSize;
end = (i + 1) * oneSize;
if(i < threadCount - 1) {
queueBatch.add(data.subList(start, end));
} else {
queueBatch.add(data.subList(start, data.size()));
}
}
return queueBatch;
}
}
任務(wù)執(zhí)行類
package com.yunshidi.freight.wf.operation;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.CountDownLatch;
/**
* 任務(wù)執(zhí)行類
*/
public abstract class ThreadTask implements Runnable {
/**
* 監(jiān)控子任務(wù)的執(zhí)行
*/
private CountDownLatch childMonitor;
/**
* 監(jiān)控主線程
*/
private CountDownLatch mainMonitor;
/**
* 存儲(chǔ)線程的返回結(jié)果
*/
private BlockingDeque resultList;
/**
* 回滾類
*/
private RollBack rollback;
private Map params;
protected Object obj;
protected DataSourceTransactionManager transactionManager;
protected TransactionStatus status;
public ThreadTask(CountDownLatch childCountDown, CountDownLatch mainCountDown, BlockingDeque result, RollBack rollback, DataSourceTransactionManager transactionManager, Object obj, Map params) {
this.childMonitor = childCountDown;
this.mainMonitor = mainCountDown;
this.resultList = result;
this.rollback = rollback;
this.transactionManager = transactionManager;
this.obj = obj;
this.params = params;
initParam();
}
/**
* 事務(wù)回滾
*/
private void rollBack() {
System.out.println(Thread.currentThread().getName() + "開始回滾");
transactionManager.rollback(status);
}
/**
* 事務(wù)提交
*/
private void submit() {
System.out.println(Thread.currentThread().getName() + "提交事務(wù)");
transactionManager.commit(status);
}
protected Object getParam(String key) {
return params.get(key);
}
public abstract void initParam();
/**
* 執(zhí)行任務(wù),返回false表示任務(wù)執(zhí)行錯(cuò)誤,需要回滾
*
* @return
*/
public abstract boolean processTask();
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "子線程開始執(zhí)行任務(wù)");
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
status = transactionManager.getTransaction(def);
Boolean result = processTask();
//向隊(duì)列中添加處理結(jié)果
resultList.add(result);
//2、使用childMonitor.countDown()釋放子線程鎖定,同時(shí)使用mainMonitor.await();阻塞子線程,將程序的控制權(quán)交還給主線程。
childMonitor.countDown();
try {
//等待主線程的判斷邏輯執(zhí)行完,執(zhí)行下面的是否回滾邏輯
mainMonitor.await();
} catch (Exception e) {
log.error(e.getMessage());
}
System.out.println(Thread.currentThread().getName() + "子線程執(zhí)行剩下的任務(wù)");
//3、主線程檢查子線程執(zhí)行任務(wù)的結(jié)果,若有失敗結(jié)果出現(xiàn),主線程標(biāo)記狀態(tài)告知子線程回滾,然后使用mainMonitor.countDown();將程序控制權(quán)再次交給子線程,子線程檢測(cè)回滾標(biāo)志,判斷是否回滾。
if (rollback.isNeedRoolBack()) {
rollBack();
} else {
//事務(wù)提交
submit();
}
}
}
事務(wù)回滾類
package com.yunshidi.freight.wf.operation;
import lombok.Data;
@Data
public class RollBack {
public RollBack(boolean needRoolBack) {
this.needRoolBack = needRoolBack;
}
private boolean needRoolBack;
}
使用線程池工具:
1,首先建立自己的任務(wù)執(zhí)行類 并且 extends ThreadTask ,實(shí)現(xiàn)initParam()和processTask()方法
package com.yunshidi.freight.wf.operation;
import java.util.List;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.CountDownLatch;
public class TestTask extends ThreadTask {
/**
* 分批處理的數(shù)據(jù)
*/
private List objectList;
/**
* 可能需要注入的某些服務(wù)
*/
private TestService testService;
public TestTask(CountDownLatch childCountDown, CountDownLatch mainCountDown, BlockingDeque result, RollBack rollback, DataSourceTransactionManager transactionManager, Object obj, Map params) {
super(childCountDown, mainCountDown, result, rollback, transactionManager, obj, params);
}
@Override
public void initParam() {
this.objectList = (List) getParam("objectList");
this.testService = (TestService) getParam("testService");
}
/**
* 執(zhí)行任務(wù),返回false表示任務(wù)執(zhí)行錯(cuò)誤,需要回滾
* @return
* */
@Override
public boolean processTask() {
try {
for (Object o : objectList) {
testService.list();
System.out.println(o.toString()+"執(zhí)行自己的多線程任務(wù)邏輯");
}
return true;
} catch (Exception e) {
return false;
}
}
}
2,編寫主任務(wù)執(zhí)行方法
package com.yunshidi.freight.wf.operation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.jdbc.DataSourceTransactionManagerAutoConfiguration;
import javax.sql.DataSource;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class test {
@Autowired
private ThreadPoolTool threadPoolTool;
@Autowired
private TestService testService;
@Autowired
private DataSourceTransactionManager transactionManager;
/**
* 執(zhí)行多線程任務(wù)方法
*/
public void testThreadTask() {
try {
int threadCount = 5;
//需要分批處理的數(shù)據(jù)
List objectList = new ArrayList<>();
Map params =new HashMap<>();
params.put("objectList",objectList);
params.put("testService",testService);
//調(diào)用多線程工具方法
threadPoolTool.excuteTask(transactionManager,objectList,threadCount,params, TestTask.class);
}catch (Exception e){
throw new RuntimeException(e.getMessage());
}
}
}