發(fā)布與訂閱
一、介紹
Redis 發(fā)布訂閱(pub/sub)是一種消息通信模式:發(fā)送者(pub)發(fā)送消息,訂閱者(sub)接收消息。
SUBSCRIBE channel [channel ...] 訂閱1-N個頻道
UNSUBSCRIBE [channel [channel ...]] 指示客戶端退訂給定的頻道。
PSUBSCRIBE pattern [pattern ...] 訂閱一個或多個符合給定模式的頻道。
PUNSUBSCRIBE [pattern [pattern ...]] 指示客戶端退訂所有給定模式。
PUBLISH channel message 將信息 message 發(fā)送到指定的頻道 channel 。
PUBSUB CHANNELS [pattern] 列出當前的活躍頻道。
PUBSUB NUMSUB [channel-1 ... channel-N] 返回給定頻道的訂閱者數(shù)量, 訂閱模式的客戶端不計算在內(nèi)。
PUBSUB NUMPAT 返回訂閱模式的數(shù)量
二、數(shù)據(jù)結(jié)構(gòu)
redis服務(wù)器通過一個(頻道名稱->客戶端鏈表)字典,和一個(匹配模式)鏈表來完成發(fā)布與訂閱的功能。
struct redisServer {
.....
dict *pubsub_channels; /*所有頻道的訂閱關(guān)系 字典 (頻道名稱->client鏈表)*/
list *pubsub_patterns; /*保存所有訂閱頻道模式鏈表 */
.....
}
//pubsub_patterns鏈表的Node
typedef struct pubsubPattern {
client *client;//用戶
robj *pattern;//訂閱的頻道模式
} pubsubPattern;
三、實現(xiàn)
-
subscribeCommand
//subscribe c c c ... c void subscribeCommand(client *c) { int j; for (j = 1; j < c->argc; j++) pubsubSubscribeChannel(c,c->argv[j]);//訂閱每個頻道 c->flags |= CLIENT_PUBSUB; } //返回1 成功監(jiān)聽 返回0監(jiān)聽已經(jīng)存在 int pubsubSubscribeChannel(client *c, robj *channel) { dictEntry *de; list *clients = NULL; int retval = 0; if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {//自己的訂閱頻道字典加入 retval = 1; incrRefCount(channel); /* Add the client to the channel -> list of clients hash table */ de = dictFind(server.pubsub_channels,channel);//在服務(wù)器中訂閱頻道字典找 if (de == NULL) {//沒找到這個頻道 創(chuàng)建 clients = listCreate(); dictAdd(server.pubsub_channels,channel,clients); incrRefCount(channel); } else {//找到拿出來 clients = dictGetVal(de); } listAddNodeTail(clients,c);//加入到list里面去 } addReplyPubsubSubscribed(c,channel);//通知客戶端 return retval; }
-
psubscribeCommand
void psubscribeCommand(client *c) { int j; for (j = 1; j < c->argc; j++) pubsubSubscribePattern(c,c->argv[j]);//訂閱匹配頻道 c->flags |= CLIENT_PUBSUB; } //加入訂閱頻道模式成功返回1,否則返回0 int pubsubSubscribePattern(client *c, robj *pattern) { int retval = 0; if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {//沒找到的話 retval = 1; pubsubPattern *pat; listAddNodeTail(c->pubsub_patterns,pattern);//加入到c里面的模式頻道 incrRefCount(pattern); pat = zmalloc(sizeof(*pat)); pat->pattern = getDecodedObject(pattern); pat->client = c; listAddNodeTail(server.pubsub_patterns,pat);//加入到server里面的模式頻道 } addReplyPubsubPatSubscribed(c,pattern); return retval; }
-
publish
void publishCommand(client *c) { int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);//發(fā)送一個消息 if (server.cluster_enabled)//如果做了Redis集群 clusterPropagatePublish(c->argv[1],c->argv[2]); else forceCommandPropagation(c,PROPAGATE_REPL);//強制命令傳播 addReplyLongLong(c,receivers);//告訴哦用戶發(fā)送了幾個訂閱者 } /* 發(fā)布一個消息*/ int pubsubPublishMessage(robj *channel, robj *message) { int receivers = 0; dictEntry *de; listNode *ln; listIter li; de = dictFind(server.pubsub_channels,channel); if (de) {//找到此頻道的話 list *list = dictGetVal(de); listNode *ln; listIter li; listRewind(list,&li); while ((ln = listNext(&li)) != NULL) { client *c = ln->value; addReplyPubsubMessage(c,channel,message);//發(fā)送消息給每一個訂閱者 receivers++; } } /* 模式匹配頻道*/ if (listLength(server.pubsub_patterns)) { listRewind(server.pubsub_patterns,&li); channel = getDecodedObject(channel); while ((ln = listNext(&li)) != NULL) { pubsubPattern *pat = ln->value; if (stringmatchlen((char*)pat->pattern->ptr, sdslen(pat->pattern->ptr), (char*)channel->ptr, sdslen(channel->ptr),0))//匹配 發(fā)送 { addReplyPubsubPatMessage(pat->client, pat->pattern,channel,message); receivers++; } } decrRefCount(channel); } return receivers; }其中,反向操作就是以上的操作的逆順序。
四、參考
- https://redis.io/
- 《Redis設(shè)計與實現(xiàn)》