CountDownLatch

背景:
在項(xiàng)目中使用多線程抓取第三方數(shù)據(jù)執(zhí)行數(shù)據(jù)入庫(kù)時(shí),如果某個(gè)子線程執(zhí)行異常,其他子線事務(wù)全部回滾,spring對(duì)多線程無(wú)法進(jìn)行事務(wù)控制,是因?yàn)槎嗑€程底層連接數(shù)據(jù)庫(kù)的時(shí)候,是使用的線程變量(TheadLocal),線程之間事務(wù)隔離,每個(gè)線程有自己的連接,事務(wù)肯定不是同一個(gè)了。

解決辦法
思想就是使用兩個(gè)CountDownLatch實(shí)現(xiàn)子線程的二段提交
步驟:
1、主線程將任務(wù)分發(fā)給子線程,然后使用childMonitor.await();阻塞主線程,等待所有子線程處理向數(shù)據(jù)庫(kù)中插入的業(yè)務(wù),并使用BlockingDeque存儲(chǔ)線程的返回結(jié)果。
2、使用childMonitor.countDown()釋放子線程鎖定,同時(shí)使用mainMonitor.await();阻塞子線程,將程序的控制權(quán)交還給主線程。
3、主線程檢查子線程執(zhí)行任務(wù)的結(jié)果,若有失敗結(jié)果出現(xiàn),主線程標(biāo)記狀態(tài)告知子線程回滾,然后使用mainMonitor.countDown();將程序控制權(quán)再次交給子線程,子線程檢測(cè)回滾標(biāo)志,判斷是否回滾。

線程池工具類

package com.yunshidi.freight.wf.operation;

import java.lang.reflect.Constructor;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.logging.Handler;

public class ThreadPoolTool {

    /**    * 多線程任務(wù)

     * @param transactionManager

     * @param data

     * @param threadCount

     * @param params

     * @param clazz

     */
    public void excuteTask(DataSourceTransactionManager transactionManager, List data,int threadCount, Map params, Class clazz) {
        if(data ==null|| data.size() == 0) {
            return;
        }
        int batch = 0;
        ExecutorService executor = Executors.newFixedThreadPool(threadCount);
        //監(jiān)控子線程的任務(wù)執(zhí)行
        CountDownLatch childMonitor = new CountDownLatch(threadCount);
        //監(jiān)控主線程,是否需要回滾
        CountDownLatch mainMonitor = new CountDownLatch(1);
        //存儲(chǔ)任務(wù)的返回結(jié)果,返回true表示不需要回滾,反之,則回滾(由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列)
        BlockingDeque results = new LinkedBlockingDeque(threadCount);
        RollBack rollback = new RollBack(false);
        try {

            LinkedBlockingQueue queue = splitQueue(data, threadCount);//由鏈表結(jié)構(gòu)組成的有界阻塞隊(duì)列
            while(true) {
                List list = queue.poll();
                if(list ==null) {
                    break;
                }
                batch++;
                params.put("batch", batch);
                Constructor constructor = clazz.getConstructor(new Class[]{
                        CountDownLatch.class, CountDownLatch.class, BlockingDeque.class, RollBack.class, DataSourceTransactionManager.class, Object.class, Map.class});
                ThreadTask task = (ThreadTask) constructor.newInstance(childMonitor, mainMonitor, results, rollback, transactionManager, list, params);
                executor.execute(task);
            }

            //  1、主線程將任務(wù)分發(fā)給子線程,然后使用childMonitor.await();阻塞主線程,等待所有子線程處理向數(shù)據(jù)庫(kù)中插入的業(yè)務(wù)。
            childMonitor.await();
            System.out.println("主線程開始執(zhí)行任務(wù)");
            //根據(jù)返回結(jié)果來(lái)確定是否回滾
            for(int i = 0; i < threadCount; i++) {
                Boolean result = results.take();
                if(!result) {
                    //有線程執(zhí)行異常,需要回滾子線程
                    rollback.setNeedRoolBack(true);
                }
            }
            //  3、主線程檢查子線程執(zhí)行任務(wù)的結(jié)果,若有失敗結(jié)果出現(xiàn),主線程標(biāo)記狀態(tài)告知子線程回滾,然后使用mainMonitor.countDown();將程序控制權(quán)再次交給子線程,子線程檢測(cè)回滾標(biāo)志,判斷是否回滾。
            mainMonitor.countDown();
        } catch (Exception e) {
            log.error(e.getMessage());
        } finally {
            //關(guān)閉線程池,釋放資源
            executor.shutdown();
        }

    }

    /**    * 隊(duì)列拆分

     *

     * @param data 需要執(zhí)行的數(shù)據(jù)集合

     * @param threadCount 核心線程數(shù)

     * @return*/
    private LinkedBlockingQueue splitQueue(List data, int threadCount) {

        LinkedBlockingQueue queueBatch =new LinkedBlockingQueue();
        int total = data.size();
        int oneSize = total / threadCount;
        int start;
        int end;
        for(int i = 0; i < threadCount; i++) {
            start = i * oneSize;
            end = (i + 1) * oneSize;
            if(i < threadCount - 1) {
                queueBatch.add(data.subList(start, end));
            } else {
                queueBatch.add(data.subList(start, data.size()));
            }
        }
        return queueBatch;
    }

}


任務(wù)執(zhí)行類

package com.yunshidi.freight.wf.operation;

import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;

import java.util.concurrent.BlockingDeque;
import java.util.concurrent.CountDownLatch;

/**
 * 任務(wù)執(zhí)行類
 */
public abstract class ThreadTask implements Runnable {

