Android 源碼學(xué)習(xí) -- 底層多線程task設(shè)計(jì)

以前工作多偏重于業(yè)務(wù)邏輯,較少關(guān)注到底層的邏輯實(shí)現(xiàn),自己寫(xiě)內(nèi)部工具中時(shí)也比較隨意,遇到用多線程的地方都是臨時(shí)起一個(gè)線程處理耗時(shí)的復(fù)雜任務(wù),任務(wù)結(jié)束后,自動(dòng)被主線程回收。線程的頻繁創(chuàng)建銷(xiāo)毀其實(shí)是存在很大開(kāi)銷(xiāo)的。下面的這種設(shè)計(jì)使用方式可以更高效的利用multi-thread

MessageLoopThread

循環(huán)隊(duì)列任務(wù)處理機(jī)制簡(jiǎn)單說(shuō)明:

  • 一個(gè)持續(xù)運(yùn)行的Thread(while (1) { do tasks})
  • 一個(gè) task queue (如果用的不是線程安全的標(biāo)準(zhǔn)數(shù)據(jù)結(jié)構(gòu),則需要另外加鎖來(lái)保證)
  • task queue中存放的是 類(lèi)似于 C++ Functor的對(duì)象,或者簡(jiǎn)單理解就是task function的一段代碼
  • 其他線程會(huì)將耗時(shí)任務(wù)封裝成task,加入隊(duì)列中
  • Thread 運(yùn)行時(shí),按queue中順序依次執(zhí)行其中的task function
    上面有一個(gè)很重要解決的問(wèn)題是,需要把不同的邏輯處理函數(shù) 封裝成統(tǒng)一的task function,答案就是Android里面使用了chromium的base::Bind來(lái)實(shí)現(xiàn)統(tǒng)一的封裝

可以參考關(guān)于std::bind 來(lái)理解這個(gè)base::Bind作用,實(shí)際作用有些像python的functools 里面的輔助函數(shù),也有些類(lèi)似python里的 wrapper裝飾器,簡(jiǎn)單理解就是把一個(gè)函數(shù)固化入?yún)⒑螅梢粋€(gè)新的函數(shù)。

  • 下面舉例說(shuō)明:
static bt_status_t btif_gatts_add_service(int server_if,
                                          const btgatt_db_element_t* service,
                                          size_t service_count) {
  CHECK_BTGATT_INIT();
  return do_in_jni_thread(FROM_HERE,
                          Bind(&add_service_impl, server_if,
                               std::vector(service, service + service_count)));
}

static void add_service_impl(int server_if,
                             vector<btgatt_db_element_t> service) {
  if (service[0].uuid == Uuid::From16Bit(UUID_SERVCLASS_GATT_SERVER) ||
      service[0].uuid == Uuid::From16Bit(UUID_SERVCLASS_GAP_SERVER)) {
    LOG_ERROR("%s: Attept to register restricted service", __func__);
    HAL_CBACK(bt_gatt_callbacks, server->service_added_cb, BT_STATUS_FAIL,
              server_if, service.data(), service.size());
    return;
  }

  BTA_GATTS_AddService(
      server_if, service,
      jni_thread_wrapper(FROM_HERE, base::Bind(&on_service_added_cb)));
}

bt_status_t do_in_jni_thread(const base::Location& from_here,
                             base::OnceClosure task) {
  if (!jni_thread.DoInThread(from_here, std::move(task))) {
    LOG(ERROR) << __func__ << ": Post task to task runner failed!";
    return BT_STATUS_FAIL;
  }
  return BT_STATUS_SUCCESS;
}

static MessageLoopThread jni_thread("bt_jni_thread");

bool MessageLoopThread::DoInThread(const base::Location& from_here,
                                   base::OnceClosure task) {
  return DoInThreadDelayed(from_here, std::move(task), base::TimeDelta());
}

bool MessageLoopThread::DoInThreadDelayed(const base::Location& from_here,
                                          base::OnceClosure task,
                                          const base::TimeDelta& delay) {
  std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
  if (is_main_ && init_flags::gd_rust_is_enabled()) {
    if (rust_thread_ == nullptr) {
      LOG(ERROR) << __func__ << ": rust thread is null for thread " << *this
                 << ", from " << from_here.ToString();
      return false;
    }

    shim::rust::main_message_loop_thread_do_delayed(
        **rust_thread_,
        std::make_unique<shim::rust::OnceClosure>(std::move(task)),
        delay.InMilliseconds());
    return true;
  }

上面這段代碼btif_gatts_add_service 就是將函數(shù)add_service_impl通過(guò)Bind入?yún)⒐袒筠D(zhuǎn)換成統(tǒng)一的OnceClosure task,放到bt_jni_thread中

這里需要注意的一點(diǎn)shim::rust::main_message_loop_thread_do_delayed最終調(diào)用的是Rust實(shí)現(xiàn),這里似乎是Android在新版本里最終底層多線程的處理都改用Rust實(shí)現(xiàn)了,目前尚不熟悉Rust語(yǔ)言和C++的調(diào)用Rust處理,對(duì)應(yīng)rust文件message_loop_thread.rs,據(jù)說(shuō)是Rust相比C++在多線程處理上使用起來(lái)更方便高效;DoInThreadDelayed函數(shù)也有相關(guān)mock實(shí)現(xiàn),可以從mock實(shí)現(xiàn)來(lái)理解上面所說(shuō)的細(xì)節(jié),這里就不列舉出來(lái)了

  • 下面這段代碼的設(shè)計(jì)也很巧妙,可以在實(shí)際應(yīng)用中借鑒:
