用過微信網(wǎng)頁版的人應(yīng)該都清楚網(wǎng)頁登陸的流程,大致描述一下這個過程:
- 打開網(wǎng)頁版登陸鏈接
- 頁面會顯示一個二維碼
- 用微信客戶端掃描二維碼,讓用戶確認(rèn)登陸網(wǎng)頁版
- 如果確認(rèn)登陸,網(wǎng)頁版會自動進入聊天界面。
這個過程的交互方式和一般的WEB應(yīng)用不太一樣,步驟4網(wǎng)頁自動跳轉(zhuǎn),明顯是由服務(wù)端主動推送了內(nèi)容給網(wǎng)頁端,網(wǎng)頁端收到跳轉(zhuǎn)確認(rèn)后才觸發(fā)的,這里就引出了今天要討論的問題:服務(wù)端推送技術(shù)。服務(wù)端推送又稱為Comet,服務(wù)端異步處理等。很早以前就出現(xiàn)了,但一直沒有一個統(tǒng)一的標(biāo)準(zhǔn),存在著不少Comet技術(shù)框架,各個Web容器也各自實現(xiàn)了自己的Comet支持。最近公司的產(chǎn)品也出現(xiàn)了和微信網(wǎng)頁版登陸類似的場景,需要用到Comet技術(shù),我簡單的研究了下,寫下來記錄一下。
針對Comet技術(shù)的選擇性蠻多,我匆匆看了一下,就有這么3個方案:
Tomcat 內(nèi)置支持,需要實現(xiàn)CometProcessor接口。但是應(yīng)用就依賴Tomcat容器了。
Servlet3 天然支持,Servlet3提供一套完整的異步處理API,包括AsyncContext,AsyncLiseter,AsyncEvent. 要求Tomcat7.0++。
SpringMVC3.2 在Servlet3的基礎(chǔ)上做了進一步的封裝,編碼更為簡單,提供Callable,WebAsyncTask,DeferredResult三種方式進行異步編程支持,非常方便。
基于Tomcat的CometProcessor依賴性過大,我基本上不予考慮了。因為時間還算充裕,所以我分別針對Servlet3 和SpringMVC3.2 都做了嘗試,其實過程都比較簡單,關(guān)鍵是要理解場景。我來介紹下我們產(chǎn)品的實際場景吧,我們要實現(xiàn)的一個功能是掃描動態(tài)二維碼關(guān)注微信公眾賬號?;玖鞒淌沁@樣的:
- 客戶端調(diào)用服務(wù)端接口獲取動態(tài)二維碼以及二維碼內(nèi)容中內(nèi)置的ID。(這個時候在客戶端能看到一個二維碼了,等待用戶掃描)
- 客戶端馬上調(diào)用服務(wù)端的一個長連接接口,與服務(wù)端建立長連接,等待服務(wù)端通知。(這個過程是在后臺發(fā)生的,用戶無法感知)
- 用戶拿出微信掃描二維碼,就會有一個掃描事件通知到服務(wù)端的掃描接口。(這個時候服務(wù)端接收到掃描動作,完成自己的業(yè)務(wù)操作以后,通知長連接接口,用戶已經(jīng)掃描了,可以返回了)。
這個流程里面有這么幾個地方是需要能解決的:
- 步驟2里面要求客戶端--服務(wù)端建立長連接,不會立即返回,客戶端一直在等待狀態(tài)。(Servlet3 的API可以支持,需要把Timeout時間設(shè)置長一點,一般是60S夠了)
- 步驟3中 掃描接口要通知長連接接口,如何做到? 必須存在一個公共的容器,容器里面存著上下文信息,掃描接口把執(zhí)行完畢的上下文告知長連接接口就可以了。
所以,實現(xiàn)代碼如下:
配置部分
web.xml 啟用Servlet3 的命名空間
<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://java.sun.com/xml/ns/javaee"
xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
id="WebApp_ID" version="3.0">
</web-app>
長連接Servlet要開啟異步支持:
@WebServlet(value = "/scan/*",asyncSupported = true)
Tomcat server.xml要開啟NIO模式
<Connector port="8080" protocol="org.apache.coyote.http11.Http11NioProtocol"
connectionTimeout="20000" asyncTimeout="150000" URIEncoding="utf-8" redirectPort="8443" />
長連接Servlet實現(xiàn)
@WebServlet(value = "/scan/*",asyncSupported = true)
public class ScanServlet extends HttpServlet {
// private ScanRetain retain;
private Logger logger = Logger.getLogger(getClass());
@Override
public void init() throws ServletException {
}
@Override
public void destroy() {
ScanRetain.MAP.clear();
}
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
doPost(req, resp);
}
@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
logger.debug(">>>>>>>>>>>>>>>>>開始訪問長連接Servlet.....");
String pathInfo = req.getPathInfo();
String key = null;
if (pathInfo != null) {
int i = pathInfo.lastIndexOf('/');
if (i >= 0) {
key = pathInfo.substring(i + 1);
}
}
if (key == null) {
PrintWriter writer = resp.getWriter();
writer.write("error:not found scan key");
writer.flush();
return;
}
req.startAsync(req, resp);
if (req.isAsyncStarted()) {
final AsyncContext asyncContext = req.getAsyncContext();
final String theKey = key;
asyncContext.setTimeout(60 * 1000L);
asyncContext.addListener(new AsyncListener() {
@Override
public void onComplete(AsyncEvent asyncEvent) throws IOException {
ScanRetain.MAP.remove(theKey);
}
@Override
public void onTimeout(AsyncEvent asyncEvent) throws IOException {
ScanRetain.MAP.remove(theKey);
}
@Override
public void onError(AsyncEvent asyncEvent) throws IOException {
ScanRetain.MAP.remove(theKey);
}
@Override
public void onStartAsync(AsyncEvent asyncEvent) throws IOException {
}
});
logger.debug(">>>>>>>>>>>>>>>>>將長連接上下文對象加入隊列等待處理.........");
ScanRetain.MAP.put(theKey, asyncContext);
}
}
}
公共Context容器存放類以及提供給掃描后對長連接響應(yīng)處理的邏輯
public class ScanRetain {
// 公共上下文容器
public static final ConcurrentHashMap<String, AsyncContext> MAP = new ConcurrentHashMap<String, AsyncContext>();
private Logger logger = Logger.getLogger(getClass());
public void doReturn(String key){
logger.debug(">>>>>>>>>>>>>>>>>長連接正在響應(yīng).....");
AsyncContext asyncContext = MAP.get(key);
if (asyncContext == null) {
return;
}
HttpServletResponse res = (HttpServletResponse) asyncContext.getResponse();
DBObject data = new BasicDBObject("result",1)
.append("info","ok")
.append("now",System.currentTimeMillis());
String str = JSON.serialize(data);
OutputStream os = null;
try {
os = res.getOutputStream();
os.write(str.getBytes("utf-8"));
logger.debug(">>>>>>>>>>>>>>>>>長連接響應(yīng)完畢.....");
os.flush();
asyncContext.setTimeout(100L);// 一定要加這一句才會及時返回
} catch (IOException e) {
e.printStackTrace();
}
}
}
掃描事件觸發(fā)長連接響應(yīng)的邏輯
Long senceId = 0L;
if (qrSenceId != null) {
senceId = Long.parseLong(qrSenceId);
}
scanRetain.doReturn(senceId + "");
SpringMVC3.2 的實現(xiàn)我也嘗試了一下:
長連接接口:
// 上下文容器
public static final ConcurrentHashMap<String, DeferredResult<String>> MAP = new ConcurrentHashMap<String, DeferredResult<String>>();
@RequestMapping("doScan/{key}")
@ResponseBody
public DeferredResult<String> doScan(@PathVariable("key") String key) {
DeferredResult<String> result = new DeferredResult<String>();
MAP.put(key, result);
return result;
}
通知長連接響應(yīng)客戶端的測試代碼:
@RequestMapping(value="/newScan/{key}",produces = "text/plain;charset=utf-8;")
@ResponseBody
public String newScan(@PathVariable("key") String key,
HttpServletRequest req, HttpServletResponse res) {
DeferredResult<String> data = Scans.MAP.get(key);
if(data!=null){
data.setResult("this is result:"+System.currentTimeMillis());
Scans.MAP.remove(key);
}
return "new scan test finished :"+key+"now is :"+System.currentTimeMillis();
}
Spring的代碼實現(xiàn)簡單很多,但是也不那么直觀,不利于理解。
同時,它還提供另外兩種異步處理的方式,只是不適于這個場景,這里也羅列一下。
Callable:
@ResponseBody
@RequestMapping("call")
public Callable<String> call(HttpServletRequest req, HttpServletResponse res) throws Exception {
return new Callable<String>() {
@Override
public String call() throws Exception {
TimeUnit.SECONDS.sleep(5);
return "hello,callable";
}
};
}
WebAsyncTask:
@ResponseBody
@RequestMapping("async")
public WebAsyncTask<String> async(HttpServletRequest req, HttpServletResponse res) throws Exception {
Callable<String> callable = new Callable<String>() {
@Override
public String call() throws Exception {
TimeUnit.SECONDS.sleep(5);
return "hello,WebAsyncTask";
}
};
return new WebAsyncTask<String>(1000*60L,callable);
}
上面兩種方式也是用于異步操作的,它們比較適用于一些比較耗時的操作(如大數(shù)據(jù)計算,文件處理),它們的響應(yīng)一般不存在其他的觸發(fā)點,就是取決于Callable內(nèi)部代碼塊的執(zhí)行結(jié)束。
綜上,我們大致可以總結(jié)出異步處理的兩種應(yīng)用場景:
1. 多點操作,單點的響應(yīng)往往依賴于其他點的觸發(fā),最典型的就是微信掃描登錄了。這個基本的編碼思路應(yīng)該是這樣的:
定義一個上下文存儲容器,容器要支持并發(fā),最好選用Concurrent類型。
開發(fā)長連接接口,客戶端請求連接后,將上下文加入存儲容器。
開發(fā)響應(yīng)的觸發(fā)邏輯代碼段。
觸發(fā)業(yè)務(wù)完成以后,調(diào)用響應(yīng)觸發(fā)邏輯。