dpvs學(xué)習(xí)筆記: 6 定時器實現(xiàn)及連接老化超時

定時器實現(xiàn)方式很多種,定時器個數(shù)也不同。比如 go 老版本只有一個全局定時器,所以有些高性能項目拋棄了,自己實現(xiàn)用戶級別的定時器,并且開啟 n 個。但是在新版本沒必要了,默認(rèn) 64 個。dpvs 由于流表可能巨大,并且處在不同狀態(tài)的 tcp 連接超時時間還不一樣,如果實現(xiàn)的低效,會非常影響性能。所以 dpvs 在利用 dpdk 定時器的基礎(chǔ)上,自實現(xiàn)了一個,算法是常見的時間輪。

數(shù)據(jù)結(jié)構(gòu)

struct timer_scheduler {
    /* wheels and cursors */
    rte_spinlock_t      lock;
    uint32_t            cursors[LEVEL_DEPTH];
    struct list_head    *hashs[LEVEL_DEPTH];

    /* leverage dpdk rte_timer to drive us */
    struct rte_timer    rte_tim;
};

lock 是鎖,增刪改查都要鎖。rte_tim 是 dpdk 庫自身的定時器,他只負(fù)責(zé)時間嘀嗒,也就是 tick ++ 操作。cursors 和 hashs 配合使用,構(gòu)成時間輪。

cursors 保存嘀嗒計數(shù),從 0 到 LEVEL_SIZE (2<<18), LEVEL_DEPTH 值為 2,也就是說 cursors[0] 可以保存 2<<18 個嘀嗒,如果 DPVS_TIMER_HZ = 1000,那么就是 524s. cursors[0] 驅(qū)動 cursors[1] 時間輪,兩個輪一共 8.7 年時間。hashs 是一個長度為 2 的數(shù)組,每個元素是一個鏈表數(shù)組,長度是 LEVEL_SIZE (2<<18), 鏈表成員就是具體的定時器消息。

初始化

首先,dpvs 調(diào)用 rte_timer_subsystem_init 初始化 dpdk 庫的定時器。然后 dpvs_timer_init 初始化 dpvs 定時器。

int dpvs_timer_init(void)
{
    lcoreid_t cid;
    int err;

    /* per-lcore timer */
    // 每個 logic core 一個定時器
    rte_eal_mp_remote_launch(timer_lcore_init, NULL, SKIP_MASTER);
    RTE_LCORE_FOREACH_SLAVE(cid) {
        err = rte_eal_wait_lcore(cid);
        if (err < 0) {
            RTE_LOG(ERR, DTIMER, "%s: lcore %d: %s.\n", 
                    __func__, cid, dpvs_strerror(err));
            return err;
        }
    }

    /* global timer */
    return timer_init_schedler(&g_timer_sched, rte_get_master_lcore());
}

看源碼得知,每個 slave lcore 都要調(diào) timer_lcore_init 初始化自己的。然后再初始化全局的 g_timer_sched.

初始化 timer_lcore_init

static int timer_init_schedler(struct timer_scheduler *sched, lcoreid_t cid)
{
    int i, l;

    rte_spinlock_init(&sched->lock);


    rte_spinlock_lock(&sched->lock);
    for (l = 0; l < LEVEL_DEPTH; l++) {
        sched->cursors[l] = 0;

        sched->hashs[l] = rte_malloc(NULL,
                                     sizeof(struct list_head) * LEVEL_SIZE, 0);
        if (!sched->hashs[l]) {
            RTE_LOG(ERR, DTIMER, "[%02d] no memory.\n", cid);
            return EDPVS_NOMEM;
        }

        for (i = 0; i < LEVEL_SIZE; i++)
            INIT_LIST_HEAD(&sched->hashs[l][i]);
    }
    rte_spinlock_unlock(&sched->lock);

    rte_timer_init(&sched->rte_tim);
    /* ticks should be exactly same with precision */
    if (rte_timer_reset(&sched->rte_tim, rte_get_timer_hz() / DPVS_TIMER_HZ,
                        PERIODICAL, cid, rte_timer_tick_cb, sched) != 0) {
        RTE_LOG(ERR, DTIMER, "[%02d] fail to reset rte timer.\n", cid);
        return EDPVS_INVAL;
    }

    RTE_LOG(DEBUG, DTIMER, "[%02d] timer initialized %p.\n", cid, sched);
    return EDPVS_OK;
}

代碼蠻詳細(xì)了,需要注意兩點

  1. hashs 是一個二維數(shù)組,大小是 2 * sizeof(struct list_head) * LEVEL_SIZE,空間換時間
  2. 利用了 dpdk 自身的定時器去維護嘀嗒,通過 rte_timer_reset 來注冊,回調(diào)函數(shù)是 rte_timer_tick_cb,間隔是 rte_get_timer_hz() / DPVS_TIMER_HZ ticks

