在處理大數(shù)據(jù)或?qū)崟r(shí)數(shù)據(jù)時(shí),如果在主線程頻繁創(chuàng)建大量對(duì)象,這些對(duì)象使用完后成為游離對(duì)象,不會(huì)立即被GC。當(dāng)創(chuàng)建速度大于銷毀速度時(shí),可能導(dǎo)致內(nèi)存持續(xù)上漲,最后內(nèi)存溢出。
可以開啟多線程來處理,線程內(nèi)的對(duì)象會(huì)在執(zhí)行結(jié)束后盡快的銷毀,均分內(nèi)存累加的負(fù)擔(dān),保證內(nèi)存占用的穩(wěn)定性。
springboot的多線程使用
- 配置@EnableAsync
package com.cdgs.data.config;
import java.util.concurrent.Executor;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.aop.interceptor.SimpleAsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
@EnableAsync(proxyTargetClass=true)//利用@EnableAsync注解開啟異步任務(wù)支持
@ComponentScan("com.cdgs.data.service") //必須加此注解掃描包
public class CustomMultiThreadingConfig implements AsyncConfigurer{
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(10);
taskExecutor.setMaxPoolSize(20);
taskExecutor.setQueueCapacity(500);
//當(dāng)提交的任務(wù)個(gè)數(shù)大于QueueCapacity,就需要設(shè)置該參數(shù),但spring提供的都不太滿足業(yè)務(wù)場(chǎng)景,可以自定義一個(gè),也可以注意不要超過QueueCapacity即可
//taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
taskExecutor.setAwaitTerminationSeconds(10);
taskExecutor.setThreadNamePrefix("ES-IMOPRT-");
taskExecutor.initialize();
return taskExecutor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new SimpleAsyncUncaughtExceptionHandler();
}
}
- 配置@Async,修飾類時(shí)表示類里所有方法都是多線程異步執(zhí)行
@Async
public Future<Integer> jdbcToElasticsearch(Pageable pageable) {
//通過實(shí)現(xiàn)ApplicationContextAware得到applicationContext,進(jìn)而獲取spring管理的bean
BaseinfoRepository repository = (BaseinfoRepository)SpringBeanUtil.getBean(BaseinfoRepository.class);
List<ZrHisEnterpriseBaseinfo> list = new ArrayList<>() ;
//實(shí)際查詢數(shù)據(jù)庫等業(yè)務(wù)代碼...
elasticsearchOperation.bulkSave(list);
return new AsyncResult<>(list.size());
}
- 調(diào)用,注意:調(diào)用方法不能和異步方法在同一類里
//oralce批量導(dǎo)入es
@GetMapping("/baseinfo")
public String importBaseinfo(@PageableDefault(size=1000)Pageable pageable) {
long count = autoImportService.count();
int size = pageable.getPageSize();
long loops = count%size>0?count/size+1:count/size;
ArrayList<Future<Integer>> futureList = new ArrayList<>();
for (int i = 0; i < loops; i++) {
//異步執(zhí)行任務(wù),返回參數(shù)使用Future封裝接收
Future<Integer> future = autoImportService.jdbcToElasticsearch(new PageRequest(pageable.getPageNumber() + i, size));
futureList.add(future);
}
importCount = checkTaskDone(futureList);
System.out.println(importCount);
}
public static long checkTaskDone(ArrayList<Future<Integer>> futureList) {
//判斷異步調(diào)用的方法是否全都執(zhí)行完了
while(true) {
int doneSize = 0;
for ( Future<Integer> future : futureList) {
//該異步方法是否執(zhí)行完成
if(future.isDone()) {
doneSize++;
}
}
//如果異步方法全部執(zhí)行完,跳出循環(huán)
if(doneSize == futureList.size()) {
break;
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}//每隔2秒判斷一次
}
long importCount = 0;
for ( Future<Integer> future : futureList) {
try {
importCount += future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
return importCount;
}
調(diào)用該方法,觀察內(nèi)存能夠穩(wěn)定在一定范圍

QQ圖片20190304090025.png