依據(jù)kfifo實(shí)現(xiàn)環(huán)形緩沖區(qū)

環(huán)形緩沖區(qū)經(jīng)常被使用到,尤其在生產(chǎn)者和消費(fèi)者的模型中,假設(shè)生產(chǎn)者專門用于產(chǎn)生數(shù)據(jù),而消費(fèi)者專門用于處理數(shù)據(jù),由于各種原因,可能生產(chǎn)者和消費(fèi)者產(chǎn)生數(shù)據(jù)和處理數(shù)據(jù)的速度不一,比如如果處理速度有慢又快,在慢的時(shí)候,消費(fèi)者產(chǎn)生的數(shù)據(jù)來(lái)不及處理的可能被丟棄,或者強(qiáng)制讓生產(chǎn)者降速等待,在快的時(shí)候,又有可能太快,而生產(chǎn)者供給不了,那么消費(fèi)者也必須等待.正是由于快慢不一,緩沖區(qū)的存在則恰可以進(jìn)行中和,協(xié)調(diào)生產(chǎn)者和消費(fèi)者速度不一的問(wèn)題.

一.內(nèi)核kfifo

首先學(xué)習(xí)一下linux內(nèi)核是如何設(shè)計(jì)環(huán)形緩沖區(qū)的,畢竟內(nèi)核代碼精煉之至,令人嘆為觀止.
這里是linux2.6.27的代碼


1.kfifo的結(jié)構(gòu)類型

struct kfifo {
    unsigned char *buffer;  /* the buffer holding the data */
    unsigned int size;  /* the size of the allocated buffer */
    unsigned int in;    /* data is added at offset (in % size) */
    unsigned int out;   /* data is extracted from off. (out % size) */
    spinlock_t *lock;   /* protects concurrent modifications */
};

這里發(fā)現(xiàn)我們用in out描述put get操作fifo的位置,用的是unsigned int類型,后面如果我們想獲得in實(shí)際在fifo的位置,用in&(size-1),這就是size下面要采用用2的乘方的原因.
而牽涉到in,out一起計(jì)算的時(shí)候,不需要進(jìn)行&運(yùn)算獲取實(shí)際位置,即使有溢出問(wèn)題也是滿足的,可以使用補(bǔ)碼進(jìn)行驗(yàn)算,最后都是看成無(wú)符號(hào)的數(shù).
在in out增加和減少,會(huì)自己溢出回歸.
2.kfifo_init

struct kfifo *kfifo_init(unsigned char *buffer, unsigned int size,
             gfp_t gfp_mask, spinlock_t *lock)
{
    struct kfifo *fifo;

    /* size must be a power of 2 */
    BUG_ON(!is_power_of_2(size));

    fifo = kmalloc(sizeof(struct kfifo), gfp_mask);
    if (!fifo)
        return ERR_PTR(-ENOMEM);

    fifo->buffer = buffer;
    fifo->size = size;
    fifo->in = fifo->out = 0;
    fifo->lock = lock;

    return fifo;
}

bool is_power_of_2(unsigned long n)
{
    return (n != 0 && ((n & (n - 1)) == 0));
}

申請(qǐng)分配一個(gè)kfifo的結(jié)構(gòu)體指針,初始化buffer使用的是函數(shù)外部的空間,in,out為0,size其中必須為2的乘方,意義為下面size-1方便進(jìn)行與運(yùn)算.

3.kfifo_alloc

struct kfifo *kfifo_alloc(unsigned int size, gfp_t gfp_mask, spinlock_t *lock)
{
    unsigned char *buffer;
    struct kfifo *ret;

    /*
     * round up to the next power of 2, since our 'let the indices
     * wrap' tachnique works only in this case.
     */
    if (size & (size - 1)) {
        BUG_ON(size > 0x80000000);
        size = roundup_pow_of_two(size);
    }

    buffer = kmalloc(size, gfp_mask);
    if (!buffer)
        return ERR_PTR(-ENOMEM);

    ret = kfifo_init(buffer, size, gfp_mask, lock);

    if (IS_ERR(ret))
        kfree(buffer);

    return ret;
}

這個(gè)函數(shù)主要就是申請(qǐng)size的buffer空間,然后調(diào)用kfifo_init初始化.
4.kfifo_free

void kfifo_free(struct kfifo *fifo)
{
    kfree(fifo->buffer);
    kfree(fifo);
}

這個(gè)函數(shù)和kfifo_alloc配合使用,用于釋放內(nèi)存,先釋放buffer,再釋放結(jié)構(gòu)體指針fifo.
5.kfifo_reset

static inline void __kfifo_reset(struct kfifo *fifo)
{
    fifo->in = fifo->out = 0;
}

