springboot @EnableAsync 多線程

在處理大數(shù)據(jù)或?qū)崟r(shí)數(shù)據(jù)時(shí),如果在主線程頻繁創(chuàng)建大量對(duì)象,這些對(duì)象使用完后成為游離對(duì)象,不會(huì)立即被GC。當(dāng)創(chuàng)建速度大于銷毀速度時(shí),可能導(dǎo)致內(nèi)存持續(xù)上漲,最后內(nèi)存溢出。
可以開啟多線程來處理,線程內(nèi)的對(duì)象會(huì)在執(zhí)行結(jié)束后盡快的銷毀,均分內(nèi)存累加的負(fù)擔(dān),保證內(nèi)存占用的穩(wěn)定性。

springboot的多線程使用

  1. 配置@EnableAsync
package com.cdgs.data.config;
import java.util.concurrent.Executor;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.aop.interceptor.SimpleAsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration
@EnableAsync(proxyTargetClass=true)//利用@EnableAsync注解開啟異步任務(wù)支持
@ComponentScan("com.cdgs.data.service") //必須加此注解掃描包
public class CustomMultiThreadingConfig implements AsyncConfigurer{

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(10);
        taskExecutor.setMaxPoolSize(20);
        taskExecutor.setQueueCapacity(500);
      //當(dāng)提交的任務(wù)個(gè)數(shù)大于QueueCapacity,就需要設(shè)置該參數(shù),但spring提供的都不太滿足業(yè)務(wù)場(chǎng)景,可以自定義一個(gè),也可以注意不要超過QueueCapacity即可
      //taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());        
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        taskExecutor.setAwaitTerminationSeconds(10);
        taskExecutor.setThreadNamePrefix("ES-IMOPRT-");
        taskExecutor.initialize();  
        return taskExecutor;
    }
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new SimpleAsyncUncaughtExceptionHandler();
    }
}
  1. 配置@Async,修飾類時(shí)表示類里所有方法都是多線程異步執(zhí)行
    @Async
    public Future<Integer> jdbcToElasticsearch(Pageable pageable) {
        //通過實(shí)現(xiàn)ApplicationContextAware得到applicationContext,進(jìn)而獲取spring管理的bean
        BaseinfoRepository repository = (BaseinfoRepository)SpringBeanUtil.getBean(BaseinfoRepository.class);
        List<ZrHisEnterpriseBaseinfo> list = new ArrayList<>() ;

        //實(shí)際查詢數(shù)據(jù)庫等業(yè)務(wù)代碼...

        elasticsearchOperation.bulkSave(list);
        return new AsyncResult<>(list.size());
    }
  1. 調(diào)用,注意:調(diào)用方法不能和異步方法在同一類里
    //oralce批量導(dǎo)入es
    @GetMapping("/baseinfo")
    public String importBaseinfo(@PageableDefault(size=1000)Pageable pageable) {
        long count = autoImportService.count();
        int size = pageable.getPageSize();
        long loops = count%size>0?count/size+1:count/size;
        ArrayList<Future<Integer>> futureList = new ArrayList<>();
        for (int i = 0; i < loops; i++) {
            //異步執(zhí)行任務(wù),返回參數(shù)使用Future封裝接收
            Future<Integer> future = autoImportService.jdbcToElasticsearch(new PageRequest(pageable.getPageNumber() + i, size));
            futureList.add(future);
        }
        importCount = checkTaskDone(futureList);
        System.out.println(importCount);
    }

    public static long checkTaskDone(ArrayList<Future<Integer>> futureList) {
        //判斷異步調(diào)用的方法是否全都執(zhí)行完了
        while(true) {
            int doneSize = 0;
            for ( Future<Integer> future  : futureList) {
                //該異步方法是否執(zhí)行完成
                if(future.isDone()) {
                    doneSize++;
                }
            }
            //如果異步方法全部執(zhí)行完,跳出循環(huán)
            if(doneSize == futureList.size()) {
                break;
            }
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }//每隔2秒判斷一次
        }
        
        long importCount = 0;
        for ( Future<Integer> future  : futureList) {
            try {
                importCount += future.get();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
        return importCount;
    }

調(diào)用該方法,觀察內(nèi)存能夠穩(wěn)定在一定范圍


QQ圖片20190304090025.png
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Swift1> Swift和OC的區(qū)別1.1> Swift沒有地址/指針的概念1.2> 泛型1.3> 類型嚴(yán)謹(jǐn) 對(duì)...
    cosWriter閱讀 11,674評(píng)論 1 32
  • 在一個(gè)方法內(nèi)部定義的變量都存儲(chǔ)在棧中,當(dāng)這個(gè)函數(shù)運(yùn)行結(jié)束后,其對(duì)應(yīng)的棧就會(huì)被回收,此時(shí),在其方法體中定義的變量將不...
    Y了個(gè)J閱讀 4,576評(píng)論 1 14
  • 不追星,不盲從是我經(jīng)常告誡自己的話,但對(duì)于追星族一直持理解狀態(tài),畢竟你喜歡的每一個(gè)愛豆都有他的魅力。就好像你也一樣...
    徒久旅人閱讀 251評(píng)論 0 2
  • 轉(zhuǎn)載自他人:https://blog.csdn.net/chenshun123/article/details/5...
    不一落葉閱讀 3,007評(píng)論 0 0
  • 引用方法 Referring to methods ?Constructor?.prototype.?method...
    饑人谷_Vomx閱讀 123評(píng)論 0 0

友情鏈接更多精彩內(nèi)容