Librdkafka的基礎(chǔ)數(shù)據(jù)結(jié)構(gòu) 2 --- 定時器 原子操作與引用計數(shù)

  • Timer
  • 原子操作
  • 引用計數(shù)

Timer
  • 所在文件: sr/rdkafka_timer.c(h)
  • 主要是通過TimerManager來管理多個timer, 達到處理定時任務(wù)的效果
  • TimerManager定義:
typedef struct rd_kafka_timers_s {
        TAILQ_HEAD(, rd_kafka_timer_s) rkts_timers;
        struct rd_kafka_s *rkts_rk;
    mtx_t       rkts_lock;
    cnd_t       rkts_cond;

        int         rkts_enabled;
} rd_kafka_timers_t;
  1. 使用TAILQ來管理多個timer, 這個 隊列是個有序隊列, 按rd_kafka_timer_s中的rtmr_next從小到大排列;
  2. 對timer 隊列的操作需要加鎖保護: rkts_lock
  • Timer定義:
typedef struct rd_kafka_timer_s {
    TAILQ_ENTRY(rd_kafka_timer_s)  rtmr_link;

    rd_ts_t rtmr_next;
    rd_ts_t rtmr_interval;   /* interval in microseconds */

    void  (*rtmr_callback) (rd_kafka_timers_t *rkts, void *arg);
    void   *rtmr_arg;
} rd_kafka_timer_t;
  1. rtmr_link : TAILQ元素
  2. rtmr_next: 當前timer的下一次到期時間, 絕對時間;
  3. rtmr_interval: 執(zhí)行的時間間隔;
  4. rtmr_callback: 時期時執(zhí)行的回調(diào)函數(shù);
  • 加入新的timer到TimerManager中:
void rd_kafka_timer_start (rd_kafka_timers_t *rkts,
               rd_kafka_timer_t *rtmr, rd_ts_t interval,
               void (*callback) (rd_kafka_timers_t *rkts, void *arg),
               void *arg) {
    rd_kafka_timers_lock(rkts);
    rd_kafka_timer_stop(rkts, rtmr, 0/*!lock*/); 

    rtmr->rtmr_interval = interval;
    rtmr->rtmr_callback = callback;
    rtmr->rtmr_arg      = arg;

    rd_kafka_timer_schedule(rkts, rtmr, 0);
    rd_kafka_timers_unlock(rkts);
}
  1. 此timer已經(jīng)在隊列中的話,要先stop;
  2. 重新設(shè)置 timer的各參數(shù);
  3. 加入隊列;
  • Timer的插入: 根據(jù)rtmr_next值在隊列中找到合適的位置后插入;
static void rd_kafka_timer_schedule (rd_kafka_timers_t *rkts,
                     rd_kafka_timer_t *rtmr, int extra_us) {
    rd_kafka_timer_t *first;

    /* Timer has been stopped */
    if (!rtmr->rtmr_interval)
        return;

        /* Timers framework is terminating */
        if (unlikely(!rkts->rkts_enabled))
                return;

    rtmr->rtmr_next = rd_clock() + rtmr->rtmr_interval + extra_us;

    if (!(first = TAILQ_FIRST(&rkts->rkts_timers)) ||
        first->rtmr_next > rtmr->rtmr_next) {
        TAILQ_INSERT_HEAD(&rkts->rkts_timers, rtmr, rtmr_link);
                cnd_signal(&rkts->rkts_cond);
    } else
        TAILQ_INSERT_SORTED(&rkts->rkts_timers, rtmr,
                                    rd_kafka_timer_t *, rtmr_link,
                    rd_kafka_timer_cmp);
}
  • Timer的調(diào)度執(zhí)行:
