前段時間遇到個蠻有意思的bug,趁著春節(jié)有空刨根問底記錄下來。
背景是我們有個調(diào)試工具運行在安卓機器上用于局域網(wǎng)內(nèi)遠(yuǎn)程調(diào)試機器,其實就是用nanohttpd接收http的請求進(jìn)行轉(zhuǎn)發(fā)。
其中有個功能是通過sse的方式實現(xiàn)往client的轉(zhuǎn)發(fā)通知,我們的實現(xiàn)方式是client調(diào)用get請求,server返回一個text/event-stream類型的無限大response,它的body是PipedInputStream類型,當(dāng)有消息需要轉(zhuǎn)發(fā)的時候就往對應(yīng)的PipedOutputStream寫入:
private fun onCreateSse(uuid: String): Response {
val input = PipedInputStream()
val output = PipedOutputStream(input)
val handler = ISseHandler { event, data ->
try {
output.write("event: $event\ndata: ${data}\n\n".toByteArray())
} catch (e: IOException) {
notifyHandlers.remove(uuid)
}
}
notifyHandlers[uuid] = handler
return newFixedLengthResponse(Response.Status.OK, "text/event-stream", input, -1)
.apply {
addHeader("Cache-Control", "no-cache")
addHeader("Access-Control-Allow-Origin", "*")
}
}
但是有開發(fā)發(fā)現(xiàn)這個sse總是會在接收到某一個url的post請求后幾秒內(nèi)因為IOException而斷開,一開始我以為是http server和client的socket io斷開了,仔細(xì)看異常才發(fā)現(xiàn)是PipedInputStream.read拋出的IOException("Pipe broken"):
// https://cs.android.com/android/platform/superproject/+/android-13.0.0_r74:libcore/ojluni/src/main/java/java/io/PipedInputStream.java
public synchronized int read() throws IOException {
...
readSide = Thread.currentThread();
int trials = 2;
while (in < 0) {
...
if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
throw new IOException("Pipe broken"); //就是這里拋出異常了!!!
}
...
wait(1000);
...
}
int ret = buffer[out++] & 0xFF;
if (out >= buffer.length) {
out = 0;
}
if (in == out) {
/* now empty */
in = -1;
}
return ret;
}
從上面的代碼可以看到PipedInputStream的原理其實很暴力,不斷wait 1秒等待PipedOutputStream寫入數(shù)據(jù),當(dāng)數(shù)據(jù)寫入的時候會執(zhí)行到PipedInputStream.receive,這里面會賦值writeSide和in:
// https://cs.android.com/android/platform/superproject/+/android-13.0.0_r74:libcore/ojluni/src/main/java/java/io/PipedInputStream.java
synchronized void receive(byte b[], int off, int len) throws IOException {
checkStateForReceive();
writeSide = Thread.currentThread();
int bytesToTransfer = len;
while (bytesToTransfer > 0) {
if (in == out)
awaitSpace();
int nextTransferAmount = 0;
if (out < in) {
nextTransferAmount = buffer.length - in;
} else if (in < out) {
if (in == -1) {
in = out = 0;
nextTransferAmount = buffer.length - in;
} else {
nextTransferAmount = out - in;
}
}
if (nextTransferAmount > bytesToTransfer)
nextTransferAmount = bytesToTransfer;
assert(nextTransferAmount > 0);
System.arraycopy(b, off, buffer, in, nextTransferAmount);
bytesToTransfer -= nextTransferAmount;
off += nextTransferAmount;
in += nextTransferAmount;
if (in >= buffer.length) {
in = 0;
}
}
}
從拋出異常的條件if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0))看,應(yīng)該就是寫入端的線程已經(jīng)退出了。但我們的寫入端其實是注冊了binder的回調(diào)給到其他進(jìn)程,在binder回調(diào)里面寫入的。難道是binder線程池的線程被回收了導(dǎo)致的?但是從復(fù)現(xiàn)手法來看只有接收到那一個url的post請求會導(dǎo)致斷開,其他url的post請求則PipedInputStream能一直正常工作。按道理binder線程并不會那么快就被回收,而且如果是binder線程回收的原因,應(yīng)該是不會區(qū)分nanohttpd里的url處理才對。
于是我在output.write前添加堆棧打印確認(rèn)pid然后在斷開后用kill -3強制打印進(jìn)程全部線程堆棧到/data/anr目錄,發(fā)現(xiàn)寫入的線程的確在斷開后退出了,但是從堆棧上我看到一些奇怪的東西:
01-31 09:27:54.801 9805 11368 D testtest: at xx.xx.xx.xx.HttpServer.lambda$vSE0qxyqGQt4NJp3RjyeRZx9JR8(Unknown Source:0)
...
01-31 09:27:54.801 9805 11368 D testtest: at xx.xx.xx.xx.xx.BinderEventHelper$1.onEvent(BinderEventHelper.java:53)
01-31 09:27:54.801 9805 11368 D testtest: at xx.xx.xx.xx.binder.aidl.IEventListener$Stub.onTransact(IEventListener.java:63)
01-31 09:27:54.801 9805 11368 D testtest: at android.os.Binder.execTransactInternal(Binder.java:1179)
01-31 09:27:54.801 9805 11368 D testtest: at android.os.Binder.execTransact(Binder.java: 1143)
01-31 09:27:54.801 9805 11368 D testtest: at android.os.BinderProxy.transactNative(Native Method)
01-31 09:27:54.801 9805 11368 D testtest: at android.os.BinderProxy.transact(BinderProxy.java:571)
01-31 09:27:54.801 9805 11368 D testtest: at xx.xx.xx.xx.binder.aidl.IServer$Stub$Proxy.sendRequest(IServer.iava:167)
...
01-31 09:27:54.801 9805 11368 D testtest: at xx.xx.xx.xx.HttpServer.serve(HttpServer.kt:116)
01-31 09:27:54.801 9805 11368 D testtest: at org.nanohttpd.protocols.http.HTTPSession.execute(HTTPSession.java:142)
01-31 09:27:54.801 9805 11368 D testtest: at org.nanohttpd.protocols.http.ClientHandler.run(ClientHandler.java:12)
01-31 09:27:54.801 9805 11368 D testtest: at java.lang.Thread.run(Thread.java:920)
onEvent這個aidl回調(diào)居然不是在binder線程中回調(diào)的,而是在nanohttpd的線程中回調(diào)的。而這條線程則是在DefaultAsyncRunner里面直接new的Thread:
// https://github.com/NanoHttpd/nanohttpd/blob/efb2ebf85a2b06f7c508aba9eaad5377e3a01e81/core/src/main/java/org/nanohttpd/protocols/http/threading/DefaultAsyncRunner.java
public void exec(ClientHandler clientHandler) {
++this.requestCount;
this.running.add(clientHandler);
createThread(clientHandler).start();
}
protected Thread createThread(ClientHandler clientHandler) {
Thread t = new Thread(clientHandler);
t.setDaemon(true);
t.setName("NanoHttpd Request Processor (#" + this.requestCount + ")");
return t;
}
這樣的線程在執(zhí)行完Runnable的run方法之后就退出了,無怪乎PipedInputStream.read里面wait 1秒之后會判斷到writeSide線程退出而拋出IOException。
而再看仔細(xì)點,我們其實是在這個線程里使用aidl去調(diào)用其他進(jìn)程的方法,只不過這個方法剛好會引發(fā)aidl的回調(diào),而這個回調(diào)就在同一個線程回到了,所以也只有處理這一個url的請求的時候會引發(fā)IOException,畢竟其他請求不會調(diào)用到這個aidl的方法:
01-31 09:27:54.801 9805 11368 D testtest: at xx.xx.xx.xx.binder.aidl.IEventListener$Stub.onTransact(IEventListener.java:63) // 這里是service端回調(diào)client端注冊的linstener
01-31 09:27:54.801 9805 11368 D testtest: at android.os.Binder.execTransactInternal(Binder.java:1179)
01-31 09:27:54.801 9805 11368 D testtest: at android.os.Binder.execTransact(Binder.java: 1143)
01-31 09:27:54.801 9805 11368 D testtest: at android.os.BinderProxy.transactNative(Native Method)
01-31 09:27:54.801 9805 11368 D testtest: at android.os.BinderProxy.transact(BinderProxy.java:571)
01-31 09:27:54.801 9805 11368 D testtest: at xx.xx.xx.xx.binder.aidl.IServer$Stub$Proxy.sendRequest(IServer.iava:167) // 這里是client端調(diào)用service端的方法
理清楚問題發(fā)生的原因之后解決的方法就很容易想到了,直接用Handler post到主線程去寫入就好。
binder的請求流程
其實從這個結(jié)果來看是挺合常理的,我調(diào)用的方法內(nèi)部會觸發(fā)listener回調(diào),那么這個回調(diào)和調(diào)用的方法在同一個線程執(zhí)行。但問題是現(xiàn)在使用的是aidl跨進(jìn)程調(diào)用的另外一個進(jìn)程的方法,回調(diào)也是另外一個進(jìn)程回調(diào)回來的,安卓是如何實現(xiàn)調(diào)用和回調(diào)在同一個線程的呢?
由于堆棧上只有java的信息,要想解答這個問題我們還需要深入看看BinderProxy.transactNative這個native方法是如何實現(xiàn)跨進(jìn)程調(diào)用的:
// https://cs.android.com/android/platform/superproject/+/android-13.0.0_r74:frameworks/base/core/jni/android_util_Binder.cpp
static jboolean android_os_BinderProxy_transact(JNIEnv* env, jobject obj,
jint code, jobject dataObj, jobject replyObj, jint flags) // throws RemoteException
{
...
IBinder* target = getBPNativeData(env, obj)->mObject.get();
...
status_t err = target->transact(code, *data, reply, flags);
...
}
// https://cs.android.com/android/platform/superproject/+/android-13.0.0_r74:frameworks/native/libs/binder/BpBinder.cpp
status_t BpBinder::transact(
uint32_t code, const Parcel& data, Parcel* reply, uint32_t flags)
{
...
status = IPCThreadState::self()->transact(binderHandle(), code, data, reply, flags);
...
}
// https://cs.android.com/android/platform/superproject/+/android-13.0.0_r74:frameworks/native/libs/binder/IPCThreadState.cpp
status_t IPCThreadState::transact(int32_t handle,
uint32_t code, const Parcel& data,
Parcel* reply, uint32_t flags)
{
...
err = writeTransactionData(BC_TRANSACTION, flags, handle, code, data, nullptr); // 寫入數(shù)據(jù)
...
// 等待響應(yīng)
if (reply) {
err = waitForResponse(reply);
} else {
Parcel fakeReply;
err = waitForResponse(&fakeReply);
}
...
}
可以看到BinderProxy.transactNative最終是調(diào)用到IPCThreadState去實現(xiàn)的,從IPCThreadState的名字還有獲取的方式IPCThreadState::self()可以知道它是和線程綁定的。而它的writeTransactionData負(fù)責(zé)發(fā)送請求,時間就是往mOut里面寫入BC_TRANSACTION這個cmd和各種調(diào)用參數(shù)打包成的binder_transaction_data:
// https://cs.android.com/android/platform/superproject/+/android-13.0.0_r74:frameworks/native/libs/binder/IPCThreadState.cpp
status_t IPCThreadState::writeTransactionData(int32_t cmd, uint32_t binderFlags,
int32_t handle, uint32_t code, const Parcel& data, status_t* statusBuffer)
{
binder_transaction_data tr;
tr.target.ptr = 0; /* Don't pass uninitialized stack data to a remote process */
tr.target.handle = handle;
tr.code = code;
tr.flags = binderFlags;
tr.cookie = 0;
tr.sender_pid = 0;
tr.sender_euid = 0;
const status_t err = data.errorCheck();
if (err == NO_ERROR) {
tr.data_size = data.ipcDataSize();
tr.data.ptr.buffer = data.ipcData();
tr.offsets_size = data.ipcObjectsCount()*sizeof(binder_size_t);
tr.data.ptr.offsets = data.ipcObjects();
} else if (statusBuffer) {
tr.flags |= TF_STATUS_CODE;
*statusBuffer = err;
tr.data_size = sizeof(status_t);
tr.data.ptr.buffer = reinterpret_cast<uintptr_t>(statusBuffer);
tr.offsets_size = 0;
tr.data.ptr.offsets = 0;
} else {
return (mLastError = err);
}
mOut.writeInt32(cmd);
mOut.write(&tr, sizeof(tr));
return NO_ERROR;
}
service端接收到BC_TRANSACTION這個cmd就會回調(diào)到aidl Stub的onTransact然后返回,client則是在IPCThreadState::waitForResponse里面等待返回:
// https://cs.android.com/android/platform/superproject/+/android-13.0.0_r74:frameworks/native/libs/binder/IPCThreadState.cpp
status_t IPCThreadState::waitForResponse(Parcel *reply, status_t *acquireResult)
{
uint32_t cmd;
int32_t err;
while (1) {
if ((err=talkWithDriver()) < NO_ERROR) break;
err = mIn.errorCheck();
if (err < NO_ERROR) break;
if (mIn.dataAvail() == 0) continue;
cmd = (uint32_t)mIn.readInt32();
IF_LOG_COMMANDS() {
alog << "Processing waitForResponse Command: "
<< getReturnString(cmd) << endl;
}
switch (cmd) {
case BR_ONEWAY_SPAM_SUSPECT:
...
case BR_TRANSACTION_COMPLETE:
...
case BR_DEAD_REPLY:
...
case BR_FAILED_REPLY:
...
case BR_FROZEN_REPLY:
...
case BR_ACQUIRE_RESULT:
...
case BR_REPLY:
{
binder_transaction_data tr;
err = mIn.read(&tr, sizeof(tr));
ALOG_ASSERT(err == NO_ERROR, "Not enough command data for brREPLY");
if (err != NO_ERROR) goto finish;
if (reply) {
if ((tr.flags & TF_STATUS_CODE) == 0) {
reply->ipcSetDataReference(
reinterpret_cast<const uint8_t*>(tr.data.ptr.buffer),
tr.data_size,
reinterpret_cast<const binder_size_t*>(tr.data.ptr.offsets),
tr.offsets_size/sizeof(binder_size_t),
freeBuffer);
} else {
err = *reinterpret_cast<const status_t*>(tr.data.ptr.buffer);
freeBuffer(nullptr,
reinterpret_cast<const uint8_t*>(tr.data.ptr.buffer),
tr.data_size,
reinterpret_cast<const binder_size_t*>(tr.data.ptr.offsets),
tr.offsets_size/sizeof(binder_size_t));
}
} else {
freeBuffer(nullptr,
reinterpret_cast<const uint8_t*>(tr.data.ptr.buffer),
tr.data_size,
reinterpret_cast<const binder_size_t*>(tr.data.ptr.offsets),
tr.offsets_size/sizeof(binder_size_t));
continue;
}
}
goto finish;
default:
err = executeCommand(cmd);
if (err != NO_ERROR) goto finish;
break;
}
}
...
}
正常情況會直接讀取到BR_REPLY,然后就填充返回數(shù)據(jù)回到j(luò)ava層,但是如果service端的onTransact里面又回調(diào)回client端的方法,由于IPCThreadState::self()會獲取到同一個IPCThreadState,然后寫入的cmd是BC_TRANSACTION而不是響應(yīng)的BR_REPLY。這個時候就會去到default的executeCommand里面處理回調(diào)到client端listener Stub的onTransact:
// https://cs.android.com/android/platform/superproject/+/android-13.0.0_r74:frameworks/native/libs/binder/IPCThreadState.cpp
status_t IPCThreadState::executeCommand(int32_t cmd)
{
BBinder* obj;
RefBase::weakref_type* refs;
status_t result = NO_ERROR;
switch ((uint32_t)cmd) {
...
case BR_TRANSACTION:
{
...
}
break;
...
default:
ALOGE("*** BAD COMMAND %d received from Binder driver\n", cmd);
result = UNKNOWN_ERROR;
break;
}
if (result != NO_ERROR) {
mLastError = result;
}
return result;
}
所以整個流程就是在IPCThreadState::transact里面調(diào)用service端方法,然后讀取service端的返回發(fā)現(xiàn)讀取出來的是回調(diào)client的listener于是進(jìn)行回調(diào)。這樣一來的確調(diào)用service端方法和回調(diào)client的listener都是在IPCThreadState::transact這個函數(shù)里面完成的就在同一個線程。
而處理完listener的回調(diào)之后由于waitForResponse里面是個while(1),所以就會繼續(xù)去讀取調(diào)用service端方法的返回了:
// https://cs.android.com/android/platform/superproject/+/android-13.0.0_r74:frameworks/native/libs/binder/IPCThreadState.cpp
status_t IPCThreadState::waitForResponse(Parcel *reply, status_t *acquireResult)
{
uint32_t cmd;
int32_t err;
while (1) { // 這里是個while(1)
...
switch (cmd) {
...
case BR_REPLY:
{
... // 2.再讀取service方法的返回
}
goto finish; // 3.最后退出waitForResponse
default:
err = executeCommand(cmd); // 1.先處理BR_TRANSACTION回調(diào)client的listener
if (err != NO_ERROR) goto finish;
break;
}
}
...
}
順帶一提,service端binder線程池里最終也是通過executeCommand去處理BC_TRANSACTION這個cmd的:
// https://cs.android.com/android/platform/superproject/+/android-13.0.0_r74:frameworks/native/libs/binder/ProcessState.cpp
class PoolThread : public Thread
{
public:
explicit PoolThread(bool isMain)
: mIsMain(isMain)
{
}
protected:
virtual bool threadLoop()
{
IPCThreadState::self()->joinThreadPool(mIsMain);
return false;
}
const bool mIsMain;
};
// https://cs.android.com/android/platform/superproject/+/android-13.0.0_r74:frameworks/native/libs/binder/IPCThreadState.cpp
void IPCThreadState::joinThreadPool(bool isMain)
{
...
status_t result;
do {
...
result = getAndExecuteCommand();
...
// Let this thread exit the thread pool if it is no longer
// needed and it is not the main process thread.
if(result == TIMED_OUT && !isMain) {
break;
}
} while (result != -ECONNREFUSED && result != -EBADF);
...
}
status_t IPCThreadState::getAndExecuteCommand()
{
...
result = executeCommand(cmd);
...
}
再結(jié)合我之前的另外一篇binder機制深入探究博客對數(shù)據(jù)傳輸?shù)姆治?整個binder的跨進(jìn)程請求流程就清晰了。