Spring 多線程、異步和redis隊(duì)來(lái)解決非等待性方法

在處理后臺(tái)程序時(shí)如果執(zhí)行比較久,而不需要用戶等待的話,可以考慮使用多線程,線程異步或者redis隊(duì)的方法來(lái)實(shí)現(xiàn)

Spring通過(guò)任務(wù)執(zhí)行器(TaskExecutor)來(lái)實(shí)現(xiàn)多線程和并發(fā)編程。使用TheadPoolTaskExecutor可實(shí)現(xiàn)一個(gè)基于線程池的TaskExecutor。而實(shí)際開(kāi)發(fā)任務(wù)一般是非阻礙的,即異步的,所以要開(kāi)啟異步任務(wù)的支持(@EnableAsync),并通過(guò)實(shí)際的執(zhí)行bean中的方法使用@Async注釋來(lái)生命其是一個(gè)異步任務(wù)

代碼示例

線程數(shù)的配置類

import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

/**
 * @description: 多線程 配置類
 * @author: Shenshuaihu
 * @version: 1.0
 * @data: 2019-05-25 11:41
 */
@Configuration
@ComponentScan("com.ch3.taskexecutor")
@EnableAsync    // 開(kāi)啟異步任務(wù)支持
public class TaskExecutorConfig implements AsyncConfigurer {
    @Override
    public Executor getAsyncExecutor() {
        /**
         *  創(chuàng)建線程池
         *      核心線程數(shù)
         *      最大線程數(shù)
         *      隊(duì)列最大長(zhǎng)度
         */
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(5);
        taskExecutor.setMaxPoolSize(10);
        taskExecutor.setQueueCapacity(25);
        taskExecutor.initialize();
        return taskExecutor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return null;
    }
}

需要開(kāi)啟的異步方法

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

/**
 * @description: 任務(wù)執(zhí)行類
 * @author: Shenshuaihu
 * @version: 1.0
 * @data: 2019-05-25 10:39
 */
@Service
public class AsyncTaskService {

    /**
     *  Async 異步方法
     * @param i
     */
    @Async
    public void executeAsyncTask(Integer i) {
        int a = (int)(1+Math.random()*(800-1+1));

        try {
            Thread.sleep(a);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("執(zhí)行異步任務(wù):" + i);
    }

    @Async
    public void executeAsyncTaskPlus(Integer i) {
        System.out.println("執(zhí)行異步任務(wù)+1:" + i);
    }
}

程序入口類

import org.springframework.context.annotation.AnnotationConfigApplicationContext;

/**
 * @description: 線程調(diào)用入口
 * @author: Shenshuaihu
 * @version: 1.0
 * @data: 2019-05-25 13:13
 */
public class TaskMain {
    public static void main(String[] args) {
        AnnotationConfigApplicationContext context =
                new AnnotationConfigApplicationContext(TaskExecutorConfig.class);
        AsyncTaskService taskService = context.getBean(AsyncTaskService.class);
        System.out.println(taskService);
        for (int i = 0; i < 100; i++) {
            taskService.executeAsyncTask(i);
            taskService.executeAsyncTaskPlus(i);
        }
        context.close();
    }
}

簡(jiǎn)單代碼說(shuō)明

如果此時(shí)線程池中的數(shù)量小于corePoolSize,即使線程池中的線程都處于空閑狀態(tài),也要創(chuàng)建新的線程來(lái)處理被添加的任務(wù)。

如果此時(shí)線程池中的數(shù)量等于 corePoolSize,但是緩沖隊(duì)列 workQueue未滿,那么任務(wù)被放入緩沖隊(duì)列。

如果此時(shí)線程池中的數(shù)量大于corePoolSize,緩沖隊(duì)列workQueue滿,并且線程池中的數(shù)量小于maxPoolSize,建新的線程來(lái)處理被添加的任務(wù)。

如果此時(shí)線程池中的數(shù)量大于corePoolSize,緩沖隊(duì)列workQueue滿,并且線程池中的數(shù)量等于maxPoolSize,那么通過(guò)handler所指定的策略來(lái)處理此任務(wù)。也就是:處理任務(wù)的優(yōu)先級(jí)為:核心線程corePoolSize、任務(wù)隊(duì)列workQueue、最大線程 maximumPoolSize,如果三者都滿了,使用handler處理被拒絕的任務(wù)。

當(dāng)線程池中的線程數(shù)量大于corePoolSize時(shí),如果某線程空閑時(shí)間超過(guò)keepAliveTime,線程將被終止。這樣,線程池可以動(dòng)態(tài)的調(diào)整池中的線程數(shù)。

知識(shí)拓展

如果不想使用線程池的話,用redis隊(duì)也是不錯(cuò)的選擇。redis隊(duì)先進(jìn)先出也可以滿足,也是需要用線程來(lái)開(kāi)啟出發(fā)方法

入隊(duì)方法,即需要將執(zhí)行的內(nèi)容push進(jìn)來(lái)

public void pushTaskQueue(Long resultDataId,  String[] command){

    // 將參數(shù)放進(jìn)redis隊(duì)列中   resultDataId 與 cmd

    String data = String.valueOf(resultDataId) + "&&" + Arrays.toString(command);

    redisTemplate.opsForList().leftPush("task-queue",data);

}

執(zhí)行等待時(shí)間比較久的任務(wù),出隊(duì)

import jodd.util.StringUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.text.SimpleDateFormat;
import java.util.Arrays;

/**
 * @Date: 2018/12/15 17:28
 * @Description: 任務(wù)隊(duì)列消費(fèi)者   執(zhí)行Python 更新數(shù)據(jù)庫(kù)
 */

@Component
@Slf4j
public class TaskConsumer implements Runnable {

    @Autowired
    private IResultDataService service;

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @Override
    public void run() {

        try {
            String data = redisTemplate.opsForList().leftPop("task-queue").toString();
            String resultData[] = data.split("&&");
            dataId = resultData[0];
            String cmdData  = resultData[1].replace(",","#").replace("csv\"#","csv\",");
            cmdData  = cmdData.substring(1,cmdData.length()-1).replace(" ","");
            String[] command  = cmdData.split("#");
            log.info(" 隊(duì)列中數(shù)據(jù){} " , data );
           
            // 執(zhí)行py文件 等待時(shí)間比較久,需要異步操作
                Process proc = Runtime.getRuntime().exec(command);
                in.close();
                proc.waitFor();
        }catch (Exception e){
        }
    }
 }

2019/05/27晚于成都

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容