linux下使用hiredis異步API實現(xiàn)sub/pub消息訂閱和發(fā)布的功能

轉(zhuǎn)載自:http://blog.csdn.net/chenzba/article/details/51224715

最近使用redis的c接口——hiredis,使客戶端與redis服務(wù)器通信,實現(xiàn)消息訂閱和發(fā)布(PUB/SUB)的功能,我把遇到的一些問題和解決方法列出來供大家學(xué)習(xí)。

廢話不多說,先貼代碼。

redis_publisher.h

[cpp]view plaincopy

/*************************************************************************

>?File?Name:?redis_publisher.h

>?Author:?chenzengba

>?Mail:?chenzengba@gmail.com

>?Created?Time:?Sat?23?Apr?2016?10:15:09?PM?CST

>?Description:?封裝hiredis,實現(xiàn)消息發(fā)布給redis功能

************************************************************************/

#ifndef?REDIS_PUBLISHER_H

#define?REDIS_PUBLISHER_H

#include?

#include?

#include?

#include?

#include?

#include?

#include?

#include?

#include?

classCRedisPublisher

{

public:

CRedisPublisher();

~CRedisPublisher();

boolinit();

booluninit();

boolconnect();

booldisconnect();

boolpublish(conststd::string?&channel_name,

conststd::string?&message);

private:

//?下面三個回調(diào)函數(shù)供redis服務(wù)調(diào)用

//?連接回調(diào)

staticvoidconnect_callback(constredisAsyncContext?*redis_context,

intstatus);

//?斷開連接的回調(diào)

staticvoiddisconnect_callback(constredisAsyncContext?*redis_context,

intstatus);

//?執(zhí)行命令回調(diào)

staticvoidcommand_callback(redisAsyncContext?*redis_context,

void*reply,void*privdata);

//?事件分發(fā)線程函數(shù)

staticvoid*event_thread(void*data);

void*event_proc();

private:

//?libevent事件對象

event_base?*_event_base;

//?事件線程ID

pthread_t?_event_thread;

//?事件線程的信號量

sem_t?_event_sem;

//?hiredis異步對象

redisAsyncContext?*_redis_context;

};

#endif

redis_publisher.cpp

[cpp]view plaincopy

/*************************************************************************

>?File?Name:?redis_publisher.cpp

>?Author:?chenzengba

>?Mail:?chenzengba@gmail.com

>?Created?Time:?Sat?23?Apr?2016?10:15:09?PM?CST

>?Description:

************************************************************************/

#include?

#include?

#include?

#include?"redis_publisher.h"

CRedisPublisher::CRedisPublisher():_event_base(0),?_event_thread(0),

_redis_context(0)

{

}

CRedisPublisher::~CRedisPublisher()

{

}

boolCRedisPublisher::init()

{

//?initialize?the?event

_event_base?=?event_base_new();//?創(chuàng)建libevent對象

if(NULL?==?_event_base)

{

printf(":?Create?redis?event?failed.\n");

returnfalse;

}

memset(&_event_sem,?0,sizeof(_event_sem));

intret?=?sem_init(&_event_sem,?0,?0);

if(ret?!=?0)

{

printf(":?Init?sem?failed.\n");

returnfalse;

}

returntrue;

}

boolCRedisPublisher::uninit()

{

_event_base?=?NULL;

sem_destroy(&_event_sem);

returntrue;

}

boolCRedisPublisher::connect()

{

//?connect?redis

_redis_context?=?redisAsyncConnect("127.0.0.1",?6379);//?異步連接到redis服務(wù)器上,使用默認端口

if(NULL?==?_redis_context)

{

printf(":?Connect?redis?failed.\n");

returnfalse;

}

if(_redis_context->err)

{

printf(":?Connect?redis?error:?%d,?%s\n",

_redis_context->err,?_redis_context->errstr);//?輸出錯誤信息

returnfalse;

}

//?attach?the?event

redisLibeventAttach(_redis_context,?_event_base);//?將事件綁定到redis?context上,使設(shè)置給redis的回調(diào)跟事件關(guān)聯(lián)

//?創(chuàng)建事件處理線程

intret?=?pthread_create(&_event_thread,?0,?&CRedisPublisher::event_thread,this);

if(ret?!=?0)

{

printf(":?create?event?thread?failed.\n");

disconnect();

returnfalse;

}

//?設(shè)置連接回調(diào),當(dāng)異步調(diào)用連接后,服務(wù)器處理連接請求結(jié)束后調(diào)用,通知調(diào)用者連接的狀態(tài)

redisAsyncSetConnectCallback(_redis_context,

&CRedisPublisher::connect_callback);

//?設(shè)置斷開連接回調(diào),當(dāng)服務(wù)器斷開連接后,通知調(diào)用者連接斷開,調(diào)用者可以利用這個函數(shù)實現(xiàn)重連

redisAsyncSetDisconnectCallback(_redis_context,

&CRedisPublisher::disconnect_callback);

//?啟動事件線程

sem_post(&_event_sem);

returntrue;

}