void rd_kafka_timers_run (rd_kafka_timers_t *rkts, int timeout_us) {
    rd_ts_t now = rd_clock();
    rd_ts_t end = now + timeout_us;

        rd_kafka_timers_lock(rkts);

    while (!rd_atomic32_get(&rkts->rkts_rk->rk_terminate) && now <= end) {
        int64_t sleeptime;
        rd_kafka_timer_t *rtmr;

        if (timeout_us != RD_POLL_NOWAIT) {
            sleeptime = rd_kafka_timers_next(rkts,
                             timeout_us,
                             0/*no-lock*/);

            if (sleeptime > 0) {
                cnd_timedwait_ms(&rkts->rkts_cond,
                         &rkts->rkts_lock,
                         (int)(sleeptime / 1000));

            }
        }

        now = rd_clock();

        while ((rtmr = TAILQ_FIRST(&rkts->rkts_timers)) &&
               rtmr->rtmr_next <= now) {

            rd_kafka_timer_unschedule(rkts, rtmr);
                        rd_kafka_timers_unlock(rkts);

            rtmr->rtmr_callback(rkts, rtmr->rtmr_arg);

                        rd_kafka_timers_lock(rkts);
            /* Restart timer, unless it has been stopped, or
             * already reschedueld (start()ed) from callback. */
            if (rd_kafka_timer_started(rtmr) &&
                !rd_kafka_timer_scheduled(rtmr))
                rd_kafka_timer_schedule(rkts, rtmr, 0);
        }

        if (timeout_us == RD_POLL_NOWAIT) {
            /* Only iterate once, even if rd_clock doesn't change */
            break;
        }
    }

    rd_kafka_timers_unlock(rkts);
}
  1. 通過 rd_kafka_timers_next獲取需要wait的時間 ;
  2. 需要wait就 cnd_timedwait_ms;
  3. 執(zhí)行到期 timer的回調(diào)函數(shù), 根據(jù)需要將此timer再次加入隊列;
原子操作
  • 所在文件: src/rdatomic.h
  • 如果當前GCC支持_atomic組操作,就使用GCC的build-in函數(shù)
  • 如果不支持, 原子操作用鎖來模擬實現(xiàn);
  • 在Windows上用Interlocked族函數(shù)實現(xiàn);
引用計數(shù)
  • 所在文件: src/rd.h
  • 定義:
#ifdef RD_REFCNT_USE_LOCKS
typedef struct rd_refcnt_t {
        mtx_t lock;
        int v;
} rd_refcnt_t;
#else
typedef rd_atomic32_t rd_refcnt_t;
#endif
  1. 由定義我們可以看出可以通過鎖來實現(xiàn),也可以通過上面介紹的原子類型來實現(xiàn)這個計數(shù);
  • 引用計數(shù)的操作接口, 也是分成了鎖(實現(xiàn)成函數(shù))和原子類型(實現(xiàn)成宏)兩種不同的實現(xiàn)
static RD_INLINE RD_UNUSED int rd_refcnt_init (rd_refcnt_t *R, int v)
static RD_INLINE RD_UNUSED void rd_refcnt_destroy (rd_refcnt_t *R)
static RD_INLINE RD_UNUSED int rd_refcnt_set (rd_refcnt_t *R, int v) 
static RD_INLINE RD_UNUSED int rd_refcnt_add0 (rd_refcnt_t *R)
static RD_INLINE RD_UNUSED int rd_refcnt_sub0 (rd_refcnt_t *R)
static RD_INLINE RD_UNUSED int rd_refcnt_get (rd_refcnt_t *R)
智能指針
  • 所在文件: src/rd.h
  • 智能指針就是加了上面的引用計數(shù)的指針
  • 定義:
#define RD_SHARED_PTR_TYPE(STRUCT_NAME,WRAPPED_TYPE) WRAPPED_TYPE
//get的同時會將此用計數(shù) +1
#define rd_shared_ptr_get_src(FUNC,LINE,OBJ,REFCNT,SPTR_TYPE)   \
        (rd_refcnt_add(REFCNT), (OBJ))
#define rd_shared_ptr_get(OBJ,REFCNT,SPTR_TYPE)          \
        (rd_refcnt_add(REFCNT), (OBJ))

#define rd_shared_ptr_obj(SPTR) (SPTR)

