Android中的Looper與epoll
眾所周知,Android的消息隊列是通過Looper實現(xiàn)的,但是這與pipe有什么關(guān)系,為什么會用到epoll這樣的Linux IO多路復(fù)用機制呢。
前言
在應(yīng)用程序進程的入口函數(shù)ActivityThread的main函數(shù)中,會調(diào)用Looper的prepareMainLooper方法最終實例化一個MessageQueue對象,而MessageQueue.java的構(gòu)造函數(shù)中會通過nativeinit調(diào)用到j(luò)ni層方法,最終生成C++層的Looper對象(代碼在frameworks/base/libs/utils/Looper.cpp)
讀
在Looper.cpp的構(gòu)造函數(shù)中:
Looper::Looper(bool allowNonCallbacks) :
mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
int wakeFds[2];
int result = pipe(wakeFds);
mWakeReadPipeFd = wakeFds[0];
mWakeWritePipeFd = wakeFds[1];
result = fcntl(mWakeReadPipeFd, F_SETFL, O_NONBLOCK);
result = fcntl(mWakeWritePipeFd, F_SETFL, O_NONBLOCK);
// Allocate the epoll instance and register the wake pipe.
mEpollFd = epoll_create(EPOLL_SIZE_HINT);
struct epoll_event eventItem;
// zero out unused members of data field union
memset(& eventItem, 0, sizeof(epoll_event));
eventItem.events = EPOLLIN;
eventItem.data.fd = mWakeReadPipeFd;
result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem);
}
首先通過管道pipe創(chuàng)建了讀端與寫端兩個文件描述符,最后通過epoll_create創(chuàng)建epoll專用文件描述符,最后通過epoll_ctl告訴mEpollFd需要監(jiān)控mWakeReadPipeFd描述符的EPOLLIN事件。
看到這里會有疑問了,為什么單獨監(jiān)控一個mWakeReadPipeFd描述符需要用到epoll呢,使用recvform這樣的同步阻塞IO就行了呢。實際上Looper的功能是強大的,它提供了addfd的方法,外部可調(diào)用該方法動態(tài)添加需要監(jiān)控的描述符與回調(diào),Android的Choreographer中就是通過該方法監(jiān)控VSYNC事件的。
在Java層的MessageQueue的next方法中:
int nextPollTimeoutMillis = 0;
for (;;) {
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}
nativePollOnce(ptr, nextPollTimeoutMillis);
synchronized (this) {
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
if (msg != null && msg.target == null) {
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
if (msg != null) {
if (now < msg.when) {
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
if (DEBUG) Log.v(TAG, "Returning message: " + msg);
msg.markInUse();
return msg;
}
} else {
nextPollTimeoutMillis = -1;
}
...
}
//nativePollOnce是個jni調(diào)用,在管道內(nèi)無消息時有可能導(dǎo)致線程阻塞
//首次傳入nativePollOnce的超時時間為0,表示不阻塞立即返回
//mMessages保存了下一條Message,如果存在直接返回,并將mMessages設(shè)置為當(dāng)前Message.next,下次調(diào)用next可直接返回
//如果沒有消息則設(shè)置超時時間為-1,表示無限期阻塞等待管道中有新的消息寫入。
其中的nativePollOnce方法,會通過jni調(diào)用進入Looper.cpp的pollInner方法中:
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
//這里通過epoll_wait調(diào)用,當(dāng)mEpollFd監(jiān)控的文件描述符沒有IO事件發(fā)生時,線程會阻塞住且不會被CPU分配時間片了,如果有IO事件發(fā)生或事件超時了,該方法會返回了。
for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeReadPipeFd) {
if (epollEvents & EPOLLIN) {
awoken();
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents);
}
} else {
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
int events = 0;
if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP;
pushResponse(events, mRequests.valueAt(requestIndex));
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
"no longer registered.", epollEvents, fd);
}
}
}
//遍歷返回的消息,如果是管道m(xù)WakeReadPipeFd的消息,調(diào)用awoken();
//如果是其他描述符的消息,pushResponse進集合稍后執(zhí)行,也就是通過addfd方法添加的描述符;
//另外Looper.cpp也支持sendmessage方法,會在Response之前執(zhí)行。
而awoken方法:
void Looper::awoken() {
char buffer[16];
ssize_t nRead;
do {
nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer));
} while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer));
}
//該方法只是將管道中的內(nèi)容清空
寫
我們在應(yīng)用中通過handler的sendmessage想消息隊列發(fā)送消息,最終會調(diào)用到MessageQueue.java的enqueueMessage方法:
if (p == null || when == 0 || when < p.when) {
// New head, wake up the event queue if blocked.
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
// Inserted within the middle of the queue. Usually we don't have to wake
// up the event queue unless there is a barrier at the head of the queue
// and the message is the earliest asynchronous message in the queue.
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}
// We can assume mPtr != 0 because mQuitting is false.
if (needWake) {
nativeWake(mPtr);
}
//根據(jù)注釋也可以知道,這里分兩種情況
//第一是消息隊列為空,第二是消息隊列已經(jīng)有新消息時,需要適當(dāng)?shù)奈恢梅胖孟ⅰ?//當(dāng)線程needWake時,說明當(dāng)前線程掛起,需要喚醒線程,會調(diào)用nativeWake方法,最終調(diào)用到Looper.cpp的wake方法。
wake方法:
void Looper::wake() {
ssize_t nWrite;
do {
nWrite = write(mWakeWritePipeFd, "W", 1);
} while (nWrite == -1 && errno == EINTR);
if (nWrite != 1) {
if (errno != EAGAIN) {
ALOGW("Could not write wake signal, errno=%d", errno);
}
}
}
//只是往管道寫端寫入W,喚醒當(dāng)前應(yīng)用主線程。
總結(jié)
Android的應(yīng)用層通過Message.java實現(xiàn)隊列,利用管道和epoll機制實現(xiàn)線程狀態(tài)的管理,配合起來實現(xiàn)了Android主線程的消息隊列模型,而這只是Android的一部分。epoll機制通過Looper.cpp的addfd實現(xiàn)了對其他描述符的監(jiān)聽,在4.1版本之后應(yīng)用程序會等待VSYNC信號發(fā)起View的繪制操作,就是通過它實現(xiàn)的,通過分析SurfaceFlinger和Choreographer,可以很好的理解Android的繪制流程。。。