- AsyncContext介紹
1.1 概念
1.2 疑問 - 項目實戰(zhàn)
2.1 API方法
2.2 項目實戰(zhàn)—實現(xiàn)配置更新 - nacos實現(xiàn)長輪詢
1. AsyncContext介紹
有這么一個場景:客戶端輪詢的去服務(wù)器讀取更新的配置信息,但讀取的頻率高就會影響客戶端和服務(wù)器的性能。那么如何優(yōu)化這種場景呢?
1.1 概念
SpringBoot集成了servlet一系列的操作,故servlet提供的新特性,在SpringBoot環(huán)境中可以正常使用。
而servlet3.0提供了異步處理的特性【AsyncContext】:使Servlet線程不再一直阻塞,直到業(yè)務(wù)處理完成才能輸出響應(yīng),最后結(jié)束Servlet線程。而是在接受到請求之后,Servlet線程可以將耗時的操作委派給一個異步線程執(zhí)行,而Servlet線程在不生成響應(yīng)的情況下返回容器。這種方案可以大大減少服務(wù)器資源占用,提高并發(fā)處理速度。
1.2 疑問
servlet線程返回容器,異步線程處理完耗時的操作后,還可以設(shè)置響應(yīng)對象進(jìn)行返回嗎?
可以!服務(wù)端的servlet線程返回容器,客戶端依舊和服務(wù)端保持著http連接,通過AysncContext依舊可以返回響應(yīng)數(shù)據(jù)