    /**
     * 監(jiān)控子任務(wù)的執(zhí)行
     */
    private CountDownLatch childMonitor;

    /**
     * 監(jiān)控主線程
     */
    private CountDownLatch mainMonitor;

    /**
     * 存儲(chǔ)線程的返回結(jié)果
     */
    private BlockingDeque resultList;

    /**
     * 回滾類
     */
    private RollBack rollback;

    private Map params;

    protected Object obj;

    protected DataSourceTransactionManager transactionManager;

    protected TransactionStatus status;

    public ThreadTask(CountDownLatch childCountDown, CountDownLatch mainCountDown, BlockingDeque result, RollBack rollback, DataSourceTransactionManager transactionManager, Object obj, Map params) {
        this.childMonitor = childCountDown;
        this.mainMonitor = mainCountDown;
        this.resultList = result;
        this.rollback = rollback;
        this.transactionManager = transactionManager;
        this.obj = obj;
        this.params = params;
        initParam();

    }

    /**
     * 事務(wù)回滾
     */
    private void rollBack() {
        System.out.println(Thread.currentThread().getName() + "開始回滾");
        transactionManager.rollback(status);

    }

    /**
     * 事務(wù)提交
     */
    private void submit() {
        System.out.println(Thread.currentThread().getName() + "提交事務(wù)");
        transactionManager.commit(status);

    }

    protected Object getParam(String key) {
        return params.get(key);
    }

    public abstract void initParam();

    /**
     * 執(zhí)行任務(wù),返回false表示任務(wù)執(zhí)行錯(cuò)誤,需要回滾
     *
     * @return
     */
    public abstract boolean processTask();

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + "子線程開始執(zhí)行任務(wù)");
        DefaultTransactionDefinition def = new DefaultTransactionDefinition();
        def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
        status = transactionManager.getTransaction(def);
        Boolean result = processTask();
        //向隊(duì)列中添加處理結(jié)果
        resultList.add(result);
        //2、使用childMonitor.countDown()釋放子線程鎖定,同時(shí)使用mainMonitor.await();阻塞子線程,將程序的控制權(quán)交還給主線程。
        childMonitor.countDown();
        try {
            //等待主線程的判斷邏輯執(zhí)行完,執(zhí)行下面的是否回滾邏輯
            mainMonitor.await();
        } catch (Exception e) {
            log.error(e.getMessage());
        }
        System.out.println(Thread.currentThread().getName() + "子線程執(zhí)行剩下的任務(wù)");
        //3、主線程檢查子線程執(zhí)行任務(wù)的結(jié)果,若有失敗結(jié)果出現(xiàn),主線程標(biāo)記狀態(tài)告知子線程回滾,然后使用mainMonitor.countDown();將程序控制權(quán)再次交給子線程,子線程檢測(cè)回滾標(biāo)志,判斷是否回滾。
        if (rollback.isNeedRoolBack()) {
            rollBack();
        } else {
            //事務(wù)提交
            submit();
        }

    }

}

事務(wù)回滾類

package com.yunshidi.freight.wf.operation;

import lombok.Data;

@Data
public class RollBack {

    public RollBack(boolean needRoolBack) {
        this.needRoolBack = needRoolBack;
    }
    private boolean needRoolBack;

}

使用線程池工具:

1,首先建立自己的任務(wù)執(zhí)行類 并且 extends ThreadTask ,實(shí)現(xiàn)initParam()和processTask()方法

package com.yunshidi.freight.wf.operation;

import java.util.List;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.CountDownLatch;

public class TestTask extends ThreadTask {

        /**
         * 分批處理的數(shù)據(jù)
         */
        private List objectList;
        /**
        * 可能需要注入的某些服務(wù)
        */
        private TestService testService;
        public TestTask(CountDownLatch childCountDown, CountDownLatch mainCountDown, BlockingDeque result, RollBack rollback, DataSourceTransactionManager transactionManager, Object obj, Map params) {
        super(childCountDown, mainCountDown, result, rollback, transactionManager, obj, params);
        }

     @Override
    public void initParam() {
        this.objectList = (List) getParam("objectList");
        this.testService = (TestService) getParam("testService");
     }

    /**
     * 執(zhí)行任務(wù),返回false表示任務(wù)執(zhí)行錯(cuò)誤,需要回滾

     * @return
     * */
    @Override
    public boolean processTask() {
          try {
            for (Object o : objectList) {
                testService.list();
                System.out.println(o.toString()+"執(zhí)行自己的多線程任務(wù)邏輯");
            }
            return true;
          } catch (Exception e) {
            return false;
          }

    }

   }

2,編寫主任務(wù)執(zhí)行方法

package com.yunshidi.freight.wf.operation;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.jdbc.DataSourceTransactionManagerAutoConfiguration;

import javax.sql.DataSource;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class test {

    @Autowired
    private ThreadPoolTool threadPoolTool;
    
    @Autowired
    private TestService testService;

    @Autowired
    private DataSourceTransactionManager transactionManager;
    
    /**
     * 執(zhí)行多線程任務(wù)方法
     */
    public void testThreadTask() {
        try {
            int threadCount = 5;
            //需要分批處理的數(shù)據(jù)
            List objectList = new ArrayList<>();
            Map params =new HashMap<>();
            params.put("objectList",objectList);
            params.put("testService",testService);
            //調(diào)用多線程工具方法
            threadPoolTool.excuteTask(transactionManager,objectList,threadCount,params, TestTask.class);
        }catch (Exception e){
            throw new RuntimeException(e.getMessage());
        }

    }
}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容