2. NAPI機(jī)制
上一篇博客已經(jīng)分析了,內(nèi)核是如何通過(guò)驅(qū)動(dòng)收包的。但其中說(shuō)到軟中斷時(shí),對(duì)使用的NAPI接口的描述不是很清楚。所以這篇就來(lái)看看NAPI是何方圣神。
一, 框架
隨著網(wǎng)絡(luò)帶寬的發(fā)展,網(wǎng)速越來(lái)越快,之前的中斷收包模式已經(jīng)無(wú)法適應(yīng)目前千兆,萬(wàn)兆的帶寬了。如果每個(gè)數(shù)據(jù)包大小等于MTU大小1460字節(jié)。當(dāng)驅(qū)動(dòng)以千兆網(wǎng)速收包時(shí),CPU將每秒被中斷91829次。在以MTU收包的情況下都會(huì)出現(xiàn)每秒被中斷10萬(wàn)次的情況。過(guò)多的中斷會(huì)引起一個(gè)問(wèn)題,CPU一直陷入硬中斷而沒(méi)有時(shí)間來(lái)處理別的事情了。為了解決這個(gè)問(wèn)題,內(nèi)核在2.6中引入了NAPI機(jī)制。
NAPI就是混合中斷和輪詢的方式來(lái)收包,當(dāng)有中斷來(lái)了,驅(qū)動(dòng)關(guān)閉中斷,通知內(nèi)核收包,內(nèi)核軟中斷輪詢當(dāng)前網(wǎng)卡,在規(guī)定時(shí)間盡可能多的收包。時(shí)間用盡或者沒(méi)有數(shù)據(jù)可收,內(nèi)核再次開(kāi)啟中斷,準(zhǔn)備下一次收包。
二, NAPI接口
struct napi_struct 是內(nèi)核處理軟中斷的入口,每個(gè)net_device都對(duì)應(yīng)一個(gè)napi_struct,驅(qū)動(dòng)在硬中斷中將自己的napi_struct掛載到CPU的收包隊(duì)列softnet_data。內(nèi)核在軟中斷中輪詢?cè)撽?duì)列,并執(zhí)行napi_sturct中的回調(diào)函數(shù)int(*poll)(struct napi_struct *, int);,在poll函數(shù)中,驅(qū)動(dòng)將網(wǎng)卡數(shù)據(jù)轉(zhuǎn)換成skb_buff形式,最終發(fā)往協(xié)議棧。也就是說(shuō),協(xié)議棧對(duì)數(shù)據(jù)包的處理,使用的是軟中斷的時(shí)間片。如果協(xié)議棧處理耗費(fèi)了過(guò)多的CPU時(shí)間的化,會(huì)直接影響到設(shè)備的網(wǎng)絡(luò)性能。
/*
* Structure for NAPI scheduling similar to tasklet but with weighting
*/
struct napi_struct {
/* The poll_list must only be managed by the entity which
* changes the state of the NAPI_STATE_SCHED bit. This means
* whoever atomically sets that bit can add this napi_struct
* to the per-CPU poll_list, and whoever clears that bit
* can remove from the list right before clearing the bit.
*/
struct list_head poll_list;
unsigned long state;//設(shè)備狀態(tài)
int weight; //每次輪詢最大處理數(shù)據(jù)包數(shù)量
unsigned int gro_count;
int (*poll)(struct napi_struct *, int);//輪詢?cè)O(shè)備的回調(diào)函數(shù)
#ifdef CONFIG_NETPOLL
int poll_owner;
#endif
struct net_device *dev;
struct sk_buff *gro_list;
struct sk_buff *skb;
struct hrtimer timer;
struct list_head dev_list;
struct hlist_node napi_hash_node;
unsigned int napi_id;
};
有了保存數(shù)據(jù)的結(jié)構(gòu)體,讓我們?cè)诳纯礊樗涮滋峁┑慕涌诤瘮?shù)吧:
netif_napi_add函數(shù)
驅(qū)動(dòng)在初始化net_device時(shí)通過(guò)這函數(shù)將通過(guò)這個(gè)函數(shù)綁定一個(gè)napi_struct結(jié)構(gòu)。驅(qū)動(dòng)需要在這里注冊(cè)軟中斷中用于輪詢的網(wǎng)卡的poll函數(shù)。
void netif_napi_add(struct net_device *dev, struct napi_struct *napi,
int (*poll)(struct napi_struct *, int), int weight)
{
INIT_LIST_HEAD(&napi->poll_list);
hrtimer_init(&napi->timer, CLOCK_MONOTONIC, HRTIMER_MODE_REL_PINNED);
napi->timer.function = napi_watchdog;
napi->gro_count = 0;
napi->gro_list = NULL;
napi->skb = NULL;
napi->poll = poll;
if (weight > NAPI_POLL_WEIGHT)
pr_err_once("netif_napi_add() called with weight %d on device %s\n",
weight, dev->name);
napi->weight = weight;
list_add(&napi->dev_list, &dev->napi_list);
napi->dev = dev;
#ifdef CONFIG_NETPOLL
napi->poll_owner = -1;
#endif
set_bit(NAPI_STATE_SCHED, &napi->state);
napi_hash_add(napi);
}
__napi_schedule函數(shù)
__napi_schedule函數(shù),為驅(qū)動(dòng)硬件中斷提供的接口,驅(qū)動(dòng)在硬件中斷中,將自己的napi_struct掛載到當(dāng)前CPU的softnet_data上。
/**
* __napi_schedule - schedule for receive
* @n: entry to schedule
*
* The entry's receive function will be scheduled to run.
* Consider using __napi_schedule_irqoff() if hard irqs are masked.
*/
void __napi_schedule(struct napi_struct *n)
{
unsigned long flags;
local_irq_save(flags);
____napi_schedule(this_cpu_ptr(&softnet_data), n);
local_irq_restore(flags);
}
/* Called with irq disabled */
static inline void ____napi_schedule(struct softnet_data *sd,
struct napi_struct *napi)
{
list_add_tail(&napi->poll_list, &sd->poll_list);
__raise_softirq_irqoff(NET_RX_SOFTIRQ); //設(shè)置了軟中斷接收標(biāo)志位
}
napi_schedule_prep函數(shù)
napi_schedule_prep函數(shù)是上面__napi_schedule的配套函數(shù),用于__napi_schedule調(diào)用前對(duì)napi_struct進(jìn)行檢查。前面博文e1000網(wǎng)卡的中斷函數(shù)就是這樣調(diào)用的。
if (likely(napi_schedule_prep(&adapter->napi))) {
adapter->total_tx_bytes = 0;
adapter->total_tx_packets = 0;
adapter->total_rx_bytes = 0;
adapter->total_rx_packets = 0;
__napi_schedule(&adapter->napi);
}
判斷NAPI是否可以調(diào)度。如果NAPI沒(méi)有被禁止,且不存在已被調(diào)度的NAPI,則允許調(diào)度NAPI,因?yàn)橥粫r(shí)刻只允許有一個(gè)NAPI poll instance。測(cè)試napi.state字段,只有當(dāng)其不是NAPI_STATE_SCHED時(shí),返回真,并設(shè)置為NAPI_STATE_SCHED.
/**
* napi_schedule_prep - check if napi can be scheduled
* @n: napi context
*
* Test if NAPI routine is already running, and if not mark
* it as running. This is used as a condition variable
* insure only one NAPI poll instance runs. We also make
* sure there is no pending NAPI disable.
*/
bool napi_schedule_prep(struct napi_struct *n)
{
unsigned long val, new;
do {
val = READ_ONCE(n->state);
if (unlikely(val & NAPIF_STATE_DISABLE))
return false;
new = val | NAPIF_STATE_SCHED;
/* Sets STATE_MISSED bit if STATE_SCHED was already set
* This was suggested by Alexander Duyck, as compiler
* emits better code than :
* if (val & NAPIF_STATE_SCHED)
* new |= NAPIF_STATE_MISSED;
*/
new |= (val & NAPIF_STATE_SCHED) / NAPIF_STATE_SCHED *
NAPIF_STATE_MISSED;
} while (cmpxchg(&n->state, val, new) != val);
return !(val & NAPIF_STATE_SCHED);
}
上面的三個(gè)函數(shù)netif_napi_add,__napi_schedule,napi_schedule_prep是驅(qū)動(dòng)使用NAPI收包機(jī)制的接口,下面再看看內(nèi)核軟中斷使用NAPI的接口函數(shù)吧。
napi_poll函數(shù)
這函數(shù)是被軟中斷處理函數(shù)net_rx_action調(diào)用的。這個(gè)函數(shù)將在napi_struct.weight規(guī)定的時(shí)間內(nèi),被net_rx_action循環(huán)調(diào)用,直到時(shí)間片用盡或者網(wǎng)卡當(dāng)前DMA中所有緩存的數(shù)據(jù)包被處理完。如果是由于時(shí)間片用盡而退出的的話,napi_struct會(huì)重新掛載到softnet_data上,而如果是所有數(shù)據(jù)包處理完退出的,napi_struct會(huì)從softnet_data上移除并重新打開(kāi)網(wǎng)卡硬件中斷。
static int napi_poll(struct napi_struct *n, struct list_head *repoll)
{
void *have;
int work, weight;
list_del_init(&n->poll_list);
have = netpoll_poll_lock(n);
weight = n->weight;
/* This NAPI_STATE_SCHED test is for avoiding a race
* with netpoll's poll_napi(). Only the entity which
* obtains the lock and sees NAPI_STATE_SCHED set will
* actually make the ->poll() call. Therefore we avoid
* accidentally calling ->poll() when NAPI is not scheduled.
*/
work = 0;
if (test_bit(NAPI_STATE_SCHED, &n->state)) {
work = n->poll(n, weight); //調(diào)用網(wǎng)卡注冊(cè)的poll函數(shù)
trace_napi_poll(n, work, weight);
}
WARN_ON_ONCE(work > weight);
if (likely(work < weight))
goto out_unlock;
/* Drivers must not modify the NAPI state if they
* consume the entire weight. In such cases this code
* still "owns" the NAPI instance and therefore can
* move the instance around on the list at-will.
*/
if (unlikely(napi_disable_pending(n))) {
napi_complete(n);
goto out_unlock;
}
if (n->gro_list) {
/* flush too old packets
* If HZ < 1000, flush all packets.
*/
napi_gro_flush(n, HZ >= 1000);
}
/* Some drivers may have called napi_schedule
* prior to exhausting their budget.
*/
if (unlikely(!list_empty(&n->poll_list))) {
pr_warn_once("%s: Budget exhausted after napi rescheduled\n",
n->dev ? n->dev->name : "backlog");
goto out_unlock;
}
list_add_tail(&n->poll_list, repoll);
out_unlock:
netpoll_poll_unlock(have);
return work;
}
napi_gro_receive函數(shù)
準(zhǔn)確來(lái)說(shuō)napi_gro_receive函數(shù)是驅(qū)動(dòng)通過(guò)poll注冊(cè),內(nèi)核調(diào)用的函數(shù)。通過(guò)這函數(shù)的的調(diào)用,skb將會(huì)傳給協(xié)議棧的入口函數(shù)__netif_receive_skb。dev_gro_receive函數(shù)用于對(duì)數(shù)據(jù)包的合并,他將合并napi_struct.gro_list鏈表上的skb。GRO是一個(gè)網(wǎng)絡(luò)子系統(tǒng)的另一套機(jī)制,以后再看。
gro_result_t napi_gro_receive(struct napi_struct *napi, struct sk_buff *skb)
{
skb_mark_napi_id(skb, napi);
trace_napi_gro_receive_entry(skb);
skb_gro_reset_offset(skb);
return napi_skb_finish(dev_gro_receive(napi, skb), skb);
}
總結(jié)
netif_napi_add:驅(qū)動(dòng)初始時(shí)向內(nèi)核注冊(cè)軟軟中斷處理回調(diào)poll函數(shù)
__napi_schedule:網(wǎng)卡硬件中斷用來(lái)觸發(fā)軟中斷
napi_poll:軟中斷處理函數(shù)net_rx_action用來(lái)回調(diào)上面驅(qū)動(dòng)初始化是通過(guò)netif_napi_add注冊(cè)的回調(diào)收包poll函數(shù)
napi_gro_receive:poll函數(shù)用來(lái)將網(wǎng)卡上的數(shù)據(jù)包發(fā)給協(xié)議棧處理。
到這,NAPI機(jī)制下的收包處理流程就很清晰了。IRQ->__napi_schedule->進(jìn)入軟中斷->net_rx_action->napi_poll->驅(qū)動(dòng)注冊(cè)的poll->napi_gro_receive。