mqtt源碼分析
為什么要分析源碼呢,一是為了需求擴(kuò)展,二也是為了學(xué)習(xí)優(yōu)秀的源代碼。
連接
逐層跟代碼,到ClientComms類(lèi),該類(lèi)用于與server通訊,發(fā)送和接收mqtt協(xié)議消息。
先看connect代碼:
public void connect(MqttConnectOptions options, MqttToken token) throws MqttException {
final String methodName = "connect";
synchronized (conLock) {
if (isDisconnected() && !closePending) {
//@TRACE 214=state=CONNECTING
conState = CONNECTING;
conOptions = options;
MqttConnect connect = new MqttConnect(client.getClientId(),
conOptions.getMqttVersion(),
conOptions.isCleanSession(),
conOptions.getKeepAliveInterval(),
conOptions.getUserName(),
conOptions.getPassword(),
conOptions.getWillMessage(),
conOptions.getWillDestination());
this.clientState.setKeepAliveSecs(conOptions.getKeepAliveInterval());
this.clientState.setCleanSession(conOptions.isCleanSession());
this.clientState.setMaxInflight(conOptions.getMaxInflight());
tokenStore.open();
ConnectBG conbg = new ConnectBG(this, token, connect, executorService);
conbg.start();
}
else {
// @TRACE 207=connect failed: not disconnected {0}
if (isClosed() || closePending) {
throw new MqttException(MqttException.REASON_CODE_CLIENT_CLOSED);
} else if (isConnecting()) {
throw new MqttException(MqttException.REASON_CODE_CONNECT_IN_PROGRESS);
} else if (isDisconnecting()) {
throw new MqttException(MqttException.REASON_CODE_CLIENT_DISCONNECTING);
} else {
throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_CONNECTED);
}
}
}
}
可以看到,無(wú)非是:獲取之前我們?cè)O(shè)置的一些參數(shù) --> 然后開(kāi)啟ConnectBG連接線程
看ConnectBG的run方法:
NetworkModule networkModule = networkModules[networkModuleIndex];
networkModule.start();
receiver = new CommsReceiver(clientComms, clientState, tokenStore, networkModule.getInputStream());
receiver.start("MQTT Rec: "+getClient().getClientId(), executorService);
sender = new CommsSender(clientComms, clientState, tokenStore, networkModule.getOutputStream());
sender.start("MQTT Snd: "+getClient().getClientId(), executorService);
callback.start("MQTT Call: "+getClient().getClientId(), executorService);
internalSend(conPacket, conToken);
這段代碼的邏輯是:建立(tcp或者ssl或者WebSocket或者WebSocketSecure)連接 --> 開(kāi)啟接收線程 --> 開(kāi)啟發(fā)送線程 --> 發(fā)送消息
建立上文中的何種連接方式,是根據(jù)serverURI來(lái)判斷,參考MqttAsyncClient的createNetworkModules方法。
我們分析最常見(jiàn)的tcp連接方式(類(lèi)TCPNetworkModule的start方法):
SocketAddress sockaddr = new InetSocketAddress(host, port);
if (factory instanceof SSLSocketFactory) {
// SNI support
Socket tempsocket = new Socket();
tempsocket.connect(sockaddr, conTimeout*1000);
socket = ((SSLSocketFactory)factory).createSocket(tempsocket, host, port, true);
} else {
socket = factory.createSocket();
socket.connect(sockaddr, conTimeout*1000);
}
嗯,最終在這建立的socket連接。
如何發(fā)送消息
先簡(jiǎn)單描述:
將message、topic等字段實(shí)例化IMqttToken --> 接收線程循環(huán)獲取msg --> 寫(xiě)流傳輸
看看具體的代碼:
獲取msg:
message = clientState.get();
if (message != null) {
//@TRACE 802=network send key={0} msg={1}
if (message instanceof MqttAck) {
out.write(message);
out.flush();
} else {
MqttToken token = tokenStore.getToken(message);
// While quiescing the tokenstore can be cleared so need
// to check for null for the case where clear occurs
// while trying to send a message.
if (token != null) {
synchronized (token) {
out.write(message);
try {
out.flush();
} catch (IOException ex) {
....
如何接收消息
讀取數(shù)據(jù)流,再分發(fā)通知。
看接收線程的代碼:
while (running && (in != null)) {
try {
//@TRACE 852=network read message
receiving = in.available() > 0;
MqttWireMessage message = in.readMqttWireMessage();
receiving = false;
// instanceof checks if message is null
if (message instanceof MqttAck) {
token = tokenStore.getToken(message);
if (token!=null) {
synchronized (token) {
// Ensure the notify processing is done under a lock on the token
// This ensures that the send processing can complete before the
// receive processing starts! ( request and ack and ack processing
// can occur before request processing is complete if not!
clientState.notifyReceivedAck((MqttAck)message);
}
} else if
...
如何保持長(zhǎng)連接的呢
通過(guò)鬧鐘定時(shí)任務(wù)(AlarmPingSender):
pendingIntent = PendingIntent.getBroadcast(service, 0, new Intent(action), PendingIntent.FLAG_UPDATE_CURRENT);
schedule(comms.getKeepAlive());
從AlarmReceiver類(lèi)onReceive方法入手切入,ClientState的checkForActivity方法有這么一段代碼:
token = new MqttToken(clientComms.getClient().getClientId());
if(pingCallback != null){
token.setActionCallback(pingCallback);
}
tokenStore.saveToken(token, pingCommand);
pendingFlows.insertElementAt(pingCommand, 0);
nextPingTime = getKeepAlive();
//Wake sender thread since it may be in wait state (in ClientState.get())
notifyQueueLock();
原來(lái)是pendingFlows隊(duì)列中插入消息,接收線程收到消息將數(shù)據(jù)寫(xiě)入流。
總結(jié)
源碼結(jié)構(gòu)清晰,功能很完整,寫(xiě)的還不錯(cuò)的,還是值得一讀。
當(dāng)然也便于自己擴(kuò)展。