Binder請求處理流程探究

前段時間遇到個蠻有意思的bug,趁著春節(jié)有空刨根問底記錄下來。

背景是我們有個調(diào)試工具運行在安卓機器上用于局域網(wǎng)內(nèi)遠(yuǎn)程調(diào)試機器,其實就是用nanohttpd接收http的請求進(jìn)行轉(zhuǎn)發(fā)。

其中有個功能是通過sse的方式實現(xiàn)往client的轉(zhuǎn)發(fā)通知,我們的實現(xiàn)方式是client調(diào)用get請求,server返回一個text/event-stream類型的無限大response,它的body是PipedInputStream類型,當(dāng)有消息需要轉(zhuǎn)發(fā)的時候就往對應(yīng)的PipedOutputStream寫入:

private fun onCreateSse(uuid: String): Response {
    val input = PipedInputStream()
    val output = PipedOutputStream(input)
    val handler = ISseHandler { event, data ->
        try {
            output.write("event: $event\ndata: ${data}\n\n".toByteArray())
        } catch (e: IOException) {
            notifyHandlers.remove(uuid)
        }
    }

    notifyHandlers[uuid] = handler
    return newFixedLengthResponse(Response.Status.OK, "text/event-stream", input, -1)
        .apply {
            addHeader("Cache-Control", "no-cache")
            addHeader("Access-Control-Allow-Origin", "*")
        }
}

但是有開發(fā)發(fā)現(xiàn)這個sse總是會在接收到某一個url的post請求后幾秒內(nèi)因為IOException而斷開,一開始我以為是http server和client的socket io斷開了,仔細(xì)看異常才發(fā)現(xiàn)是PipedInputStream.read拋出的IOException("Pipe broken"):

// https://cs.android.com/android/platform/superproject/+/android-13.0.0_r74:libcore/ojluni/src/main/java/java/io/PipedInputStream.java
public synchronized int read()  throws IOException {
    ...
    readSide = Thread.currentThread();
    int trials = 2;
    while (in < 0) {
        ...
        if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
            throw new IOException("Pipe broken"); //就是這里拋出異常了!!!
        }
        ...
        wait(1000);
        ...
    }
    int ret = buffer[out++] & 0xFF;
    if (out >= buffer.length) {
        out = 0;
    }
    if (in == out) {
        /* now empty */
        in = -1;
    }

    return ret;
}

從上面的代碼可以看到PipedInputStream的原理其實很暴力,不斷wait 1秒等待PipedOutputStream寫入數(shù)據(jù),當(dāng)數(shù)據(jù)寫入的時候會執(zhí)行到PipedInputStream.receive,這里面會賦值writeSide和in:

// https://cs.android.com/android/platform/superproject/+/android-13.0.0_r74:libcore/ojluni/src/main/java/java/io/PipedInputStream.java
synchronized void receive(byte b[], int off, int len)  throws IOException {
    checkStateForReceive();
    writeSide = Thread.currentThread();
    int bytesToTransfer = len;
    while (bytesToTransfer > 0) {
        if (in == out)
            awaitSpace();
        int nextTransferAmount = 0;
        if (out < in) {
            nextTransferAmount = buffer.length - in;
        } else if (in < out) {
            if (in == -1) {
                in = out = 0;
                nextTransferAmount = buffer.length - in;
            } else {
                nextTransferAmount = out - in;
            }
        }
        if (nextTransferAmount > bytesToTransfer)
            nextTransferAmount = bytesToTransfer;
        assert(nextTransferAmount > 0);
        System.arraycopy(b, off, buffer, in, nextTransferAmount);
        bytesToTransfer -= nextTransferAmount;
        off += nextTransferAmount;
        in += nextTransferAmount;
        if (in >= buffer.length) {
            in = 0;
        }
    }
}