回調(diào)函數(shù) rte_timer_tick_cb

static void rte_timer_tick_cb(struct rte_timer *tim, void *arg)
{
    struct timer_scheduler *sched = arg;
    struct dpvs_timer *timer, *next;
    uint64_t left, hash, off;
    int level, lower;
    uint32_t *cursor;
    bool carry;

    rte_spinlock_lock(&sched->lock);
    /* drive timer to move and handle expired timers. */
    for (level = 0; level < LEVEL_DEPTH; level++) {
        cursor = &sched->cursors[level];
        (*cursor)++; // tick ++ 

        if (*cursor < LEVEL_SIZE) {
            carry = false;
        } else {
            /* reset the cursor and handle next level later. */
            *cursor = 0;
            carry = true;
        }
        // 遍歷當(dāng)前 cursor 鏈表
        list_for_each_entry_safe(timer, next,
                                 &sched->hashs[level][*cursor], list) {
            /* is all lower levels ticks empty ? */
            left = timer->delay % get_level_ticks(level);
            if (!left) {
                timer_expire(sched, timer);
            } else {
                /* drop to lower level wheel, note it may not drop to
                 * "next" lower level wheel. */
                list_del(&timer->list);

                lower = level;
                while (--lower >= 0) {
                    off = timer->delay / get_level_ticks(lower);
                    if (!off)
                        continue; /* next lower level */

                    hash = (*cursor + off) % LEVEL_SIZE;
                    list_add_tail(&timer->list, &sched->hashs[lower][hash]);
                    break;
                }
                assert(lower >= 0);
            }
        }
        if (!carry)
            break;
    }
    rte_spinlock_unlock(&sched->lock);
    return;
}
  1. 當(dāng) level 0 時,使用第一個輪,并調(diào)用 (*cursor)++ 將嘀嗒加一。如果嘀嗒值大于 LEVEL_SIZE 說明第一個輪用完了,重置為 0 并設(shè)置 carry 進位標(biāo)記。
  2. 遍歷當(dāng)前輪的 hash 數(shù)組,get_level_ticks 獲取每一層的輪步進一個單位代表多少嘀嗒,當(dāng) level=0 時返回 1, 當(dāng) level=1時返回 LEVEL_SIZE。left = timer->delay % get_level_ticks(level) 如果 left 為 0 說明這個定時任務(wù)屬于當(dāng)前時間輪,并且己經(jīng)過期了,觸發(fā) timer_expire 回調(diào)。
  3. 如果 left 有值,說明還沒到超時時間。但是時間輪己經(jīng)觸發(fā)一次了,所以要降級到 --lower 層。從原有的鏈表中調(diào)用 list_del 刪除,然后 timer->delay / get_level_ticks(lower) 求出在當(dāng)前層步進單位個數(shù),(*cursor + off) % LEVEL_SIZE 求出 hash 索引,然后 list_add_tail 添加到對應(yīng)鏈表中。
  4. 如果沒有進位,也就是 carry 未標(biāo)記,那么退出循環(huán)。否則處理下一層的輪。

添加超時任務(wù)

dp_vs_conn_new 新建連接時會調(diào)用 dpvs_timer_sched 添加超時任務(wù)

dpvs_timer_sched(&new->timer, &new->timeout, conn_expire, new, true);

看一下具體的 dpvs_timer_sched 實現(xiàn)

static int __dpvs_timer_sched(struct timer_scheduler *sched,
                              struct dpvs_timer *timer, struct timeval *delay,
                              dpvs_timer_cb_t handler, void *arg, bool period)
{
    uint32_t off, hash;
    int level;

    assert(timer && delay && handler);

    if (timer_pending(timer))
        RTE_LOG(WARNING, DTIMER, "schedule a pending timer ?\n");

    timer->handler = handler;
    timer->priv = arg;
    timer->is_period = period;
    timer->delay = timeval_to_ticks(delay);

    if (unlikely(timer->delay >= TIMER_MAX_TICKS)) {
        RTE_LOG(WARNING, DTIMER, "exceed timer range\n");
        return EDPVS_INVAL;
    }

    /*
     * to schedule a 0 delay timer is not make sence.
     * and it will never stopped (periodic) or never triggered (one-shut).
     */
    if (unlikely(!timer->delay)) {
        RTE_LOG(WARNING, DTIMER, "schedule 0 timeout timer.\n");
        return EDPVS_INVAL;
    }

    /* add to corresponding wheel, from higher level to lower. */
    for (level = LEVEL_DEPTH - 1; level >= 0; level--) {
        off = timer->delay / get_level_ticks(level);
        if (off > 0) {
            hash = (sched->cursors[level] + off) % LEVEL_SIZE;
            list_add_tail(&timer->list, &sched->hashs[level][hash]);
            return EDPVS_OK;
        }
    }

    /* not adopted by any wheel (never happend) */
    return EDPVS_INVAL;
}
  1. 生成 timer 任務(wù)結(jié)構(gòu)體,最重要的是 timeval_to_ticks 根據(jù)超時時間生成嘀嗒,也就是說過了 delay 個嘀嗒后超時。
  2. 各種異常 case 檢測,不能為 0,不能超過最大 TIMER_MAX_TICKS
  3. 從最大層開始遍歷時間輪,添加到指定鏈表中。off 是在當(dāng)前輪中需要多少步進單位,hash 求出索引。

