一. 背景
最近的一個(gè)項(xiàng)目:需要使用 Android App 作為 Socket 的服務(wù)端,并且一個(gè)端口能夠同時(shí)監(jiān)聽(tīng) TCP/Web Socket 協(xié)議。
自然而然,項(xiàng)目決定采用 Netty 框架。Netty 服務(wù)端在收到客戶(hù)端發(fā)來(lái)的消息后,能夠做出相應(yīng)的業(yè)務(wù)處理。在某些場(chǎng)景下,服務(wù)端也需要給客戶(hù)端 App/網(wǎng)頁(yè)發(fā)送消息。
二. Netty 的使用
2.1 Netty 服務(wù)端
首先,定義好 NettyServer,它使用object聲明表示是一個(gè)單例。用于 Netty 服務(wù)端的啟動(dòng)、關(guān)閉以及發(fā)送消息。
object NettyServer {
private val TAG = "NettyServer"
private var channel: Channel?=null
private lateinit var listener: NettyServerListener<String>
private lateinit var bossGroup: EventLoopGroup
private lateinit var workerGroup: EventLoopGroup
var port = 8888
set(value) {
field = value
}
var webSocketPath = "/ws"
set(value) {
field = value
}
var isServerStart: Boolean = false
private set
fun start() {
object : Thread() {
override fun run() {
super.run()
bossGroup = NioEventLoopGroup(1)
workerGroup = NioEventLoopGroup()
try {
val b = ServerBootstrap()
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel::class.java)
.localAddress(InetSocketAddress(port))
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(NettyServerInitializer(listener,webSocketPath))
// Bind and start to accept incoming connections.
val f = b.bind().sync()
Log.i(TAG, NettyServer::class.java.name + " started and listen on " + f.channel().localAddress())
isServerStart = true
listener.onStartServer()
f.channel().closeFuture().sync()
} catch (e: Exception) {
Log.e(TAG, e.localizedMessage)
e.printStackTrace()
} finally {
isServerStart = false
listener.onStopServer()
disconnect()
}
}
}.start()
}
fun disconnect() {
workerGroup.shutdownGracefully()
bossGroup.shutdownGracefully()
}
fun setListener(listener: NettyServerListener<String>) {
this.listener = listener
}
// 異步發(fā)送TCP消息
fun sendMsgToClient(data: String, listener: ChannelFutureListener) = channel?.run {
val flag = this.isActive
if (flag) {
this.writeAndFlush(data + System.getProperty("line.separator")).addListener(listener)
}
flag
} ?: false
// 同步發(fā)送TCP消息
fun sendMsgToClient(data: String) = channel?.run {
if (this.isActive) {
return this.writeAndFlush(data + System.getProperty("line.separator")).awaitUninterruptibly().isSuccess
}
false
} ?: false
// 異步發(fā)送WebSocket消息
fun sendMsgToWS(data: String,listener: ChannelFutureListener) = channel?.run {
val flag = this.isActive
if (flag) {
this.writeAndFlush(TextWebSocketFrame(data)).addListener(listener)
}
flag
} ?: false
// 同步發(fā)送TCP消息
fun sendMsgToWS(data: String) = channel?.run {
if (this.isActive) {
return this.writeAndFlush(TextWebSocketFrame(data)).awaitUninterruptibly().isSuccess
}
false
} ?: false
/**
* 切換通道
* 設(shè)置服務(wù)端,與哪個(gè)客戶(hù)端通信
* @param channel
*/
fun selectorChannel(channel: Channel?) {
this.channel = channel
}
}
NettyServerInitializer 是服務(wù)端跟客戶(hù)端連接之后使用的 childHandler:
class NettyServerInitializer(private val mListener: NettyServerListener<String>,private val webSocketPath:String) : ChannelInitializer<SocketChannel>() {
@Throws(Exception::class)
public override fun initChannel(ch: SocketChannel) {
val pipeline = ch.pipeline()
pipeline.addLast("active",ChannelActiveHandler(mListener))
pipeline.addLast("socketChoose", SocketChooseHandler(webSocketPath))
pipeline.addLast("string_encoder",StringEncoder(CharsetUtil.UTF_8))
pipeline.addLast("linebased",LineBasedFrameDecoder(1024))
pipeline.addLast("string_decoder",StringDecoder(CharsetUtil.UTF_8))
pipeline.addLast("commonhandler", CustomerServerHandler(mListener))
}
}
NettyServerInitializer 包含了多個(gè) Handler:連接使用的ChannelActiveHandler,協(xié)議選擇使用的 SocketChooseHandler,TCP 消息使用的 StringEncoder、LineBasedFrameDecoder、StringDecoder,以及最終處理消息的 CustomerServerHandler。
ChannelActiveHandler:
@ChannelHandler.Sharable
class ChannelActiveHandler(var mListener: NettyServerListener<String>) : ChannelInboundHandlerAdapter() {
@Throws(Exception::class)
override fun channelActive(ctx: ChannelHandlerContext) {
val insocket = ctx.channel().remoteAddress() as InetSocketAddress
val clientIP = insocket.address.hostAddress
val clientPort = insocket.port
Log.i("ChannelActiveHandler","新的連接:$clientIP : $clientPort")
mListener.onChannelConnect(ctx.channel())
}
}
SocketChooseHandler 通過(guò)讀取消息來(lái)區(qū)分是 WebSocket 還是 Socket。如果是 WebSocket 的話,去掉 Socket 使用的相關(guān) Handler。
class SocketChooseHandler(val webSocketPath:String) : ByteToMessageDecoder() {
@Throws(Exception::class)
override fun decode(ctx: ChannelHandlerContext, `in`: ByteBuf, out: List<Any>) {
val protocol = getBufStart(`in`)
if (protocol.startsWith(WEBSOCKET_PREFIX)) {
PipelineAdd.websocketAdd(ctx,webSocketPath)
ctx.pipeline().remove("string_encoder")
ctx.pipeline().remove("linebased")
ctx.pipeline().remove("string_decoder")
}
`in`.resetReaderIndex()
ctx.pipeline().remove(this.javaClass)
}
private fun getBufStart(`in`: ByteBuf): String {
var length = `in`.readableBytes()
if (length > MAX_LENGTH) {
length = MAX_LENGTH
}
// 標(biāo)記讀位置
`in`.markReaderIndex()
val content = ByteArray(length)
`in`.readBytes(content)
return String(content)
}
companion object {
/** 默認(rèn)暗號(hào)長(zhǎng)度為23 */
private val MAX_LENGTH = 23
/** WebSocket握手的協(xié)議前綴 */
private val WEBSOCKET_PREFIX = "GET /"
}
}
StringEncoder、LineBasedFrameDecoder、StringDecoder 都是 Netty 內(nèi)置的編、解碼器。其中,LineBasedFrameDecoder 用于解決 TCP粘包/拆包的問(wèn)題。
CustomerServerHandler:
@ChannelHandler.Sharable
class CustomerServerHandler(private val mListener: NettyServerListener<String>) : SimpleChannelInboundHandler<Any>() {
@Throws(Exception::class)
override fun channelReadComplete(ctx: ChannelHandlerContext) {
}
override fun exceptionCaught(ctx: ChannelHandlerContext,
cause: Throwable) {
cause.printStackTrace()
ctx.close()
}
@Throws(Exception::class)
override fun channelRead0(ctx: ChannelHandlerContext, msg: Any) {
val buff = msg as ByteBuf
val info = buff.toString(CharsetUtil.UTF_8)
Log.d(TAG,"收到消息內(nèi)容:$info")
}
@Throws(Exception::class)
override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
if (msg is WebSocketFrame) { // 處理 WebSocket 消息
val webSocketInfo = (msg as TextWebSocketFrame).text().trim { it <= ' ' }
Log.d(TAG, "收到WebSocketSocket消息:$webSocketInfo")
mListener.onMessageResponseServer(webSocketInfo , ctx.channel().id().asShortText())
} else if (msg is String){ // 處理 Socket 消息
Log.d(TAG, "收到socket消息:$msg")
mListener.onMessageResponseServer(msg, ctx.channel().id().asShortText())
}
}
// 斷開(kāi)連接
@Throws(Exception::class)
override fun channelInactive(ctx: ChannelHandlerContext) {
super.channelInactive(ctx)
Log.d(TAG, "channelInactive")
val reAddr = ctx.channel().remoteAddress() as InetSocketAddress
val clientIP = reAddr.address.hostAddress
val clientPort = reAddr.port
Log.d(TAG,"連接斷開(kāi):$clientIP : $clientPort")
mListener.onChannelDisConnect(ctx.channel())
}
companion object {
private val TAG = "CustomerServerHandler"
}
}
2.2 Netty 客戶(hù)端
客戶(hù)端也需要一個(gè)啟動(dòng)、關(guān)閉、發(fā)送消息的 NettyTcpClient,并且 NettyTcpClient 的創(chuàng)建采用 Builder 模式。
class NettyTcpClient private constructor(val host: String, val tcp_port: Int, val index: Int) {
private lateinit var group: EventLoopGroup
private lateinit var listener: NettyClientListener<String>
private var channel: Channel? = null
/**
* 獲取TCP連接狀態(tài)
*
* @return 獲取TCP連接狀態(tài)
*/
var connectStatus = false
/**
* 最大重連次數(shù)
*/
var maxConnectTimes = Integer.MAX_VALUE
private set
private var reconnectNum = maxConnectTimes
private var isNeedReconnect = true
var isConnecting = false
private set
var reconnectIntervalTime: Long = 5000
private set
/**
* 心跳間隔時(shí)間
*/
var heartBeatInterval: Long = 5
private set//單位秒
/**
* 是否發(fā)送心跳
*/
var isSendheartBeat = false
private set
/**
* 心跳數(shù)據(jù),可以是String類(lèi)型,也可以是byte[].
*/
private var heartBeatData: Any? = null
fun connect() {
if (isConnecting) {
return
}
val clientThread = object : Thread("Netty-Client") {
override fun run() {
super.run()
isNeedReconnect = true
reconnectNum = maxConnectTimes
connectServer()
}
}
clientThread.start()
}
private fun connectServer() {
synchronized(this@NettyTcpClient) {
var channelFuture: ChannelFuture?=null
if (!connectStatus) {
isConnecting = true
group = NioEventLoopGroup()
val bootstrap = Bootstrap().group(group)
.option(ChannelOption.TCP_NODELAY, true)//屏蔽Nagle算法試圖
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.channel(NioSocketChannel::class.java as Class<out Channel>?)
.handler(object : ChannelInitializer<SocketChannel>() {
@Throws(Exception::class)
public override fun initChannel(ch: SocketChannel) {
if (isSendheartBeat) {
ch.pipeline().addLast("ping", IdleStateHandler(0, heartBeatInterval, 0, TimeUnit.SECONDS)) //5s未發(fā)送數(shù)據(jù),回調(diào)userEventTriggered
}
ch.pipeline().addLast(StringEncoder(CharsetUtil.UTF_8))
ch.pipeline().addLast(StringDecoder(CharsetUtil.UTF_8))
ch.pipeline().addLast(LineBasedFrameDecoder(1024))//黏包處理,需要客戶(hù)端、服務(wù)端配合
ch.pipeline().addLast(NettyClientHandler(listener, index, isSendheartBeat, heartBeatData))
}
})
try {
channelFuture = bootstrap.connect(host, tcp_port).addListener {
if (it.isSuccess) {
Log.d(TAG, "連接成功")
reconnectNum = maxConnectTimes
connectStatus = true
channel = channelFuture?.channel()
} else {
Log.d(TAG, "連接失敗")
connectStatus = false
}
isConnecting = false
}.sync()
// Wait until the connection is closed.
channelFuture.channel().closeFuture().sync()
Log.d(TAG, " 斷開(kāi)連接")
} catch (e: Exception) {
e.printStackTrace()
} finally {
connectStatus = false
listener.onClientStatusConnectChanged(ConnectState.STATUS_CONNECT_CLOSED, index)
if (channelFuture != null) {
if (channelFuture.channel() != null && channelFuture.channel().isOpen) {
channelFuture.channel().close()
}
}
group.shutdownGracefully()
reconnect()
}
}
}
}
fun disconnect() {
Log.d(TAG, "disconnect")
isNeedReconnect = false
group.shutdownGracefully()
}
fun reconnect() {
Log.d(TAG, "reconnect")
if (isNeedReconnect && reconnectNum > 0 && !connectStatus) {
reconnectNum--
SystemClock.sleep(reconnectIntervalTime)
if (isNeedReconnect && reconnectNum > 0 && !connectStatus) {
Log.e(TAG, "重新連接")
connectServer()
}
}
}
/**
* 異步發(fā)送
*
* @param data 要發(fā)送的數(shù)據(jù)
* @param listener 發(fā)送結(jié)果回調(diào)
* @return 方法執(zhí)行結(jié)果
*/
fun sendMsgToServer(data: String, listener: MessageStateListener) = channel?.run {
val flag = this != null && connectStatus
if (flag) {
this.writeAndFlush(data + System.getProperty("line.separator")).addListener { channelFuture -> listener.isSendSuccss(channelFuture.isSuccess) }
}
flag
} ?: false
/**
* 同步發(fā)送
*
* @param data 要發(fā)送的數(shù)據(jù)
* @return 方法執(zhí)行結(jié)果
*/
fun sendMsgToServer(data: String) = channel?.run {
val flag = this != null && connectStatus
if (flag) {
val channelFuture = this.writeAndFlush(data + System.getProperty("line.separator")).awaitUninterruptibly()
return channelFuture.isSuccess
}
false
}?:false
fun setListener(listener: NettyClientListener<String>) {
this.listener = listener
}
/**
* Builder 模式創(chuàng)建NettyTcpClient
*/
class Builder {
/**
* 最大重連次數(shù)
*/
private var MAX_CONNECT_TIMES = Integer.MAX_VALUE
/**
* 重連間隔
*/
private var reconnectIntervalTime: Long = 5000
/**
* 服務(wù)器地址
*/
private var host: String? = null
/**
* 服務(wù)器端口
*/
private var tcp_port: Int = 0
/**
* 客戶(hù)端標(biāo)識(shí),(因?yàn)榭赡艽嬖诙鄠€(gè)連接)
*/
private var mIndex: Int = 0
/**
* 是否發(fā)送心跳
*/
private var isSendheartBeat: Boolean = false
/**
* 心跳時(shí)間間隔
*/
private var heartBeatInterval: Long = 5
/**
* 心跳數(shù)據(jù),可以是String類(lèi)型,也可以是byte[].
*/
private var heartBeatData: Any? = null
fun setMaxReconnectTimes(reConnectTimes: Int): Builder {
this.MAX_CONNECT_TIMES = reConnectTimes
return this
}
fun setReconnectIntervalTime(reconnectIntervalTime: Long): Builder {
this.reconnectIntervalTime = reconnectIntervalTime
return this
}
fun setHost(host: String): Builder {
this.host = host
return this
}
fun setTcpPort(tcp_port: Int): Builder {
this.tcp_port = tcp_port
return this
}
fun setIndex(mIndex: Int): Builder {
this.mIndex = mIndex
return this
}
fun setHeartBeatInterval(intervalTime: Long): Builder {
this.heartBeatInterval = intervalTime
return this
}
fun setSendheartBeat(isSendheartBeat: Boolean): Builder {
this.isSendheartBeat = isSendheartBeat
return this
}
fun setHeartBeatData(heartBeatData: Any): Builder {
this.heartBeatData = heartBeatData
return this
}
fun build(): NettyTcpClient {
val nettyTcpClient = NettyTcpClient(host!!, tcp_port, mIndex)
nettyTcpClient.maxConnectTimes = this.MAX_CONNECT_TIMES
nettyTcpClient.reconnectIntervalTime = this.reconnectIntervalTime
nettyTcpClient.heartBeatInterval = this.heartBeatInterval
nettyTcpClient.isSendheartBeat = this.isSendheartBeat
nettyTcpClient.heartBeatData = this.heartBeatData
return nettyTcpClient
}
}
companion object {
private val TAG = "NettyTcpClient"
private val CONNECT_TIMEOUT_MILLIS = 5000
}
}
Android 的客戶(hù)端相對(duì)而言比較簡(jiǎn)單,需要的 Handler 包括:支持心跳的 IdleStateHandler, TCP 消息需要使用的 Handler (跟服務(wù)端一樣分別是StringEncoder、StringDecoder、LineBasedFrameDecoder),以及對(duì)收到 TCP 消息進(jìn)行處理的 NettyClientHandler。
NettyClientHandler:
class NettyClientHandler(private val listener: NettyClientListener<String>, private val index: Int, private val isSendheartBeat: Boolean, private val heartBeatData: Any?) : SimpleChannelInboundHandler<String>() {
/**
*
* 設(shè)定IdleStateHandler心跳檢測(cè)每x秒進(jìn)行一次讀檢測(cè),
* 如果x秒內(nèi)ChannelRead()方法未被調(diào)用則觸發(fā)一次userEventTrigger()方法
*
* @param ctx ChannelHandlerContext
* @param evt IdleStateEvent
*/
override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) {
if (evt is IdleStateEvent) {
if (evt.state() == IdleState.WRITER_IDLE) { //發(fā)送心跳
if (isSendheartBeat) {
if (heartBeatData == null) {
ctx.channel().writeAndFlush("Heartbeat" + System.getProperty("line.separator")!!)
} else {
if (heartBeatData is String) {
Log.d(TAG, "userEventTriggered: String")
ctx.channel().writeAndFlush(heartBeatData + System.getProperty("line.separator")!!)
} else if (heartBeatData is ByteArray) {
Log.d(TAG, "userEventTriggered: byte")
val buf = Unpooled.copiedBuffer((heartBeatData as ByteArray?)!!)
ctx.channel().writeAndFlush(buf)
} else {
Log.d(TAG, "userEventTriggered: heartBeatData type error")
}
}
} else {
Log.d(TAG, "不發(fā)送心跳")
}
}
}
}
/**
*
* 客戶(hù)端上線
*
* @param ctx ChannelHandlerContext
*/
override fun channelActive(ctx: ChannelHandlerContext) {
Log.d(TAG, "channelActive")
listener.onClientStatusConnectChanged(ConnectState.STATUS_CONNECT_SUCCESS, index)
}
/**
*
* 客戶(hù)端下線
*
* @param ctx ChannelHandlerContext
*/
override fun channelInactive(ctx: ChannelHandlerContext) {
Log.d(TAG, "channelInactive")
}
/**
* 客戶(hù)端收到消息
*
* @param channelHandlerContext ChannelHandlerContext
* @param msg 消息
*/
override fun channelRead0(channelHandlerContext: ChannelHandlerContext, msg: String) {
Log.d(TAG, "channelRead0:")
listener.onMessageResponseClient(msg, index)
}
/**
* @param ctx ChannelHandlerContext
* @param cause 異常
*/
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
Log.e(TAG, "exceptionCaught")
listener.onClientStatusConnectChanged(ConnectState.STATUS_CONNECT_ERROR, index)
cause.printStackTrace()
ctx.close()
}
companion object {
private val TAG = "NettyClientHandler"
}
}
三. Demo 的實(shí)現(xiàn)
3.1 Socket 服務(wù)端
啟動(dòng) NettyServer:
private fun startServer() {
if (!NettyServer.isServerStart) {
NettyServer.setListener(this@MainActivity)
NettyServer.port = port
NettyServer.webSocketPath = webSocketPath
NettyServer.start()
} else {
NettyServer.disconnect()
}
}
NettyServer 異步發(fā)送 TCP 消息:
NettyServer.sendMsgToClient(msg, ChannelFutureListener { channelFuture ->
if (channelFuture.isSuccess) {
msgSend(msg)
}
})
NettyServer 異步發(fā)送 WebSocket 消息:
NettyServer.sendMsgToWS(msg, ChannelFutureListener { channelFuture ->
if (channelFuture.isSuccess) {
msgSend(msg)
}
})
Demo 可以通過(guò) startServer 來(lái)啟動(dòng) Socket 服務(wù)端,也可以在啟動(dòng)之前點(diǎn)擊 configServer 來(lái)修改服務(wù)端的端口以及 WebSocket 的 Endpoint。

