Spring異步請求與異步調(diào)用

異步請求

在Servlet 3.0之前,Servlet采用Thread-Per-Request的方式處理請求,即每一次Http請求都由某一個線程從頭到尾負責處理。如果一個請求需要進行IO操作,比如訪問數(shù)據(jù)庫、調(diào)用第三方服務(wù)接口等,那么其所對應(yīng)的線程將同步地等待****IO操作完成, 而IO操作是非常慢的,所以此時的線程并不能及時地釋放回線程池以供后續(xù)使用,在并發(fā)量越來越大的情況下,這將帶來嚴重的性能問題。其請求流程大致為:


image.png

而在Servlet3.0發(fā)布后,提供了一個新特性:異步處理請求??梢韵柔尫湃萜鞣峙浣o請求的線程與相關(guān)資源,減輕系統(tǒng)負擔,釋放了容器所分配線程的請求,其響應(yīng)將被延后,可以在耗時處理完成(例如長時間的運算)時再對客戶端進行響應(yīng)。其請求流程為:


image.png

在Servlet 3.0后,我們可以從HttpServletRequest對象中獲得一個AsyncContext對象,該對象構(gòu)成了異步處理的上下文,Request和Response對象都可從中獲取。AsyncContext可以從當前線程傳給另外的線程,并在新的線程中完成對請求的處理并返回結(jié)果給客戶端,初始線程便可以還回給容器線程池以處理更多的請求。如此,通過將請求從一個線程傳給另一個線程處理的過程便構(gòu)成了Servlet 3.0中的異步處理。

隨著Spring5發(fā)布,提供了一個響應(yīng)式Web框架:Spring WebFlux。之后可能就不需要Servlet容器的支持了。以下是其先后對比圖:


image.png

左側(cè)是傳統(tǒng)的基于Servlet的Spring Web MVC框架,右側(cè)是5.0版本新引入的基于Reactive Streams的Spring WebFlux框架,從上到下依次是Router Functions,WebFlux,Reactive Streams三個新組件。

原生異步請求API說明
在編寫實際代碼之前,我們來了解下一些關(guān)于異步請求的api的調(diào)用說明。

獲取AsyncContext:根據(jù)HttpServletRequest對象獲取。
AsyncContext asyncContext = request.startAsync();

設(shè)置監(jiān)聽器:可設(shè)置其開始、完成、異常、超時等事件的回調(diào)處理
其監(jiān)聽器的接口代碼:

public interface AsyncListener extends EventListener {
    void onComplete(AsyncEvent event) throws IOException;
    void onTimeout(AsyncEvent event) throws IOException;
    void onError(AsyncEvent event) throws IOException;
    void onStartAsync(AsyncEvent event) throws IOException;
}

說明:

  • onStartAsync:異步線程開始時調(diào)用
  • onError:異步線程出錯時調(diào)用
  • onTimeout:異步線程執(zhí)行超時調(diào)用
  • onComplete:異步執(zhí)行完畢時調(diào)用

一般上,我們在超時或者異常時,會返回給前端相應(yīng)的提示,比如說超時了,請再次請求等等,根據(jù)各業(yè)務(wù)進行自定義返回。同時,在異步調(diào)用完成時,一般需要執(zhí)行一些清理工作或者其他相關(guān)操作。

需要注意的是只有在調(diào)用request.startAsync前將監(jiān)聽器添加到AsyncContext,監(jiān)聽器的onStartAsync方法才會起作用,而調(diào)用startAsync前AsyncContext還不存在,所以第一次調(diào)用startAsync是不會被監(jiān)聽器中的onStartAsync方法捕獲的,只有在超時后又重新開始的情況下onStartAsync方法才會起作用。

設(shè)置超時:通過setTimeout方法設(shè)置,單位:毫秒。
一定要設(shè)置超時時間,不能無限等待下去,不然和正常的請求就一樣了。。

Servlet方式實現(xiàn)異步請求

前面已經(jīng)提到,可通過HttpServletRequest對象中獲得一個AsyncContext對象,該對象構(gòu)成了異步處理的上下文。所以,我們來實際操作下。

1、編寫一個簡單控制層

/**
 * 使用servlet方式進行異步請求
 *
 */
@Slf4j
@RestController
public class ServletController {
    
