redis只能支持簡(jiǎn)單的消息發(fā)布訂閱,如果消息訂閱需求復(fù)雜,可以選擇其他MQ進(jìn)行整合。
/**
* Redis 訂閱發(fā)布功能
*
* @author whucke
* @since 2018/12/17 11:07
*/
public class PubSubscribe {
private Jedis jedis;
public PubSubscribe(Jedis jedis) {
this.jedis = jedis;
}
/**
* 發(fā)送訂閱消息
*
* @param channel 通道名稱
* @param message 消息
* @return
*/
public boolean publish(String channel, String message) {
Long result = jedis.publish(channel, message);
return result != null && result > 0;
}
}
/**
* 消息訂閱實(shí)現(xiàn)
*
* @author whucke
* @since 2018/12/17 13:58
*/
public class Subscriber extends JedisPubSub {
/**
* 收到訂閱消息處理
*
* @param channel
* @param message
*/
@Override
public void onMessage(String channel, String message) {
System.out.printf("通道%s 接收到信息:%s \r\n", channel, message);
}
/**
* 訂閱通道調(diào)用
*
* @param channel
* @param subscribedChannels
*/
@Override
public void onSubscribe(String channel, int subscribedChannels) {
System.out.printf("訂閱通道信息調(diào)用,通道:%s ,值:%s \r\n", channel, subscribedChannels);
}
/**
* 取消訂閱調(diào)用
*
* @param channel
* @param subscribedChannels
*/
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
System.out.printf("取消訂閱信息調(diào)用,通道:%s ,值:%s \r\n", channel, subscribedChannels);
}
}
/**
* 單獨(dú)開(kāi)啟一個(gè)線程監(jiān)聽(tīng)訂閱消息
* @author whucke
* @since 2018/12/17 14:25
*/
public class SubThread extends Thread {
private Subscriber subscriber;
private Jedis jedis;
private String[] channels;
public SubThread(Jedis jedis, Subscriber subscriber, String[] channels) {
this.subscriber = subscriber;
this.jedis = jedis;
this.channels = channels;
}
@Override
public void run() {
jedis.subscribe(subscriber, channels);
}
}
/**
* 消息發(fā)布訂閱測(cè)試
*
* @author whucke
* @since 2018/12/17 14:19
*/
public class PubSubscribeTest {
@Test
public void testPub() {
Jedis jedis = RedisFactory.getRedisClient();
PubSubscribe subscribe = new PubSubscribe(jedis);
for (int i = 0; i < 10; i++) {
boolean result = subscribe.publish("channel1", "消息發(fā)布測(cè)試_"+i);
System.out.println(result ? i+"_消息發(fā)送成功" : i+"_消息發(fā)送失敗");
}
jedis.close();
}
public static void main(String[] args) {
Subscriber subscriber = new Subscriber();
Jedis jedis = RedisFactory.getRedisClient();
String[] channels = {"channel1"};
SubThread subThread = new SubThread(jedis, subscriber, channels);
subThread.start();
}
}