從拋出異常的條件if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0))看,應(yīng)該就是寫入端的線程已經(jīng)退出了。但我們的寫入端其實是注冊了binder的回調(diào)給到其他進(jìn)程,在binder回調(diào)里面寫入的。難道是binder線程池的線程被回收了導(dǎo)致的?但是從復(fù)現(xiàn)手法來看只有接收到那一個url的post請求會導(dǎo)致斷開,其他url的post請求則PipedInputStream能一直正常工作。按道理binder線程并不會那么快就被回收,而且如果是binder線程回收的原因,應(yīng)該是不會區(qū)分nanohttpd里的url處理才對。

于是我在output.write前添加堆棧打印確認(rèn)pid然后在斷開后用kill -3強制打印進(jìn)程全部線程堆棧到/data/anr目錄,發(fā)現(xiàn)寫入的線程的確在斷開后退出了,但是從堆棧上我看到一些奇怪的東西:

01-31 09:27:54.801 9805 11368 D testtest:   at xx.xx.xx.xx.HttpServer.lambda$vSE0qxyqGQt4NJp3RjyeRZx9JR8(Unknown Source:0)
...
01-31 09:27:54.801 9805 11368 D testtest:   at xx.xx.xx.xx.xx.BinderEventHelper$1.onEvent(BinderEventHelper.java:53)
01-31 09:27:54.801 9805 11368 D testtest:   at xx.xx.xx.xx.binder.aidl.IEventListener$Stub.onTransact(IEventListener.java:63)
01-31 09:27:54.801 9805 11368 D testtest:   at android.os.Binder.execTransactInternal(Binder.java:1179)
01-31 09:27:54.801 9805 11368 D testtest:   at android.os.Binder.execTransact(Binder.java: 1143)
01-31 09:27:54.801 9805 11368 D testtest:   at android.os.BinderProxy.transactNative(Native Method)
01-31 09:27:54.801 9805 11368 D testtest:   at android.os.BinderProxy.transact(BinderProxy.java:571)
01-31 09:27:54.801 9805 11368 D testtest:   at xx.xx.xx.xx.binder.aidl.IServer$Stub$Proxy.sendRequest(IServer.iava:167)
...
01-31 09:27:54.801 9805 11368 D testtest:   at xx.xx.xx.xx.HttpServer.serve(HttpServer.kt:116)
01-31 09:27:54.801 9805 11368 D testtest:   at org.nanohttpd.protocols.http.HTTPSession.execute(HTTPSession.java:142)
01-31 09:27:54.801 9805 11368 D testtest:   at org.nanohttpd.protocols.http.ClientHandler.run(ClientHandler.java:12)
01-31 09:27:54.801 9805 11368 D testtest:   at java.lang.Thread.run(Thread.java:920)

onEvent這個aidl回調(diào)居然不是在binder線程中回調(diào)的,而是在nanohttpd的線程中回調(diào)的。而這條線程則是在DefaultAsyncRunner里面直接new的Thread:

// https://github.com/NanoHttpd/nanohttpd/blob/efb2ebf85a2b06f7c508aba9eaad5377e3a01e81/core/src/main/java/org/nanohttpd/protocols/http/threading/DefaultAsyncRunner.java
public void exec(ClientHandler clientHandler) {
    ++this.requestCount;
    this.running.add(clientHandler);
    createThread(clientHandler).start();
}

protected Thread createThread(ClientHandler clientHandler) {
    Thread t = new Thread(clientHandler);
    t.setDaemon(true);
    t.setName("NanoHttpd Request Processor (#" + this.requestCount + ")");
    return t;
}

這樣的線程在執(zhí)行完Runnable的run方法之后就退出了,無怪乎PipedInputStream.read里面wait 1秒之后會判斷到writeSide線程退出而拋出IOException。

而再看仔細(xì)點,我們其實是在這個線程里使用aidl去調(diào)用其他進(jìn)程的方法,只不過這個方法剛好會引發(fā)aidl的回調(diào),而這個回調(diào)就在同一個線程回到了,所以也只有處理這一個url的請求的時候會引發(fā)IOException,畢竟其他請求不會調(diào)用到這個aidl的方法:

01-31 09:27:54.801 9805 11368 D testtest:   at xx.xx.xx.xx.binder.aidl.IEventListener$Stub.onTransact(IEventListener.java:63) // 這里是service端回調(diào)client端注冊的linstener
01-31 09:27:54.801 9805 11368 D testtest:   at android.os.Binder.execTransactInternal(Binder.java:1179)
01-31 09:27:54.801 9805 11368 D testtest:   at android.os.Binder.execTransact(Binder.java: 1143)
01-31 09:27:54.801 9805 11368 D testtest:   at android.os.BinderProxy.transactNative(Native Method)
01-31 09:27:54.801 9805 11368 D testtest:   at android.os.BinderProxy.transact(BinderProxy.java:571)
01-31 09:27:54.801 9805 11368 D testtest:   at xx.xx.xx.xx.binder.aidl.IServer$Stub$Proxy.sendRequest(IServer.iava:167) // 這里是client端調(diào)用service端的方法

理清楚問題發(fā)生的原因之后解決的方法就很容易想到了,直接用Handler post到主線程去寫入就好。

binder的請求流程

其實從這個結(jié)果來看是挺合常理的,我調(diào)用的方法內(nèi)部會觸發(fā)listener回調(diào),那么這個回調(diào)和調(diào)用的方法在同一個線程執(zhí)行。但問題是現(xiàn)在使用的是aidl跨進(jìn)程調(diào)用的另外一個進(jìn)程的方法,回調(diào)也是另外一個進(jìn)程回調(diào)回來的,安卓是如何實現(xiàn)調(diào)用和回調(diào)在同一個線程的呢?

由于堆棧上只有java的信息,要想解答這個問題我們還需要深入看看BinderProxy.transactNative這個native方法是如何實現(xiàn)跨進(jìn)程調(diào)用的:

// https://cs.android.com/android/platform/superproject/+/android-13.0.0_r74:frameworks/base/core/jni/android_util_Binder.cpp
static jboolean android_os_BinderProxy_transact(JNIEnv* env, jobject obj,
        jint code, jobject dataObj, jobject replyObj, jint flags) // throws RemoteException
{
    ...
    IBinder* target = getBPNativeData(env, obj)->mObject.get();
    ...
    status_t err = target->transact(code, *data, reply, flags);
    ...
}

// https://cs.android.com/android/platform/superproject/+/android-13.0.0_r74:frameworks/native/libs/binder/BpBinder.cpp
status_t BpBinder::transact(
    uint32_t code, const Parcel& data, Parcel* reply, uint32_t flags)
{
    ...
    status = IPCThreadState::self()->transact(binderHandle(), code, data, reply, flags);
    ...
}

// https://cs.android.com/android/platform/superproject/+/android-13.0.0_r74:frameworks/native/libs/binder/IPCThreadState.cpp
status_t IPCThreadState::transact(int32_t handle,
                                  uint32_t code, const Parcel& data,
                                  Parcel* reply, uint32_t flags)
{
    ...
    err = writeTransactionData(BC_TRANSACTION, flags, handle, code, data, nullptr); // 寫入數(shù)據(jù)
    ...
    // 等待響應(yīng)
    if (reply) {
        err = waitForResponse(reply);
    } else {
        Parcel fakeReply;
        err = waitForResponse(&fakeReply);
    }
    ...
}

可以看到BinderProxy.transactNative最終是調(diào)用到IPCThreadState去實現(xiàn)的,從IPCThreadState的名字還有獲取的方式IPCThreadState::self()可以知道它是和線程綁定的。而它的writeTransactionData負(fù)責(zé)發(fā)送請求,時間就是往mOut里面寫入BC_TRANSACTION這個cmd和各種調(diào)用參數(shù)打包成的binder_transaction_data:

