Redis學習之發(fā)布與訂閱

發(fā)布與訂閱

一、介紹

Redis 發(fā)布訂閱(pub/sub)是一種消息通信模式:發(fā)送者(pub)發(fā)送消息,訂閱者(sub)接收消息。

  • SUBSCRIBE channel [channel ...] 訂閱1-N個頻道

  • UNSUBSCRIBE [channel [channel ...]] 指示客戶端退訂給定的頻道。

  • PSUBSCRIBE pattern [pattern ...] 訂閱一個或多個符合給定模式的頻道。

  • PUNSUBSCRIBE [pattern [pattern ...]] 指示客戶端退訂所有給定模式。

  • PUBLISH channel message 將信息 message 發(fā)送到指定的頻道 channel 。

  • PUBSUB CHANNELS [pattern] 列出當前的活躍頻道。

  • PUBSUB NUMSUB [channel-1 ... channel-N] 返回給定頻道的訂閱者數(shù)量, 訂閱模式的客戶端不計算在內(nèi)。

  • PUBSUB NUMPAT 返回訂閱模式的數(shù)量

二、數(shù)據(jù)結(jié)構(gòu)

redis服務(wù)器通過一個(頻道名稱->客戶端鏈表)字典,和一個(匹配模式)鏈表來完成發(fā)布與訂閱的功能。

struct redisServer {
.....
    dict *pubsub_channels;  /*所有頻道的訂閱關(guān)系 字典 (頻道名稱->client鏈表)*/
    list *pubsub_patterns;  /*保存所有訂閱頻道模式鏈表 */
.....
}
//pubsub_patterns鏈表的Node
typedef struct pubsubPattern {
    client *client;//用戶
    robj *pattern;//訂閱的頻道模式
} pubsubPattern;

三、實現(xiàn)

  • subscribeCommand

    //subscribe c c c ... c
    void subscribeCommand(client *c) {
        int j;
    
        for (j = 1; j < c->argc; j++)
            pubsubSubscribeChannel(c,c->argv[j]);//訂閱每個頻道
        c->flags |= CLIENT_PUBSUB;
    }
    //返回1 成功監(jiān)聽 返回0監(jiān)聽已經(jīng)存在
    int pubsubSubscribeChannel(client *c, robj *channel) {
        dictEntry *de;
        list *clients = NULL;
        int retval = 0;
    
        if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {//自己的訂閱頻道字典加入
            retval = 1;
            incrRefCount(channel);
            /* Add the client to the channel -> list of clients hash table */
            de = dictFind(server.pubsub_channels,channel);//在服務(wù)器中訂閱頻道字典找
            if (de == NULL) {//沒找到這個頻道 創(chuàng)建
                clients = listCreate();
                dictAdd(server.pubsub_channels,channel,clients);
                incrRefCount(channel);
            } else {//找到拿出來
                clients = dictGetVal(de);
            }
            listAddNodeTail(clients,c);//加入到list里面去
        }
        addReplyPubsubSubscribed(c,channel);//通知客戶端
        return retval;
    }
    
  • psubscribeCommand

    void psubscribeCommand(client *c) {
        int j;
    
        for (j = 1; j < c->argc; j++)
            pubsubSubscribePattern(c,c->argv[j]);//訂閱匹配頻道
        c->flags |= CLIENT_PUBSUB;
    }
    //加入訂閱頻道模式成功返回1,否則返回0
    int pubsubSubscribePattern(client *c, robj *pattern) {
        int retval = 0;
        if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {//沒找到的話
            retval = 1;
            pubsubPattern *pat;
            listAddNodeTail(c->pubsub_patterns,pattern);//加入到c里面的模式頻道
            incrRefCount(pattern);
            pat = zmalloc(sizeof(*pat));
            pat->pattern = getDecodedObject(pattern);
            pat->client = c;
            listAddNodeTail(server.pubsub_patterns,pat);//加入到server里面的模式頻道
        }
        addReplyPubsubPatSubscribed(c,pattern);
        return retval;
    }
    
  • publish

    void publishCommand(client *c) {
        int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);//發(fā)送一個消息
        if (server.cluster_enabled)//如果做了Redis集群
            clusterPropagatePublish(c->argv[1],c->argv[2]);
        else
            forceCommandPropagation(c,PROPAGATE_REPL);//強制命令傳播
        addReplyLongLong(c,receivers);//告訴哦用戶發(fā)送了幾個訂閱者
    }
    /* 發(fā)布一個消息*/
    int pubsubPublishMessage(robj *channel, robj *message) {
        int receivers = 0;
        dictEntry *de;
        listNode *ln;
        listIter li;
        de = dictFind(server.pubsub_channels,channel);
        if (de) {//找到此頻道的話
            list *list = dictGetVal(de);
            listNode *ln;
            listIter li;
    
            listRewind(list,&li);
            while ((ln = listNext(&li)) != NULL) {
                client *c = ln->value;
                addReplyPubsubMessage(c,channel,message);//發(fā)送消息給每一個訂閱者
                receivers++;
            }
        }
        /* 模式匹配頻道*/
        if (listLength(server.pubsub_patterns)) {
            listRewind(server.pubsub_patterns,&li);
            channel = getDecodedObject(channel);
            while ((ln = listNext(&li)) != NULL) {
                pubsubPattern *pat = ln->value;
    
                if (stringmatchlen((char*)pat->pattern->ptr,
                                    sdslen(pat->pattern->ptr),
                                    (char*)channel->ptr,
                                    sdslen(channel->ptr),0))//匹配 發(fā)送
                {
                    addReplyPubsubPatMessage(pat->client,
                        pat->pattern,channel,message);
                    receivers++;
                }
            }
            decrRefCount(channel);
        }
        return receivers;
    }
    

    其中,反向操作就是以上的操作的逆順序。

四、參考

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容