由于公司最近做即時(shí)通訊,該文章是基于netty-all-4.1.25.Final.jar 版本5 在android中有問題 運(yùn)行時(shí)候加載不到相關(guān)類 很是納悶?zāi)奈煌瑢W(xué) 有解決方案 可以交流下。minaDemo可看另一片文章。
代碼地址:https://github.com/mygzk/NettyAndroidDemo.git
先看效果圖:

device-2018-06-22-170316.png
客戶端 基于android移動(dòng)的:
/**
* netty client
*/
public class NettyClient {
private String TAG = NettyClient.class.getSimpleName();
/**
* 重連間隔時(shí)間
*/
private long reconnectIntervalTime = 5000;
/**
* 連接狀態(tài)
*/
private volatile boolean isConnect = false;
/**
* 是否需要重連
*/
private boolean isNeedReconnect = true;
/**
* 重連次數(shù)
*/
private static int reconnectNum = Integer.MAX_VALUE;
private EventLoopGroup mEventLoopGroup;
private Channel mChannel;
private NettyClientHandler mNettyClientHandler;
private DispterMessage mDispterMessage;
private Thread mClientThread;
private NettyConnectListener mNettyConnectListener;
private List<NettyReceiveListener> mNettyReceiveListeners = new ArrayList<>();
private NettyReceiveListener mNettyReceiveListener;
private static class NettyClientHint {
private static final NettyClient INSTANCE = new NettyClient();
}
private NettyClient() {
mNettyClientHandler = new NettyClientHandler();
mDispterMessage = new DispterMessage();
}
public static NettyClient getInstance() {
return NettyClientHint.INSTANCE;
}
public void connect(NettyConnectListener listener) {
if (isConnect) {
return;
}
mNettyReceiveListeners.clear();
mNettyConnectListener = listener;
mClientThread = new Thread(new Runnable() {
@Override
public void run() {
connectServer();
}
});
mClientThread.start();
}
/**
* 連接到netty服務(wù)器
*/
private void connectServer() {
if (mChannel != null) {
mChannel = null;
}
try {
mEventLoopGroup = new NioEventLoopGroup();
Bootstrap mBootstrap = new Bootstrap();
mBootstrap.group(mEventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
ChannelPipeline pipeline = socketChannel.pipeline();
//粘包處理
pipeline.addLast("line", new LineBasedFrameDecoder(1024));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("handler", mNettyClientHandler);
}
});
ChannelFuture mChannelFuture = mBootstrap
.connect(new InetSocketAddress(NettyConstant.HOST, NettyConstant.PORT)).sync();
mChannel = mChannelFuture.channel();
mChannelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
isConnect = true;
mChannel = channelFuture.channel();
if (mNettyConnectListener != null) {
// mNettyConnectListener.connectSucc();
postMsg(null, mNettyConnectListener, null, DispterMessage.MSG_CONN_SUCC);
}
} else {
if (mNettyConnectListener != null) {
// mNettyConnectListener.connectFail("連接失敗,channelFuture is not success");
postMsg("連接失敗,channelFuture is not success", mNettyConnectListener, null, DispterMessage.MSG_CONN_FAIL);
}
isConnect = false;
}
}
});
mChannel.closeFuture().sync();
} catch (InterruptedException e) {
if (mNettyConnectListener != null) {
// mNettyConnectListener.connectFail(e.getMessage());
postMsg(e.getMessage(), mNettyConnectListener, null, DispterMessage.MSG_CONN_FAIL);
}
isConnect = false;
e.printStackTrace();
} catch (Exception e) {
if (mNettyConnectListener != null) {
postMsg(e.getMessage(), mNettyConnectListener, null, DispterMessage.MSG_CONN_FAIL);
}
e.printStackTrace();
} finally {
isConnect = false;
if (mNettyConnectListener != null) {
postMsg(null, mNettyConnectListener, null, DispterMessage.MSG_CONN_DIS);
}
disconnect();
mEventLoopGroup.shutdownGracefully();
}
}
public void disconnect() {
/* if (mClientThread != null) {
mClientThread.interrupt();
mClientThread = null;
}*/
if (mNettyConnectListener != null) {
postMsg(null, mNettyConnectListener, null, DispterMessage.MSG_CONN_DIS);
}
clearReceiveLisenter();
isConnect = false;
isNeedReconnect = false;
mEventLoopGroup.shutdownGracefully();
}
public void reconnect() {
Log.e(TAG, "reconnect");
if (isNeedReconnect && reconnectNum > 0 && !isConnect) {
reconnectNum--;
SystemClock.sleep(reconnectIntervalTime);
if (isNeedReconnect && reconnectNum > 0 && !isConnect) {
disconnect();
SystemClock.sleep(reconnectIntervalTime);
connectServer();
}
}
}
public synchronized void send(String msg, NettyReceiveListener listener) {
mNettyReceiveListener = listener;
if (mChannel == null) {
postMsg("channel is null", null, mNettyReceiveListener, DispterMessage.MSG_RECEIVE_FAIL);
return;
}
if (!mChannel.isWritable()) {
postMsg("channel is not Writable", null, mNettyReceiveListener, DispterMessage.MSG_RECEIVE_FAIL);
return;
}
if (!mChannel.isActive()) {
postMsg("channel is not active!", null, mNettyReceiveListener, DispterMessage.MSG_RECEIVE_FAIL);
return;
}
if (mChannel != null) {
addReceiveLisenter(mNettyReceiveListener);
mChannel.writeAndFlush(msg + System.getProperty(NettyConstant.MAG_SEPARATOR_1));
}
}
public void addReceiveLisenter(NettyReceiveListener listener) {
if (listener != null && !mNettyReceiveListeners.contains(listener)) {
mNettyReceiveListeners.add(listener);
}
}
public void removeCurrentReceiveLisenter() {
if (mNettyReceiveListener != null && mNettyReceiveListeners.size() > 0) {
mNettyReceiveListeners.remove(mNettyReceiveListener);
}
}
public void removeReceiveLisenter(NettyReceiveListener listener) {
if (listener != null && mNettyReceiveListeners.contains(listener)) {
mNettyReceiveListeners.remove(listener);
}
}
public void clearReceiveLisenter() {
mNettyReceiveListeners.clear();
}
public void handMsg(String msg) {
for (NettyReceiveListener listener : mNettyReceiveListeners) {
if (listener != null) {
postMsg(msg, null, listener, DispterMessage.MSG_RECEIVE_SUCC);
}
}
}
public void handErrorMsg(String msg) {
for (NettyReceiveListener listener : mNettyReceiveListeners) {
if (listener != null) {
postMsg(msg, null, listener, DispterMessage.MSG_RECEIVE_FAIL);
}
}
}
private void postMsg(String msg, NettyConnectListener connectListener, NettyReceiveListener receiveListener, int type) {
ReplyMessage message = new ReplyMessage();
message.setConnectListener(connectListener);
message.setMsg(msg);
message.setReceiveListener(receiveListener);
message.setType(type);
mDispterMessage.handMsg(message);
}
}
設(shè)計(jì)的主要類:
1.EventLoopGroup 可以理解為將多個(gè)EventLoop進(jìn)行分組管理的一個(gè)類,是EventLoop的一個(gè)組。
2.Bootstrap 啟動(dòng)幫助類 參數(shù)主要這里配置的
chnnel()
handler() 主要用來接收處理消息 本demo里面用到的是netty自帶的一種解碼器LineBasedFrameDecoder
option 設(shè)置socket相關(guān)參數(shù)
測(cè)試服務(wù)端代碼:
public class TestServer {
public static void main(String[] agrs) {
new TestServer().bind(8080);
}
private void bind(int port) {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast("line", new LineBasedFrameDecoder(1024));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("handler", new TestServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 1024);
System.out.println("SimpleChatServer 啟動(dòng)");
// 綁定端口,開始接收進(jìn)來的連接
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
System.out.println("SimpleChatServer 關(guān)閉了");
}
}
}
TestServerHandler 消息處理器
public class TestServerHandler extends SimpleChannelInboundHandler<String> {
String TAG_LINE = "line.separator";
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
System.out.println("server receive msg:" + s);
channelHandlerContext.writeAndFlush("[reply]:" + s + System.getProperty(TAG_LINE));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}