在處理后臺(tái)程序時(shí)如果執(zhí)行比較久,而不需要用戶等待的話,可以考慮使用多線程,線程異步或者redis隊(duì)的方法來(lái)實(shí)現(xiàn)
Spring通過(guò)任務(wù)執(zhí)行器(TaskExecutor)來(lái)實(shí)現(xiàn)多線程和并發(fā)編程。使用TheadPoolTaskExecutor可實(shí)現(xiàn)一個(gè)基于線程池的TaskExecutor。而實(shí)際開(kāi)發(fā)任務(wù)一般是非阻礙的,即異步的,所以要開(kāi)啟異步任務(wù)的支持(@EnableAsync),并通過(guò)實(shí)際的執(zhí)行bean中的方法使用@Async注釋來(lái)生命其是一個(gè)異步任務(wù)
代碼示例
線程數(shù)的配置類
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
/**
* @description: 多線程 配置類
* @author: Shenshuaihu
* @version: 1.0
* @data: 2019-05-25 11:41
*/
@Configuration
@ComponentScan("com.ch3.taskexecutor")
@EnableAsync // 開(kāi)啟異步任務(wù)支持
public class TaskExecutorConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
/**
* 創(chuàng)建線程池
* 核心線程數(shù)
* 最大線程數(shù)
* 隊(duì)列最大長(zhǎng)度
*/
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(5);
taskExecutor.setMaxPoolSize(10);
taskExecutor.setQueueCapacity(25);
taskExecutor.initialize();
return taskExecutor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return null;
}
}
需要開(kāi)啟的異步方法
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
/**
* @description: 任務(wù)執(zhí)行類
* @author: Shenshuaihu
* @version: 1.0
* @data: 2019-05-25 10:39
*/
@Service
public class AsyncTaskService {
/**
* Async 異步方法
* @param i
*/
@Async
public void executeAsyncTask(Integer i) {
int a = (int)(1+Math.random()*(800-1+1));
try {
Thread.sleep(a);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("執(zhí)行異步任務(wù):" + i);
}
@Async
public void executeAsyncTaskPlus(Integer i) {
System.out.println("執(zhí)行異步任務(wù)+1:" + i);
}
}
程序入口類
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
/**
* @description: 線程調(diào)用入口
* @author: Shenshuaihu
* @version: 1.0
* @data: 2019-05-25 13:13
*/
public class TaskMain {
public static void main(String[] args) {
AnnotationConfigApplicationContext context =
new AnnotationConfigApplicationContext(TaskExecutorConfig.class);
AsyncTaskService taskService = context.getBean(AsyncTaskService.class);
System.out.println(taskService);
for (int i = 0; i < 100; i++) {
taskService.executeAsyncTask(i);
taskService.executeAsyncTaskPlus(i);
}
context.close();
}
}
簡(jiǎn)單代碼說(shuō)明
如果此時(shí)線程池中的數(shù)量小于corePoolSize,即使線程池中的線程都處于空閑狀態(tài),也要創(chuàng)建新的線程來(lái)處理被添加的任務(wù)。
如果此時(shí)線程池中的數(shù)量等于 corePoolSize,但是緩沖隊(duì)列 workQueue未滿,那么任務(wù)被放入緩沖隊(duì)列。
如果此時(shí)線程池中的數(shù)量大于corePoolSize,緩沖隊(duì)列workQueue滿,并且線程池中的數(shù)量小于maxPoolSize,建新的線程來(lái)處理被添加的任務(wù)。
如果此時(shí)線程池中的數(shù)量大于corePoolSize,緩沖隊(duì)列workQueue滿,并且線程池中的數(shù)量等于maxPoolSize,那么通過(guò)handler所指定的策略來(lái)處理此任務(wù)。也就是:處理任務(wù)的優(yōu)先級(jí)為:核心線程corePoolSize、任務(wù)隊(duì)列workQueue、最大線程 maximumPoolSize,如果三者都滿了,使用handler處理被拒絕的任務(wù)。
當(dāng)線程池中的線程數(shù)量大于corePoolSize時(shí),如果某線程空閑時(shí)間超過(guò)keepAliveTime,線程將被終止。這樣,線程池可以動(dòng)態(tài)的調(diào)整池中的線程數(shù)。
知識(shí)拓展
如果不想使用線程池的話,用redis隊(duì)也是不錯(cuò)的選擇。redis隊(duì)先進(jìn)先出也可以滿足,也是需要用線程來(lái)開(kāi)啟出發(fā)方法
入隊(duì)方法,即需要將執(zhí)行的內(nèi)容push進(jìn)來(lái)
public void pushTaskQueue(Long resultDataId, String[] command){
// 將參數(shù)放進(jìn)redis隊(duì)列中 resultDataId 與 cmd
String data = String.valueOf(resultDataId) + "&&" + Arrays.toString(command);
redisTemplate.opsForList().leftPush("task-queue",data);
}
執(zhí)行等待時(shí)間比較久的任務(wù),出隊(duì)
import jodd.util.StringUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.text.SimpleDateFormat;
import java.util.Arrays;
/**
* @Date: 2018/12/15 17:28
* @Description: 任務(wù)隊(duì)列消費(fèi)者 執(zhí)行Python 更新數(shù)據(jù)庫(kù)
*/
@Component
@Slf4j
public class TaskConsumer implements Runnable {
@Autowired
private IResultDataService service;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Override
public void run() {
try {
String data = redisTemplate.opsForList().leftPop("task-queue").toString();
String resultData[] = data.split("&&");
dataId = resultData[0];
String cmdData = resultData[1].replace(",","#").replace("csv\"#","csv\",");
cmdData = cmdData.substring(1,cmdData.length()-1).replace(" ","");
String[] command = cmdData.split("#");
log.info(" 隊(duì)列中數(shù)據(jù){} " , data );
// 執(zhí)行py文件 等待時(shí)間比較久,需要異步操作
Process proc = Runtime.getRuntime().exec(command);
in.close();
proc.waitFor();
}catch (Exception e){
}
}
}
2019/05/27晚于成都