boolCRedisPublisher::disconnect()

{

if(_redis_context)

{

redisAsyncDisconnect(_redis_context);

redisAsyncFree(_redis_context);

_redis_context?=?NULL;

}

returntrue;

}

boolCRedisPublisher::publish(conststd::string?&channel_name,

conststd::string?&message)

{

intret?=?redisAsyncCommand(_redis_context,

&CRedisPublisher::command_callback,this,"PUBLISH?%s?%s",

channel_name.c_str(),?message.c_str());

if(REDIS_ERR?==?ret)

{

printf("Publish?command?failed:?%d\n",?ret);

returnfalse;

}

returntrue;

}

voidCRedisPublisher::connect_callback(constredisAsyncContext?*redis_context,

intstatus)

{

if(status?!=?REDIS_OK)

{

printf(":?Error:?%s\n",?redis_context->errstr);

}

else

{

printf(":?Redis?connected!\n");

}

}

voidCRedisPublisher::disconnect_callback(

constredisAsyncContext?*redis_context,intstatus)

{

if(status?!=?REDIS_OK)

{

//?這里異常退出,可以嘗試重連

printf(":?Error:?%s\n",?redis_context->errstr);

}

}

//?消息接收回調(diào)函數(shù)

voidCRedisPublisher::command_callback(redisAsyncContext?*redis_context,

void*reply,void*privdata)

{

printf("command?callback.\n");

//?這里不執(zhí)行任何操作

}

void*CRedisPublisher::event_thread(void*data)

{

if(NULL?==?data)

{

printf(":?Error!\n");

assert(false);

returnNULL;

}

CRedisPublisher?*self_this?=reinterpret_cast(data);

returnself_this->event_proc();

}

void*CRedisPublisher::event_proc()

{

sem_wait(&_event_sem);

//?開啟事件分發(fā),event_base_dispatch會阻塞

event_base_dispatch(_event_base);

returnNULL;

}

redis_subscriber.h

[cpp]view plaincopy

/*************************************************************************

>?File?Name:?redis_subscriber.h

>?Author:?chenzengba

>?Mail:?chenzengba@gmail.com

>?Created?Time:?Sat?23?Apr?2016?10:15:09?PM?CST

>?Description:?封裝hiredis,實現(xiàn)消息訂閱redis功能

************************************************************************/

#ifndef?REDIS_SUBSCRIBER_H

#define?REDIS_SUBSCRIBER_H

#include?

#include?

#include?

#include?

#include?

#include?

#include?

#include?

#include?

classCRedisSubscriber

{

public:

typedef? std::tr1::function <void(const char*,const char*,int)> NotifyMessageFn;//?回調(diào)函數(shù)對象類型,當(dāng)接收到消息后調(diào)用回調(diào)把消息發(fā)送出去

CRedisSubscriber();

~CRedisSubscriber();

boolinit(const NotifyMessageFn?&fn);//?傳入回調(diào)對象

bool uninit();

bool connect();

bool disconnect();

//?可以多次調(diào)用,訂閱多個頻道

bool subscribe(conststd::string?&channel_name);

private:

//?下面三個回調(diào)函數(shù)供redis服務(wù)調(diào)用

//?連接回調(diào)

static void connect_callback(constredisAsyncContext?*redis_context,

intstatus);

//?斷開連接的回調(diào)

staticvoiddisconnect_callback(constredisAsyncContext?*redis_context,

intstatus);

//?執(zhí)行命令回調(diào)

staticvoidcommand_callback(redisAsyncContext?*redis_context,

void*reply,void*privdata);

//?事件分發(fā)線程函數(shù)

staticvoid*event_thread(void*data);

void*event_proc();

private:

//?libevent事件對象

event_base?*_event_base;

//?事件線程ID

pthread_t?_event_thread;

//?事件線程的信號量

sem_t?_event_sem;

//?hiredis異步對象

redisAsyncContext?*_redis_context;

//?通知外層的回調(diào)函數(shù)對象

NotifyMessageFn?_notify_message_fn;

};