// https://cs.android.com/android/platform/superproject/+/android-13.0.0_r74:frameworks/native/libs/binder/IPCThreadState.cpp
status_t IPCThreadState::writeTransactionData(int32_t cmd, uint32_t binderFlags,
    int32_t handle, uint32_t code, const Parcel& data, status_t* statusBuffer)
{
    binder_transaction_data tr;

    tr.target.ptr = 0; /* Don't pass uninitialized stack data to a remote process */
    tr.target.handle = handle;
    tr.code = code;
    tr.flags = binderFlags;
    tr.cookie = 0;
    tr.sender_pid = 0;
    tr.sender_euid = 0;

    const status_t err = data.errorCheck();
    if (err == NO_ERROR) {
        tr.data_size = data.ipcDataSize();
        tr.data.ptr.buffer = data.ipcData();
        tr.offsets_size = data.ipcObjectsCount()*sizeof(binder_size_t);
        tr.data.ptr.offsets = data.ipcObjects();
    } else if (statusBuffer) {
        tr.flags |= TF_STATUS_CODE;
        *statusBuffer = err;
        tr.data_size = sizeof(status_t);
        tr.data.ptr.buffer = reinterpret_cast<uintptr_t>(statusBuffer);
        tr.offsets_size = 0;
        tr.data.ptr.offsets = 0;
    } else {
        return (mLastError = err);
    }

    mOut.writeInt32(cmd);
    mOut.write(&tr, sizeof(tr));

    return NO_ERROR;
}

service端接收到BC_TRANSACTION這個cmd就會回調(diào)到aidl Stub的onTransact然后返回,client則是在IPCThreadState::waitForResponse里面等待返回:

// https://cs.android.com/android/platform/superproject/+/android-13.0.0_r74:frameworks/native/libs/binder/IPCThreadState.cpp
status_t IPCThreadState::waitForResponse(Parcel *reply, status_t *acquireResult)
{
    uint32_t cmd;
    int32_t err;

    while (1) {
        if ((err=talkWithDriver()) < NO_ERROR) break;
        err = mIn.errorCheck();
        if (err < NO_ERROR) break;
        if (mIn.dataAvail() == 0) continue;

        cmd = (uint32_t)mIn.readInt32();

        IF_LOG_COMMANDS() {
            alog << "Processing waitForResponse Command: "
                << getReturnString(cmd) << endl;
        }

        switch (cmd) {
        case BR_ONEWAY_SPAM_SUSPECT:
            ...
        case BR_TRANSACTION_COMPLETE:
            ...
        case BR_DEAD_REPLY:
            ...
        case BR_FAILED_REPLY:
            ...
        case BR_FROZEN_REPLY:
            ...
        case BR_ACQUIRE_RESULT:
            ...
        case BR_REPLY:
            {
                binder_transaction_data tr;
                err = mIn.read(&tr, sizeof(tr));
                ALOG_ASSERT(err == NO_ERROR, "Not enough command data for brREPLY");
                if (err != NO_ERROR) goto finish;

                if (reply) {
                    if ((tr.flags & TF_STATUS_CODE) == 0) {
                        reply->ipcSetDataReference(
                            reinterpret_cast<const uint8_t*>(tr.data.ptr.buffer),
                            tr.data_size,
                            reinterpret_cast<const binder_size_t*>(tr.data.ptr.offsets),
                            tr.offsets_size/sizeof(binder_size_t),
                            freeBuffer);
                    } else {
                        err = *reinterpret_cast<const status_t*>(tr.data.ptr.buffer);
                        freeBuffer(nullptr,
                            reinterpret_cast<const uint8_t*>(tr.data.ptr.buffer),
                            tr.data_size,
                            reinterpret_cast<const binder_size_t*>(tr.data.ptr.offsets),
                            tr.offsets_size/sizeof(binder_size_t));
                    }
                } else {
                    freeBuffer(nullptr,
                        reinterpret_cast<const uint8_t*>(tr.data.ptr.buffer),
                        tr.data_size,
                        reinterpret_cast<const binder_size_t*>(tr.data.ptr.offsets),
                        tr.offsets_size/sizeof(binder_size_t));
                    continue;
                }
            }
            goto finish;

        default:
            err = executeCommand(cmd);
            if (err != NO_ERROR) goto finish;
            break;
        }
    }
    ...
}

