WebSocket 在開發(fā)中遇到的情況很少,導(dǎo)致在使用的時(shí)候可能遇到很多的問題,比如它的重連機(jī)制、發(fā)送數(shù)據(jù)的統(tǒng)一、結(jié)合Service使用;下面的文章將使用OkHttp中的WebSocket以及Rxjava2結(jié)合Service實(shí)現(xiàn)斷開重連,發(fā)送和接收服務(wù)器的數(shù)據(jù)。
WebSocket
我們使用WebSocket的庫有很多,類似 AndroidAsync,作者對它的介紹是一個(gè)更加底層的異步網(wǎng)絡(luò)庫,使用這個(gè)庫也非常簡單,傳一個(gè)地址和協(xié)議再加一個(gè)回調(diào)接口,就能簡單的使用:
AsyncHttpClient.getDefaultInstance()
.websocket(get, "my-protocol", new WebSocketConnectCallback() {
@Override
public void onCompleted(Exception ex, WebSocket webSocket) {
if (ex != null) {
ex.printStackTrace();
return;
}
webSocket.send("a string");
webSocket.send(new byte[10]);
webSocket.setStringCallback(new StringCallback() {
public void onStringAvailable(String s) {
System.out.println("I got a string: " + s);
}
});
webSocket.setDataCallback(new DataCallback() {
public void onDataAvailable(DataEmitter emitter,
ByteBufferList byteBufferList) {
System.out.println("I got some bytes!");
// note that this data has been read
byteBufferList.recycle();
}
});
}
});
但是,我們看到這個(gè)庫卻是有非常多的類,通常的,我們的網(wǎng)絡(luò)請求庫都只有一個(gè),現(xiàn)在大部分Android開發(fā)都會(huì)使用類似OkHttp或者Volley來請求網(wǎng)絡(luò),如果再加一個(gè)庫的話,無疑增加了應(yīng)用的代碼量,這里我們使用OkHttp中內(nèi)置的WebSocket,基于它的一個(gè)封裝,我們也能寫一個(gè)輕量級的WebSocket應(yīng)用; 而OkHttp中的WebSocket使用也是封裝方便:
// 創(chuàng)建一個(gè)Request
Request request = new Request.Builder()
.url(socketUrl)
.build();
OkHttpClient client = new OkHttpClient();
// 使用OkHttpClient 來創(chuàng)建一個(gè)WebSocket
client.newWebSocket(request, new WebSocketListener() {
@Override
public void onOpen(WebSocket webSocket, Response response) {
super.onOpen(webSocket, response);
}
@Override
public void onMessage(WebSocket webSocket, String text) {
super.onMessage(webSocket, text);
}
@Override
public void onMessage(WebSocket webSocket, ByteString bytes) {
super.onMessage(webSocket, bytes);
}
@Override
public void onClosing(WebSocket webSocket, int code, String reason) {
super.onClosing(webSocket, code, reason);
}
@Override
public void onClosed(WebSocket webSocket, int code, String reason) {
super.onClosed(webSocket, code, reason);
}
@Override
public void onFailure(WebSocket webSocket, Throwable t,
@javax.annotation.Nullable Response response) {
super.onFailure(webSocket, t, response);
}
});
client.dispatcher().executorService().shutdown();
可以看到,先創(chuàng)建一個(gè)Request,我們在調(diào)用接口的時(shí)候,也是創(chuàng)建一個(gè)Request對象,然后使用OkHttpClient來創(chuàng)建一個(gè)WebSocket,回調(diào)到WebSocketListener監(jiān)聽接口之后,再做邏輯業(yè)務(wù)處理。(接口方法意思都很簡單,使用的時(shí)候再說明)
Rxjava2 使用
前面的博客都簡單的介紹了Rxjava2的使用,最近在項(xiàng)目中使用的也比較頻繁,后面再寫一些項(xiàng)目中Rxjava2的實(shí)戰(zhàn):
Rxjava2 學(xué)習(xí)創(chuàng)建型操作符
Rxjava2 學(xué)習(xí)變換操作符
Rxjava2 學(xué)習(xí)過濾操作符
項(xiàng)目開發(fā):
既然是使用到邏輯業(yè)務(wù)上的操作,和界面無關(guān),自然會(huì)想到Service; 將其直接封裝成Service,在使用的時(shí)候直接bindService或者startService會(huì)方便很多,不知不覺中將邏輯業(yè)務(wù)和頁面分隔開; 如果我們直接在頁面中使用也未嘗不可,但是這樣就不便復(fù)用了。
所以開始我們先創(chuàng)建一個(gè)Service:
public WebSocketService extends Service {
public static final String LOG_TAG = "WebSocketTest";
@Nullable
@Override
public IBinder onBind(Intent intent) {
Log.v(LOG_TAG, "----- onBind -----");
return new ServiceBinder();
}
public class ServiceBinder extends Binder {
public WebSocketService getService() {
return WebSocketService.this;
}
}
@Override
public void onCreate() {
super.onCreate();
Log.i(LOG_TAG, "----- onCreate -----");
}
@Override
public int onStartCommand(Intent intent, int flags, int startId) {
//Use this to force restart service
return START_STICKY;
}
@Override
public void onDestroy() {
super.onDestroy();
Log.i(LOG_TAG, "----- onDestroy -----");
}
}
Service常用的步驟,使用其中的方法,這里我們使用bindService回調(diào)一個(gè)ServiceConnect接口,因?yàn)槲覀冃枰褂玫竭@個(gè)WebSocketService實(shí)例;接下來,我們就需要在onCreate方法中做一些初始化的操作:
/**
* 初始化
*
* @param startReason
* @param isFirstConnect
*/
private void initSocketWrapper(String startReason, boolean isFirstConnect) {
// 拿到Reason,打印log
Observable.just(startReason)
.filter(new Predicate<String>() {
@Override
public boolean test(String s) throws Exception {
// 判斷當(dāng)前是否正在連接
if (isAttemptConnecting) {
Log.v(LOG_TAG, startReason + " : Should reconnect but"+
"already in process, skip.");
return Boolean.FALSE;
}
return Boolean.TRUE;
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
if ((mWebSocket == null) // 如果已經(jīng)為空
&& (!isFirstConnect) // 不是第一次連接
&& (!isAttemptConnecting)) { // 當(dāng)前沒有在嘗試連接
showUiWebSocketStatus("與服務(wù)器失去連接?。?!");
}
}
})
.observeOn(Schedulers.io())
.subscribe(s -> initSocket());
}
上面寫清楚了注釋,這個(gè)方法主要是初始化,拿到當(dāng)前的原因和是否是第一次連接,還有一個(gè)全局變量 isAttemptConnecting 來判斷當(dāng)前WebSocket是否在連接中,然后在這之間先判斷 如果websocket為空并且不是第一次連接,而且還沒有嘗試連接,則toast提示用戶斷開連接?。?!
接下來就是使用OkHttp中WebSocket創(chuàng)建連接了:
/**
* 初始化WebSocket
*/
private void initSocket() {
// ... 省略一些狀態(tài)切換代碼
...
// 開始初始化
Observable.create(new ObservableOnSubscribe<WebSocket>() {
@Override
public void subscribe(ObservableEmitter<WebSocket> emitter)
throws Exception {
//TODO 這里可以進(jìn)行登錄業(yè)務(wù)判斷
Request request = new Request.Builder()
.url(socketUrl)
.build();
OkHttpClient client = new OkHttpClient();
client.newWebSocket(request, new WebSocketListener() {
@Override
public void onOpen(WebSocket webSocket, Response response) {
super.onOpen(webSocket, response);
isAttemptConnecting = false;
connectionAttemptCount = 0;
// 連接成功之后
mWebSocket = webSocket;
dispatchStringMessage("連接成功?。。?);
emitter.onNext(mWebSocket);
emitter.onComplete();
}
@Override
public void onMessage(WebSocket webSocket, String text) {
super.onMessage(webSocket, text);
dispatchStringMessage(text);
}
@Override
public void onMessage(WebSocket webSocket, ByteString bytes) {
super.onMessage(webSocket, bytes);
}
@Override
public void onClosing(WebSocket webSocket, int code, String reason) {
super.onClosing(webSocket, code, reason);
}
@Override
public void onClosed(WebSocket webSocket, int code, String reason) {
super.onClosed(webSocket, code, reason);
Log.i(LOG_TAG, "ClosedCallback: WebSocket closed.");
// 等待自檢重啟,或者自然關(guān)閉
if ((!preparedShutdown) && (shouldAutoReconnect)) {
initSocketWrapper("onClose");
}
}
@Override
public void onFailure(WebSocket webSocket, Throwable t,
@Nullable Response response) {
super.onFailure(webSocket, t, response);
dispatchStringMessage("連接失敗?。?!");
emitter.onError(t != null ? t
: new ConnectException("Cannot connect we service!!!"));
}
});
client.dispatcher().executorService().shutdown();
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<WebSocket>() {
@Override
public void accept(WebSocket webSocket) throws Exception {
if (pongService == null) {
startPongDaemonService();
}
}
},
new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
throwable.printStackTrace();
// 判斷是否需要執(zhí)行診斷服務(wù)
if (connectionAttemptCount >= ATTEMPT_TOLERANCE) {
// 強(qiáng)制開始診斷服務(wù)
startService(new Intent(WebSocketService.this,
NetworkDiagnosisService.class));
// 重置標(biāo)記
connectionAttemptCount = 0;
}
}
}
使用create操作符創(chuàng)建一個(gè)被觀察者對象發(fā)射器,在其中使用OkHttp的創(chuàng)建WebSocket方式創(chuàng)建WebSocket,然后根據(jù)連接的結(jié)果進(jìn)行emitter發(fā)射 onNext()、onError()、onComplete() 方法,連接成功之后,開始發(fā)送 自檢服務(wù) Pong; 連接失敗之后,開始檢測網(wǎng)絡(luò)是否有連接:NetWorkDiagnosisService 網(wǎng)絡(luò)診斷服務(wù)。
發(fā)送Pong守護(hù)進(jìn)程,先創(chuàng)建一個(gè)單線程線程池,然后發(fā)送消息:
/**
* 給服務(wù)器發(fā)送Pong自檢
*/
private void startPongDaemonService() {
pongService = Executors.newSingleThreadScheduledExecutor();
pongService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
if (mWebSocket != null) {
sendRequest(WsObjectPool.newPongRequest());
}
}
}, 10, 10, TimeUnit.SECONDS);
Log.i(LOG_TAG, "Pong service has been scheduled at " + 10 + " seconds delay.");
}
延遲十秒執(zhí)行,使用一個(gè)WebSocketObjectPool對象池,為了方便取出對象數(shù)據(jù)。
整個(gè)WebSocket初始化就在上面,流程還是比較簡單的,接下來就是從服務(wù)器拿到數(shù)據(jù)之后分發(fā)數(shù)據(jù)了, 因?yàn)槲疫@里測試只用到String數(shù)據(jù),如果需要用到Json的數(shù)據(jù),則分發(fā)json數(shù)據(jù)即可:
/**
* 方法字符串
*
* @param message
*/
private void dispatchStringMessage(String message) {
Observable.just(message)
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
WsListener<String> listener =
(WsListener<String>)activeListener.get(
SocketConstants.ResponseType.RESPONSE_STRING_MESSAGE);
Log.d(LOG_TAG, "Msg entity: " + s + ".");
if (listener != null) {
listener.handleData(s);
}
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
拿到message String數(shù)據(jù)之后,或者到外部監(jiān)聽器,因?yàn)槭鞘褂玫絊tring字符串類型,所以這里直接獲取到一個(gè)監(jiān)聽器,然后分發(fā)給方法。
如果是方法json的話,就去GitHub下載代碼查看詳細(xì)代碼:代碼
自檢服務(wù)
連接成功之后,我們需要自檢,因?yàn)榭赡茉谶B接過程中出現(xiàn)斷連的情況,網(wǎng)絡(luò)不穩(wěn)定情況,所以需要使用循環(huán)的自檢:
/**
* 啟動(dòng)自檢服務(wù),按照周期執(zhí)行
*/
private void startSelfCheckService() {
// 自檢服務(wù)器打開
mSelfCheckDispose = Observable
.interval(10, 10, TimeUnit.SECONDS)
.filter(new Predicate<Long>() {
@Override
public boolean test(Long aLong) throws Exception {
if (!shouldAutoReconnect) {
Log.i(LOG_TAG, "Auto reconnect has been disabled,"
"maybe kicked?");
}
return shouldAutoReconnect;
}
})
.map(new Function<Long, Boolean>() {
@Override
public Boolean apply(Long aLong) throws Exception {
return checkSocketAvailable();
}
})
.subscribeOn(Schedulers.computation())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Log.i(LOG_TAG, "Self check task has been scheduled per "
+ 10 + " seconds.");
shouldAutoReconnect = true;
Log.i(LOG_TAG, "Auto reconnect feature has been enabled.");
}
})
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean webSocketAlive)
throws Exception {
if (webSocketAlive) {
Log.v(LOG_TAG, "WebSocket self check: is alive.");
return;
}
// 自檢服務(wù)器打開
initSocketWrapper("SelfCheckService");
}
},
new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e(LOG_TAG, "Error while executing self check!"
+ throwable);
}
});
}
這里我沒使用lambda表達(dá)式,為了讓方法清楚,所以代碼比較長,但是結(jié)構(gòu)比較清晰;
首先使用 interval 創(chuàng)建一個(gè)延時(shí)的周期被觀察者,然后根據(jù)當(dāng)前設(shè)置是否需要自動(dòng)連接來過濾是否進(jìn)行下面的操作,接下來判斷當(dāng)前的連接是否存在和連接,接下來判斷是否存活,如果斷連則調(diào)用初始化的方法,在上面解釋了什么情況下會(huì)調(diào)用自檢服務(wù)。
總結(jié)
整篇文章寫了OkHttp的WebSocket使用,斷連重試機(jī)制,Service使用等,主要的一些細(xì)節(jié)在文章中可能沒體現(xiàn)出來,如果有需要?jiǎng)t下載源碼自己修改運(yùn)行。 最后貼上Github地址,喜歡的話給個(gè)start! ??