環(huán)形緩沖區(qū)經(jīng)常被使用到,尤其在生產(chǎn)者和消費(fèi)者的模型中,假設(shè)生產(chǎn)者專門用于產(chǎn)生數(shù)據(jù),而消費(fèi)者專門用于處理數(shù)據(jù),由于各種原因,可能生產(chǎn)者和消費(fèi)者產(chǎn)生數(shù)據(jù)和處理數(shù)據(jù)的速度不一,比如如果處理速度有慢又快,在慢的時(shí)候,消費(fèi)者產(chǎn)生的數(shù)據(jù)來(lái)不及處理的可能被丟棄,或者強(qiáng)制讓生產(chǎn)者降速等待,在快的時(shí)候,又有可能太快,而生產(chǎn)者供給不了,那么消費(fèi)者也必須等待.正是由于快慢不一,緩沖區(qū)的存在則恰可以進(jìn)行中和,協(xié)調(diào)生產(chǎn)者和消費(fèi)者速度不一的問(wèn)題.
一.內(nèi)核kfifo
首先學(xué)習(xí)一下linux內(nèi)核是如何設(shè)計(jì)環(huán)形緩沖區(qū)的,畢竟內(nèi)核代碼精煉之至,令人嘆為觀止.
這里是linux2.6.27的代碼
1.kfifo的結(jié)構(gòu)類型
struct kfifo {
unsigned char *buffer; /* the buffer holding the data */
unsigned int size; /* the size of the allocated buffer */
unsigned int in; /* data is added at offset (in % size) */
unsigned int out; /* data is extracted from off. (out % size) */
spinlock_t *lock; /* protects concurrent modifications */
};
這里發(fā)現(xiàn)我們用in out描述put get操作fifo的位置,用的是unsigned int類型,后面如果我們想獲得in實(shí)際在fifo的位置,用in&(size-1),這就是size下面要采用用2的乘方的原因.
而牽涉到in,out一起計(jì)算的時(shí)候,不需要進(jìn)行&運(yùn)算獲取實(shí)際位置,即使有溢出問(wèn)題也是滿足的,可以使用補(bǔ)碼進(jìn)行驗(yàn)算,最后都是看成無(wú)符號(hào)的數(shù).
在in out增加和減少,會(huì)自己溢出回歸.
2.kfifo_init
struct kfifo *kfifo_init(unsigned char *buffer, unsigned int size,
gfp_t gfp_mask, spinlock_t *lock)
{
struct kfifo *fifo;
/* size must be a power of 2 */
BUG_ON(!is_power_of_2(size));
fifo = kmalloc(sizeof(struct kfifo), gfp_mask);
if (!fifo)
return ERR_PTR(-ENOMEM);
fifo->buffer = buffer;
fifo->size = size;
fifo->in = fifo->out = 0;
fifo->lock = lock;
return fifo;
}
bool is_power_of_2(unsigned long n)
{
return (n != 0 && ((n & (n - 1)) == 0));
}
申請(qǐng)分配一個(gè)kfifo的結(jié)構(gòu)體指針,初始化buffer使用的是函數(shù)外部的空間,in,out為0,size其中必須為2的乘方,意義為下面size-1方便進(jìn)行與運(yùn)算.
3.kfifo_alloc
struct kfifo *kfifo_alloc(unsigned int size, gfp_t gfp_mask, spinlock_t *lock)
{
unsigned char *buffer;
struct kfifo *ret;
/*
* round up to the next power of 2, since our 'let the indices
* wrap' tachnique works only in this case.
*/
if (size & (size - 1)) {
BUG_ON(size > 0x80000000);
size = roundup_pow_of_two(size);
}
buffer = kmalloc(size, gfp_mask);
if (!buffer)
return ERR_PTR(-ENOMEM);
ret = kfifo_init(buffer, size, gfp_mask, lock);
if (IS_ERR(ret))
kfree(buffer);
return ret;
}
這個(gè)函數(shù)主要就是申請(qǐng)size的buffer空間,然后調(diào)用kfifo_init初始化.
4.kfifo_free
void kfifo_free(struct kfifo *fifo)
{
kfree(fifo->buffer);
kfree(fifo);
}
這個(gè)函數(shù)和kfifo_alloc配合使用,用于釋放內(nèi)存,先釋放buffer,再釋放結(jié)構(gòu)體指針fifo.
5.kfifo_reset
static inline void __kfifo_reset(struct kfifo *fifo)
{
fifo->in = fifo->out = 0;
}
static inline void kfifo_reset(struct kfifo *fifo)
{
unsigned long flags;
spin_lock_irqsave(fifo->lock, flags);
__kfifo_reset(fifo);
spin_unlock_irqrestore(fifo->lock, flags);
}
重置in out位置為0
6.kfifo_len
static inline unsigned int __kfifo_len(struct kfifo *fifo)
{
return fifo->in - fifo->out;
}
static inline unsigned int kfifo_len(struct kfifo *fifo)
{
unsigned long flags;
unsigned int ret;
spin_lock_irqsave(fifo->lock, flags);
ret = __kfifo_len(fifo);
spin_unlock_irqrestore(fifo->lock, flags);
return ret;
}
得到fifo中數(shù)據(jù)的長(zhǎng)度,用fifo->in - fifo->out是沒(méi)有問(wèn)題的,即便在unsigned int型溢出時(shí)也是對(duì)的,具體可以使用補(bǔ)碼進(jìn)行運(yùn)算.
7.kfifo_put
unsigned int __kfifo_put(struct kfifo *fifo,
unsigned char *buffer, unsigned int len)
{
unsigned int l;
//put進(jìn)去的字節(jié)數(shù)不能大于fifo剩余的字節(jié)數(shù)
len = min(len, fifo->size - fifo->in + fifo->out);
smp_mb();
/*fifo->in & (fifo->size - 1)通過(guò)這個(gè)與運(yùn)算,相當(dāng)于把
fifo->in是size的倍數(shù)給去掉了,得到的是在size里的位
置,就是在這個(gè)buffer的位置.
而l所表示的是要put進(jìn)去的字節(jié)數(shù)和從in開(kāi)始到buffer
結(jié)尾字節(jié)數(shù)的小值,就是從in到buffer結(jié)尾能不能放下
目的字節(jié)數(shù)*/
l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
/*如果放不下,copy分兩部分,一部分從in在位置復(fù)制l字
節(jié)數(shù).一部分從buffer開(kāi)始復(fù)制len-l字節(jié)數(shù),如果放得
下,那len-l為0,一樣可以*/
memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);
memcpy(fifo->buffer, buffer + l, len - l);
smp_wmb();
//更新in所在位置
fifo->in += len;
return len;
}
static inline unsigned int kfifo_put(struct kfifo *fifo,
unsigned char *buffer, unsigned int len)
{
unsigned long flags;
unsigned int ret;
spin_lock_irqsave(fifo->lock, flags);
ret = __kfifo_put(fifo, buffer, len);
spin_unlock_irqrestore(fifo->lock, flags);
return ret;
}
8.kfifo_get
unsigned int __kfifo_get(struct kfifo *fifo,
unsigned char *buffer, unsigned int len)
{
unsigned int l;
//get的字節(jié)數(shù)和fifo buffer中字節(jié)數(shù)比較,len為最終要get的字節(jié)數(shù)
len = min(len, fifo->in - fifo->out);
smp_rmb();
//要get的字節(jié)數(shù),和out所在位置到fifo buffer結(jié)尾字節(jié)數(shù),比較
l = min(len, fifo->size - (fifo->out & (fifo->size - 1)));
//和put同理,兩部分,一部分copy l字節(jié)數(shù),一部分copy len-l字節(jié)數(shù),注意方向
memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);
memcpy(buffer + l, fifo->buffer, len - l);
smp_mb();
//更新out位置
fifo->out += len;
return len;
}
static inline unsigned int kfifo_get(struct kfifo *fifo,
unsigned char *buffer, unsigned int len)
{
unsigned long flags;
unsigned int ret;
spin_lock_irqsave(fifo->lock, flags);
ret = __kfifo_get(fifo, buffer, len);
//如果沒(méi)有數(shù)據(jù),重置
if (fifo->in == fifo->out)
fifo->in = fifo->out = 0;
spin_unlock_irqrestore(fifo->lock, flags)
//返回get的字節(jié)數(shù)
return ret;
}
二.仿造kfifo,編寫的環(huán)形緩沖區(qū)ring.c ring.h
/*ring.h*/
#ifndef RING_H
#define RING_H
#include <pthread.h>
struct ring{
unsigned char *buffer; /* the buffer holding the data */
unsigned int size; /* the size of the allocated buffer */
unsigned int in; /* data is added at offset (in % size) */
unsigned int out; /* data is extracted from off. (out % size) */
pthread_mutex_t *lock; /* protects concurrent modifications */
};
extern struct ring *ring_init(unsigned char *buffer, unsigned int size,pthread_mutex_t *lock);
extern struct ring *ring_alloc(unsigned int size,pthread_mutex_t *lock);
extern void ring_free(struct ring *fifo);
extern unsigned int __ring_put(struct ring *fifo,
unsigned char *buffer, unsigned int len);
extern unsigned int __ring_get(struct ring *fifo,
unsigned char *buffer, unsigned int len);
static inline void __ring_reset(struct ring *fifo)
{
fifo->in = fifo->out = 0;
}
static inline void ring_reset(struct ring *fifo)
{
unsigned long flags;
pthread_mutex_lock(fifo->lock);
__ring_reset(fifo);
pthread_mutex_unlock(fifo->lock);
}
static inline unsigned int __ring_len(struct ring *fifo)
{
return fifo->in - fifo->out;
}
static inline unsigned int ring_len(struct ring *fifo)
{
unsigned int ret;
pthread_mutex_lock(fifo->lock);
ret = __ring_len(fifo);
pthread_mutex_unlock(fifo->lock);
return ret;
}
static inline unsigned int ring_put(struct ring *fifo,
unsigned char *buffer, unsigned int len)
{
unsigned int ret;
pthread_mutex_lock(fifo->lock);
ret = __ring_put(fifo, buffer, len);
pthread_mutex_unlock(fifo->lock);
return ret;
}
static inline unsigned int ring_get(struct ring *fifo,
unsigned char *buffer, unsigned int len)
{
unsigned int ret;
pthread_mutex_lock(fifo->lock);
ret = __ring_get(fifo, buffer, len);
if (fifo->in == fifo->out)
fifo->in = fifo->out = 0;
pthread_mutex_unlock(fifo->lock);
return ret;
}
#endif
/*ring.c*/
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include "ring.h"
#define is_power_of_2(x) ((x) != 0 && (((x) & ((x) - 1)) == 0))
#define min(x,y) ({ \
typeof(x) _x = (x); \
typeof(y) _y = (y); \
(void) (&_x == &_y); \
_x < _y ? _x : _y; })
struct ring *ring_init(unsigned char *buffer, unsigned int size,
pthread_mutex_t *lock)
{
struct ring *fifo = NULL;
if(!is_power_of_2(size)){
printf("size is not power of 2\n");
return fifo;
}
fifo = (struct ring *)malloc(sizeof(struct ring));
if (!fifo){
printf("fifo malloc error\n");
return fifo;
}
fifo->buffer = buffer;
fifo->size = size;
fifo->in = fifo->out = 0;
fifo->lock = lock;
return fifo;
}
struct ring *ring_alloc(unsigned int size,pthread_mutex_t *lock)
{
unsigned char *buffer = NULL;
struct ring *ret = NULL;
buffer = (unsigned char *)malloc(size);
if (!buffer){
printf("buffer malloc error\n");
return ret;
}
ret = ring_init(buffer, size, lock);
return ret;
}
void ring_free(struct ring *fifo)
{
free(fifo->buffer);
free(fifo);
}
unsigned int __ring_put(struct ring *fifo,
unsigned char *buffer, unsigned int len)
{
unsigned int l;
len = min(len, fifo->size - fifo->in + fifo->out);
l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);
memcpy(fifo->buffer, buffer + l, len - l);
fifo->in += len;
return len;
}
unsigned int __ring_get(struct ring *fifo,
unsigned char *buffer, unsigned int len)
{
unsigned int l;
len = min(len, fifo->in - fifo->out);
l = min(len, fifo->size - (fifo->out & (fifo->size - 1)));
memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);
memcpy(buffer + l, fifo->buffer, len - l);
fifo->out += len;
return len;
}
三.測(cè)試
測(cè)試的main.c文件如下:
/*main.c*/
#include <stdio.h>
#include <pthread.h>
#include <signal.h>
#include <string.h>
#include <stdlib.h>
#include <time.h>
#include "ring.h"
struct data{
int a;
time_t t;
};
pthread_t tid1;
pthread_t tid2;
void sig_handler(int sig)
{
if(sig == SIGINT){
if(pthread_cancel(tid1) != 0){
perror("thread cancel fail");
exit(0);
}
if(pthread_cancel(tid2) != 0){
perror("thread cancel fail");
exit(0);
}
printf("\n\n 兩個(gè)線程取消\n");
}
}
void * put_proc(void * arg)
{
signal(SIGINT, sig_handler);
if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL) != 0){
perror("pthread set cancel state fail");
pthread_exit(NULL);
exit(0);
}
if(pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS,NULL) != 0){
perror("pthread set cancel type fail");
pthread_exit(NULL);
exit(0);
}
int i = 0;
struct data data_put;
struct ring * ring_buf = (struct ring *)arg;
int len = sizeof(struct data);
int ret;
while(1){
data_put.a = i;
time(&data_put.t);
ret = ring_put(ring_buf,(unsigned char *)&data_put,len);
printf("ret put:%d\nput data:%d\ntime:%s\n\n",ret,data_put.a,ctime(&data_put.t));
i++;
sleep(2);
}
}
void * get_proc(void * arg)
{
signal(SIGINT, sig_handler);
if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL) != 0){
perror("pthread set cancel state fail");
pthread_exit(NULL);
exit(0);
}
if(pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS,NULL) != 0){
perror("pthread set cancel type fail");
pthread_exit(NULL);
exit(0);
}
struct ring * ring_buf = (struct ring *)arg;
int len = sizeof(struct data);
struct data data_get;
int ret;
while(1){
ret = ring_get(ring_buf,(unsigned char *)&data_get,len);
printf("ret get:%d\nget data:%d\ntime:%s\n\n",ret,data_get.a,ctime(&data_get.t));
sleep(2);
}
}
int main(int argc, char const *argv[])
{
signal(SIGINT, sig_handler);
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
struct ring * ring_buf = NULL;
ring_buf = ring_alloc(32,&lock);
int err;
err = pthread_create(&tid1, NULL, put_proc, ring_buf);
if(err){
printf("fail create thread 1\n");
goto end;
}
err = pthread_create(&tid2, NULL, get_proc, ring_buf);
if(err){
printf("fail create thread 2\n");
goto end;
}
pthread_join(tid1,NULL);
pthread_join(tid2,NULL);
printf("program end\n");
end:
ring_free(ring_buf);
return 0;
}
其中,建立了兩個(gè)線程,一個(gè)用于向ring_buf寫數(shù)據(jù),一個(gè)用于向ring_buf讀數(shù)據(jù),數(shù)據(jù)定義時(shí)加上了時(shí)間信息便于查看.互斥量的使用主要用于線程同步,比如兩個(gè)線程如果都向緩沖區(qū)寫數(shù)據(jù)時(shí),必須保證臨界區(qū)的安全,當(dāng)然也可以使用讀寫鎖,其實(shí)更好一些,因?yàn)樽x的時(shí)候,也可以寫.
信號(hào)處理函數(shù),用于ctrl+c強(qiáng)制結(jié)束時(shí),異步取消線程.
結(jié)果:
ret get:0
get data:1032341248
time:Sun Apr 21 12:30:00 4461252
ret put:16
put data:0
time:Mon Jan 8 17:35:33 2018
ret get:16
get data:0
time:Mon Jan 8 17:35:33 2018
ret put:16
put data:1
time:Mon Jan 8 17:35:35 2018
ret get:16
get data:1
time:Mon Jan 8 17:35:35 2018
ret put:16
put data:2
time:Mon Jan 8 17:35:37 2018
ret get:16
get data:2
time:Mon Jan 8 17:35:37 2018
ret put:16
put data:3
time:Mon Jan 8 17:35:39 2018
ret get:16
get data:3
time:Mon Jan 8 17:35:39 2018
ret put:16
put data:4
time:Mon Jan 8 17:35:41 2018
^C
兩個(gè)線程取消
program end
剛開(kāi)始,get線程沒(méi)有讀出數(shù)據(jù)
后面就是put一個(gè),get一個(gè),沒(méi)問(wèn)題.