#endif

redis_subscriber.cpp:

[cpp]view plaincopy

/*************************************************************************

>?File?Name:?redis_subscriber.cpp

>?Author:?chenzengba

>?Mail:?chenzengba@gmail.com

>?Created?Time:?Sat?23?Apr?2016?10:15:09?PM?CST

>?Description:

************************************************************************/

#include?

#include?

#include?

#include?"redis_subscriber.h"

CRedisSubscriber::CRedisSubscriber():_event_base(0),?_event_thread(0),

_redis_context(0)

{

}

CRedisSubscriber::~CRedisSubscriber()

{

}

boolCRedisSubscriber::init(constNotifyMessageFn?&fn)

{

//?initialize?the?event

_notify_message_fn?=?fn;

_event_base?=?event_base_new();//?創(chuàng)建libevent對象

if(NULL?==?_event_base)

{

printf(":?Create?redis?event?failed.\n");

returnfalse;

}

memset(&_event_sem,?0,sizeof(_event_sem));

intret?=?sem_init(&_event_sem,?0,?0);

if(ret?!=?0)

{

printf(":?Init?sem?failed.\n");

returnfalse;

}

returntrue;

}

boolCRedisSubscriber::uninit()

{

_event_base?=?NULL;

sem_destroy(&_event_sem);

returntrue;

}

boolCRedisSubscriber::connect()

{

//?connect?redis

_redis_context?=?redisAsyncConnect("127.0.0.1",?6379);//?異步連接到redis服務(wù)器上,使用默認端口

if(NULL?==?_redis_context)

{

printf(":?Connect?redis?failed.\n");

returnfalse;

}

if(_redis_context->err)

{

printf(":?Connect?redis?error:?%d,?%s\n",

_redis_context->err,?_redis_context->errstr);//?輸出錯誤信息

returnfalse;

}

//?attach?the?event

redisLibeventAttach(_redis_context,?_event_base);//?將事件綁定到redis?context上,使設(shè)置給redis的回調(diào)跟事件關(guān)聯(lián)

//?創(chuàng)建事件處理線程

intret?=?pthread_create(&_event_thread,?0,?&CRedisSubscriber::event_thread,this);

if(ret?!=?0)

{

printf(":?create?event?thread?failed.\n");

disconnect();

returnfalse;

}

//?設(shè)置連接回調(diào),當(dāng)異步調(diào)用連接后,服務(wù)器處理連接請求結(jié)束后調(diào)用,通知調(diào)用者連接的狀態(tài)

redisAsyncSetConnectCallback(_redis_context,

&CRedisSubscriber::connect_callback);

//?設(shè)置斷開連接回調(diào),當(dāng)服務(wù)器斷開連接后,通知調(diào)用者連接斷開,調(diào)用者可以利用這個函數(shù)實現(xiàn)重連

redisAsyncSetDisconnectCallback(_redis_context,

&CRedisSubscriber::disconnect_callback);

//?啟動事件線程

sem_post(&_event_sem);

returntrue;

}

boolCRedisSubscriber::disconnect()

{

if(_redis_context)

{

redisAsyncDisconnect(_redis_context);

redisAsyncFree(_redis_context);

_redis_context?=?NULL;

}

returntrue;

}

boolCRedisSubscriber::subscribe(conststd::string?&channel_name)

{

intret?=?redisAsyncCommand(_redis_context,

&CRedisSubscriber::command_callback,this,"SUBSCRIBE?%s",

channel_name.c_str());

if(REDIS_ERR?==?ret)

{

printf("Subscribe?command?failed:?%d\n",?ret);

returnfalse;

}

printf(":?Subscribe?success:?%s\n",?channel_name.c_str());

returntrue;

}

voidCRedisSubscriber::connect_callback(constredisAsyncContext?*redis_context,

intstatus)

{

if(status?!=?REDIS_OK)

{

printf(":?Error:?%s\n",?redis_context->errstr);

}

else

{

printf(":?Redis?connected!");

}

}

voidCRedisSubscriber::disconnect_callback(

constredisAsyncContext?*redis_context,intstatus)

{

if(status?!=?REDIS_OK)

{

//?這里異常退出,可以嘗試重連

printf(":?Error:?%s\n",?redis_context->errstr);

}

}

//?消息接收回調(diào)函數(shù)

