pigeon源碼分析-處理請求結(jié)果

RPC調(diào)用處理請求結(jié)果可以分為兩部分:

  • 獲取 response
  • 關(guān)聯(lián) request 和 response

這么分看起來似乎很奇怪,不是直接等待處理完獲取結(jié)果就行了嗎?

我們說 RPC 調(diào)用都是在模擬這個動作: Result result = service.call(args); 但是遠(yuǎn)程調(diào)用畢竟不是本地調(diào)用(其實(shí)稍后可以看到還是有相似之處的),將請求寫到網(wǎng)絡(luò)之后,就無法命令遠(yuǎn)端做任何事了,這次請求就已經(jīng)告一段落了。

pigeon client 只知道:

  • 向網(wǎng)絡(luò)寫數(shù)據(jù);
    • 就是寫 request
  • 處理網(wǎng)絡(luò)寫入的數(shù)據(jù);
    • 處理成 response

于是不難理解為何有此一問:網(wǎng)絡(luò)另一端寫過來的數(shù)據(jù),我怎么知道是哪個請求的返回值呢?

獲取 response

其實(shí)從網(wǎng)絡(luò)讀取數(shù)據(jù),轉(zhuǎn)化成 Object。 pigeon 基于 netty,獲取 response 就是處理網(wǎng)絡(luò)寫入。

具體實(shí)現(xiàn)在:

 // com.dianping.pigeon.remoting.netty.invoker.NettyClientHandler#messageReceived
    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
        CodecEvent codecEvent = (CodecEvent) e.getMessage();

        if (codecEvent.isValid() && codecEvent.getInvocation() != null) {
            client.processResponse((InvocationResponse) codecEvent.getInvocation());
        }
    }

從 netty 封裝 MessageEvent 轉(zhuǎn)換成 CodecEvent,再剝開一層就是 InvocationResponse,當(dāng)然這還是一個比較泛化的返回值,com.dianping.pigeon.remoting.invoker.process.ResponseProcessor 將會進(jìn)一步處理;

繼續(xù)往下看:

// com.dianping.pigeon.remoting.invoker.process.threadpool.ResponseThreadPoolProcessor#doProcessResponse
public void doProcessResponse(final InvocationResponse response, final Client client) {
        Runnable task = new Runnable() {
            public void run() {
                ServiceInvocationRepository.getInstance().receiveResponse(response);
            }
        };
        try {
            responseProcessThreadPool.execute(task);
        } catch (RejectedExecutionException e) {
            String error = String.format("process response failed:%s, processor stats:%s", response,
                    getProcessorStatistics());
            throw new RejectedException(error, e);
        }
    }

這里封裝成了一個 task,交給線程池處理。

再下一層:

// com.dianping.pigeon.remoting.invoker.service.ServiceInvocationRepository#receiveResponse
public class ServiceInvocationRepository {
    // 略
    private static Map<Long, RemoteInvocationBean> invocations = new ConcurrentHashMap<Long, RemoteInvocationBean>();


    public void receiveResponse(InvocationResponse response) {
        RemoteInvocationBean invocationBean = invocations.get(response.getSequence());
        if (invocationBean != null) {
            if (logger.isDebugEnabled()) {
                logger.debug("received response:" + response);
            }
            InvocationRequest request = invocationBean.request;
            try {
                Callback callback = invocationBean.callback;
                if (callback != null) {
                    Client client = callback.getClient();
                    if (client != null) {
                        ServiceStatisticsHolder.flowOut(request, client.getAddress());
                    }
                    callback.callback(response);
                    callback.run();
                }
            } finally {
                invocations.remove(response.getSequence());
            }
        }
    }
// 略
}

這里可以看到, RemoteInvocationBean invocationBean = invocations.get(response.getSequence()); invocations 維護(hù)一個 HashMap,key 是一個 long 型的 sequenceId,通過這種方式定位到 invocationBean,而 invocationBean 看實(shí)現(xiàn)可知持有 request 引用,以及一個處理返回值的 callback。

Callback 在不同調(diào)用模式(sync / future/ oneway/ callback) 下有不同實(shí)現(xiàn)類,比如 sync 模式下:

// public class CallbackFuture implements Callback, CallFuture {   
@Override
    public void callback(InvocationResponse response) {
        this.response = response;
    }

sync 和 future 調(diào)用,都是將 response 對象實(shí)例設(shè)置給相應(yīng)的引用

何時得到真正的 returnValue?

看動態(tài)代理的邏輯,com.dianping.pigeon.remoting.invoker.service.ServiceInvocationProxy#invoke

com.dianping.pigeon.remoting.invoker.process.filter.InvocationInvokeFilter 挨個執(zhí)行完之后,提取返回值