3.2 Socket 客戶(hù)端
NettyTcpClient 通過(guò) Builder 模式創(chuàng)建:
mNettyTcpClient = NettyTcpClient.Builder()
.setHost(ip) //設(shè)置服務(wù)端地址
.setTcpPort(port) //設(shè)置服務(wù)端端口號(hào)
.setMaxReconnectTimes(5) //設(shè)置最大重連次數(shù)
.setReconnectIntervalTime(5) //設(shè)置重連間隔時(shí)間。單位:秒
.setSendheartBeat(false) //設(shè)置發(fā)送心跳
.setHeartBeatInterval(5) //設(shè)置心跳間隔時(shí)間。單位:秒
.setHeartBeatData("I'm is HeartBeatData") //設(shè)置心跳數(shù)據(jù),可以是String類(lèi)型,也可以是byte[],以后設(shè)置的為準(zhǔn)
.setIndex(0) //設(shè)置客戶(hù)端標(biāo)識(shí).(因?yàn)榭赡艽嬖诙鄠€(gè)tcp連接)
.build()
mNettyTcpClient.setListener(this@MainActivity) //設(shè)置TCP監(jiān)聽(tīng)
啟動(dòng)、關(guān)閉客戶(hù)端連接:
private fun connect() {
Log.d(TAG, "connect")
if (!mNettyTcpClient.connectStatus) {
mNettyTcpClient.connect()//連接服務(wù)器
} else {
mNettyTcpClient.disconnect()
}
}
NettyTcpClient 異步發(fā)送 TCP 消息到服務(wù)端:
mNettyTcpClient.sendMsgToServer(msg, object : MessageStateListener {
override fun isSendSuccss(isSuccess: Boolean) {
if (isSuccess) {
msgSend(msg)
}
}
})
Demo 的客戶(hù)端 App 也可以在啟動(dòng)之前點(diǎn)擊 configClient 來(lái)修改要連接的服務(wù)端 IP 、端口。


WebSocket 的測(cè)試可以通過(guò):http://www.websocket-test.com/
Netty Server 端跟網(wǎng)頁(yè)通信:

WebSocket在線測(cè)試:

四. 總結(jié)
借助 Kotlin 的特性以及 Netty 框架,我們?cè)?Android 上也實(shí)現(xiàn)了一個(gè) Socket 服務(wù)端。
本文 demo github 地址:https://github.com/fengzhizi715/Netty4Android
本文的例子很簡(jiǎn)單,只是發(fā)送簡(jiǎn)單的消息。在實(shí)際生產(chǎn)環(huán)境中,我們采用的消息格式可能是 json ,因?yàn)?json 更加靈活,通過(guò)解析 json 獲取消息的內(nèi)容。
參考資料: