前一陣在工作中用到了RabbitMQ,因此對幾種常見的消息隊列產(chǎn)生了興趣。首先從GitHub上下載了RocketMQ的源碼打算一探究竟。在閱讀remoting這個模塊時遇到了很大的障礙。RocketMQ的網(wǎng)絡(luò)編程完全基于Netty,而本人對Netty的理解還只停留在了知道這是一款封裝了NIO的優(yōu)秀框架上。于是正好就借此機會先揭開Netty的面紗。
閱讀完《Netty IN ACTION》后有些手癢,就用Netty實現(xiàn)了一個簡易的應(yīng)用層協(xié)議以及一個同步調(diào)用的方法。
github:https://github.com/ztglcy/netty-protocol
整體結(jié)構(gòu)

圖里demo中是client和server的簡易demo;handler中則是自定義協(xié)議的編碼器和解碼器;message中是與傳輸?shù)南⑾嚓P(guān)的類;processor是服務(wù)端的業(yè)務(wù)處理類;service中則是client和server的啟動類。
傳輸消息格式及編解碼

length是一個表示消息大小的int型數(shù)字,自定義長度解碼器解決TCP黏包問題。headerLength則是表示消息頭大小的int型數(shù)字,用以將傳輸?shù)南㈩^與消息體分開進行序列化。header和content分別存儲消息的消息頭以及消息體。
public class MessageHeader{
private int messageId;
private int clientId;
private int serverId;
private int code;
private MessageHeader(){}
public MessageHeader(int code) {
this.code = code;
}
public int getCode() {
return code;
}
public int getMessageId() {
return messageId;
}
public void setMessageId(int messageId) {
this.messageId = messageId;
}
public int getClientId() {
return clientId;
}
public void setClientId(int clientId) {
this.clientId = clientId;
}
public int getServerId() {
return serverId;
}
public void setServerId(int serverId) {
this.serverId = serverId;
}
public void setCode(int code) {
this.code = code;
}
}
消息頭包含messageId,clientId,serverId,code四個參數(shù),分別用以表征Message的ID,客戶端ID,服務(wù)端ID,以及消息體的格式code(維護在一個常量中)。公有的構(gòu)造方法中必傳消息體格式code,私有的構(gòu)造方法用于fastjson的反序列化。
public class Message {
private MessageHeader messageHeader;
private byte[] content;
private Message(){
}
public static Message createMessage(MessageHeader messageHeader){
Message message = new Message();
message.messageHeader = messageHeader;
return message;
}
public byte[] getContent() {
return content;
}
public void setContent(byte[] content) {
this.content = content;
}
public MessageHeader getMessageHeader() {
return messageHeader;
}
//還有編碼與解碼的方法
......
}
Message的結(jié)構(gòu)比較直白,包含消息頭和消息體以及編碼與解碼的方法。解碼與編碼的方法在解碼器與編碼器中進行調(diào)用。先來介紹一下編碼的過程。
public class ProtocolEncoder extends MessageToByteEncoder<Message> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf byteBuf) throws Exception {
ByteBuffer byteBuffer = message.encode();
byteBuf.writeBytes(byteBuffer);
}
}
編碼器繼承了MessageToByteEncoder并實現(xiàn)了其encode()方法進行編碼。編碼器直接調(diào)用傳進來的Message自己的編碼方法,將編碼后的ByteBuffer寫入ByteBuf中。再來看一下Message怎么實現(xiàn)這個方法的。
public ByteBuffer encode(){
int length = 4;
byte[] bytes = SerializableHelper.encode(messageHeader);
if(bytes != null){
length += bytes.length;
}
if(content!=null){
length += content.length;
}
ByteBuffer byteBuffer = ByteBuffer.allocate(length + 4);
byteBuffer.putInt(length);
if(bytes != null){
byteBuffer.putInt(bytes.length);
byteBuffer.put(bytes);
}else{
byteBuffer.putInt(0);
}
if(content!=null){
byteBuffer.put(content);
}
byteBuffer.flip();
return byteBuffer;
}
length用以表示整個消息大小,計算方式為表示消息頭大小的int+序列化后的消息頭大小+消息體大小。計算完成后申請一塊length+4大小的ByteBuffer(因為length本身存儲也要4個字節(jié))。將消息內(nèi)容按照上面給出的格式依次寫入Buffer中。
public class ProtocolDecoder extends LengthFieldBasedFrameDecoder {
public ProtocolDecoder() {
super(16777216, 0, 4,0,4);
}
@Override
public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf byteBuf = (ByteBuf) super.decode(ctx, in);
if(byteBuf == null){
return null;
}
ByteBuffer byteBuffer = byteBuf.nioBuffer();
return Message.decode(byteBuffer);
}
}
解碼器繼承了LengthFieldBasedFrameDecoder用以解決粘包的問題。構(gòu)造函數(shù)中的參數(shù)分別表示包的最大值、長度字段的偏移量、長度字段占的字節(jié)數(shù)、添加到長度字段的補償值以及從解碼幀中第一次去除的字節(jié)數(shù)。因為消息的頭部存儲了4個字節(jié)的表示消息大小的Int型,所以后4個參數(shù)為0、4、0、4。經(jīng)過處理后的消息已經(jīng)剝離掉了最消息頭部的Int型。再調(diào)用Message自身的decode()方法進行解碼。
public static Message decode(ByteBuffer byteBuffer){
int length = byteBuffer.limit();
int headerLength = byteBuffer.getInt();
byte[] headerData = new byte[headerLength];
byteBuffer.get(headerData);
MessageHeader messageHeader = SerializableHelper.decode(headerData,MessageHeader.class);
byte[] content = new byte[length - headerLength -4];
byteBuffer.get(content);
Message message = Message.createMessage(messageHeader);
message.setContent(content);
return message;
}
解碼時首先將消息頭的長度從ByteBuffer中取出,然后讀取該長度的字節(jié)作為消息頭進行反序列化,其他部分則作為消息體,重新組裝成新的Message。
服務(wù)端與客戶端的引導(dǎo)
public interface ProtocolService {
void start();
void shutdown();
}
服務(wù)端與客戶端都繼承了ProtocolService接口,實現(xiàn)了start()和shutdown()兩個方法。
public class NettyProtocolServer implements ProtocolService {
private EventLoopGroup bossGroup = new NioEventLoopGroup(1);
private EventLoopGroup workerGroup = new NioEventLoopGroup();
private Map<Integer,ProtocolProcessor> processorMap = new HashMap<>();
@Override
public void start(){
try{
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.localAddress(8888)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new ProtocolEncoder()
,new ProtocolDecoder()
,new ProtocolServerProcessor()
);
}
});
ChannelFuture cf = bootstrap.bind().sync();
} catch (InterruptedException e) {
}
}
@Override
public void shutdown() {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
public void registerProcessor(Integer code,ProtocolProcessor protocolProcessor){
processorMap.put(code,protocolProcessor);
}
public class ProtocolServerProcessor extends SimpleChannelInboundHandler<Message> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Message message) throws Exception {
Integer code = message.getMessageHeader().getCode();
ProtocolProcessor processor = processorMap.get(code);
if(processor != null){
Message response = processor.process(message);
channelHandlerContext.writeAndFlush(response);
}
}
}
}
服務(wù)端的start()方法中完成了服務(wù)端的初始化,很常見的netty寫法,將編碼器、解碼器以及業(yè)務(wù)處理器ProtocolServerProcessor加入了Worker的Pipeline中。這里業(yè)務(wù)處理器也可以放在線程池里執(zhí)行,防止業(yè)務(wù)處理時間太長造成堵塞。shutdown()方法則將兩個EventLoopGroup進行關(guān)閉,防止資源泄露。registerProcessor()方法則是將業(yè)務(wù)處理器以KV的形式注冊到服務(wù)端中,ProtocolServerProcessor會根據(jù)消息頭中的code在map中查找對應(yīng)的業(yè)務(wù)處理器進行業(yè)務(wù)的處理。
public class DemoProcessor implements ProtocolProcessor{
@Override
public Message process(Message message) {
byte[] bodyDate = message.getContent();
DemoMessageBody messageBody = SerializableHelper.decode(bodyDate,DemoMessageBody.class);
System.out.println(messageBody.getDemo());
MessageHeader messageHeader = new MessageHeader(1);
messageHeader.setMessageId(message.getMessageHeader().getMessageId());
DemoMessageBody responseBody = new DemoMessageBody();
responseBody.setDemo("I received!");
Message response = Message.createMessage(messageHeader);
response.setContent(SerializableHelper.encode(responseBody));
return response;
}
}
DemoProcessor是一個示例的業(yè)務(wù)處理器,將傳來的消息體解碼后返回一個I received!的回復(fù),這里注意的是messageId要與請求的消息一致,用以表征這是哪個請求的返回。
public class NettyProtocolClient implements ProtocolService {
private EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
private Bootstrap bootstrap = new Bootstrap();
private ConcurrentHashMap<Integer,Response> responseMap = new ConcurrentHashMap<>();
@Override
public void start() {
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new ProtocolDecoder(),
new ProtocolEncoder(),
new ProtocolClientProcessor());
}
});
}
@Override
public void shutdown() {
eventLoopGroup.shutdownGracefully();
}
public Message send(String address, Message message){
try {
Response response = new Response();
responseMap.put(message.getMessageHeader().getMessageId(),response);
Channel channel = bootstrap.connect(address,8888).sync().channel();
channel.writeAndFlush(message);
Message responseMessage = response.waitResponse();
responseMap.remove(message.getMessageHeader().getMessageId());
channel.close();
return responseMessage;
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
public class ProtocolClientProcessor extends SimpleChannelInboundHandler<Message> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Message message) throws Exception {
Response response = responseMap.get(message.getMessageHeader().getMessageId());
if (response != null){
response.putResponse(message);
}
}
}
}
客戶端與服務(wù)端的start()方法和shutdown()方法類似??蛻舳颂峁┝艘粋€send()方法用以消息的同步調(diào)用,send()方法在發(fā)送信息后以消息的messageId為key生成一個Response的實例緩存在responseMap中,調(diào)用Response中的countDownLatch.await()方法堵塞住等待返回(這里應(yīng)該加一個時間限制以防止線程無限期地堵塞?。rotocolClientProcessor會處理返回的消息,將其存入對應(yīng)的Response中,并調(diào)用countDownLatch.countDown()。這樣客戶端線程就可以收到結(jié)果同步返回。還可以改進的一點在于保持客戶端與服務(wù)端的長連接,將其緩存在客戶端中,每次發(fā)送消息都用已緩存的連接,減少開銷。
DEMO
最后分別編寫一個客戶端與服務(wù)端的demo用以測試我們的協(xié)議。
public class ServerDemo {
public static void main(String[] args) {
NettyProtocolServer nettyProtocolServer = new NettyProtocolServer();
nettyProtocolServer.registerProcessor(1,new DemoProcessor());
nettyProtocolServer.start();
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
nettyProtocolServer.shutdown();
}
}
服務(wù)端的demo首先構(gòu)建一個NettyProtocolServer的實例,將DemoProcessor注冊到服務(wù)端中之后掛起主線程等待客戶端的消息,最后shutdown掉NettyProtocolServer。
public class ClientDemo {
public static void main(String[] args) {
NettyProtocolClient client = new NettyProtocolClient();
client.start();
Message message = demoMessage();
Message messageResponse = client.send("localhost",message);
System.out.println(SerializableHelper.decode(messageResponse.getContent(),DemoMessageBody.class).getDemo());
client.shutdown();
}
private static Message demoMessage(){
MessageHeader messageHeader = new MessageHeader(1);
messageHeader.setMessageId(1);
messageHeader.setClientId(1);
messageHeader.setServerId(1);
Message message =Message.createMessage(messageHeader);
DemoMessageBody responseBody = new DemoMessageBody();
responseBody.setDemo("Hello World!");
message.setContent(SerializableHelper.encode(responseBody));
return message;
}
}
客戶端的demo也很簡單。構(gòu)建一個NettyProtocolClient的實例,拼裝一個消息,調(diào)用send()方法,再對返回的消息稍加處理就OK啦(客戶端拼裝和處理消息可以再抽出一個中間層)。
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。