// com.dianping.pigeon.remoting.invoker.service.ServiceInvocationProxy#extractResult
    public Object extractResult(InvocationResponse response, Class<?> returnType) throws Throwable {
        Object responseReturn = response.getReturn();
        // ...
}

拿到的是頂層父類實(shí)例 Object,具體的類型匹配需要客戶端和服務(wù)端自行匹配

關(guān)聯(lián) request 和 response

從上面的分析可以看出,關(guān)鍵點(diǎn)就在于 sequence,每次調(diào)用應(yīng)該有個唯一的 id 進(jìn)行匹配

這個 sequence 是唯一的嗎?

如果不唯一,就可能導(dǎo)致拿到錯誤的處理結(jié)果。

sequence 的生成位置

//com.dianping.pigeon.remoting.invoker.process.filter.ContextPrepareInvokeFilter#initRequest

private static AtomicLong requestSequenceMaker = new AtomicLong();

request.setSequence(requestSequenceMaker.incrementAndGet() * -1);

可以看到 這個 sequence 是全局唯一的,準(zhǔn)確說是同一個 JVM 中是唯一的,而且是 long 類型,足夠大;

Q:sequence 發(fā)生回繞怎么辦?

A:long 類型,即使發(fā)生回繞,也需要足夠長的時間,一般來說不會堆積有那么多的請求,導(dǎo)致兩個相同的 sequenceId 實(shí)際對應(yīng)不同請求;

分布式環(huán)境下,sequenceId 在多個機(jī)器上可能重復(fù),會出錯嗎?

A:sequence 存儲的 com.dianping.pigeon.remoting.invoker.service.ServiceInvocationRepository#invocations也是同一個 JVM 唯一的,所以只需要擔(dān)心會不會有這樣的場景:

client A 調(diào)用 server A,client B 也調(diào)用 server A,但是 server A 把 client B的請求返回值處理之后發(fā)送到了 client A?

看看服務(wù)端的處理,寫返回值:

//com.dianping.pigeon.remoting.provider.process.filter.WriteResponseProcessFilter#invoke
    public InvocationResponse invoke(ServiceInvocationHandler handler, ProviderContext invocationContext)
            throws Throwable {
        try {
            ProviderChannel channel = invocationContext.getChannel();
            InvocationRequest request = invocationContext.getRequest();
            InvocationResponse response = handler.handle(invocationContext);
            if (request.getCallType() == Constants.CALLTYPE_REPLY) {
                invocationContext.getTimeline().add(new TimePoint(TimePhase.P));
                channel.write(invocationContext, response);
                invocationContext.getTimeline().add(new TimePoint(TimePhase.P));
            }
        // ...

channel 就是對socket的封裝,可以看成是 client / server 對對方的抽象。

那么只需要保證拿到正確的 channel 就對了:

//com.dianping.pigeon.remoting.netty.provider.NettyServerHandler#messageReceived
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent message) {
        CodecEvent codecEvent = (CodecEvent) (message.getMessage());

        if (!codecEvent.isValid() || codecEvent.getInvocation() == null) {
            return;
        }

        InvocationRequest request = (InvocationRequest) codecEvent.getInvocation();

        ProviderContext invocationContext = new DefaultProviderContext(request, new NettyServerChannel(ctx.getChannel()));
        //...
    }

也就是說,再 client 端 writeRequest() 之后,server 端讀取網(wǎng)絡(luò)數(shù)據(jù)的時候就從 context 中獲取到 client 所在的 channel 了,簡單來說,”從哪里來,到哪里去“。

總的來說,不會發(fā)生以上所述的 sequenceId 錯亂的問題。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • pigeon源碼分析-同步調(diào)用和異步調(diào)用 Pigeon是美團(tuán)點(diǎn)評內(nèi)部廣泛使用的一個分布式服務(wù)通信框架(RPC),本...
    WhiteBase閱讀 1,600評論 0 0
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,534評論 19 139
  • 國家電網(wǎng)公司企業(yè)標(biāo)準(zhǔn)(Q/GDW)- 面向?qū)ο蟮挠秒娦畔?shù)據(jù)交換協(xié)議 - 報批稿:20170802 前言: 排版 ...
    庭說閱讀 12,332評論 6 13
  • 1.感謝最強(qiáng)大腦,發(fā)現(xiàn)了自己觀察視角的不同,會發(fā)現(xiàn)最強(qiáng)大腦觀眾席抓拍到的觀眾都沒有化妝,是因?yàn)槭鼙姴煌?2.發(fā)現(xiàn)...
    張洪瑜閱讀 188評論 0 0
  • 這還是高一時候的事。好吧,說我記仇也好,懷舊也罷。就這樣在心里生了根,后來發(fā)了芽。 是的,我喜歡上一個人,故事太多...
    Grexogr閱讀 194評論 0 1

友情鏈接更多精彩內(nèi)容