voidCRedisSubscriber::command_callback(redisAsyncContext?*redis_context,

void*reply,void*privdata)

{

if(NULL?==?reply?||?NULL?==?privdata)?{

return;

}

//?靜態(tài)函數(shù)中,要使用類的成員變量,把當(dāng)前的this指針傳進來,用this指針間接訪問

CRedisSubscriber?*self_this?=reinterpret_cast(privdata);

redisReply?*redis_reply?=reinterpret_cast(reply);

//?訂閱接收到的消息是一個帶三元素的數(shù)組

if(redis_reply->type?==?REDIS_REPLY_ARRAY?&&

redis_reply->elements?==?3)

{

printf(":?Recieve?message:%s:%d:%s:%d:%s:%d\n",

redis_reply->element[0]->str,?redis_reply->element[0]->len,

redis_reply->element[1]->str,?redis_reply->element[1]->len,

redis_reply->element[2]->str,?redis_reply->element[2]->len);

//?調(diào)用函數(shù)對象把消息通知給外層

self_this->_notify_message_fn(redis_reply->element[1]->str,

redis_reply->element[2]->str,?redis_reply->element[2]->len);

}

}

void*CRedisSubscriber::event_thread(void*data)

{

if(NULL?==?data)

{

printf(":?Error!\n");

assert(false);

returnNULL;

}

CRedisSubscriber?*self_this?=reinterpret_cast(data);

returnself_this->event_proc();

}

void*CRedisSubscriber::event_proc()

{

sem_wait(&_event_sem);

//?開啟事件分發(fā),event_base_dispatch會阻塞

event_base_dispatch(_event_base);

returnNULL;

}

問題1:hiredis官網(wǎng)沒有異步接口的實現(xiàn)例子。

hiredis提供了幾個異步通信的API,一開始根據(jù)API名字的理解,我們實現(xiàn)了跟redis服務(wù)器建立連接、訂閱和發(fā)布的功能,可在實際使用的時候,程序并沒有像我們預(yù)想的那樣,除了能夠建立連接外,任何事情都沒發(fā)生。

網(wǎng)上查了很多資料,原來hiredis的異步實現(xiàn)是通過事件來分發(fā)redis發(fā)送過來的消息的,hiredis可以使用libae、libev、libuv和libevent中的任何一個實現(xiàn)事件的分發(fā),網(wǎng)上的資料提示使用libae、libev和libuv可能發(fā)生其他問題,這里為了方便就選用libevent。hireds官網(wǎng)并沒有對libevent做任何介紹,也沒用說明使用異步機制需要引入事件的接口,所以一開始走了很多彎路。

關(guān)于libevent的使用這里就不再贅述,詳情可以見libevent官網(wǎng)。

libevent官網(wǎng):http://libevent.org/

libevent api文檔:https://www.monkey.org/~provos/libevent/doxygen-2.0.1/include_2event2_2event_8h.html#6e9827de8c3014417b11b48f2fe688ae

CRedisPublisher和CRedisSubscriber的初始化過程:

初始化事件處理,并獲得事件處理的實例:

[cpp]view plaincopy

_event_base?=?event_base_new();

在獲得redisAsyncContext *之后,調(diào)用

[cpp]view plaincopy

redisLibeventAttach(_redis_context,?_event_base);

這樣就將事件處理和redis關(guān)聯(lián)起來,最后在另一個線程調(diào)用

[cpp]view plaincopy

event_base_dispatch(_event_base);

啟動事件的分發(fā),這是一個阻塞函數(shù),因此,創(chuàng)建了一個新的線程處理事件分發(fā),值得注意的是,這里用信號燈_event_sem控制線程的啟動,意在程序調(diào)用

[cpp]view plaincopy

redisAsyncSetConnectCallback(_redis_context,

&CRedisSubscriber::connect_callback);

redisAsyncSetDisconnectCallback(_redis_context,

&CRedisSubscriber::disconnect_callback);

之后,能夠完全捕捉到這兩個回調(diào)。

問題2 奇特的‘ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context’錯誤

有些人會覺得這兩個類設(shè)計有點冗余,我們發(fā)現(xiàn)CRedisPublisher和CRedisSubscriber很多邏輯是一樣的,為什么不把他們整合到一起成一個類,既能夠發(fā)布消息也能夠訂閱消息。其實一開始我就是這么干的,在使用的時候發(fā)現(xiàn),用同個redisAsynContex *對象進行消息訂閱和發(fā)布,與redis服務(wù)連接會自動斷開,disconnect_callback回調(diào)會被調(diào)用,并且返回奇怪的錯誤:ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context,因此,不能使用同個redisAsyncContext *對象實現(xiàn)發(fā)布和訂閱。這里為了減少設(shè)計的復(fù)雜性,就將兩個類的邏輯分開了。