static void add_service_impl(int server_if,
                             vector<btgatt_db_element_t> service) {
  if (service[0].uuid == Uuid::From16Bit(UUID_SERVCLASS_GATT_SERVER) ||
      service[0].uuid == Uuid::From16Bit(UUID_SERVCLASS_GAP_SERVER)) {
    LOG_ERROR("%s: Attept to register restricted service", __func__);
    HAL_CBACK(bt_gatt_callbacks, server->service_added_cb, BT_STATUS_FAIL,
              server_if, service.data(), service.size());
    return;
  }

  BTA_GATTS_AddService(
      server_if, service,
      jni_thread_wrapper(FROM_HERE, base::Bind(&on_service_added_cb)));
}

extern void BTA_GATTS_AddService(tGATT_IF server_if,
                                 std::vector<btgatt_db_element_t> service,
                                 BTA_GATTS_AddServiceCb cb) {
  do_in_main_thread(FROM_HERE,
                    base::Bind(&bta_gatts_add_service_impl, server_if,
                               std::move(service), std::move(cb)));
}

template <typename R, typename... Args>
base::Callback<R(Args...)> jni_thread_wrapper(const base::Location& from_here,
                                              base::Callback<R(Args...)> cb) {
  return base::Bind(
      [](const base::Location& from_here, base::Callback<R(Args...)> cb,
         Args... args) {
        do_in_jni_thread(from_here,
                         base::Bind(cb, std::forward<Args>(args)...));
      },
      from_here, std::move(cb));
}

static void on_service_added_cb(tGATT_STATUS status, int server_if,
                                vector<btgatt_db_element_t> service) {
  HAL_CBACK(bt_gatt_callbacks, server->service_added_cb, status, server_if,
            service.data(), service.size());
}

有兩點(diǎn):

  • 用了兩個(gè)線程來(lái)完成一個(gè)task,準(zhǔn)確的說(shuō)是一個(gè)線程完成主體任務(wù)處理,另一個(gè)線程完成任務(wù)結(jié)果callback處理,這樣主體任務(wù)線程的處理不會(huì)因?yàn)閱我蝗蝿?wù)callback而阻塞 (這個(gè)比較經(jīng)典的場(chǎng)景就是在UI設(shè)計(jì)里,以前最開(kāi)始使用Qt做tool時(shí)有遇到:后臺(tái)處理,前臺(tái)刷新邏輯做線程分離,后臺(tái)處理不會(huì)導(dǎo)致UI界面卡住,UI界面重繪不會(huì)導(dǎo)致后臺(tái)任務(wù)阻塞,甚至復(fù)雜處理邏輯可以使用更多的線程來(lái)保證)
  • jni_thread_wrapper中使用了Bind和C++ 匿名函數(shù)對(duì)callback函數(shù)做統(tǒng)一封裝,實(shí)際處理時(shí)調(diào)用cb.Run(GATT_ERROR, server_if, std::move(service));
void bta_gatts_add_service_impl(tGATT_IF server_if,
                                std::vector<btgatt_db_element_t> service,
                                BTA_GATTS_AddServiceCb cb) {
  uint8_t rcb_idx =
      bta_gatts_find_app_rcb_idx_by_app_if(&bta_gatts_cb, server_if);

  LOG(INFO) << __func__ << ": rcb_idx=" << +rcb_idx;

  if (rcb_idx == BTA_GATTS_INVALID_APP) {
    cb.Run(GATT_ERROR, server_if, std::move(service));
    return;
  }

Notes:

  • 當(dāng)然上面任務(wù)的處理是屬于異步的,對(duì)于時(shí)效性要求特別高的場(chǎng)景,可能不適合使用;如果有很多產(chǎn)生任務(wù)的線程,那可能需要合理規(guī)劃任務(wù)處理線程的個(gè)數(shù)以及實(shí)際分配協(xié)同等
  • 下面rust中多線程處理代碼,rust在多線程處理上比C++更高效,這個(gè)需要更多研究。
pub fn main_message_loop_thread_do_delayed(
    thread: &mut MessageLoopThread,
    closure: cxx::UniquePtr<ffi::OnceClosure>,
    delay_ms: i64,
) {
    assert!(init_flags::gd_rust_is_enabled());
    if delay_ms == 0 {
        if thread.tx.send(closure).is_err() {
            log::error!("could not post task - shutting down?");
        }
    } else {
        thread.rt.spawn(async move {
            // NOTE: tokio's sleep can't wake up the system...
            // but hey, neither could the message loop from libchrome.
            //
            // ...and this way we don't use timerfds arbitrarily.
            //
            // #yolo
            tokio::time::sleep(Duration::from_millis(delay_ms.try_into().unwrap_or(0))).await;
            closure.Run();
        });
    }
}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容