(nacos源碼系列)springBoot下實現(xiàn)http請求的異步長輪詢—AsyncContext

  1. AsyncContext介紹
    1.1 概念
    1.2 疑問
  2. 項目實戰(zhàn)
    2.1 API方法
    2.2 項目實戰(zhàn)—實現(xiàn)配置更新
  3. nacos實現(xiàn)長輪詢

1. AsyncContext介紹

有這么一個場景:客戶端輪詢的去服務(wù)器讀取更新的配置信息,但讀取的頻率高就會影響客戶端和服務(wù)器的性能。那么如何優(yōu)化這種場景呢?

1.1 概念

SpringBoot集成了servlet一系列的操作,故servlet提供的新特性,在SpringBoot環(huán)境中可以正常使用。

而servlet3.0提供了異步處理的特性【AsyncContext】:使Servlet線程不再一直阻塞,直到業(yè)務(wù)處理完成才能輸出響應(yīng),最后結(jié)束Servlet線程。而是在接受到請求之后,Servlet線程可以將耗時的操作委派給一個異步線程執(zhí)行,而Servlet線程在不生成響應(yīng)的情況下返回容器。這種方案可以大大減少服務(wù)器資源占用,提高并發(fā)處理速度。

1.2 疑問

servlet線程返回容器,異步線程處理完耗時的操作后,還可以設(shè)置響應(yīng)對象進(jìn)行返回嗎?

可以!服務(wù)端的servlet線程返回容器,客戶端依舊和服務(wù)端保持著http連接,通過AysncContext依舊可以返回響應(yīng)數(shù)據(jù)

image.png

2. 項目實戰(zhàn)

2.1 API方法

在Servlet3.0中,ServletRequest提供了startAysnc()方法。

   /**
     * @see AsyncContext#dispatch()
     * @since Servlet 3.0
     */
    public AsyncContext startAsync() throws IllegalStateException;
 
    /**
     *
     * @since Servlet 3.0
     */
    public AsyncContext startAsync(ServletRequest servletRequest,
                                   ServletResponse servletResponse)
            throws IllegalStateException;

2.2 項目實戰(zhàn)—實現(xiàn)配置更新

http層面實現(xiàn)的推拉結(jié)合的配置更新。

在controller層獲取到HttpServletRequest和HttpServletResponse對象。

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;

@RestController
@RequestMapping
public class LongPollingController {
    //定時任務(wù),阻塞的最大超時時間。
    private ScheduledExecutorService timeoutChecker = new ScheduledThreadPoolExecutor(1, threadFactory);

    private static final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("longPolling-timeout-checker-%d")
            .build();
    private static final Multimap<String, AsyncTask> dataIdContext = Multimaps.synchronizedSetMultimap(HashMultimap.create());


    @GetMapping(path = "getConf")
    @ResponseBody
    public String getThreadPoolConf(HttpServletRequest request, HttpServletResponse response)
        String serviceName = request.getParameter("serviceName");
        String timeOut =request.getParameter("timeOut");
        if (timeOut == null) {
            //如果沒設(shè)置過期時間則默認(rèn) 29秒超時
            timeOut = "29";
        }
        // 開啟異步
        AsyncContext asyncContext = request.startAsync(request, response);
        AsyncTask asyncTask = new AsyncTask(asyncContext, true);

        // 維護(hù) serviceName 和異步請求上下文的關(guān)聯(lián)
        dataIdContext.put(serviceName, asyncTask);

        // 啟動定時器,30s 后寫入 304 響應(yīng)
        timeoutChecker.schedule(() -> {
            //觸發(fā)定時后,判斷任務(wù)是否被執(zhí)行,即isTimeout為true(沒有被執(zhí)行)
           //則返回客戶端304的狀態(tài)碼-即無修改。
            if (asyncTask.isTimeout()) {
                //清除緩存中的任務(wù)
                if (dataIdContext.remove(serviceName, asyncTask)) {
                    response.setStatus(HttpServletResponse.SC_NOT_MODIFIED);
                    asyncTask.getAsyncContext().complete();
                }
            }
        }, Integer.parseInt(timeOut), TimeUnit.SECONDS);
    }


    //當(dāng)配置被修改,觸發(fā)事件后,會調(diào)用該方法。
    public void publishConfig(String serviceName,  List<ThreadPoolProperties> result) {
        if (StringUtil.isEmpty(serviceName) && CollectionUtils.isEmpty(result)) {
            log.error("publishConfig:serviceName:result all is null");
            return;
        }
        if (StringUtil.isNotEmpty(serviceName) && CollectionUtils.isEmpty(result)) {
            log.error("publishConfig result is null");
            //去持久化的存儲源(es/mysql...)讀取配置信息
            result = threadPoolService.getConfigByServiceName(serviceName);
            if (result == null) {
                log.error("publishConfig:serviceName:result all is null but not find threadPoolProperties serviceName:{}", serviceName);
                return;
            }
        }
        //讀取到的配置信息,json化
        String configInfo = JSONArray.toJSONString(result);
        //移除AsyncTask的緩存
        Collection<AsyncTask> asyncTasks = dataIdContext.removeAll(serviceName);
        if (CollectionUtils.isEmpty(asyncTasks)) return;
        //為每一個AsyncContext設(shè)置200的狀態(tài)碼以及響應(yīng)數(shù)據(jù)。
        for (AsyncTask asyncTask : asyncTasks) {
            //表明未超時,已經(jīng)進(jìn)行處理了。
            asyncTask.setTimeout(false);
            HttpServletResponse response = (HttpServletResponse)asyncTask.getAsyncContext().getResponse();
            response.setStatus(HttpServletResponse.SC_OK);
            response.getWriter().println(configInfo);
            asyncTask.getAsyncContext().complete();
        }
    }

    //自定義任務(wù)對象
    @Data
    private static class AsyncTask {
        // 長輪詢請求的上下文,包含請求和響應(yīng)體
        private AsyncContext asyncContext;
        // 超時標(biāo)記
        private boolean timeout;

        public AsyncTask(AsyncContext asyncContext, boolean timeout) {
            this.asyncContext = asyncContext;
            this.timeout = timeout;
        }
    }
}

