前言
swoole 中數(shù)據(jù)的接受與發(fā)送(例如 reactor 線程接受客戶(hù)端消息、發(fā)送給客戶(hù)端的消息、接受到的來(lái)自 worker 的消息、要發(fā)送給 worker 的消息等等)都要涉及到緩沖區(qū),swoole 中的緩沖區(qū)實(shí)現(xiàn)是 swBuffer,實(shí)際上是一個(gè)單鏈表。
swBuffer 的數(shù)據(jù)結(jié)構(gòu)
-
swBuffer數(shù)據(jù)結(jié)構(gòu)中trunk_num是鏈表元素的個(gè)數(shù),trunk_size是swBuffer緩沖區(qū)創(chuàng)建時(shí),鏈表元素約定的大小(實(shí)際大小不一定是這個(gè)值),length是實(shí)際上緩沖區(qū)占用的內(nèi)存總大小。 -
swBuffer_trunk中的type有三種,分別應(yīng)用于:緩存數(shù)據(jù)、發(fā)送文件、提醒連接關(guān)閉三種情景。length指的是元素的內(nèi)存大小。
enum swBufferChunk
{
SW_CHUNK_DATA,
SW_CHUNK_SENDFILE,
SW_CHUNK_CLOSE,
};
typedef struct _swBuffer_trunk
{
uint32_t type;
uint32_t length;
uint32_t offset;
union
{
void *ptr;
struct
{
uint32_t val1;
uint32_t val2;
} data;
} store;
uint32_t size;
void (*destroy)(struct _swBuffer_trunk *chunk);
struct _swBuffer_trunk *next;
} swBuffer_trunk;
typedef struct _swBuffer
{
int fd;
uint8_t trunk_num; //trunk數(shù)量
uint16_t trunk_size;
uint32_t length;
swBuffer_trunk *head;
swBuffer_trunk *tail;
} swBuffer;
swBuffer 的創(chuàng)建
swBuffer 的創(chuàng)建很簡(jiǎn)單,只是初始化整個(gè) swBuffer 的 header 頭元素而已:
swBuffer* swBuffer_new(int trunk_size)
{
swBuffer *buffer = sw_malloc(sizeof(swBuffer));
if (buffer == NULL)
{
swWarn("malloc for buffer failed. Error: %s[%d]", strerror(errno), errno);
return NULL;
}
bzero(buffer, sizeof(swBuffer));
buffer->trunk_size = trunk_size;
return buffer;
}
swBuffer 內(nèi)存的申請(qǐng)
swBuffer 內(nèi)存的申請(qǐng)邏輯也很簡(jiǎn)單,按照傳入的 size 參數(shù)為鏈表元素申請(qǐng)內(nèi)存,初始化成員變量,然后將鏈表元素放到鏈表的尾部即可:
int swBuffer_append(swBuffer *buffer, void *data, uint32_t size)
{
swBuffer_trunk *chunk = swBuffer_new_trunk(buffer, SW_CHUNK_DATA, size);
if (chunk == NULL)
{
return SW_ERR;
}
buffer->length += size;
chunk->length = size;
memcpy(chunk->store.ptr, data, size);
swTraceLog(SW_TRACE_BUFFER, "trunk_n=%d|size=%d|trunk_len=%d|trunk=%p", buffer->trunk_num, size,
chunk->length, chunk);
return SW_OK;
}
swBuffer_trunk *swBuffer_new_trunk(swBuffer *buffer, uint32_t type, uint32_t size)
{
swBuffer_trunk *chunk = sw_malloc(sizeof(swBuffer_trunk));
if (chunk == NULL)
{
swWarn("malloc for trunk failed. Error: %s[%d]", strerror(errno), errno);
return NULL;
}
bzero(chunk, sizeof(swBuffer_trunk));
//require alloc memory
if (type == SW_CHUNK_DATA && size > 0)
{
void *buf = sw_malloc(size);
if (buf == NULL)
{
swWarn("malloc(%d) for data failed. Error: %s[%d]", size, strerror(errno), errno);
sw_free(chunk);
return NULL;
}
chunk->size = size;
chunk->store.ptr = buf;
}
chunk->type = type;
buffer->trunk_num ++;
if (buffer->head == NULL)
{
buffer->tail = buffer->head = chunk;
}
else
{
buffer->tail->next = chunk;
buffer->tail = chunk;
}
return chunk;
}
獲取 swBuffer 的元素
從 swBuffer 緩沖區(qū)拿數(shù)據(jù)只能從 head 中獲取:
#define swBuffer_get_trunk(buffer) (buffer->head)
swBuffer 元素的 pop
獲取了緩沖區(qū)的元素之后,就要相應(yīng)刪除 head 鏈表元素:
void swBuffer_pop_trunk(swBuffer *buffer, swBuffer_trunk *chunk)
{
if (chunk->next == NULL)
{
buffer->head = NULL;
buffer->tail = NULL;
buffer->length = 0;
buffer->trunk_num = 0;
}
else
{
buffer->head = chunk->next;
buffer->length -= chunk->length;
buffer->trunk_num--;
}
if (chunk->type == SW_CHUNK_DATA)
{
sw_free(chunk->store.ptr);
}
if (chunk->destroy)
{
chunk->destroy(chunk);
}
sw_free(chunk);
}
swBuffer 緩沖區(qū)的銷(xiāo)毀
int swBuffer_free(swBuffer *buffer)
{
volatile swBuffer_trunk *chunk = buffer->head;
void * *will_free_trunk; //free the point
while (chunk != NULL)
{
if (chunk->type == SW_CHUNK_DATA)
{
sw_free(chunk->store.ptr);
}
will_free_trunk = (void *) chunk;
chunk = chunk->next;
sw_free(will_free_trunk);
}
sw_free(buffer);
return SW_OK;
}