
image.png
如上圖所示,整個(gè) UserProcessor 自定義處理器從兩個(gè)角度進(jìn)行分類:
- interest(感興趣的請(qǐng)求數(shù)據(jù)類型)是單個(gè)還是多個(gè)
- UserProcessor 是異步還是同步
兩種分類組合成四種底層抽象類,由用戶選擇實(shí)現(xiàn)。其中 AbstractUserProcessor 和 AbstractMultiInterestUserProcessor 可以看做是接口的適配器(為接口的每一個(gè)方法做了默認(rèn)實(shí)現(xiàn),使得最底層的四個(gè)抽象僅僅實(shí)現(xiàn)自己需要實(shí)現(xiàn)的方法就可以),所以個(gè)人更傾向于取名為 UserProcessorAdapter 和 MultiInterestUserProcessorAdapter。
一、同步用戶處理器
public class MyServerUserProcessor extends SyncUserProcessor<MyRequest> {
@Override
public Object handleRequest(BizContext bizCtx, MyRequest request) throws Exception {
MyResponse response = new MyResponse();
if (request != null) {
System.out.println(request);
response.setResp("from server -> " + request.getReq());
}
return response;
}
@Override
public String interest() {
return MyRequest.class.getName();
}
}
二、異步用戶處理器
public class MyAsyncServerUserProcessor extends AsyncUserProcessor<MyRequest> {
private static final Executor executor = Executors.newCachedThreadPool();
@Override
public void handleRequest(BizContext bizCtx, final AsyncContext asyncCtx, MyRequest request) {
executor.execute(new AsyncTask(bizCtx, asyncCtx, request));
}
@Override
public String interest() {
return MyRequest.class.getName();
}
class AsyncTask implements Runnable {
private BizContext bizCtx;
private AsyncContext asyncCtx;
private MyRequest request;
public AsyncTask(BizContext bizCtx, AsyncContext asyncCtx, MyRequest request) {
this.bizCtx = bizCtx;
this.asyncCtx = asyncCtx;
this.request = request;
}
@Override
public void run() {
MyResponse response = new MyResponse();
if (request != null) {
System.out.println(bizCtx.getRemoteAddress() + " == " + request);
response.setResp("from server -> " + request.getReq());
}
asyncCtx.sendResponse(response);
}
}
}
異步處理器中提供了一個(gè)自定義的線程池 executor
然后將業(yè)務(wù)邏輯的執(zhí)行封裝為一個(gè) AsyncTask 任務(wù)
之后使用自定義線程池執(zhí)行該任務(wù),執(zhí)行完畢之后,使用AsyncContext返回響應(yīng)。
看一下 RpcAsyncContext
public class RpcAsyncContext implements AsyncContext {
private RemotingContext ctx;
private RpcRequestCommand cmd;
private RpcRequestProcessor processor;
public RpcAsyncContext(final RemotingContext ctx, final RpcRequestCommand cmd,
final RpcRequestProcessor processor) {
this.ctx = ctx;
this.cmd = cmd;
this.processor = processor;
}
public void sendResponse(Object responseObject) {
processor.sendResponseIfNecessary(this.ctx, cmd.getType(), processor.getCommandFactory().createResponse(responseObject, this.cmd));
}
}
RpcAsyncContext#sendResponse 依舊是使用 RpcRequestProcessor#sendResponseIfNecessary 方法進(jìn)行響應(yīng)的發(fā)送。
通過 SOFABolt 源碼分析8 - RemotingCommand 命令協(xié)議的設(shè)計(jì)我們知道最終請(qǐng)求會(huì)執(zhí)行到 RpcRequestProcessor#dispatchToUserProcessor 方法,該方法中調(diào)用了 UserProcessor 自定義處理器。
private void dispatchToUserProcessor(RemotingContext ctx, RpcRequestCommand cmd) {
final int id = cmd.getId();
final byte type = cmd.getType();
UserProcessor processor = ctx.getUserProcessor(cmd.getRequestClass());
// 異步自定義處理器
if (processor instanceof AsyncUserProcessor) {
processor.handleRequest(processor.preHandleRequest(ctx, cmd.getRequestObject()), new RpcAsyncContext(ctx, cmd, this), cmd.getRequestObject());
...
} else { // 同步自定義處理器
Object responseObject = processor.handleRequest(processor.preHandleRequest(ctx, cmd.getRequestObject()), cmd.getRequestObject());
sendResponseIfNecessary(ctx, type, this.getCommandFactory().createResponse(responseObject, cmd));
...
}
}
public void sendResponseIfNecessary(final RemotingContext ctx, byte type,
final RemotingCommand response) {
final int id = response.getId();
if (type != RpcCommandType.REQUEST_ONEWAY) {
RemotingCommand serializedResponse = response;
response.serialize();
ctx.writeAndFlush(serializedResponse);
}
}
- 先進(jìn)行預(yù)處理:將 RemotingContext 封裝到 DefaultBizContext 中,避免用戶直接操作 RemotingContext,用戶后續(xù)可以操作 BizContext。preHandleRequest在 MyAsyncServerUserProcessor 中做沒有覆蓋和異步處理,所以是同步執(zhí)行;
public BizContext preHandleRequest(RemotingContext remotingCtx, T request) {
return new DefaultBizContext(remotingCtx);
}
- 進(jìn)行業(yè)務(wù)邏輯處理:這一步在 MyAsyncServerUserProcessor 中異步處理;
- 如果是異步處理,到此結(jié)束;如果是同步處理,此處還需要等待業(yè)務(wù)邏輯處理完成之后,執(zhí)行發(fā)送響應(yīng)代碼
三、多 interest 用戶處理器
public class MyMultiInterestServerUserProcessor extends SyncMutiInterestUserProcessor {
@Override
public Object handleRequest(BizContext bizCtx, Object request) throws Exception {
if (request instanceof MyRequest) {
System.out.println("MyRequest: " + request);
return newResponse("from server - MyRequest: " + request);
}
if (request instanceof String) {
System.out.println("String: " + request);
return newResponse("from server - String: " + request);
}
return null;
}
@Override
public List<String> multiInterest() {
List<String> interestList = new ArrayList<String>(2);
interestList.add(MyRequest.class.getName());
interestList.add(String.class.getName());
return interestList;
}
private MyResponse newResponse(String resp) {
MyResponse response = new MyResponse();
response.setResp(resp);
return response;
}
}
更加實(shí)用的例子見 SOFABolt github
多 interest 用戶處理器只在注冊(cè)的時(shí)候有所不同(同一個(gè) MyMultiInterestServerUserProcessor 實(shí)例會(huì)注冊(cè)到兩個(gè) multiInterest中的每一個(gè) key 上)。注冊(cè)代碼大致輪廓。
public static void registerUserProcessor(UserProcessor<?> processor,
ConcurrentHashMap<String, UserProcessor<?>> userProcessors) {
if (processor instanceof MultiInterestUserProcessor) {
registerUserProcessor((MultiInterestUserProcessor) processor, userProcessors);
} else {
// interest 不能 blank
if (StringUtils.isBlank(processor.interest())) {
throw new RuntimeException("Processor interest should not be blank!");
}
UserProcessor<?> preProcessor = userProcessors.putIfAbsent(processor.interest(), processor);
// 不能為同一個(gè) interest 注冊(cè)兩個(gè) processor + 不能重復(fù)注冊(cè)
if (preProcessor != null) {
String errMsg = "Processor with interest key ["
+ processor.interest()
+ "] has already been registered to rpc server, can not register again!";
throw new RuntimeException(errMsg);
}
}
}
private static void registerUserProcessor(MultiInterestUserProcessor<?> processor, ConcurrentHashMap<String, UserProcessor<?>> userProcessors) {
// // multiInterest 不能 epmty
if (null == processor.multiInterest() || processor.multiInterest().isEmpty()) {
throw new RuntimeException("Processor interest should not be blank!");
}
for (String interest : processor.multiInterest()) {
UserProcessor<?> preProcessor = userProcessors.putIfAbsent(interest, processor);
// 不能為同一個(gè) interest 注冊(cè)兩個(gè) processor + 不能重復(fù)注冊(cè)
if (preProcessor != null) {
String errMsg = "Processor with interest key ["
+ interest
+ "] has already been registered to rpc server, can not register again!";
throw new RuntimeException(errMsg);
}
}
}
}