正常情況會直接讀取到BR_REPLY,然后就填充返回數(shù)據(jù)回到j(luò)ava層,但是如果service端的onTransact里面又回調(diào)回client端的方法,由于IPCThreadState::self()會獲取到同一個IPCThreadState,然后寫入的cmd是BC_TRANSACTION而不是響應(yīng)的BR_REPLY。這個時候就會去到default的executeCommand里面處理回調(diào)到client端listener Stub的onTransact:

// https://cs.android.com/android/platform/superproject/+/android-13.0.0_r74:frameworks/native/libs/binder/IPCThreadState.cpp
status_t IPCThreadState::executeCommand(int32_t cmd)
{
    BBinder* obj;
    RefBase::weakref_type* refs;
    status_t result = NO_ERROR;

    switch ((uint32_t)cmd) {
    ...
    case BR_TRANSACTION:
        {
            ...

        }
        break;
    ...
    default:
        ALOGE("*** BAD COMMAND %d received from Binder driver\n", cmd);
        result = UNKNOWN_ERROR;
        break;
    }

    if (result != NO_ERROR) {
        mLastError = result;
    }

    return result;
}

所以整個流程就是在IPCThreadState::transact里面調(diào)用service端方法,然后讀取service端的返回發(fā)現(xiàn)讀取出來的是回調(diào)client的listener于是進(jìn)行回調(diào)。這樣一來的確調(diào)用service端方法和回調(diào)client的listener都是在IPCThreadState::transact這個函數(shù)里面完成的就在同一個線程。

而處理完listener的回調(diào)之后由于waitForResponse里面是個while(1),所以就會繼續(xù)去讀取調(diào)用service端方法的返回了:

// https://cs.android.com/android/platform/superproject/+/android-13.0.0_r74:frameworks/native/libs/binder/IPCThreadState.cpp
status_t IPCThreadState::waitForResponse(Parcel *reply, status_t *acquireResult)
{
    uint32_t cmd;
    int32_t err;

    while (1) { // 這里是個while(1)
        ...
        switch (cmd) {
        ...
        case BR_REPLY:
            {
                ... // 2.再讀取service方法的返回
            }
            goto finish; // 3.最后退出waitForResponse

        default:
            err = executeCommand(cmd); // 1.先處理BR_TRANSACTION回調(diào)client的listener
            if (err != NO_ERROR) goto finish;
            break;
        }
    }
    ...
}

順帶一提,service端binder線程池里最終也是通過executeCommand去處理BC_TRANSACTION這個cmd的:

// https://cs.android.com/android/platform/superproject/+/android-13.0.0_r74:frameworks/native/libs/binder/ProcessState.cpp
class PoolThread : public Thread
{
public:
    explicit PoolThread(bool isMain)
        : mIsMain(isMain)
    {
    }

protected:
    virtual bool threadLoop()
    {
        IPCThreadState::self()->joinThreadPool(mIsMain);
        return false;
    }

    const bool mIsMain;
};

// https://cs.android.com/android/platform/superproject/+/android-13.0.0_r74:frameworks/native/libs/binder/IPCThreadState.cpp
void IPCThreadState::joinThreadPool(bool isMain)
{
    ...
    status_t result;
    do {
        ...
        result = getAndExecuteCommand();
        ...
        // Let this thread exit the thread pool if it is no longer
        // needed and it is not the main process thread.
        if(result == TIMED_OUT && !isMain) {
            break;
        }
    } while (result != -ECONNREFUSED && result != -EBADF);
    ...
}

status_t IPCThreadState::getAndExecuteCommand()
{
    ...
    result = executeCommand(cmd);
    ...
}

再結(jié)合我之前的另外一篇binder機制深入探究博客對數(shù)據(jù)傳輸?shù)姆治?整個binder的跨進(jìn)程請求流程就清晰了。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容