連接老化處理

在 tcp 每個階段,都有不同的超時時間,并且超時到期后動作也有所不同,

static int conn_expire(void *priv)
{
    struct dp_vs_conn *conn = priv;
    struct dp_vs_proto *pp;
    struct rte_mbuf *cloned_syn_mbuf;
    struct dp_vs_synproxy_ack_pakcet *ack_mbuf, *t_ack_mbuf;
    struct rte_mempool *pool;
    assert(conn);

    /* set proper timeout */
    unsigned conn_timeout = 0;

    pp = dp_vs_proto_lookup(conn->proto);
    if (((conn->proto == IPPROTO_TCP) &&
        (conn->state == DPVS_TCP_S_ESTABLISHED)) ||
        ((conn->proto == IPPROTO_UDP) &&
        (conn->state == DPVS_UDP_S_NORMAL))) {
        conn_timeout = dp_vs_get_conn_timeout(conn);
        if (unlikely(conn_timeout > 0))
            conn->timeout.tv_sec = conn_timeout;
        else if (pp && pp->timeout_table)
            conn->timeout.tv_sec = pp->timeout_table[conn->state];
        else
            conn->timeout.tv_sec = 60;
    }
    else if (pp && pp->timeout_table)
        conn->timeout.tv_sec = pp->timeout_table[conn->state];
    else
        conn->timeout.tv_sec = 60;

dp_vs_get_conn_timeout 獲取超時時間,查看源碼是在配置 dpvs 時后端服務(wù)的超時時間

    dpvs_time_rand_delay(&conn->timeout, 1000000);
    rte_atomic32_inc(&conn->refcnt);

    /* retransmit syn packet to rs */
    if (conn->syn_mbuf && rte_atomic32_read(&conn->syn_retry_max) > 0) {
        if (likely(conn->packet_xmit != NULL)) {
            pool = get_mbuf_pool(conn, DPVS_CONN_DIR_INBOUND);
            if (unlikely(!pool)) {
                RTE_LOG(WARNING, IPVS, "%s: no route for syn_proxy rs's syn "
                        "retransmit\n", __func__);
            } else {
                cloned_syn_mbuf = rte_pktmbuf_clone(conn->syn_mbuf, pool);
                if (unlikely(!cloned_syn_mbuf)) {
                    RTE_LOG(WARNING, IPVS, "%s: no memory for syn_proxy rs's syn "
                            "retransmit\n", __func__);
                } else {
                    cloned_syn_mbuf->userdata = NULL;
                    conn->packet_xmit(pp, conn, cloned_syn_mbuf);
                }
            }
        }

        rte_atomic32_dec(&conn->syn_retry_max);
        dp_vs_estats_inc(SYNPROXY_RS_ERROR);

        /* expire later */
        dp_vs_conn_put(conn);
        return DTIMER_OK;
    }

如果支持 syn proxy, 并且當(dāng)前 syn_mbuf 還在,syn_retry_max 重試次數(shù)大于 0,重發(fā)給 client

    /* somebody is controlled by me, expire later */
    if (rte_atomic32_read(&conn->n_control)) {
        dp_vs_conn_put(conn);
        return DTIMER_OK;
    }

其實不太理解這個 n_control 有什么用,慢慢看吧

    /* unhash it then no further user can get it,
     * even we cannot del it now. */
    conn_unhash(conn);

這是重點,刪除連接,都超時了留著有啥用,清理流表

    /* refcnt == 1 means we are the only referer.
     * no one is using the conn and it's timed out. */
    if (rte_atomic32_read(&conn->refcnt) == 1) {
        struct dp_vs_proto *proto = dp_vs_proto_lookup(conn->proto);

        if (conn->flags & DPVS_CONN_F_TEMPLATE)
            dpvs_timer_cancel(&conn->timer, true);
        else
            dpvs_timer_cancel(&conn->timer, false);

        /* I was controlled by someone */
        if (conn->control)
            dp_vs_control_del(conn);

        if (proto && proto->conn_expire)
            proto->conn_expire(proto, conn);

        if (conn->dest->fwdmode == DPVS_FWD_MODE_SNAT
                && conn->proto != IPPROTO_ICMP) {
            struct sockaddr_in daddr, saddr;

            memset(&daddr, 0, sizeof(daddr));
            daddr.sin_family = AF_INET;
            daddr.sin_addr = conn->caddr.in;
            daddr.sin_port = conn->cport;

            memset(&saddr, 0, sizeof(saddr));
            saddr.sin_family = AF_INET;
            saddr.sin_addr = conn->vaddr.in;
            saddr.sin_port = conn->vport;

            sa_release(conn->out_dev, &daddr, &saddr);
        }

        conn_unbind_dest(conn);
        dp_vs_laddr_unbind(conn);

        /* free stored ack packet */
        list_for_each_entry_safe(ack_mbuf, t_ack_mbuf, &conn->ack_mbuf, list) {
            list_del_init(&ack_mbuf->list);
            rte_pktmbuf_free(ack_mbuf->mbuf);
            sp_dbg_stats32_dec(sp_ack_saved);
            rte_mempool_put(this_ack_mbufpool, ack_mbuf);
        }
        conn->ack_num = 0;

        /* free stored syn mbuf */
        if (conn->syn_mbuf) {
            rte_pktmbuf_free(conn->syn_mbuf);
            sp_dbg_stats32_dec(sp_syn_saved);
        }

        rte_atomic32_dec(&conn->refcnt);

        rte_mempool_put(conn->connpool, conn);
        this_conn_count--;

#ifdef CONFIG_DPVS_IPVS_STATS_DEBUG
        conn_stats_dump("del conn", conn);
#endif
#ifdef CONFIG_DPVS_IPVS_DEBUG
        conn_dump("del conn: ", conn);
#endif
        return DTIMER_STOP;
    }

如果引用計數(shù) 1,說明只有自己在用,獲取協(xié)義,調(diào)用協(xié)義的連接老化函數(shù) proto->conn_expire,這里還要兼容 DPVS_FWD_MODE_SNAT,我覺得代碼寫的丑... conn_unbind_dest 解除連接和后端的引用,dp_vs_laddr_unbind 釋放本地地址。最后清理 ack_mbuf, syn_mbuf 等等

    conn_hash(conn);

如果走到這里,說明連接還有人在用,那么加回流表

    /* some one is using it when expire,
     * try del it again later */
    if (conn->flags & DPVS_CONN_F_TEMPLATE)
        dpvs_timer_update(&conn->timer, &conn->timeout, true);
    else
        dpvs_timer_update(&conn->timer, &conn->timeout, false);

    rte_atomic32_dec(&conn->refcnt);
    return DTIMER_OK;
}

