定時器實現(xiàn)方式很多種,定時器個數(shù)也不同。比如 go 老版本只有一個全局定時器,所以有些高性能項目拋棄了,自己實現(xiàn)用戶級別的定時器,并且開啟 n 個。但是在新版本沒必要了,默認(rèn) 64 個。dpvs 由于流表可能巨大,并且處在不同狀態(tài)的 tcp 連接超時時間還不一樣,如果實現(xiàn)的低效,會非常影響性能。所以 dpvs 在利用 dpdk 定時器的基礎(chǔ)上,自實現(xiàn)了一個,算法是常見的時間輪。
數(shù)據(jù)結(jié)構(gòu)
struct timer_scheduler {
/* wheels and cursors */
rte_spinlock_t lock;
uint32_t cursors[LEVEL_DEPTH];
struct list_head *hashs[LEVEL_DEPTH];
/* leverage dpdk rte_timer to drive us */
struct rte_timer rte_tim;
};
lock 是鎖,增刪改查都要鎖。rte_tim 是 dpdk 庫自身的定時器,他只負(fù)責(zé)時間嘀嗒,也就是 tick ++ 操作。cursors 和 hashs 配合使用,構(gòu)成時間輪。
cursors 保存嘀嗒計數(shù),從 0 到 LEVEL_SIZE (2<<18), LEVEL_DEPTH 值為 2,也就是說 cursors[0] 可以保存 2<<18 個嘀嗒,如果 DPVS_TIMER_HZ = 1000,那么就是 524s. cursors[0] 驅(qū)動 cursors[1] 時間輪,兩個輪一共 8.7 年時間。hashs 是一個長度為 2 的數(shù)組,每個元素是一個鏈表數(shù)組,長度是 LEVEL_SIZE (2<<18), 鏈表成員就是具體的定時器消息。
初始化
首先,dpvs 調(diào)用 rte_timer_subsystem_init 初始化 dpdk 庫的定時器。然后 dpvs_timer_init 初始化 dpvs 定時器。
int dpvs_timer_init(void)
{
lcoreid_t cid;
int err;
/* per-lcore timer */
// 每個 logic core 一個定時器
rte_eal_mp_remote_launch(timer_lcore_init, NULL, SKIP_MASTER);
RTE_LCORE_FOREACH_SLAVE(cid) {
err = rte_eal_wait_lcore(cid);
if (err < 0) {
RTE_LOG(ERR, DTIMER, "%s: lcore %d: %s.\n",
__func__, cid, dpvs_strerror(err));
return err;
}
}
/* global timer */
return timer_init_schedler(&g_timer_sched, rte_get_master_lcore());
}
看源碼得知,每個 slave lcore 都要調(diào) timer_lcore_init 初始化自己的。然后再初始化全局的 g_timer_sched.
初始化 timer_lcore_init
static int timer_init_schedler(struct timer_scheduler *sched, lcoreid_t cid)
{
int i, l;
rte_spinlock_init(&sched->lock);
rte_spinlock_lock(&sched->lock);
for (l = 0; l < LEVEL_DEPTH; l++) {
sched->cursors[l] = 0;
sched->hashs[l] = rte_malloc(NULL,
sizeof(struct list_head) * LEVEL_SIZE, 0);
if (!sched->hashs[l]) {
RTE_LOG(ERR, DTIMER, "[%02d] no memory.\n", cid);
return EDPVS_NOMEM;
}
for (i = 0; i < LEVEL_SIZE; i++)
INIT_LIST_HEAD(&sched->hashs[l][i]);
}
rte_spinlock_unlock(&sched->lock);
rte_timer_init(&sched->rte_tim);
/* ticks should be exactly same with precision */
if (rte_timer_reset(&sched->rte_tim, rte_get_timer_hz() / DPVS_TIMER_HZ,
PERIODICAL, cid, rte_timer_tick_cb, sched) != 0) {
RTE_LOG(ERR, DTIMER, "[%02d] fail to reset rte timer.\n", cid);
return EDPVS_INVAL;
}
RTE_LOG(DEBUG, DTIMER, "[%02d] timer initialized %p.\n", cid, sched);
return EDPVS_OK;
}
代碼蠻詳細(xì)了,需要注意兩點
- hashs 是一個二維數(shù)組,大小是 2 * sizeof(struct list_head) * LEVEL_SIZE,空間換時間
- 利用了 dpdk 自身的定時器去維護嘀嗒,通過
rte_timer_reset來注冊,回調(diào)函數(shù)是rte_timer_tick_cb,間隔是 rte_get_timer_hz() / DPVS_TIMER_HZ ticks
回調(diào)函數(shù) rte_timer_tick_cb
static void rte_timer_tick_cb(struct rte_timer *tim, void *arg)
{
struct timer_scheduler *sched = arg;
struct dpvs_timer *timer, *next;
uint64_t left, hash, off;
int level, lower;
uint32_t *cursor;
bool carry;
rte_spinlock_lock(&sched->lock);
/* drive timer to move and handle expired timers. */
for (level = 0; level < LEVEL_DEPTH; level++) {
cursor = &sched->cursors[level];
(*cursor)++; // tick ++
if (*cursor < LEVEL_SIZE) {
carry = false;
} else {
/* reset the cursor and handle next level later. */
*cursor = 0;
carry = true;
}
// 遍歷當(dāng)前 cursor 鏈表
list_for_each_entry_safe(timer, next,
&sched->hashs[level][*cursor], list) {
/* is all lower levels ticks empty ? */
left = timer->delay % get_level_ticks(level);
if (!left) {
timer_expire(sched, timer);
} else {
/* drop to lower level wheel, note it may not drop to
* "next" lower level wheel. */
list_del(&timer->list);
lower = level;
while (--lower >= 0) {
off = timer->delay / get_level_ticks(lower);
if (!off)
continue; /* next lower level */
hash = (*cursor + off) % LEVEL_SIZE;
list_add_tail(&timer->list, &sched->hashs[lower][hash]);
break;
}
assert(lower >= 0);
}
}
if (!carry)
break;
}
rte_spinlock_unlock(&sched->lock);
return;
}
- 當(dāng) level 0 時,使用第一個輪,并調(diào)用
(*cursor)++將嘀嗒加一。如果嘀嗒值大于 LEVEL_SIZE 說明第一個輪用完了,重置為 0 并設(shè)置 carry 進位標(biāo)記。 - 遍歷當(dāng)前輪的 hash 數(shù)組,
get_level_ticks獲取每一層的輪步進一個單位代表多少嘀嗒,當(dāng) level=0 時返回 1, 當(dāng) level=1時返回 LEVEL_SIZE。left = timer->delay % get_level_ticks(level)如果 left 為 0 說明這個定時任務(wù)屬于當(dāng)前時間輪,并且己經(jīng)過期了,觸發(fā) timer_expire 回調(diào)。 - 如果 left 有值,說明還沒到超時時間。但是時間輪己經(jīng)觸發(fā)一次了,所以要降級到 --lower 層。從原有的鏈表中調(diào)用 list_del 刪除,然后
timer->delay / get_level_ticks(lower)求出在當(dāng)前層步進單位個數(shù),(*cursor + off) % LEVEL_SIZE求出 hash 索引,然后 list_add_tail 添加到對應(yīng)鏈表中。 - 如果沒有進位,也就是 carry 未標(biāo)記,那么退出循環(huán)。否則處理下一層的輪。
添加超時任務(wù)
在 dp_vs_conn_new 新建連接時會調(diào)用 dpvs_timer_sched 添加超時任務(wù)
dpvs_timer_sched(&new->timer, &new->timeout, conn_expire, new, true);
看一下具體的 dpvs_timer_sched 實現(xiàn)
static int __dpvs_timer_sched(struct timer_scheduler *sched,
struct dpvs_timer *timer, struct timeval *delay,
dpvs_timer_cb_t handler, void *arg, bool period)
{
uint32_t off, hash;
int level;
assert(timer && delay && handler);
if (timer_pending(timer))
RTE_LOG(WARNING, DTIMER, "schedule a pending timer ?\n");
timer->handler = handler;
timer->priv = arg;
timer->is_period = period;
timer->delay = timeval_to_ticks(delay);
if (unlikely(timer->delay >= TIMER_MAX_TICKS)) {
RTE_LOG(WARNING, DTIMER, "exceed timer range\n");
return EDPVS_INVAL;
}
/*
* to schedule a 0 delay timer is not make sence.
* and it will never stopped (periodic) or never triggered (one-shut).
*/
if (unlikely(!timer->delay)) {
RTE_LOG(WARNING, DTIMER, "schedule 0 timeout timer.\n");
return EDPVS_INVAL;
}
/* add to corresponding wheel, from higher level to lower. */
for (level = LEVEL_DEPTH - 1; level >= 0; level--) {
off = timer->delay / get_level_ticks(level);
if (off > 0) {
hash = (sched->cursors[level] + off) % LEVEL_SIZE;
list_add_tail(&timer->list, &sched->hashs[level][hash]);
return EDPVS_OK;
}
}
/* not adopted by any wheel (never happend) */
return EDPVS_INVAL;
}
- 生成 timer 任務(wù)結(jié)構(gòu)體,最重要的是
timeval_to_ticks根據(jù)超時時間生成嘀嗒,也就是說過了 delay 個嘀嗒后超時。 - 各種異常 case 檢測,不能為 0,不能超過最大 TIMER_MAX_TICKS
- 從最大層開始遍歷時間輪,添加到指定鏈表中。off 是在當(dāng)前輪中需要多少步進單位,hash 求出索引。
連接老化處理
在 tcp 每個階段,都有不同的超時時間,并且超時到期后動作也有所不同,
static int conn_expire(void *priv)
{
struct dp_vs_conn *conn = priv;
struct dp_vs_proto *pp;
struct rte_mbuf *cloned_syn_mbuf;
struct dp_vs_synproxy_ack_pakcet *ack_mbuf, *t_ack_mbuf;
struct rte_mempool *pool;
assert(conn);
/* set proper timeout */
unsigned conn_timeout = 0;
pp = dp_vs_proto_lookup(conn->proto);
if (((conn->proto == IPPROTO_TCP) &&
(conn->state == DPVS_TCP_S_ESTABLISHED)) ||
((conn->proto == IPPROTO_UDP) &&
(conn->state == DPVS_UDP_S_NORMAL))) {
conn_timeout = dp_vs_get_conn_timeout(conn);
if (unlikely(conn_timeout > 0))
conn->timeout.tv_sec = conn_timeout;
else if (pp && pp->timeout_table)
conn->timeout.tv_sec = pp->timeout_table[conn->state];
else
conn->timeout.tv_sec = 60;
}
else if (pp && pp->timeout_table)
conn->timeout.tv_sec = pp->timeout_table[conn->state];
else
conn->timeout.tv_sec = 60;
dp_vs_get_conn_timeout 獲取超時時間,查看源碼是在配置 dpvs 時后端服務(wù)的超時時間
dpvs_time_rand_delay(&conn->timeout, 1000000);
rte_atomic32_inc(&conn->refcnt);
/* retransmit syn packet to rs */
if (conn->syn_mbuf && rte_atomic32_read(&conn->syn_retry_max) > 0) {
if (likely(conn->packet_xmit != NULL)) {
pool = get_mbuf_pool(conn, DPVS_CONN_DIR_INBOUND);
if (unlikely(!pool)) {
RTE_LOG(WARNING, IPVS, "%s: no route for syn_proxy rs's syn "
"retransmit\n", __func__);
} else {
cloned_syn_mbuf = rte_pktmbuf_clone(conn->syn_mbuf, pool);
if (unlikely(!cloned_syn_mbuf)) {
RTE_LOG(WARNING, IPVS, "%s: no memory for syn_proxy rs's syn "
"retransmit\n", __func__);
} else {
cloned_syn_mbuf->userdata = NULL;
conn->packet_xmit(pp, conn, cloned_syn_mbuf);
}
}
}
rte_atomic32_dec(&conn->syn_retry_max);
dp_vs_estats_inc(SYNPROXY_RS_ERROR);
/* expire later */
dp_vs_conn_put(conn);
return DTIMER_OK;
}
如果支持 syn proxy, 并且當(dāng)前 syn_mbuf 還在,syn_retry_max 重試次數(shù)大于 0,重發(fā)給 client
/* somebody is controlled by me, expire later */
if (rte_atomic32_read(&conn->n_control)) {
dp_vs_conn_put(conn);
return DTIMER_OK;
}
其實不太理解這個 n_control 有什么用,慢慢看吧
/* unhash it then no further user can get it,
* even we cannot del it now. */
conn_unhash(conn);
這是重點,刪除連接,都超時了留著有啥用,清理流表
/* refcnt == 1 means we are the only referer.
* no one is using the conn and it's timed out. */
if (rte_atomic32_read(&conn->refcnt) == 1) {
struct dp_vs_proto *proto = dp_vs_proto_lookup(conn->proto);
if (conn->flags & DPVS_CONN_F_TEMPLATE)
dpvs_timer_cancel(&conn->timer, true);
else
dpvs_timer_cancel(&conn->timer, false);
/* I was controlled by someone */
if (conn->control)
dp_vs_control_del(conn);
if (proto && proto->conn_expire)
proto->conn_expire(proto, conn);
if (conn->dest->fwdmode == DPVS_FWD_MODE_SNAT
&& conn->proto != IPPROTO_ICMP) {
struct sockaddr_in daddr, saddr;
memset(&daddr, 0, sizeof(daddr));
daddr.sin_family = AF_INET;
daddr.sin_addr = conn->caddr.in;
daddr.sin_port = conn->cport;
memset(&saddr, 0, sizeof(saddr));
saddr.sin_family = AF_INET;
saddr.sin_addr = conn->vaddr.in;
saddr.sin_port = conn->vport;
sa_release(conn->out_dev, &daddr, &saddr);
}
conn_unbind_dest(conn);
dp_vs_laddr_unbind(conn);
/* free stored ack packet */
list_for_each_entry_safe(ack_mbuf, t_ack_mbuf, &conn->ack_mbuf, list) {
list_del_init(&ack_mbuf->list);
rte_pktmbuf_free(ack_mbuf->mbuf);
sp_dbg_stats32_dec(sp_ack_saved);
rte_mempool_put(this_ack_mbufpool, ack_mbuf);
}
conn->ack_num = 0;
/* free stored syn mbuf */
if (conn->syn_mbuf) {
rte_pktmbuf_free(conn->syn_mbuf);
sp_dbg_stats32_dec(sp_syn_saved);
}
rte_atomic32_dec(&conn->refcnt);
rte_mempool_put(conn->connpool, conn);
this_conn_count--;
#ifdef CONFIG_DPVS_IPVS_STATS_DEBUG
conn_stats_dump("del conn", conn);
#endif
#ifdef CONFIG_DPVS_IPVS_DEBUG
conn_dump("del conn: ", conn);
#endif
return DTIMER_STOP;
}
如果引用計數(shù) 1,說明只有自己在用,獲取協(xié)義,調(diào)用協(xié)義的連接老化函數(shù) proto->conn_expire,這里還要兼容 DPVS_FWD_MODE_SNAT,我覺得代碼寫的丑... conn_unbind_dest 解除連接和后端的引用,dp_vs_laddr_unbind 釋放本地地址。最后清理 ack_mbuf, syn_mbuf 等等
conn_hash(conn);
如果走到這里,說明連接還有人在用,那么加回流表
/* some one is using it when expire,
* try del it again later */
if (conn->flags & DPVS_CONN_F_TEMPLATE)
dpvs_timer_update(&conn->timer, &conn->timeout, true);
else
dpvs_timer_update(&conn->timer, &conn->timeout, false);
rte_atomic32_dec(&conn->refcnt);
return DTIMER_OK;
}
如果走到這里,說明連接還有人在用,那么更新超時時間加回定時器。
tcp連接清除
static int tcp_conn_expire(struct dp_vs_proto *proto,
struct dp_vs_conn *conn)
{
int err;
assert(proto && conn && conn->dest);
if (conn->dest->fwdmode == DPVS_FWD_MODE_NAT
|| conn->dest->fwdmode == DPVS_FWD_MODE_FNAT) {
/* send RST to RS and client */
err = tcp_send_rst(proto, conn, DPVS_CONN_DIR_INBOUND);
if (err != EDPVS_OK)
RTE_LOG(WARNING, IPVS, "%s: fail RST RS.\n", __func__);
err = tcp_send_rst(proto, conn, DPVS_CONN_DIR_OUTBOUND);
if (err != EDPVS_OK)
RTE_LOG(WARNING, IPVS, "%s: fail RST Client.\n", __func__);
}
return EDPVS_OK;
}
代碼很好理解,雙向發(fā)送 rst
小結(jié)
這塊實現(xiàn)的比較標(biāo)準(zhǔn),大量連接時流表就是會很膨脹,很考驗定時器性能。時間輪+多個定時器,想要高性能必備。