static inline void kfifo_reset(struct kfifo *fifo)
{
    unsigned long flags;
    spin_lock_irqsave(fifo->lock, flags);
    __kfifo_reset(fifo);
    spin_unlock_irqrestore(fifo->lock, flags);
}

重置in out位置為0
6.kfifo_len

static inline unsigned int __kfifo_len(struct kfifo *fifo)
{
    return fifo->in - fifo->out;
}

static inline unsigned int kfifo_len(struct kfifo *fifo)
{
    unsigned long flags;
    unsigned int ret;
    spin_lock_irqsave(fifo->lock, flags);
    ret = __kfifo_len(fifo);
    spin_unlock_irqrestore(fifo->lock, flags);
    return ret;
}

得到fifo中數(shù)據(jù)的長(zhǎng)度,用fifo->in - fifo->out是沒(méi)有問(wèn)題的,即便在unsigned int型溢出時(shí)也是對(duì)的,具體可以使用補(bǔ)碼進(jìn)行運(yùn)算.
7.kfifo_put

unsigned int __kfifo_put(struct kfifo *fifo,
             unsigned char *buffer, unsigned int len)
{
    unsigned int l;
    //put進(jìn)去的字節(jié)數(shù)不能大于fifo剩余的字節(jié)數(shù)
    len = min(len, fifo->size - fifo->in + fifo->out);
    smp_mb();

    /*fifo->in & (fifo->size - 1)通過(guò)這個(gè)與運(yùn)算,相當(dāng)于把
      fifo->in是size的倍數(shù)給去掉了,得到的是在size里的位
      置,就是在這個(gè)buffer的位置.
      而l所表示的是要put進(jìn)去的字節(jié)數(shù)和從in開(kāi)始到buffer
      結(jié)尾字節(jié)數(shù)的小值,就是從in到buffer結(jié)尾能不能放下
      目的字節(jié)數(shù)*/
    l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
    /*如果放不下,copy分兩部分,一部分從in在位置復(fù)制l字
      節(jié)數(shù).一部分從buffer開(kāi)始復(fù)制len-l字節(jié)數(shù),如果放得
      下,那len-l為0,一樣可以*/
    memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);
    memcpy(fifo->buffer, buffer + l, len - l);
    smp_wmb();
    //更新in所在位置
    fifo->in += len;

    return len;
}

static inline unsigned int kfifo_put(struct kfifo *fifo,
                     unsigned char *buffer, unsigned int len)
{
    unsigned long flags;
    unsigned int ret;
    spin_lock_irqsave(fifo->lock, flags);
    ret = __kfifo_put(fifo, buffer, len);
    spin_unlock_irqrestore(fifo->lock, flags);
    return ret;
}

8.kfifo_get

unsigned int __kfifo_get(struct kfifo *fifo,
             unsigned char *buffer, unsigned int len)
{
    unsigned int l;
    //get的字節(jié)數(shù)和fifo buffer中字節(jié)數(shù)比較,len為最終要get的字節(jié)數(shù)
    len = min(len, fifo->in - fifo->out);
    smp_rmb();

    //要get的字節(jié)數(shù),和out所在位置到fifo buffer結(jié)尾字節(jié)數(shù),比較
    l = min(len, fifo->size - (fifo->out & (fifo->size - 1)));
    //和put同理,兩部分,一部分copy l字節(jié)數(shù),一部分copy len-l字節(jié)數(shù),注意方向
    memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);
    memcpy(buffer + l, fifo->buffer, len - l);
    smp_mb();
    //更新out位置
    fifo->out += len;

    return len;
}

static inline unsigned int kfifo_get(struct kfifo *fifo,
                     unsigned char *buffer, unsigned int len)
{
    unsigned long flags;
    unsigned int ret;

    spin_lock_irqsave(fifo->lock, flags);

    ret = __kfifo_get(fifo, buffer, len);
    //如果沒(méi)有數(shù)據(jù),重置
    if (fifo->in == fifo->out)
        fifo->in = fifo->out = 0;

    spin_unlock_irqrestore(fifo->lock, flags)
     //返回get的字節(jié)數(shù)
    return ret;
}

二.仿造kfifo,編寫的環(huán)形緩沖區(qū)ring.c ring.h


/*ring.h*/
#ifndef RING_H
#define RING_H

#include <pthread.h>


struct ring{
    unsigned char *buffer;  /* the buffer holding the data */
    unsigned int size;  /* the size of the allocated buffer */
    unsigned int in;    /* data is added at offset (in % size) */
    unsigned int out;   /* data is extracted from off. (out % size) */
    pthread_mutex_t *lock;  /* protects concurrent modifications */
};



extern struct ring *ring_init(unsigned char *buffer, unsigned int size,pthread_mutex_t *lock);
extern struct ring *ring_alloc(unsigned int size,pthread_mutex_t *lock);
extern void ring_free(struct ring *fifo);
extern unsigned int __ring_put(struct ring *fifo,
                unsigned char *buffer, unsigned int len);