當(dāng)然,你也可以將相同的邏輯抽象到一個基類里,并實現(xiàn)publish和subscribe接口。

問題3 相關(guān)依賴的庫

編譯之前,需要安裝hiredis、libevent和boost庫,我是用的是Ubuntu x64系統(tǒng)。

hiredis官網(wǎng):https://github.com/redis/hiredis

下載源碼解壓,進入解壓目錄,執(zhí)行make && make install命令。

libevent官網(wǎng):http://libevent.org/下載最新的穩(wěn)定版

解壓后進入解壓目錄,執(zhí)行命令

./configure -prefix=/usr

sudo make && make install

boost庫:直接執(zhí)行安裝:sudo apt-get install libboost-dev

如果你不是用std::tr1::function的函數(shù)對象來給外層通知消息,就不需要boost庫。你可以用接口的形式實現(xiàn)回調(diào),把接口傳給CRedisSubscribe類,讓它在接收到消息后調(diào)用接口回調(diào),通知外層。

問題4 如何使用

最后貼出例子代碼。

publisher.cpp,實現(xiàn)發(fā)布消息:

[cpp]view plaincopy

/*************************************************************************

>?File?Name:?publisher.cpp

>?Author:?chenzengba

>?Mail:?chenzengba@gmail.com

>?Created?Time:?Sat?23?Apr?2016?12:13:24?PM?CST

************************************************************************/

#include?"redis_publisher.h"

intmain(intargc,char*argv[])

{

CRedisPublisher?publisher;

boolret?=?publisher.init();

if(!ret)

{

printf("Init?failed.\n");

return0;

}

ret?=?publisher.connect();

if(!ret)

{

printf("connect?failed.");

return0;

}

while(true)

{

publisher.publish("test-channel","Test?message");

sleep(1);

}

publisher.disconnect();

publisher.uninit();

return0;

}

subscriber.cpp實現(xiàn)訂閱消息:

[cpp]view plaincopy

/*************************************************************************

>?File?Name:?subscriber.cpp

>?Author:?chenzengba

>?Mail:?chenzengba@gmail.com

>?Created?Time:?Sat?23?Apr?2016?12:26:42?PM?CST

************************************************************************/

#include?"redis_subscriber.h"

voidrecieve_message(constchar*channel_name,

constchar*message,intlen)

{

printf("Recieve?message:\n????channel?name:?%s\n????message:?%s\n",

channel_name,?message);

}

intmain(intargc,char*argv[])

{

CRedisSubscriber?subscriber;

CRedisSubscriber::NotifyMessageFn?fn?=

bind(recieve_message,?std::tr1::placeholders::_1,

std::tr1::placeholders::_2,?std::tr1::placeholders::_3);

boolret?=?subscriber.init(fn);

if(!ret)

{

printf("Init?failed.\n");

return0;

}

ret?=?subscriber.connect();

if(!ret)

{

printf("Connect?failed.\n");

return0;

}

subscriber.subscribe("test-channel");

while(true)

{

sleep(1);

}

subscriber.disconnect();

subscriber.uninit();

return0;

}

關(guān)于編譯的問題:在g++中編譯,注意要加上-lhiredis -levent參數(shù),下面是一個簡單的Makefile:

[cpp]view plaincopy

EXE=server_main?client_main

CC=g++

FLAG=-lhiredis?-levent

OBJ=redis_publisher.o?publisher.o?redis_subscriber.o?subscriber.o

all:$(EXE)

$(EXE):$(OBJ)

$(CC)?-o?publisher?redis_publisher.o?publisher.o?$(FLAG)

$(CC)?-o?subscriber?redis_subscriber.o?subscriber.o?$(FLAG)

redis_publisher.o:redis_publisher.h

redis_subscriber.o:redis_subscriber.h

publisher.o:publisher.cpp

$(CC)?-c?publisher.cpp

subscriber.o:subscriber.cpp

$(CC)?-c?subscriber.cpp

clean:

rm?publisher?subscriber?*.o

致謝:

redis異步API使用libevent:http://www.tuicool.com/articles/N73uuu

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容