Springboot線程池多任務(wù)阻塞等待結(jié)果(兩種實(shí)現(xiàn)方式)

場(chǎng)景##

公司一站通翻譯同步系統(tǒng) , 需要將客戶(hù)保存在美國(guó)站點(diǎn)的文章翻譯并保存到其余22個(gè)站點(diǎn)的數(shù)據(jù)庫(kù)。由于翻譯需要耗費(fèi)較長(zhǎng)的時(shí)間,故而使用隊(duì)列將任務(wù)投遞到線程池中處理,

  1. Springboot 配置產(chǎn)品同步核心線程池
package cn.configuration;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;

@Configuration
@EnableAsync // 開(kāi)啟線程池
public class AsyncThreadPoolConfiguration
{
    /**
     * 配置默認(rèn)線程池,用于處理一些公共異步任務(wù)
     */
    @Bean("defaultThread")
    public Executor defaultThread(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(20);// 核心線程數(shù),
        executor.setMaxPoolSize(40);// 并發(fā)線程的數(shù)量限制為2
        executor.setQueueCapacity(200); // 線程隊(duì)列
        executor.setThreadNamePrefix("defaultThread@");
        executor.initialize();
        return executor;
    }

    /**
     * 同步產(chǎn)品使用的線程池
     * @return
     */
    @Bean("syncProduct")
    public Executor syncProduct() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(22);// 核心線程數(shù),
        executor.setMaxPoolSize(22);// 并發(fā)線程的數(shù)量限制為2
        executor.setQueueCapacity(100); // 線程隊(duì)列
        executor.setThreadNamePrefix("syncProduct@");
        executor.initialize();
        return executor;
    }

    /**
     * todo 配置其他功能的線程池
     */
}

2.1 使用方式一 注入線程池對(duì)象 ,通過(guò) lomda表達(dá)式結(jié)合Future實(shí)現(xiàn)

package cn.services;

import cn.utils.SystemUtils;
import cn.utils.TimeUtils;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.*;

@Service
public class IndexServices
{
    // 注入線程池對(duì)象
    @Resource(name = "syncProduct")
    private ThreadPoolTaskExecutor syncProduct;

    public void ansyc() throws InterruptedException {
        // 初始化任務(wù)結(jié)果集
        Map<String,Future<String>> futures = new HashMap<>();
        // 模擬需要同步的 22個(gè)子站點(diǎn)
        List<String> langs = new ArrayList<>();
        for (int i=1;i<=22;i++){
            langs.add("lang"+i);
        }
        // 多線程處理任務(wù)
        for (String lang : langs) {
            Future<String> future = syncProduct.submit(() -> {
                // 真正的業(yè)務(wù)處理
                if(lang.equals("lang10")){
                    Thread.sleep(8000);// 模擬阻塞時(shí)間
                }else{
                    Thread.sleep(3000); // 模擬阻塞時(shí)間
                }

                System.out.println("打印結(jié)果 : " + lang + "當(dāng)前線程名稱(chēng)是:"+SystemUtils.getThreadID());
                return lang + "...";
            });
            futures.put(lang,future);
        }

        // 阻塞等待結(jié)果,缺點(diǎn)該種方式無(wú)法設(shè)置阻塞等待有效時(shí)間,如果有一個(gè)線程阻塞,會(huì)導(dǎo)致整個(gè)線程池一直等待
        for (Map.Entry<String, Future<String>> entry : futures.entrySet()) {
           while (true){
               Future<String> future = entry.getValue();
               String lang = entry.getKey();
               if(future.isDone() && !future.isCancelled()){
                   String result = null;
                   try{
                       result = future.get();
                        //result = future.get(6L,TimeUnit.SECONDS); 設(shè)置阻塞等待時(shí)間無(wú)效
                   }catch(ExecutionException e){
                       e.printStackTrace();
                   }
                   System.out.println(lang+" 站點(diǎn)任務(wù)結(jié)果result=" + result + "獲取完成!" + TimeUtils.getCurretDate());
                   break;
               }else{
                   Thread.sleep(1);//每次輪詢(xún)休息1毫秒(CPU納秒級(jí)),避免CPU高速輪循耗空CPU ,這個(gè)至關(guān)重要
               }
           }
        }
        /*
       // 設(shè)置阻塞時(shí)間有效版本
        for (Map.Entry<String, Future<String>> entry : futures.entrySet()) {
           while (true){
               Future<String> future = entry.getValue();
               String lang = entry.getKey();
               String result = null;
               try{
                   result = future.get(3L,TimeUnit.SECONDS);
               }catch(ExecutionException e){
                   e.printStackTrace();
               }catch (TimeoutException e){
                   System.out.println("存在超時(shí)的任務(wù),無(wú)法獲取結(jié)果");
               }
               System.out.println(lang+" 站點(diǎn)任務(wù)結(jié)果result=" + result + "獲取完成!" + TimeUtils.getCurretDate());
               break;
           }

        }
        */
    }
}

2.2 上線方式二,通過(guò)Springboot的Ansyc注解實(shí)現(xiàn)任務(wù)并發(fā)
實(shí)現(xiàn)思路是 寫(xiě)一個(gè) @Ansyc 注解標(biāo)記的業(yè)務(wù)方法,在外層循環(huán)調(diào)用,通過(guò)參數(shù)控制實(shí)現(xiàn)不同的邏輯

/**
     * 翻譯英文站點(diǎn)文章 并同步到22個(gè)子站點(diǎn)
     */
    // 業(yè)務(wù)核心
    @Async("syncProduceToSite")
    public CompletableFuture<String> syncProduceToSite(String lang) throws InterruptedException {
        // 通過(guò) 參數(shù) lang的不同,實(shí)現(xiàn)翻譯成不同的語(yǔ)言,并推送到不同站點(diǎn)的數(shù)據(jù)庫(kù)
        Thread.sleep(8000L);

        String results = lang + " success";
        String name =  SystemUtils.getCurrentThreadName();
        return CompletableFuture.completedFuture("站點(diǎn)"+lang + "處理的結(jié)果是:"+results+";線程名稱(chēng)是:"+name);
    }

在外層調(diào)用過(guò)程
 // 模擬準(zhǔn)備 22個(gè)站點(diǎn)的語(yǔ)言標(biāo)識(shí)符參數(shù)
        List<String> langs = new ArrayList<>();
        for (int i=1;i<=22;++i){
            langs.add("lang"+i);
        }
        // 初始化結(jié)果集
        Map<String,CompletableFuture<String>> futureMap = new HashMap<>();
        for (String lang: langs) {
            // 業(yè)務(wù)執(zhí)行
            CompletableFuture<String> future = authServices.syncProduceToSite(lang);
            futureMap.put(lang,future); // 將結(jié)果集放入 map

        }
        // 獲取結(jié)果集
        for (Map.Entry<String, CompletableFuture<String>> entry : futureMap.entrySet()){
            String lang = entry.getKey();
            CompletableFuture<String> future = entry.getValue();

            try {
//                CompletableFuture.anyOf(future).join(); // 阻塞等待結(jié)果的返回,加上這一句,那么下面的阻塞時(shí)間就無(wú)效
                String result= future.get(2L, TimeUnit.SECONDS);
                System.out.println("站點(diǎn):"+lang+" 得到結(jié)果啦 "+result);
            } catch (TimeoutException e) {
//                e.printStackTrace();
                System.out.println("站點(diǎn):"+lang+" 超時(shí),無(wú)法獲取結(jié)果");
            }
        }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容