extern unsigned int __ring_get(struct ring *fifo,
                unsigned char *buffer, unsigned int len);

static inline void __ring_reset(struct ring *fifo)
{
    fifo->in = fifo->out = 0;
}
static inline void ring_reset(struct ring *fifo)
{
    unsigned long flags;

    pthread_mutex_lock(fifo->lock);

    __ring_reset(fifo);

    pthread_mutex_unlock(fifo->lock);
}



static inline unsigned int __ring_len(struct ring *fifo)
{
    return fifo->in - fifo->out;
}

static inline unsigned int ring_len(struct ring *fifo)
{
    unsigned int ret;

    pthread_mutex_lock(fifo->lock);

    ret = __ring_len(fifo);

    pthread_mutex_unlock(fifo->lock);

    return ret;
}



static inline unsigned int ring_put(struct ring *fifo,
                     unsigned char *buffer, unsigned int len)
{
    unsigned int ret;

    pthread_mutex_lock(fifo->lock);

    ret = __ring_put(fifo, buffer, len);

    pthread_mutex_unlock(fifo->lock);

    return ret;
}




static inline unsigned int ring_get(struct ring *fifo,
                     unsigned char *buffer, unsigned int len)
{
    unsigned int ret;

    pthread_mutex_lock(fifo->lock);

    ret = __ring_get(fifo, buffer, len);

    if (fifo->in == fifo->out)
        fifo->in = fifo->out = 0;

    pthread_mutex_unlock(fifo->lock);

    return ret;
}

#endif

/*ring.c*/
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include "ring.h"

#define is_power_of_2(x) ((x) != 0 && (((x) & ((x) - 1)) == 0))
#define min(x,y) ({ \
    typeof(x) _x = (x); \
    typeof(y) _y = (y); \
    (void) (&_x == &_y);    \
    _x < _y ? _x : _y; })



struct ring *ring_init(unsigned char *buffer, unsigned int size,
                        pthread_mutex_t *lock)
{
    struct ring *fifo = NULL;

    if(!is_power_of_2(size)){
        printf("size is not power of 2\n");
        return fifo;
    }

    fifo = (struct ring *)malloc(sizeof(struct ring));
    if (!fifo){
        printf("fifo malloc error\n");
        return fifo;
    }

    fifo->buffer = buffer;
    fifo->size = size;
    fifo->in = fifo->out = 0;
    fifo->lock = lock;

    return fifo;
}



struct ring *ring_alloc(unsigned int size,pthread_mutex_t *lock)
{
    unsigned char *buffer = NULL;
    struct ring *ret = NULL;

    

    buffer = (unsigned char *)malloc(size);
    if (!buffer){
        printf("buffer malloc error\n");
        return ret;
    }

    ret = ring_init(buffer, size, lock);

    return ret;
}
void ring_free(struct ring *fifo)
{
    free(fifo->buffer);
    free(fifo);
}




unsigned int __ring_put(struct ring *fifo,
             unsigned char *buffer, unsigned int len)
{
    unsigned int l;

    len = min(len, fifo->size - fifo->in + fifo->out);

    l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
    memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);
    memcpy(fifo->buffer, buffer + l, len - l);

    fifo->in += len;

    return len;
}



unsigned int __ring_get(struct ring *fifo,
             unsigned char *buffer, unsigned int len)
{
    unsigned int l;

    len = min(len, fifo->in - fifo->out);

    l = min(len, fifo->size - (fifo->out & (fifo->size - 1)));
    memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);
    memcpy(buffer + l, fifo->buffer, len - l);

    fifo->out += len;
    return len;
}

三.測(cè)試

測(cè)試的main.c文件如下:

/*main.c*/
#include <stdio.h>
#include <pthread.h>
#include <signal.h>
#include <string.h>
#include <stdlib.h>
#include <time.h>
#include "ring.h"

struct data{
    int a;
    time_t t;
};



pthread_t tid1;
pthread_t tid2;


void sig_handler(int sig)
{
    if(sig == SIGINT){

        if(pthread_cancel(tid1) != 0){
            perror("thread cancel fail");
            exit(0);
        }

        if(pthread_cancel(tid2) != 0){
            perror("thread cancel fail");
            exit(0);
        }
        printf("\n\n 兩個(gè)線程取消\n");
   }
}