如果走到這里,說明連接還有人在用,那么更新超時時間加回定時器。

tcp連接清除

static int tcp_conn_expire(struct dp_vs_proto *proto, 
                       struct dp_vs_conn *conn)
{
    int err;
    assert(proto && conn && conn->dest);

    if (conn->dest->fwdmode == DPVS_FWD_MODE_NAT 
            || conn->dest->fwdmode == DPVS_FWD_MODE_FNAT) {
        /* send RST to RS and client */
        err = tcp_send_rst(proto, conn, DPVS_CONN_DIR_INBOUND);
        if (err != EDPVS_OK)
            RTE_LOG(WARNING, IPVS, "%s: fail RST RS.\n", __func__);
        err = tcp_send_rst(proto, conn, DPVS_CONN_DIR_OUTBOUND);
        if (err != EDPVS_OK)
            RTE_LOG(WARNING, IPVS, "%s: fail RST Client.\n", __func__);
    }

    return EDPVS_OK;
}

代碼很好理解,雙向發(fā)送 rst

小結(jié)

這塊實現(xiàn)的比較標(biāo)準(zhǔn),大量連接時流表就是會很膨脹,很考驗定時器性能。時間輪+多個定時器,想要高性能必備。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 1. 簡介 本文檔包含DPDK軟件安裝和配置的相關(guān)說明。旨在幫助用戶快速啟動和運行軟件。文檔主要描述了在Linux...
    半天妖閱讀 18,114評論 0 22
  • 3. 環(huán)境抽象層 環(huán)境抽象層(Environment Abstraction Layer,下文簡稱EAL)是對操作...
    希爾哥哥s閱讀 4,867評論 0 1
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,539評論 19 139
  • 讓別人幫你做事,讓別人心悅誠服的幫你做事,首先得尊重別人,有讓他死心塌地的決心。 若是命令式的,我相信沒有幾個人會...
    七姑娘_7閱讀 179評論 0 0
  • 日記,呵呵,久遠的東西,記的上次寫日記是在小學(xué),語文老師留的作業(yè),每天必須寫一篇,一周一交.懶惰的我總是在周末的時...
    莫名清醒閱讀 310評論 1 0

友情鏈接更多精彩內(nèi)容