上面代碼實現(xiàn)的邏輯:客戶端拉取服務(wù)端的配置,服務(wù)端開啟長輪詢,使用AysncContext進(jìn)行異步阻塞。當(dāng)服務(wù)端有配置被改變時,(例如:由redis的訂閱發(fā)布模式)來進(jìn)行通知,最終執(zhí)行publishConfig方法,將服務(wù)器的配置推送給客戶端,其本質(zhì)實現(xiàn)的是http推拉結(jié)合的配置通知。


  • startAsync()會直接利用原有的請求與響應(yīng)對象來創(chuàng)建AsyncContext
  • startAsync(ServletRequest request,ServletResponse response)可以傳入自行創(chuàng)建的請求、響應(yīng)封裝對象;

注意事項:

  1. 可以通過AsyncContext的getRequest()、getResponse()方法取得請求、響應(yīng)對象,此次對客戶端的響應(yīng)將暫緩至調(diào)用AsyncContext的complete()或dispatch()方法為止,前者表示響應(yīng)完成,后者表示將調(diào)派指定的URL進(jìn)行響應(yīng)。
  2. AysncContext.setTimeout()的超時時間不準(zhǔn),所以需要自己控制。
  3. 客戶端的http并未斷開連接。
  4. 第一次客戶端請求,可以將全量的數(shù)據(jù)直接發(fā)生(普通連接),后續(xù)的請求,可以使用AysncContext來實現(xiàn)長輪詢。

3. nacos實現(xiàn)長輪詢

項目源碼:nacos也是借助AysncContext來實現(xiàn)配置的更新。

@Service
public class LongPollingService extends AbstractEventListener {

    private static final int FIXED_POLLING_INTERVAL_MS = 10000;

    private static final int SAMPLE_PERIOD = 100;

    private static final int SAMPLE_TIMES = 3;

    private static final String TRUE_STR = "true";

    private Map<String, Long> retainIps = new ConcurrentHashMap<String, Long>();

    private static boolean isFixedPolling() {
        return SwitchService.getSwitchBoolean(SwitchService.FIXED_POLLING, false);
    }

    //......

    static public boolean isSupportLongPolling(HttpServletRequest req) {
        return null != req.getHeader(LONG_POLLING_HEADER);
    }

    public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
                                     int probeRequestSize) {

        String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
        String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
        String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
        String tag = req.getHeader("Vipserver-Tag");
        int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
        /**
         * 提前500ms返回響應(yīng),為避免客戶端超時 @qiaoyi.dingqy 2013.10.22改動  add delay time for LoadBalance
         */
        long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
        if (isFixedPolling()) {
            timeout = Math.max(10000, getFixedPollingInterval());
            // do nothing but set fix polling timeout
        } else {
            long start = System.currentTimeMillis();
            List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
            if (changedGroups.size() > 0) {
                generateResponse(req, rsp, changedGroups);
                LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}",
                    System.currentTimeMillis() - start, "instant", RequestUtil.getRemoteIp(req), "polling",
                    clientMd5Map.size(), probeRequestSize, changedGroups.size());
                return;
            } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
                LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",
                    RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
                    changedGroups.size());
                return;
            }
        }
        String ip = RequestUtil.getRemoteIp(req);
        // 一定要由HTTP線程調(diào)用,否則離開后容器會立即發(fā)送響應(yīng)
        final AsyncContext asyncContext = req.startAsync();
        // AsyncContext.setTimeout()的超時時間不準(zhǔn),所以只能自己控制
        asyncContext.setTimeout(0L);

        scheduler.execute(
            new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
    }

    //......
}

分析:LongPollingService的isSupportLongPolling是通過判斷request是否有LONG_POLLING_HEADER的header來實現(xiàn)的;addLongPollingClient方法主要是創(chuàng)建ClientLongPolling,然后提交到scheduler定時線程池執(zhí)行。

4. 文章參考

AsyncContext異步請求的用法

聊聊nacos config的doPollingConfig

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容