void * put_proc(void * arg)
{
    signal(SIGINT, sig_handler);
    if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL) != 0){
        perror("pthread set cancel state fail");
        pthread_exit(NULL);
        exit(0);
    }
    if(pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS,NULL) != 0){
        perror("pthread set cancel type fail");
        pthread_exit(NULL);
        exit(0);
    }


    int i = 0;
    struct data data_put;
    struct ring * ring_buf = (struct ring *)arg;
    int len = sizeof(struct data);
    int ret;
    while(1){
        data_put.a = i;
        time(&data_put.t);
        ret = ring_put(ring_buf,(unsigned char *)&data_put,len);
        printf("ret put:%d\nput data:%d\ntime:%s\n\n",ret,data_put.a,ctime(&data_put.t));
        i++;
        sleep(2);
    }
}

void * get_proc(void * arg)
{
    signal(SIGINT, sig_handler);
    if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL) != 0){
        perror("pthread set cancel state fail");
        pthread_exit(NULL);
        exit(0);
    }
    if(pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS,NULL) != 0){
        perror("pthread set cancel type fail");
        pthread_exit(NULL);
        exit(0);
    }

    struct ring * ring_buf = (struct ring *)arg;
    int len = sizeof(struct data);
    struct data data_get;
    int ret;
    while(1){
        ret = ring_get(ring_buf,(unsigned char *)&data_get,len);
        printf("ret get:%d\nget data:%d\ntime:%s\n\n",ret,data_get.a,ctime(&data_get.t));
        sleep(2);
    }

}



int main(int argc, char const *argv[])
{
    signal(SIGINT, sig_handler);

    pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
    struct ring * ring_buf = NULL;
    ring_buf = ring_alloc(32,&lock);


    int err;

    err = pthread_create(&tid1, NULL, put_proc, ring_buf);
    if(err){
        printf("fail create thread 1\n");
        goto end;
    }
    err = pthread_create(&tid2, NULL, get_proc, ring_buf);
    if(err){
        printf("fail create thread 2\n");
        goto end;
    }
    pthread_join(tid1,NULL);
    pthread_join(tid2,NULL);


    printf("program end\n");
end:
    ring_free(ring_buf);
    return 0;
}

其中,建立了兩個(gè)線程,一個(gè)用于向ring_buf寫數(shù)據(jù),一個(gè)用于向ring_buf讀數(shù)據(jù),數(shù)據(jù)定義時(shí)加上了時(shí)間信息便于查看.互斥量的使用主要用于線程同步,比如兩個(gè)線程如果都向緩沖區(qū)寫數(shù)據(jù)時(shí),必須保證臨界區(qū)的安全,當(dāng)然也可以使用讀寫鎖,其實(shí)更好一些,因?yàn)樽x的時(shí)候,也可以寫.
信號(hào)處理函數(shù),用于ctrl+c強(qiáng)制結(jié)束時(shí),異步取消線程.

結(jié)果:

ret get:0
get data:1032341248
time:Sun Apr 21 12:30:00 4461252


ret put:16
put data:0
time:Mon Jan  8 17:35:33 2018


ret get:16
get data:0
time:Mon Jan  8 17:35:33 2018


ret put:16
put data:1
time:Mon Jan  8 17:35:35 2018


ret get:16
get data:1
time:Mon Jan  8 17:35:35 2018


ret put:16
put data:2
time:Mon Jan  8 17:35:37 2018


ret get:16
get data:2
time:Mon Jan  8 17:35:37 2018


ret put:16
put data:3
time:Mon Jan  8 17:35:39 2018


ret get:16
get data:3
time:Mon Jan  8 17:35:39 2018


ret put:16
put data:4
time:Mon Jan  8 17:35:41 2018


^C

 兩個(gè)線程取消
program end

剛開(kāi)始,get線程沒(méi)有讀出數(shù)據(jù)
后面就是put一個(gè),get一個(gè),沒(méi)問(wèn)題.

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,545評(píng)論 19 139
  • 本篇文章是基于谷歌有關(guān)Graphic的一篇概覽文章的翻譯:http://source.android.com/de...
    lee_3do閱讀 7,451評(píng)論 2 21
  • Tips:在學(xué)習(xí)Celery過(guò)程中,使用的系統(tǒng)為Windows 10、Celery版本為3.1.18①、中間人使用...
    嘿嘿_小于同學(xué)閱讀 5,866評(píng)論 7 10
  • 以前聽(tīng)不懂Eason,更甚的覺(jué)得粵語(yǔ)歌冗長(zhǎng)拗口,可最近越發(fā)的覺(jué)得歌里唱的不止有詞曲旋律,還有含著的浮沉漲落...
    森嶼北魚閱讀 246評(píng)論 0 1
  • 我從來(lái)不相信愛(ài)情 可是 你來(lái)了 你如三月的春風(fēng) 暖暖的輕撫在我的臉上 你似天上的彩虹 絢麗的渲染著你的風(fēng)采 我們從...
    望風(fēng)的少年閱讀 760評(píng)論 0 1

友情鏈接更多精彩內(nèi)容