SOFABolt 源碼分析9 - UserProcessor 自定義處理器的設(shè)計(jì)

image.png

如上圖所示,整個(gè) UserProcessor 自定義處理器從兩個(gè)角度進(jìn)行分類:

  • interest(感興趣的請(qǐng)求數(shù)據(jù)類型)是單個(gè)還是多個(gè)
  • UserProcessor 是異步還是同步
    兩種分類組合成四種底層抽象類,由用戶選擇實(shí)現(xiàn)。其中 AbstractUserProcessor 和 AbstractMultiInterestUserProcessor 可以看做是接口的適配器(為接口的每一個(gè)方法做了默認(rèn)實(shí)現(xiàn),使得最底層的四個(gè)抽象僅僅實(shí)現(xiàn)自己需要實(shí)現(xiàn)的方法就可以),所以個(gè)人更傾向于取名為 UserProcessorAdapter 和 MultiInterestUserProcessorAdapter。

一、同步用戶處理器

public class MyServerUserProcessor extends SyncUserProcessor<MyRequest> {
    @Override
    public Object handleRequest(BizContext bizCtx, MyRequest request) throws Exception {
        MyResponse response = new MyResponse();
        if (request != null) {
            System.out.println(request);
            response.setResp("from server -> " + request.getReq());
        }
        return response;
    }

    @Override
    public String interest() {
        return MyRequest.class.getName();
    }
}

二、異步用戶處理器

public class MyAsyncServerUserProcessor extends AsyncUserProcessor<MyRequest> {
    private static final Executor executor = Executors.newCachedThreadPool();

    @Override
    public void handleRequest(BizContext bizCtx, final AsyncContext asyncCtx, MyRequest request) {
        executor.execute(new AsyncTask(bizCtx, asyncCtx, request));
    }

    @Override
    public String interest() {
        return MyRequest.class.getName();
    }

    class AsyncTask implements Runnable {
        private BizContext   bizCtx;
        private AsyncContext asyncCtx;
        private MyRequest request;

        public AsyncTask(BizContext bizCtx, AsyncContext asyncCtx, MyRequest request) {
            this.bizCtx = bizCtx;
            this.asyncCtx = asyncCtx;
            this.request = request;
        }

        @Override
        public void run() {
            MyResponse response = new MyResponse();
            if (request != null) {
                System.out.println(bizCtx.getRemoteAddress() + " == " + request);
                response.setResp("from server -> " + request.getReq());
            }
            asyncCtx.sendResponse(response);
        }
    }
}

異步處理器中提供了一個(gè)自定義的線程池 executor
然后將業(yè)務(wù)邏輯的執(zhí)行封裝為一個(gè) AsyncTask 任務(wù)
之后使用自定義線程池執(zhí)行該任務(wù),執(zhí)行完畢之后,使用 AsyncContext 返回響應(yīng)。

看一下 RpcAsyncContext

public class RpcAsyncContext implements AsyncContext {
    private RemotingContext     ctx;
    private RpcRequestCommand   cmd;
    private RpcRequestProcessor processor;

    public RpcAsyncContext(final RemotingContext ctx, final RpcRequestCommand cmd,
                           final RpcRequestProcessor processor) {
        this.ctx = ctx;
        this.cmd = cmd;
        this.processor = processor;
    }

    public void sendResponse(Object responseObject) {
        processor.sendResponseIfNecessary(this.ctx, cmd.getType(), processor.getCommandFactory().createResponse(responseObject, this.cmd));
    }
}

RpcAsyncContext#sendResponse 依舊是使用 RpcRequestProcessor#sendResponseIfNecessary 方法進(jìn)行響應(yīng)的發(fā)送。

通過 SOFABolt 源碼分析8 - RemotingCommand 命令協(xié)議的設(shè)計(jì)我們知道最終請(qǐng)求會(huì)執(zhí)行到 RpcRequestProcessor#dispatchToUserProcessor 方法,該方法中調(diào)用了 UserProcessor 自定義處理器。

    private void dispatchToUserProcessor(RemotingContext ctx, RpcRequestCommand cmd) {
        final int id = cmd.getId();
        final byte type = cmd.getType();
        UserProcessor processor = ctx.getUserProcessor(cmd.getRequestClass());
        // 異步自定義處理器
        if (processor instanceof AsyncUserProcessor) {
            processor.handleRequest(processor.preHandleRequest(ctx, cmd.getRequestObject()), new RpcAsyncContext(ctx, cmd, this), cmd.getRequestObject());
            ...
        } else {  // 同步自定義處理器
            Object responseObject = processor.handleRequest(processor.preHandleRequest(ctx, cmd.getRequestObject()), cmd.getRequestObject());
            sendResponseIfNecessary(ctx, type, this.getCommandFactory().createResponse(responseObject, cmd));
            ...
        }
    }

    public void sendResponseIfNecessary(final RemotingContext ctx, byte type,
                                        final RemotingCommand response) {
        final int id = response.getId();
        if (type != RpcCommandType.REQUEST_ONEWAY) {
            RemotingCommand serializedResponse = response;
            response.serialize();

            ctx.writeAndFlush(serializedResponse);
        } 
    }
  • 先進(jìn)行預(yù)處理:將 RemotingContext 封裝到 DefaultBizContext 中,避免用戶直接操作 RemotingContext,用戶后續(xù)可以操作 BizContext。preHandleRequest在 MyAsyncServerUserProcessor 中做沒有覆蓋和異步處理,所以是同步執(zhí)行;
    public BizContext preHandleRequest(RemotingContext remotingCtx, T request) {
        return new DefaultBizContext(remotingCtx);
    }
  • 進(jìn)行業(yè)務(wù)邏輯處理:這一步在 MyAsyncServerUserProcessor 中異步處理;
  • 如果是異步處理,到此結(jié)束;如果是同步處理,此處還需要等待業(yè)務(wù)邏輯處理完成之后,執(zhí)行發(fā)送響應(yīng)代碼

