redis實(shí)現(xiàn)消息發(fā)布訂閱

redis只能支持簡(jiǎn)單的消息發(fā)布訂閱,如果消息訂閱需求復(fù)雜,可以選擇其他MQ進(jìn)行整合。

/**
 * Redis 訂閱發(fā)布功能
 *
 * @author whucke
 * @since 2018/12/17 11:07
 */
public class PubSubscribe {

    private Jedis jedis;

    public PubSubscribe(Jedis jedis) {
        this.jedis = jedis;
    }

    /**
     * 發(fā)送訂閱消息
     *
     * @param channel 通道名稱
     * @param message 消息
     * @return
     */
    public boolean publish(String channel, String message) {
        Long result = jedis.publish(channel, message);
        return result != null && result > 0;
    }

}
/**
 * 消息訂閱實(shí)現(xiàn)
 *
 * @author whucke
 * @since 2018/12/17 13:58
 */
public class Subscriber extends JedisPubSub {

    /**
     * 收到訂閱消息處理
     *
     * @param channel
     * @param message
     */
    @Override
    public void onMessage(String channel, String message) {
        System.out.printf("通道%s 接收到信息:%s \r\n", channel, message);
    }

    /**
     * 訂閱通道調(diào)用
     *
     * @param channel
     * @param subscribedChannels
     */
    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
        System.out.printf("訂閱通道信息調(diào)用,通道:%s ,值:%s \r\n", channel, subscribedChannels);
    }

    /**
     * 取消訂閱調(diào)用
     *
     * @param channel
     * @param subscribedChannels
     */
    @Override
    public void onUnsubscribe(String channel, int subscribedChannels) {
        System.out.printf("取消訂閱信息調(diào)用,通道:%s ,值:%s \r\n", channel, subscribedChannels);
    }
}

/**
 * 單獨(dú)開(kāi)啟一個(gè)線程監(jiān)聽(tīng)訂閱消息
 * @author whucke
 * @since 2018/12/17 14:25
 */
public class SubThread extends Thread {

    private Subscriber subscriber;
    private Jedis jedis;
    private String[] channels;

    public SubThread(Jedis jedis, Subscriber subscriber, String[] channels) {
        this.subscriber = subscriber;
        this.jedis = jedis;
        this.channels = channels;
    }

    @Override
    public void run() {
        jedis.subscribe(subscriber, channels);
    }
}

/**
 * 消息發(fā)布訂閱測(cè)試
 *
 * @author whucke
 * @since 2018/12/17 14:19
 */
public class PubSubscribeTest {

    @Test
    public void testPub() {
        Jedis jedis = RedisFactory.getRedisClient();
        PubSubscribe subscribe = new PubSubscribe(jedis);
        for (int i = 0; i < 10; i++) {
            boolean result = subscribe.publish("channel1", "消息發(fā)布測(cè)試_"+i);
            System.out.println(result ? i+"_消息發(fā)送成功" : i+"_消息發(fā)送失敗");
        }
        jedis.close();
    }


    public static void main(String[] args) {
        Subscriber subscriber = new Subscriber();
        Jedis jedis = RedisFactory.getRedisClient();
        String[] channels = {"channel1"};
        SubThread subThread = new SubThread(jedis, subscriber, channels);
        subThread.start();
    }

}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容