2. 項目實戰(zhàn)
2.1 API方法
在Servlet3.0中,ServletRequest提供了startAysnc()方法。
/**
* @see AsyncContext#dispatch()
* @since Servlet 3.0
*/
public AsyncContext startAsync() throws IllegalStateException;
/**
*
* @since Servlet 3.0
*/
public AsyncContext startAsync(ServletRequest servletRequest,
ServletResponse servletResponse)
throws IllegalStateException;
2.2 項目實戰(zhàn)—實現(xiàn)配置更新
http層面實現(xiàn)的推拉結(jié)合的配置更新。
在controller層獲取到HttpServletRequest和HttpServletResponse對象。
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
@RestController
@RequestMapping
public class LongPollingController {
//定時任務(wù),阻塞的最大超時時間。
private ScheduledExecutorService timeoutChecker = new ScheduledThreadPoolExecutor(1, threadFactory);
private static final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("longPolling-timeout-checker-%d")
.build();
private static final Multimap<String, AsyncTask> dataIdContext = Multimaps.synchronizedSetMultimap(HashMultimap.create());
@GetMapping(path = "getConf")
@ResponseBody
public String getThreadPoolConf(HttpServletRequest request, HttpServletResponse response)
String serviceName = request.getParameter("serviceName");
String timeOut =request.getParameter("timeOut");
if (timeOut == null) {
//如果沒設(shè)置過期時間則默認(rèn) 29秒超時
timeOut = "29";
}
// 開啟異步
AsyncContext asyncContext = request.startAsync(request, response);
AsyncTask asyncTask = new AsyncTask(asyncContext, true);
// 維護(hù) serviceName 和異步請求上下文的關(guān)聯(lián)
dataIdContext.put(serviceName, asyncTask);
// 啟動定時器,30s 后寫入 304 響應(yīng)
timeoutChecker.schedule(() -> {
//觸發(fā)定時后,判斷任務(wù)是否被執(zhí)行,即isTimeout為true(沒有被執(zhí)行)
//則返回客戶端304的狀態(tài)碼-即無修改。
if (asyncTask.isTimeout()) {
//清除緩存中的任務(wù)
if (dataIdContext.remove(serviceName, asyncTask)) {
response.setStatus(HttpServletResponse.SC_NOT_MODIFIED);
asyncTask.getAsyncContext().complete();
}
}
}, Integer.parseInt(timeOut), TimeUnit.SECONDS);
}
//當(dāng)配置被修改,觸發(fā)事件后,會調(diào)用該方法。
public void publishConfig(String serviceName, List<ThreadPoolProperties> result) {
if (StringUtil.isEmpty(serviceName) && CollectionUtils.isEmpty(result)) {
log.error("publishConfig:serviceName:result all is null");
return;
}
if (StringUtil.isNotEmpty(serviceName) && CollectionUtils.isEmpty(result)) {
log.error("publishConfig result is null");
//去持久化的存儲源(es/mysql...)讀取配置信息
result = threadPoolService.getConfigByServiceName(serviceName);
if (result == null) {
log.error("publishConfig:serviceName:result all is null but not find threadPoolProperties serviceName:{}", serviceName);
return;
}
}
//讀取到的配置信息,json化
String configInfo = JSONArray.toJSONString(result);
//移除AsyncTask的緩存
Collection<AsyncTask> asyncTasks = dataIdContext.removeAll(serviceName);
if (CollectionUtils.isEmpty(asyncTasks)) return;
//為每一個AsyncContext設(shè)置200的狀態(tài)碼以及響應(yīng)數(shù)據(jù)。
for (AsyncTask asyncTask : asyncTasks) {
//表明未超時,已經(jīng)進(jìn)行處理了。
asyncTask.setTimeout(false);
HttpServletResponse response = (HttpServletResponse)asyncTask.getAsyncContext().getResponse();
response.setStatus(HttpServletResponse.SC_OK);
response.getWriter().println(configInfo);
asyncTask.getAsyncContext().complete();
}
}
//自定義任務(wù)對象
@Data
private static class AsyncTask {
// 長輪詢請求的上下文,包含請求和響應(yīng)體
private AsyncContext asyncContext;
// 超時標(biāo)記
private boolean timeout;
public AsyncTask(AsyncContext asyncContext, boolean timeout) {
this.asyncContext = asyncContext;
this.timeout = timeout;
}
}
}
上面代碼實現(xiàn)的邏輯:客戶端拉取服務(wù)端的配置,服務(wù)端開啟長輪詢,使用AysncContext進(jìn)行異步阻塞。當(dāng)服務(wù)端有配置被改變時,(例如:由redis的訂閱發(fā)布模式)來進(jìn)行通知,最終執(zhí)行publishConfig方法,將服務(wù)器的配置推送給客戶端,其本質(zhì)實現(xiàn)的是http推拉結(jié)合的配置通知。
- startAsync()會直接利用原有的請求與響應(yīng)對象來創(chuàng)建AsyncContext
- startAsync(ServletRequest request,ServletResponse response)可以傳入自行創(chuàng)建的請求、響應(yīng)封裝對象;
注意事項:
- 可以通過AsyncContext的getRequest()、getResponse()方法取得請求、響應(yīng)對象,此次對客戶端的響應(yīng)將暫緩至調(diào)用AsyncContext的complete()或dispatch()方法為止,前者表示響應(yīng)完成,后者表示將調(diào)派指定的URL進(jìn)行響應(yīng)。
- AysncContext.setTimeout()的超時時間不準(zhǔn),所以需要自己控制。
- 客戶端的http并未斷開連接。
- 第一次客戶端請求,可以將全量的數(shù)據(jù)直接發(fā)生(普通連接),后續(xù)的請求,可以使用AysncContext來實現(xiàn)長輪詢。
3. nacos實現(xiàn)長輪詢
項目源碼:nacos也是借助AysncContext來實現(xiàn)配置的更新。
@Service
public class LongPollingService extends AbstractEventListener {
private static final int FIXED_POLLING_INTERVAL_MS = 10000;
private static final int SAMPLE_PERIOD = 100;
private static final int SAMPLE_TIMES = 3;
private static final String TRUE_STR = "true";
private Map<String, Long> retainIps = new ConcurrentHashMap<String, Long>();
private static boolean isFixedPolling() {
return SwitchService.getSwitchBoolean(SwitchService.FIXED_POLLING, false);
}
//......
static public boolean isSupportLongPolling(HttpServletRequest req) {
return null != req.getHeader(LONG_POLLING_HEADER);
}
public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
int probeRequestSize) {
String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
String tag = req.getHeader("Vipserver-Tag");
int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
/**
* 提前500ms返回響應(yīng),為避免客戶端超時 @qiaoyi.dingqy 2013.10.22改動 add delay time for LoadBalance
*/
long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
if (isFixedPolling()) {
timeout = Math.max(10000, getFixedPollingInterval());
// do nothing but set fix polling timeout
} else {
long start = System.currentTimeMillis();
List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
if (changedGroups.size() > 0) {
generateResponse(req, rsp, changedGroups);
LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}",
System.currentTimeMillis() - start, "instant", RequestUtil.getRemoteIp(req), "polling",
clientMd5Map.size(), probeRequestSize, changedGroups.size());
return;
} else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",
RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
changedGroups.size());
return;
}
}
String ip = RequestUtil.getRemoteIp(req);
// 一定要由HTTP線程調(diào)用,否則離開后容器會立即發(fā)送響應(yīng)
final AsyncContext asyncContext = req.startAsync();
// AsyncContext.setTimeout()的超時時間不準(zhǔn),所以只能自己控制
asyncContext.setTimeout(0L);
scheduler.execute(
new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
}
//......
}
分析:LongPollingService的isSupportLongPolling是通過判斷request是否有LONG_POLLING_HEADER的header來實現(xiàn)的;addLongPollingClient方法主要是創(chuàng)建ClientLongPolling,然后提交到scheduler定時線程池執(zhí)行。