三、多 interest 用戶處理器

public class MyMultiInterestServerUserProcessor extends SyncMutiInterestUserProcessor {
    @Override
    public Object handleRequest(BizContext bizCtx, Object request) throws Exception {
        if (request instanceof MyRequest) {
            System.out.println("MyRequest: " + request);
            return newResponse("from server - MyRequest: " + request);
        }
        if (request instanceof String) {
            System.out.println("String: " + request);
            return newResponse("from server - String: " + request);
        }
        return null;
    }

    @Override
    public List<String> multiInterest() {
        List<String> interestList = new ArrayList<String>(2);
        interestList.add(MyRequest.class.getName());
        interestList.add(String.class.getName());
        return interestList;
    }

    private MyResponse newResponse(String resp) {
        MyResponse response = new MyResponse();
        response.setResp(resp);
        return response;
    }
}

更加實(shí)用的例子見 SOFABolt github

多 interest 用戶處理器只在注冊(cè)的時(shí)候有所不同(同一個(gè) MyMultiInterestServerUserProcessor 實(shí)例會(huì)注冊(cè)到兩個(gè) multiInterest中的每一個(gè) key 上)。注冊(cè)代碼大致輪廓。

    public static void registerUserProcessor(UserProcessor<?> processor,
                                             ConcurrentHashMap<String, UserProcessor<?>> userProcessors) {
        if (processor instanceof MultiInterestUserProcessor) {
            registerUserProcessor((MultiInterestUserProcessor) processor, userProcessors);
        } else {
            // interest 不能 blank
            if (StringUtils.isBlank(processor.interest())) {
                throw new RuntimeException("Processor interest should not be blank!");
            }
            UserProcessor<?> preProcessor = userProcessors.putIfAbsent(processor.interest(), processor);
            // 不能為同一個(gè) interest 注冊(cè)兩個(gè) processor + 不能重復(fù)注冊(cè)
            if (preProcessor != null) {
                String errMsg = "Processor with interest key ["
                                + processor.interest()
                                + "] has already been registered to rpc server, can not register again!";
                throw new RuntimeException(errMsg);
            }
        }
    }

    private static void registerUserProcessor(MultiInterestUserProcessor<?> processor, ConcurrentHashMap<String, UserProcessor<?>> userProcessors) {
        // // multiInterest 不能 epmty
        if (null == processor.multiInterest() || processor.multiInterest().isEmpty()) {
            throw new RuntimeException("Processor interest should not be blank!");
        }
        for (String interest : processor.multiInterest()) {
            UserProcessor<?> preProcessor = userProcessors.putIfAbsent(interest, processor);
             // 不能為同一個(gè) interest 注冊(cè)兩個(gè) processor + 不能重復(fù)注冊(cè)
            if (preProcessor != null) {
                String errMsg = "Processor with interest key ["
                                + interest
                                + "] has already been registered to rpc server, can not register again!";
                throw new RuntimeException(errMsg);
            }
        }
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 1.ios高性能編程 (1).內(nèi)層 最小的內(nèi)層平均值和峰值(2).耗電量 高效的算法和數(shù)據(jù)結(jié)構(gòu)(3).初始化時(shí)...
    歐辰_OSR閱讀 30,231評(píng)論 8 265
  • 1、通過CocoaPods安裝項(xiàng)目名稱項(xiàng)目信息 AFNetworking網(wǎng)絡(luò)請(qǐng)求組件 FMDB本地?cái)?shù)據(jù)庫(kù)組件 SD...
    陽明AI閱讀 16,203評(píng)論 3 119
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,554評(píng)論 19 139
  • 【杭州】 申請(qǐng)材料: 1、《杭州市職工生育待遇申領(lǐng)表》;【表格下載】*2 2、生殖健康服務(wù)證復(fù)印件(即準(zhǔn)生證); ...
    美麗的大仙閱讀 331評(píng)論 0 0
  • 最幸福的時(shí)刻 (文/亦濃) 最幸福的時(shí)刻 是收到你的信息的時(shí)候 收到你的信息的時(shí)候 是最幸福的時(shí)刻 可是啊,等待 ...
    開在夜里的花兒閱讀 319評(píng)論 16 13

友情鏈接更多精彩內(nèi)容