// put使用rd_refcnt_destroywrapper實現(xiàn), 引用計數(shù)減為0,則調(diào)用DESTRUCTOR作清理釋放
#define rd_shared_ptr_put(SPTR,REF,DESTRUCTOR)                  \
                rd_refcnt_destroywrapper(REF,DESTRUCTOR)
  • 在C中實現(xiàn)引用計數(shù), 哪里要+1, 哪里要-1, 全憑使用者自己根據(jù)代碼邏輯需要來控制,因此很容易導(dǎo)致少+1, 多+1, 少-1, 多-1的情況, 因此rdkafka作者又提供了一個debug版本的實現(xiàn), 跟蹤了調(diào)用函數(shù), 所在行等信息, 供調(diào)試排查問題用,其實實現(xiàn)也很簡單, 但還是比較巧妙的
#define RD_SHARED_PTR_TYPE(STRUCT_NAME, WRAPPED_TYPE) \
        struct STRUCT_NAME {                          \
                LIST_ENTRY(rd_shptr0_s) link;         \
                WRAPPED_TYPE *obj;                     \
                rd_refcnt_t *ref;                     \
                const char *typename;                 \
                const char *func;                     \
                int line;                             \
        }
/* Common backing struct compatible with RD_SHARED_PTR_TYPE() types */
typedef RD_SHARED_PTR_TYPE(rd_shptr0_s, void) rd_shptr0_t;

LIST_HEAD(rd_shptr0_head, rd_shptr0_s);
extern struct rd_shptr0_head rd_shared_ptr_debug_list;
extern mtx_t rd_shared_ptr_debug_mtx;

引用了一個新的struct來將引用計數(shù)和調(diào)用信息結(jié)合起來, 使用鏈表來管理這個struct的對象. 每次對引用計數(shù)的操作都要操作這個鏈表.

static RD_INLINE RD_UNUSED RD_WARN_UNUSED_RESULT __attribute__((warn_unused_result))
rd_shptr0_t *rd_shared_ptr_get0 (const char *func, int line,
                                 const char *typename,
                                 rd_refcnt_t *ref, void *obj) {
        //創(chuàng)建shared ptr struct結(jié)構(gòu)
        rd_shptr0_t *sptr = rd_calloc(1, sizeof(*sptr));
        sptr->obj = obj;
        sptr->ref = ref;
        sptr->typename = typename;
        sptr->func = func;
        sptr->line = line;

       //加入鏈表
        mtx_lock(&rd_shared_ptr_debug_mtx);
        LIST_INSERT_HEAD(&rd_shared_ptr_debug_list, sptr, link);
        mtx_unlock(&rd_shared_ptr_debug_mtx);
        return sptr;
}

#define rd_shared_ptr_put(SPTR,REF,DESTRUCTOR) do {               \
               // 引用計數(shù) -1, 到0話清理釋放
                if (rd_refcnt_sub(REF) == 0)                      \
                        DESTRUCTOR;                               \
                mtx_lock(&rd_shared_ptr_debug_mtx);               \
                //從鏈表中移除struct 對象
                LIST_REMOVE(SPTR, link);                          \
                mtx_unlock(&rd_shared_ptr_debug_mtx);             \
                rd_free(SPTR);                                    \
        } while (0)

Librdkafka源碼分析-Content Table

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • layout: posttitle: 《Java并發(fā)編程的藝術(shù)》筆記categories: Javaexcerpt...
    xiaogmail閱讀 6,022評論 1 19
  • Android 自定義View的各種姿勢1 Activity的顯示之ViewRootImpl詳解 Activity...
    passiontim閱讀 179,234評論 25 708
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,663評論 19 139
  • 輕柔、和緩的音樂正適合這半夜0點的氛圍。 我不能明白,為什么我的朋友和我做一樣工作的時候生活過得比我有味道的多,現(xiàn)...
    麥克斯韋L閱讀 238評論 0 0
  • 集合了一些關(guān)于香港ins上網(wǎng)紅打卡地 除了在香港買買買 還有超適合拍照的小眾地方 試一下這些小眾玩法 你可能會發(fā)現(xiàn)...
    catingtan閱讀 460評論 0 1

友情鏈接更多精彩內(nèi)容