    @RequestMapping("/servlet/orig")
    public void todo(HttpServletRequest request,HttpServletResponse response) throws Exception {
        //這里來個休眠
        Thread.sleep(100);
        response.getWriter().println("這是【正?!康恼埱蠓祷?);
    }
    
    @RequestMapping("/servlet/async")
    public void todoAsync(HttpServletRequest request,HttpServletResponse response) {
        AsyncContext asyncContext = request.startAsync();
        asyncContext.addListener(new AsyncListener() {
            
            @Override
            public void onTimeout(AsyncEvent event) throws IOException {
                log.info("超時了:");
                //做一些超時后的相關(guān)操作
            }
            
            @Override
            public void onStartAsync(AsyncEvent event) throws IOException {
                // TODO Auto-generated method stub
                log.info("線程開始");
            }
            
            @Override
            public void onError(AsyncEvent event) throws IOException {
                log.info("發(fā)生錯誤:",event.getThrowable());
            }
            
            @Override
            public void onComplete(AsyncEvent event) throws IOException {
                log.info("執(zhí)行完成");
                //這里可以做一些清理資源的操作
                
            }
        });
        //設(shè)置超時時間
        asyncContext.setTimeout(200);
        //也可以不使用start 進行異步調(diào)用
//        new Thread(new Runnable() {
//            @Override
//            public void run() {
//                編寫業(yè)務(wù)邏輯
//                
//            }
//        }).start();
        
        asyncContext.start(new Runnable() {            
            @Override
            public void run() {
                try {
                    Thread.sleep(100);
                    log.info("內(nèi)部線程:" + Thread.currentThread().getName());
                    asyncContext.getResponse().setCharacterEncoding("utf-8");
                    asyncContext.getResponse().setContentType("text/html;charset=UTF-8");
                    asyncContext.getResponse().getWriter().println("這是【異步】的請求返回");
                } catch (Exception e) {
                    log.error("異常:",e);
                }
                //異步請求完成通知
                //此時整個請求才完成
                //其實可以利用此特性 進行多條消息的推送 把連接掛起。。
                asyncContext.complete();
            }
        });
        //此時之類 request的線程連接已經(jīng)釋放了
        log.info("線程:" + Thread.currentThread().getName());
    }
}

注意:異步請求時,可以利用ThreadPoolExecutor自定義個線程池。

1.啟動下應(yīng)用,查看控制臺輸出就可以獲悉是否在同一個線程里面了。同時,可設(shè)置下等待時間,之后就會調(diào)用超時回調(diào)方法了。

使用過濾器時,需要加入asyncSupported為true配置,開啟異步請求支持。

@WebServlet(urlPatterns = "/okong", asyncSupported = true )  
public  class AsyncServlet extends HttpServlet ...

題外話:其實我們可以利用在未執(zhí)行asyncContext.complete()方法時請求未結(jié)束這特性,可以做個簡單的文件上傳進度條之類的功能。但注意請求是會超時的,需要設(shè)置超時的時間下。

Spring方式實現(xiàn)異步請求

在Spring中,有多種方式實現(xiàn)異步請求,比如callable、DeferredResult或者WebAsyncTask。每個的用法略有不同,可根據(jù)不同的業(yè)務(wù)場景選擇不同的方式。以下主要介紹一些常用的用法

Callable

使用很簡單,直接返回的參數(shù)包裹一層callable即可。

用法

    @RequestMapping("/callable")
    public Callable<String> callable() {
        log.info("外部線程:" + Thread.currentThread().getName());
        return new Callable<String>() {
 
            @Override
            public String call() throws Exception {
                log.info("內(nèi)部線程:" + Thread.currentThread().getName());
                return "callable!";
            }
        };
    }

超時、自定義線程設(shè)置

從控制臺可以看見,異步響應(yīng)的線程使用的是名為:MvcAsync1的線程。第一次再訪問時,就是MvcAsync2了。若采用默認設(shè)置,會無限的創(chuàng)建新線程去處理異步請求,所以正常都需要配置一個線程池及超時時間。

編寫一個配置類

@Configuration
public class JavaConfig {
    /**
     * 配置線程池
     * @return
     */
    @Bean(name = "asyncPoolTaskExecutor")
    public ThreadPoolTaskExecutor getAsyncThreadPoolTaskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        //此方法返回可用處理器的虛擬機的最大數(shù)量; 不小于1
        int core = Runtime.getRuntime().availableProcessors();
        taskExecutor.setCorePoolSize(core);
        taskExecutor.setMaxPoolSize(core*2 + 1);
        taskExecutor.setQueueCapacity(25);
        taskExecutor.setKeepAliveSeconds(200);
        taskExecutor.setThreadNamePrefix("callable-");//線程名稱前綴
        // 線程池對拒絕任務(wù)(無線程可用)的處理策略,目前只支持AbortPolicy、CallerRunsPolicy;默認為后者
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        taskExecutor.initialize();
        return taskExecutor;
    }
}

DeferredResult

相比于callable,DeferredResult可以處理一些相對復雜一些的業(yè)務(wù)邏輯,最主要還是可以在另一個線程里面進行業(yè)務(wù)處理及返回,即可在兩個完全不相干的線程間的通信。

/**
 * 線程池
 */
public static ExecutorService FIXED_THREAD_POOL = Executors.newFixedThreadPool(30);
 
