ByteToMessageDecoder分析

ByteToMessageDecoder

該方法提供了將 ByteBuf 轉(zhuǎn)化為對象的解碼器處理流程,具體的解碼規(guī)則交由子類去實現(xiàn)。

我們以讀操作 channelRead 為例來研究一下:

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            // out 是一個鏈表,存放解碼成功的對象
            CodecOutputList out = CodecOutputList.newInstance();
            try {
                ByteBuf data = (ByteBuf) msg;
                // cumulation 中存放的是上次未處理完的半包消息
                first = cumulation == null;
                if (first) {
                    cumulation = data;
                } else {
                    // 本次處理,需要把上次遺留的半包和本次數(shù)據(jù)拼接后,一起處理
                    cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                }
                // 調(diào)用解碼器解碼消息
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {
                throw e;
            } catch (Exception e) {
                throw new DecoderException(e);
            } finally {
                if (cumulation != null && !cumulation.isReadable()) {
                    numReads = 0;
                    cumulation.release();
                    cumulation = null;
                } else if (++ numReads >= discardAfterReads) {
                    numReads = 0;
                    discardSomeReadBytes();
                }

                int size = out.size();
                decodeWasNull = !out.insertSinceRecycled();
                // 如果有解碼成功的數(shù)據(jù),需要向后傳遞,讓其他 ChannelHandler 繼續(xù)處理
                fireChannelRead(ctx, out, size);
                out.recycle();
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }

    protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        try {
            while (in.isReadable()) {
                int outSize = out.size();

                if (outSize > 0) {
                    // 如果有解碼成功的數(shù)據(jù),需要向后傳遞,讓其他 ChannelHandler 繼續(xù)處理
                    fireChannelRead(ctx, out, outSize);
                    out.clear();
                    // 如果當(dāng)前 ChannelHandler 所屬 ctx 被剔除 pipeline 上下文,就不需要繼續(xù)處理了
                    if (ctx.isRemoved()) {
                        break;
                    }
                    outSize = 0;
                }

                int oldInputLength = in.readableBytes();
                // 解碼
                decodeRemovalReentryProtection(ctx, in, out);

                if (ctx.isRemoved()) {
                    break;
                }

                if (outSize == out.size()) {
                    if (oldInputLength == in.readableBytes()) {
                        break;
                    } else {
                        continue;
                    }
                }

                if (oldInputLength == in.readableBytes()) {
                    throw new DecoderException(
                            StringUtil.simpleClassName(getClass()) +
                                    ".decode() did not read anything but decoded a message.");
                }

                if (isSingleDecode()) {
                    break;
                }
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Exception cause) {
            throw new DecoderException(cause);
        }
    }

    final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
            throws Exception {
        // 設(shè)置解碼器狀態(tài)為正在解碼,避免解碼過程中另一個線程調(diào)用了 handlerRemoved 把數(shù)據(jù)銷毀,造成混亂
        decodeState = STATE_CALLING_CHILD_DECODE;
        try {
            decode(ctx, in, out);
        } finally {
            // STATE_HANDLER_REMOVED_PENDING 表示在解碼過程中,ctx 被移除,需要由當(dāng)前線程來調(diào)用 handlerRemoved 
            boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
            decodeState = STATE_INIT;
            if (removePending) {
                handlerRemoved(ctx);
            }
        }
    }

    // 具體的消息解碼算法,交給子類實現(xiàn)
    protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;    
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容