轉(zhuǎn)載自:http://blog.csdn.net/chenzba/article/details/51224715
最近使用redis的c接口——hiredis,使客戶端與redis服務(wù)器通信,實現(xiàn)消息訂閱和發(fā)布(PUB/SUB)的功能,我把遇到的一些問題和解決方法列出來供大家學(xué)習(xí)。
廢話不多說,先貼代碼。
redis_publisher.h
[cpp]view plaincopy
/*************************************************************************
>?File?Name:?redis_publisher.h
>?Author:?chenzengba
>?Mail:?chenzengba@gmail.com
>?Created?Time:?Sat?23?Apr?2016?10:15:09?PM?CST
>?Description:?封裝hiredis,實現(xiàn)消息發(fā)布給redis功能
************************************************************************/
#ifndef?REDIS_PUBLISHER_H
#define?REDIS_PUBLISHER_H
#include?
#include?
#include?
#include?
#include?
#include?
#include?
#include?
#include?
classCRedisPublisher
{
public:
CRedisPublisher();
~CRedisPublisher();
boolinit();
booluninit();
boolconnect();
booldisconnect();
boolpublish(conststd::string?&channel_name,
conststd::string?&message);
private:
//?下面三個回調(diào)函數(shù)供redis服務(wù)調(diào)用
//?連接回調(diào)
staticvoidconnect_callback(constredisAsyncContext?*redis_context,
intstatus);
//?斷開連接的回調(diào)
staticvoiddisconnect_callback(constredisAsyncContext?*redis_context,
intstatus);
//?執(zhí)行命令回調(diào)
staticvoidcommand_callback(redisAsyncContext?*redis_context,
void*reply,void*privdata);
//?事件分發(fā)線程函數(shù)
staticvoid*event_thread(void*data);
void*event_proc();
private:
//?libevent事件對象
event_base?*_event_base;
//?事件線程ID
pthread_t?_event_thread;
//?事件線程的信號量
sem_t?_event_sem;
//?hiredis異步對象
redisAsyncContext?*_redis_context;
};
#endif
redis_publisher.cpp
[cpp]view plaincopy
/*************************************************************************
>?File?Name:?redis_publisher.cpp
>?Author:?chenzengba
>?Mail:?chenzengba@gmail.com
>?Created?Time:?Sat?23?Apr?2016?10:15:09?PM?CST
>?Description:
************************************************************************/
#include?
#include?
#include?
#include?"redis_publisher.h"
CRedisPublisher::CRedisPublisher():_event_base(0),?_event_thread(0),
_redis_context(0)
{
}
CRedisPublisher::~CRedisPublisher()
{
}
boolCRedisPublisher::init()
{
//?initialize?the?event
_event_base?=?event_base_new();//?創(chuàng)建libevent對象
if(NULL?==?_event_base)
{
printf(":?Create?redis?event?failed.\n");
returnfalse;
}
memset(&_event_sem,?0,sizeof(_event_sem));
intret?=?sem_init(&_event_sem,?0,?0);
if(ret?!=?0)
{
printf(":?Init?sem?failed.\n");
returnfalse;
}
returntrue;
}
boolCRedisPublisher::uninit()
{
_event_base?=?NULL;
sem_destroy(&_event_sem);
returntrue;
}
boolCRedisPublisher::connect()
{
//?connect?redis
_redis_context?=?redisAsyncConnect("127.0.0.1",?6379);//?異步連接到redis服務(wù)器上,使用默認端口
if(NULL?==?_redis_context)
{
printf(":?Connect?redis?failed.\n");
returnfalse;
}
if(_redis_context->err)
{
printf(":?Connect?redis?error:?%d,?%s\n",
_redis_context->err,?_redis_context->errstr);//?輸出錯誤信息
returnfalse;
}
//?attach?the?event
redisLibeventAttach(_redis_context,?_event_base);//?將事件綁定到redis?context上,使設(shè)置給redis的回調(diào)跟事件關(guān)聯(lián)
//?創(chuàng)建事件處理線程
intret?=?pthread_create(&_event_thread,?0,?&CRedisPublisher::event_thread,this);
if(ret?!=?0)
{
printf(":?create?event?thread?failed.\n");
disconnect();
returnfalse;
}
//?設(shè)置連接回調(diào),當(dāng)異步調(diào)用連接后,服務(wù)器處理連接請求結(jié)束后調(diào)用,通知調(diào)用者連接的狀態(tài)
redisAsyncSetConnectCallback(_redis_context,
&CRedisPublisher::connect_callback);
//?設(shè)置斷開連接回調(diào),當(dāng)服務(wù)器斷開連接后,通知調(diào)用者連接斷開,調(diào)用者可以利用這個函數(shù)實現(xiàn)重連
redisAsyncSetDisconnectCallback(_redis_context,
&CRedisPublisher::disconnect_callback);
//?啟動事件線程
sem_post(&_event_sem);
returntrue;
}
boolCRedisPublisher::disconnect()
{
if(_redis_context)
{
redisAsyncDisconnect(_redis_context);
redisAsyncFree(_redis_context);
_redis_context?=?NULL;
}
returntrue;
}
boolCRedisPublisher::publish(conststd::string?&channel_name,
conststd::string?&message)
{
intret?=?redisAsyncCommand(_redis_context,
&CRedisPublisher::command_callback,this,"PUBLISH?%s?%s",
channel_name.c_str(),?message.c_str());
if(REDIS_ERR?==?ret)
{
printf("Publish?command?failed:?%d\n",?ret);
returnfalse;
}
returntrue;
}
voidCRedisPublisher::connect_callback(constredisAsyncContext?*redis_context,
intstatus)
{
if(status?!=?REDIS_OK)
{
printf(":?Error:?%s\n",?redis_context->errstr);
}
else
{
printf(":?Redis?connected!\n");
}
}
voidCRedisPublisher::disconnect_callback(
constredisAsyncContext?*redis_context,intstatus)
{
if(status?!=?REDIS_OK)
{
//?這里異常退出,可以嘗試重連
printf(":?Error:?%s\n",?redis_context->errstr);
}
}
//?消息接收回調(diào)函數(shù)
voidCRedisPublisher::command_callback(redisAsyncContext?*redis_context,
void*reply,void*privdata)
{
printf("command?callback.\n");
//?這里不執(zhí)行任何操作
}
void*CRedisPublisher::event_thread(void*data)
{
if(NULL?==?data)
{
printf(":?Error!\n");
assert(false);
returnNULL;
}
CRedisPublisher?*self_this?=reinterpret_cast(data);
returnself_this->event_proc();
}
void*CRedisPublisher::event_proc()
{
sem_wait(&_event_sem);
//?開啟事件分發(fā),event_base_dispatch會阻塞
event_base_dispatch(_event_base);
returnNULL;
}
redis_subscriber.h
[cpp]view plaincopy
/*************************************************************************
>?File?Name:?redis_subscriber.h
>?Author:?chenzengba
>?Mail:?chenzengba@gmail.com
>?Created?Time:?Sat?23?Apr?2016?10:15:09?PM?CST
>?Description:?封裝hiredis,實現(xiàn)消息訂閱redis功能
************************************************************************/
#ifndef?REDIS_SUBSCRIBER_H
#define?REDIS_SUBSCRIBER_H
#include?
#include?
#include?
#include?
#include?
#include?
#include?
#include?
#include?
classCRedisSubscriber
{
public:
typedef? std::tr1::function <void(const char*,const char*,int)> NotifyMessageFn;//?回調(diào)函數(shù)對象類型,當(dāng)接收到消息后調(diào)用回調(diào)把消息發(fā)送出去
CRedisSubscriber();
~CRedisSubscriber();
boolinit(const NotifyMessageFn?&fn);//?傳入回調(diào)對象
bool uninit();
bool connect();
bool disconnect();
//?可以多次調(diào)用,訂閱多個頻道
bool subscribe(conststd::string?&channel_name);
private:
//?下面三個回調(diào)函數(shù)供redis服務(wù)調(diào)用
//?連接回調(diào)
static void connect_callback(constredisAsyncContext?*redis_context,
intstatus);
//?斷開連接的回調(diào)
staticvoiddisconnect_callback(constredisAsyncContext?*redis_context,
intstatus);
//?執(zhí)行命令回調(diào)
staticvoidcommand_callback(redisAsyncContext?*redis_context,
void*reply,void*privdata);
//?事件分發(fā)線程函數(shù)
staticvoid*event_thread(void*data);
void*event_proc();
private:
//?libevent事件對象
event_base?*_event_base;
//?事件線程ID
pthread_t?_event_thread;
//?事件線程的信號量
sem_t?_event_sem;
//?hiredis異步對象
redisAsyncContext?*_redis_context;
//?通知外層的回調(diào)函數(shù)對象
NotifyMessageFn?_notify_message_fn;
};
#endif
redis_subscriber.cpp:
[cpp]view plaincopy
/*************************************************************************
>?File?Name:?redis_subscriber.cpp
>?Author:?chenzengba
>?Mail:?chenzengba@gmail.com
>?Created?Time:?Sat?23?Apr?2016?10:15:09?PM?CST
>?Description:
************************************************************************/
#include?
#include?
#include?
#include?"redis_subscriber.h"
CRedisSubscriber::CRedisSubscriber():_event_base(0),?_event_thread(0),
_redis_context(0)
{
}
CRedisSubscriber::~CRedisSubscriber()
{
}
boolCRedisSubscriber::init(constNotifyMessageFn?&fn)
{
//?initialize?the?event
_notify_message_fn?=?fn;
_event_base?=?event_base_new();//?創(chuàng)建libevent對象
if(NULL?==?_event_base)
{
printf(":?Create?redis?event?failed.\n");
returnfalse;
}
memset(&_event_sem,?0,sizeof(_event_sem));
intret?=?sem_init(&_event_sem,?0,?0);
if(ret?!=?0)
{
printf(":?Init?sem?failed.\n");
returnfalse;
}
returntrue;
}
boolCRedisSubscriber::uninit()
{
_event_base?=?NULL;
sem_destroy(&_event_sem);
returntrue;
}
boolCRedisSubscriber::connect()
{
//?connect?redis
_redis_context?=?redisAsyncConnect("127.0.0.1",?6379);//?異步連接到redis服務(wù)器上,使用默認端口
if(NULL?==?_redis_context)
{
printf(":?Connect?redis?failed.\n");
returnfalse;
}
if(_redis_context->err)
{
printf(":?Connect?redis?error:?%d,?%s\n",
_redis_context->err,?_redis_context->errstr);//?輸出錯誤信息
returnfalse;
}
//?attach?the?event
redisLibeventAttach(_redis_context,?_event_base);//?將事件綁定到redis?context上,使設(shè)置給redis的回調(diào)跟事件關(guān)聯(lián)
//?創(chuàng)建事件處理線程
intret?=?pthread_create(&_event_thread,?0,?&CRedisSubscriber::event_thread,this);
if(ret?!=?0)
{
printf(":?create?event?thread?failed.\n");
disconnect();
returnfalse;
}
//?設(shè)置連接回調(diào),當(dāng)異步調(diào)用連接后,服務(wù)器處理連接請求結(jié)束后調(diào)用,通知調(diào)用者連接的狀態(tài)
redisAsyncSetConnectCallback(_redis_context,
&CRedisSubscriber::connect_callback);
//?設(shè)置斷開連接回調(diào),當(dāng)服務(wù)器斷開連接后,通知調(diào)用者連接斷開,調(diào)用者可以利用這個函數(shù)實現(xiàn)重連
redisAsyncSetDisconnectCallback(_redis_context,
&CRedisSubscriber::disconnect_callback);
//?啟動事件線程
sem_post(&_event_sem);
returntrue;
}
boolCRedisSubscriber::disconnect()
{
if(_redis_context)
{
redisAsyncDisconnect(_redis_context);
redisAsyncFree(_redis_context);
_redis_context?=?NULL;
}
returntrue;
}
boolCRedisSubscriber::subscribe(conststd::string?&channel_name)
{
intret?=?redisAsyncCommand(_redis_context,
&CRedisSubscriber::command_callback,this,"SUBSCRIBE?%s",
channel_name.c_str());
if(REDIS_ERR?==?ret)
{
printf("Subscribe?command?failed:?%d\n",?ret);
returnfalse;
}
printf(":?Subscribe?success:?%s\n",?channel_name.c_str());
returntrue;
}
voidCRedisSubscriber::connect_callback(constredisAsyncContext?*redis_context,
intstatus)
{
if(status?!=?REDIS_OK)
{
printf(":?Error:?%s\n",?redis_context->errstr);
}
else
{
printf(":?Redis?connected!");
}
}
voidCRedisSubscriber::disconnect_callback(
constredisAsyncContext?*redis_context,intstatus)
{
if(status?!=?REDIS_OK)
{
//?這里異常退出,可以嘗試重連
printf(":?Error:?%s\n",?redis_context->errstr);
}
}
//?消息接收回調(diào)函數(shù)
voidCRedisSubscriber::command_callback(redisAsyncContext?*redis_context,
void*reply,void*privdata)
{
if(NULL?==?reply?||?NULL?==?privdata)?{
return;
}
//?靜態(tài)函數(shù)中,要使用類的成員變量,把當(dāng)前的this指針傳進來,用this指針間接訪問
CRedisSubscriber?*self_this?=reinterpret_cast(privdata);
redisReply?*redis_reply?=reinterpret_cast(reply);
//?訂閱接收到的消息是一個帶三元素的數(shù)組
if(redis_reply->type?==?REDIS_REPLY_ARRAY?&&
redis_reply->elements?==?3)
{
printf(":?Recieve?message:%s:%d:%s:%d:%s:%d\n",
redis_reply->element[0]->str,?redis_reply->element[0]->len,
redis_reply->element[1]->str,?redis_reply->element[1]->len,
redis_reply->element[2]->str,?redis_reply->element[2]->len);
//?調(diào)用函數(shù)對象把消息通知給外層
self_this->_notify_message_fn(redis_reply->element[1]->str,
redis_reply->element[2]->str,?redis_reply->element[2]->len);
}
}
void*CRedisSubscriber::event_thread(void*data)
{
if(NULL?==?data)
{
printf(":?Error!\n");
assert(false);
returnNULL;
}
CRedisSubscriber?*self_this?=reinterpret_cast(data);
returnself_this->event_proc();
}
void*CRedisSubscriber::event_proc()
{
sem_wait(&_event_sem);
//?開啟事件分發(fā),event_base_dispatch會阻塞
event_base_dispatch(_event_base);
returnNULL;
}
問題1:hiredis官網(wǎng)沒有異步接口的實現(xiàn)例子。
hiredis提供了幾個異步通信的API,一開始根據(jù)API名字的理解,我們實現(xiàn)了跟redis服務(wù)器建立連接、訂閱和發(fā)布的功能,可在實際使用的時候,程序并沒有像我們預(yù)想的那樣,除了能夠建立連接外,任何事情都沒發(fā)生。
網(wǎng)上查了很多資料,原來hiredis的異步實現(xiàn)是通過事件來分發(fā)redis發(fā)送過來的消息的,hiredis可以使用libae、libev、libuv和libevent中的任何一個實現(xiàn)事件的分發(fā),網(wǎng)上的資料提示使用libae、libev和libuv可能發(fā)生其他問題,這里為了方便就選用libevent。hireds官網(wǎng)并沒有對libevent做任何介紹,也沒用說明使用異步機制需要引入事件的接口,所以一開始走了很多彎路。
關(guān)于libevent的使用這里就不再贅述,詳情可以見libevent官網(wǎng)。
libevent官網(wǎng):http://libevent.org/
libevent api文檔:https://www.monkey.org/~provos/libevent/doxygen-2.0.1/include_2event2_2event_8h.html#6e9827de8c3014417b11b48f2fe688ae
CRedisPublisher和CRedisSubscriber的初始化過程:
初始化事件處理,并獲得事件處理的實例:
[cpp]view plaincopy
_event_base?=?event_base_new();
在獲得redisAsyncContext *之后,調(diào)用
[cpp]view plaincopy
redisLibeventAttach(_redis_context,?_event_base);
這樣就將事件處理和redis關(guān)聯(lián)起來,最后在另一個線程調(diào)用
[cpp]view plaincopy
event_base_dispatch(_event_base);
啟動事件的分發(fā),這是一個阻塞函數(shù),因此,創(chuàng)建了一個新的線程處理事件分發(fā),值得注意的是,這里用信號燈_event_sem控制線程的啟動,意在程序調(diào)用
[cpp]view plaincopy
redisAsyncSetConnectCallback(_redis_context,
&CRedisSubscriber::connect_callback);
redisAsyncSetDisconnectCallback(_redis_context,
&CRedisSubscriber::disconnect_callback);
之后,能夠完全捕捉到這兩個回調(diào)。
問題2 奇特的‘ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context’錯誤
有些人會覺得這兩個類設(shè)計有點冗余,我們發(fā)現(xiàn)CRedisPublisher和CRedisSubscriber很多邏輯是一樣的,為什么不把他們整合到一起成一個類,既能夠發(fā)布消息也能夠訂閱消息。其實一開始我就是這么干的,在使用的時候發(fā)現(xiàn),用同個redisAsynContex *對象進行消息訂閱和發(fā)布,與redis服務(wù)連接會自動斷開,disconnect_callback回調(diào)會被調(diào)用,并且返回奇怪的錯誤:ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context,因此,不能使用同個redisAsyncContext *對象實現(xiàn)發(fā)布和訂閱。這里為了減少設(shè)計的復(fù)雜性,就將兩個類的邏輯分開了。
當(dāng)然,你也可以將相同的邏輯抽象到一個基類里,并實現(xiàn)publish和subscribe接口。
編譯之前,需要安裝hiredis、libevent和boost庫,我是用的是Ubuntu x64系統(tǒng)。
hiredis官網(wǎng):https://github.com/redis/hiredis
下載源碼解壓,進入解壓目錄,執(zhí)行make && make install命令。
libevent官網(wǎng):http://libevent.org/下載最新的穩(wěn)定版
解壓后進入解壓目錄,執(zhí)行命令
./configure -prefix=/usr
sudo make && make install
boost庫:直接執(zhí)行安裝:sudo apt-get install libboost-dev
如果你不是用std::tr1::function的函數(shù)對象來給外層通知消息,就不需要boost庫。你可以用接口的形式實現(xiàn)回調(diào),把接口傳給CRedisSubscribe類,讓它在接收到消息后調(diào)用接口回調(diào),通知外層。
最后貼出例子代碼。
publisher.cpp,實現(xiàn)發(fā)布消息:
[cpp]view plaincopy
/*************************************************************************
>?File?Name:?publisher.cpp
>?Author:?chenzengba
>?Mail:?chenzengba@gmail.com
>?Created?Time:?Sat?23?Apr?2016?12:13:24?PM?CST
************************************************************************/
#include?"redis_publisher.h"
intmain(intargc,char*argv[])
{
CRedisPublisher?publisher;
boolret?=?publisher.init();
if(!ret)
{
printf("Init?failed.\n");
return0;
}
ret?=?publisher.connect();
if(!ret)
{
printf("connect?failed.");
return0;
}
while(true)
{
publisher.publish("test-channel","Test?message");
sleep(1);
}
publisher.disconnect();
publisher.uninit();
return0;
}
subscriber.cpp實現(xiàn)訂閱消息:
[cpp]view plaincopy
/*************************************************************************
>?File?Name:?subscriber.cpp
>?Author:?chenzengba
>?Mail:?chenzengba@gmail.com
>?Created?Time:?Sat?23?Apr?2016?12:26:42?PM?CST
************************************************************************/
#include?"redis_subscriber.h"
voidrecieve_message(constchar*channel_name,
constchar*message,intlen)
{
printf("Recieve?message:\n????channel?name:?%s\n????message:?%s\n",
channel_name,?message);
}
intmain(intargc,char*argv[])
{
CRedisSubscriber?subscriber;
CRedisSubscriber::NotifyMessageFn?fn?=
bind(recieve_message,?std::tr1::placeholders::_1,
std::tr1::placeholders::_2,?std::tr1::placeholders::_3);
boolret?=?subscriber.init(fn);
if(!ret)
{
printf("Init?failed.\n");
return0;
}
ret?=?subscriber.connect();
if(!ret)
{
printf("Connect?failed.\n");
return0;
}
subscriber.subscribe("test-channel");
while(true)
{
sleep(1);
}
subscriber.disconnect();
subscriber.uninit();
return0;
}
關(guān)于編譯的問題:在g++中編譯,注意要加上-lhiredis -levent參數(shù),下面是一個簡單的Makefile:
[cpp]view plaincopy
EXE=server_main?client_main
CC=g++
FLAG=-lhiredis?-levent
OBJ=redis_publisher.o?publisher.o?redis_subscriber.o?subscriber.o
all:$(EXE)
$(EXE):$(OBJ)
$(CC)?-o?publisher?redis_publisher.o?publisher.o?$(FLAG)
$(CC)?-o?subscriber?redis_subscriber.o?subscriber.o?$(FLAG)
redis_publisher.o:redis_publisher.h
redis_subscriber.o:redis_subscriber.h
publisher.o:publisher.cpp
$(CC)?-c?publisher.cpp
subscriber.o:subscriber.cpp
$(CC)?-c?subscriber.cpp
clean:
rm?publisher?subscriber?*.o
致謝:
redis異步API使用libevent:http://www.tuicool.com/articles/N73uuu