@RequestMapping("/deferredresult")
public DeferredResult<String> deferredResult(){
    log.info("外部線程:" + Thread.currentThread().getName());
    //設(shè)置超時時間
    DeferredResult<String> result = new DeferredResult<String>(60*1000L);
    //處理超時事件 采用委托機制
    result.onTimeout(new Runnable() {
 
        @Override
        public void run() {
            log.error("DeferredResult超時");
            result.setResult("超時了!");
        }
    });
    result.onCompletion(new Runnable() {
 
        @Override
        public void run() {
            //完成后
            log.info("調(diào)用完成");
        }
    });
    FIXED_THREAD_POOL.execute(new Runnable() {
 
        @Override
        public void run() {
            //處理業(yè)務(wù)邏輯
            log.info("內(nèi)部線程:" + Thread.currentThread().getName());
            //返回結(jié)果
            result.setResult("DeferredResult!!");
        }
    });
    return result;
}

注意:返回結(jié)果時記得調(diào)用下setResult方法。

題外話:利用DeferredResult可實現(xiàn)一些長連接的功能,比如當某個操作是異步時,我們可以保存這個DeferredResult對象,當異步通知回來時,我們在找回這個DeferredResult對象,之后在setResult會結(jié)果即可。提高性能。

WebAsyncTask

使用方法都類似,只是WebAsyncTask是直接返回了。

@RequestMapping("/webAsyncTask")
    public WebAsyncTask<String> webAsyncTask() {
        log.info("外部線程:" + Thread.currentThread().getName());
        WebAsyncTask<String> result = new WebAsyncTask<String>(60*1000L, new Callable<String>() {
 
            @Override
            public String call() throws Exception {
                log.info("內(nèi)部線程:" + Thread.currentThread().getName());
                return "WebAsyncTask!!!";
            }
        });
        result.onTimeout(new Callable<String>() {
            
            @Override
            public String call() throws Exception {
                // TODO Auto-generated method stub
                return "WebAsyncTask超時!!!";
            }
        });
        result.onCompletion(new Runnable() {
            
            @Override
            public void run() {
                //超時后 也會執(zhí)行此方法
                log.info("WebAsyncTask執(zhí)行結(jié)束");
            }
        });
        return result;
    }

異步調(diào)用

開啟異步支持

@Configuration
@EnableAsync
public class SpringAsyncConfig {
}

@EnableAsync檢測Spring的@Async注釋和EJB 3.1 javax. EJB異步,還可用于檢測其他用戶定義注解。

自定義線程池:

@Slf4j
@Configuration
public class ThreadPoolConfiguration {

    @Bean(name = "defaultThreadPoolExecutor", destroyMethod = "shutdown")
    public ThreadPoolExecutor systemCheckPoolExecutorService() {
        return new ThreadPoolExecutor(3, 10, 60, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(10000),
                new ThreadFactoryBuilder().setNameFormat("default-executor-%d").build(),
                (r, executor) -> log.error("system pool is full! "));
    }
}

在異步處理的方法上添加注解 @Async ,當對 execute 方法 調(diào)用時,通過自定義的線程池 defaultThreadPoolExecutor 異步化執(zhí)行 execute 方法

@Service
public class AsyncServiceImpl implements AsyncService {

    @Async("defaultThreadPoolExecutor")
    public Boolean execute(Integer num) {
        log.info("線程:" + Thread.currentThread().getName() + " , 任務(wù):" + num);
        return true;
    }
}

用 @Async 注解標記的方法,稱為異步方法。在SB應(yīng)用中使用 @Async 很簡單:

調(diào)用異步方法類上或啟動類加上注解 @EnableAsync
在需要被異步調(diào)用的方法外加上 @Async
所使用的 @Async 注解方法的類對象應(yīng)該是Spring容器管理的bean對象;
@Async使用

無返回值

@Async
@Slf4j
public void returnVoid() {
}

有返回值

@Async
@Slf4j
public Future<String> returnFuture() {
    try {
        Thread.sleep(1000);
        return new AsyncResult<String>("hello");
    } catch (InterruptedException e) {
    }
    return null;
}

執(zhí)行器

Spring默認使用SimpleAsyncTaskExecutor線程池去執(zhí)行這些異步方法,此執(zhí)行器沒有限制線程數(shù),實際上此線程池不是真正意義上的線程池,線程并沒有重用,每次調(diào)用都會創(chuàng)建一個新的線程。可從兩個層級進行覆蓋:

方法級別覆蓋

@Async("threadPoolTaskExecutor")
public void asyncMethodWithConfiguredExecutor() {
}

應(yīng)用級別覆蓋
自定義配置類實現(xiàn)AsyncConfigurer接口,重寫getAsyncExecutor()方法:

