本文要點(diǎn):
- netty4+protobuf3多類型傳輸實(shí)現(xiàn)
- 優(yōu)雅的實(shí)現(xiàn)消息分發(fā)
做后臺(tái)服務(wù)經(jīng)常有這樣的流程:

如何優(yōu)雅的完成這個(gè)過程呢?下面分享下基于
netty+protobuf的方案:
首先要解決的是如何在netty+protobuf中傳輸多個(gè)protobuf協(xié)議,這里采取的方案是使用一個(gè)類來做為描述協(xié)議的方案,也就是需要二次解碼的方案,IDL文件如下:
syntax = "proto3";
option java_package = "com.nonpool.proto";
option java_multiple_files = true;
message Frame {
string messageName = 1;
bytes payload = 15;
}
message TextMessage {
string text = 1;
}
Frame為描述協(xié)議,所有消息在發(fā)送的時(shí)候都序列化成byte數(shù)組寫入Frame的payload,messageName約定為要發(fā)送的message的類名,生成的時(shí)候設(shè)置java_multiple_files = true可以讓類分開生成,更清晰些,也更方便后面利用反射來獲取這些類.
生成好了protobuf,我們解包的過程就應(yīng)該是這樣的:

其中protobuf的序列化和反序列化netty已經(jīng)為我們編寫好了對(duì)應(yīng)的解碼/編碼器,直接調(diào)用即可,我們只需要編寫二次解碼/編碼器即可:
public class SecondProtobufCodec extends MessageToMessageCodec<Frame, MessageLite> {
@Override
protected void encode(ChannelHandlerContext ctx, MessageLite msg, List<Object> out) throws Exception {
out.add(Frame.newBuilder()
.setMessageType(msg.getClass().getSimpleName())
.setPayload(msg.toByteString())
.build());
}
@Override
protected void decode(ChannelHandlerContext ctx, Frame msg, List<Object> out) throws Exception {
out.add(ParseFromUtil.parse(msg));
}
}
public abstract class ParseFromUtil {
private final static ConcurrentMap<String, Method> methodCache = new ConcurrentHashMap<>();
static {
//找到指定包下所有protobuf實(shí)體類
List<Class> classes = ClassUtil.getAllClassBySubClass(MessageLite.class, true, "com.nonpool.proto");
classes.stream()
.filter(protoClass -> !Objects.equals(protoClass, Frame.class))
.forEach(protoClass -> {
try {
//反射獲取parseFrom方法并緩存到map
methodCache.put(protoClass.getSimpleName(), protoClass.getMethod("parseFrom", ByteString.class));
} catch (NoSuchMethodException e) {
throw new RuntimeException(e);
}
});
}
/**
* 根據(jù)Frame類解析出其中的body
*
* @param msg
* @return
*/
public static MessageLite parse(Frame msg) throws InvocationTargetException, IllegalAccessException {
String type = msg.getMessageType();
ByteString body = msg.getPayload();
Method method = methodCache.get(type);
if (method == null) {
throw new RuntimeException("unknown Message type :" + type);
}
return (MessageLite) method.invoke(null, body);
}
}
至此,我們收發(fā)數(shù)據(jù)的解碼/編碼已經(jīng)做完配合自帶的解碼/編碼器,此時(shí)pipeline的處理鏈?zhǔn)沁@樣的:
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new ProtobufVarint32FrameDecoder())
.addLast(new ProtobufDecoder(Frame.getDefaultInstance()))
.addLast(new ProtobufVarint32LengthFieldPrepender())
.addLast(new ProtobufEncoder())
.addLast(new SecondProtobufCodec())
;
數(shù)據(jù)收發(fā)完成,接下來就是把消息分發(fā)到對(duì)應(yīng)的處理方法。處理方法也利用多態(tài)特性+泛型+注解優(yōu)雅的實(shí)現(xiàn)分發(fā)。首先定義一個(gè)泛型接口:interface DataHandler<T extends MessageLite>,該接口上定義一個(gè)方法void handler(T t, ChannelHandlerContext ctx),然后每一個(gè)類型的處理類都使用自己要處理的類型實(shí)現(xiàn)該接口,并使用自定義注解映射處理類型。在項(xiàng)目啟動(dòng)的掃描實(shí)現(xiàn)該接口的所有類,使用跟上述解析Message類型相同的方法來緩存這些處理類(有一點(diǎn)不同的是這里只需要緩存一個(gè)處理類的實(shí)例而不是方法,因?yàn)?code>parseFrom是static的無法統(tǒng)一調(diào)用),做完這些我們就可以編寫我們的pipeline上的最后一個(gè)處理器:
public class DispatchHandler extends ChannelInboundHandlerAdapter {
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
HandlerUtil.getHandlerInstance(msg.getClass().getSimpleName()).handler((MessageLite) msg,ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
}
}
public abstract class HandlerUtil {
private final static ConcurrentMap<String,DataHandler> instanceCache = new ConcurrentHashMap<>();
static {
try {
List<Class> classes = ClassUtil.getAllClassBySubClass(DataHandler.class, true,"com.onescorpion");
for (Class claz : classes) {
HandlerMapping annotation = (HandlerMapping) claz.getAnnotation(HandlerMapping.class);
instanceCache.put(annotation.value(), (DataHandler) claz.newInstance());
}
System.out.println("handler init success handler Map: " + instanceCache);
} catch (Exception e) {
e.printStackTrace();
}
}
public static DataHandler getHandlerInstance(String name) {
return instanceCache.get(name);
}
}
這樣一個(gè)優(yōu)雅的處理分發(fā)就完成了。
由于代碼規(guī)律性極強(qiáng),所以所有handler類均可以使用模版來生成,完整代碼請(qǐng)看這里
ps:其實(shí)本例中由于使用了protobuf還有比較強(qiáng)約定性,所以理論上來說每個(gè)消息處理器上的自定義注解是不需要的,通過獲取泛型的真實(shí)類型即可,但是注解可以大大增加handler的靈活性,如果采用其他方案(例如json)也更有借鑒的價(jià)值。