@Configuration
@EnableAsync
public class SpringAsyncConfig implements AsyncConfigurer {
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.initialize();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(25);
        return executor;
    }
}

異常處理

當方法返回值是Future時,異常捕獲是沒問題的,F(xiàn)uture.get()方法會拋出異常。但如果返回類型是Void,異常在當前線程就捕獲不到,需要添加額外的配置來處理異常。

實現(xiàn)AsyncUncaughtExceptionHandler接口來自定義異常處理類,重寫handleUncaughtException()方法,存在任何未捕獲的異步異常時調(diào)用:

@Slf4j
public class CustomAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
    @Override
    public void handleUncaughtException (Throwable throwable, Method method, Object... obj) {
        log.info("Exception message - " + throwable.getMessage() + "Method name - " + method.getName());
        for (Object param : obj) {
            log.info("Parameter value - " + param);
        }
    }
}

由configuration類實現(xiàn)的AsyncConfigurer接口。作為其中的一部分,還需要覆蓋getAsyncUncaughtExceptionHandler()方法來返回自定義的異步異常處理程序:

@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
    return new CustomAsyncExceptionHandler();
}

失效

調(diào)用的異步方法,不能為同一個類的方法(包括同一個類的內(nèi)部類),簡單來說,因為Spring在啟動掃描時會為其創(chuàng)建一個代理類,而同類調(diào)用時,還是調(diào)用本身的代理類的,所以和平常調(diào)用是一樣的。其他注解如@Cache等也是如此,由于Spring的代理機制。在開發(fā)中最好把異步服務(wù)單獨抽出一個類來管理。

導致@Async異步方法失效的幾種情況:

調(diào)用同一個類下注有@Async異步方法:在Spring中像@Async,@Transactional,@Cache等注解本質(zhì)使用的是動態(tài)代理,Spring容器在初始化時,會將含有AOP注解的類對象替換為代理對象。注解失效的原因,就是因為調(diào)用方法的是對象本身而不是代理對象,因為沒有經(jīng)過Spring容器,解決方法也會沿著這個思路來解決。
調(diào)用static方法
調(diào)用private方法
解決方法
上面的情況2,3很好解決,僅考慮情況1。

將要異步執(zhí)行的方法單獨抽取成一個類,原理就是當你把執(zhí)行異步的方法單獨抽取成一個類的時候,這個類肯定是被Spring管理的,其他Spring組件需要調(diào)用時肯定會注入進去,這時候?qū)嶋H上注入進去的就是代理類。
其實注入對象都是從Spring容器中給當前Spring組件進行成員變量的賦值,由于某些類使用AOP注解,那么實際上在Spring容器中實際存在的是它的代理對象。那么就可以通過上下文獲取自己的代理對象調(diào)用異步方法。

@Controller
public class EmailController {
    @Autowired
    private ApplicationContext applicationContext;

    @RequestMapping(value = "/asyncCall", method = GET)
    @ResponseBody
    public void asyncCall () {
        try {
            // 調(diào)用同類下的異步方法是不起作用的
            // this.testAsyncTask();
            // 通過上下文獲取自己的代理對象調(diào)用異步方法
            EmailController controller = (EmailController)applicationContext.getBean(EmailController.class);
            controller.testAsyncTask();
        } catch (Exception e) {
        }
    }

    @Async
    public void testAsyncTask() throws InterruptedException {
        Thread.sleep(10000);
        log.info("異步任務(wù)執(zhí)行完成!");
    }
}

開啟cglib代理,手動獲取Spring代理類,從而調(diào)用同類下的異步方法。在啟動類上加上@EnableAspectJAutoProxy(exposeProxy = true)注解:

@Service
@Transactional(value = "transactionManager", readOnly = false, propagation = Propagation.REQUIRED, rollbackFor = Throwable.class)
public class EmailService {
    @Autowired
    private ApplicationContext applicationContext;

    @Async
    public void testSyncTask() throws InterruptedException {
        Thread.sleep(10000);
        log.info("異步任務(wù)執(zhí)行完成!");
    }

    public void asyncCallTwo() throws InterruptedException {
        //this.testSyncTask();
//        EmailService emailService = (EmailService)applicationContext.getBean(EmailService.class);
//        emailService.testSyncTask();
        boolean isAop = AopUtils.isAopProxy(EmailController.class);//是否是代理對象;
        boolean isCglib = AopUtils.isCglibProxy(EmailController.class);  //是否是CGLIB方式的代理對象;
        boolean isJdk = AopUtils.isJdkDynamicProxy(EmailController.class);  //是否是JDK動態(tài)代理方式的代理對象;
        EmailService emailService = (EmailService)applicationContext.getBean(EmailService.class);
        EmailService proxy = (EmailService) AopContext.currentProxy();
        log.info(emailService == proxy ? true : false);
        proxy.testSyncTask();
    }
}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容