前言
本篇隨筆將匯總一些我對(duì)消息隊(duì)列 RabbitMQ 的認(rèn)識(shí),順便談?wù)勂湓诟卟l(fā)和秒殺系統(tǒng)中的具體應(yīng)用。

1. 預(yù)備示例
想了下,還是先拋出一個(gè)簡(jiǎn)單示例,隨后再根據(jù)其具體應(yīng)用場(chǎng)景進(jìn)行擴(kuò)展,我覺得這樣表述條理更清晰些。
RabbitConfig:
@Configuration
public class RabbitConfig {
@Bean
public Queue callQueue() {
return new Queue(MQConstant.CALL);
}
}
Client:
@Component
public class Client {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendCall(String content) {
for (int i = 0; i < 10000; i++) {
String message = i + "-" + content;
System.out.println(String.format("Sender: %s", message));
rabbitTemplate.convertAndSend(MQConstant.CALL, message);
}
}
}
Server:
@Component
public class Server {
@RabbitHandler
@RabbitListener(queues = MQConstant.CALL)
public void callProcess(String message) throws InterruptedException {
Thread.sleep(1000);
System.out.println(String.format("Receiver: reply(\"%s\") Yes, I just saw your message!", message));
}
}
Result:
Sender: Hello, are you there!
Receiver: reply("Hello, are you there!") Yes, I just saw your message!
以上示例會(huì)在 rabbitmq 中創(chuàng)建一條隊(duì)列 CALL, 消息在其中等待消費(fèi):

在此基礎(chǔ)上的簡(jiǎn)單擴(kuò)展我就不再寫案例了,比如領(lǐng)域模塊完成了其核心業(yè)務(wù)規(guī)則之后可能需要更新緩存、寫個(gè)郵件、記個(gè)復(fù)雜日志、做個(gè)統(tǒng)計(jì)報(bào)表等等,這些不需要及時(shí)反饋或者耗時(shí)的附屬業(yè)務(wù)都可以通過異步隊(duì)列分發(fā),以此來(lái)提升核心業(yè)務(wù)的響應(yīng)速度,同時(shí)如此處理能讓領(lǐng)域邊界更加清晰,代碼的可維護(hù)性和持續(xù)拓展的能力也會(huì)有所提升。
2. 削峰
上個(gè)示例中我提到的應(yīng)用場(chǎng)景是解耦和通知,再接著擴(kuò)展,因其具備良好的緩沖性質(zhì),所以還有一個(gè)非常適合的應(yīng)用場(chǎng)景那就是削峰。對(duì)于突如其來(lái)的極高并發(fā)請(qǐng)求,我們可以先瞬速地將其加入隊(duì)列并回復(fù)用戶一個(gè)友好提示,然后服務(wù)器可在其能承受的范圍內(nèi)慢慢處理,以此來(lái)防止突發(fā)的 CPU 和內(nèi)存 “爆表”。
改造之后對(duì)于發(fā)送方來(lái)說當(dāng)然是比較爽的,他只是將請(qǐng)求加入消息隊(duì)列而已,處理壓力都?xì)w到了消費(fèi)端。接著思考,這樣處理有沒有副作用?如果這個(gè)請(qǐng)求剛好是線程阻塞的,那還要加入隊(duì)列慢慢排隊(duì)處理,那不是完蛋了,用戶要猴年馬月才能得到反饋?所以針對(duì)此,我覺得應(yīng)該將消費(fèi)端的方法改為異步調(diào)用(即多線程)以提升吞吐量,在 Spring Boot 中的寫法也非常簡(jiǎn)單:
@Component
public class Server {
@Async
@RabbitHandler
@RabbitListener(queues = MQConstant.CALL)
public void callProcess(String message) throws InterruptedException {
Thread.sleep(100);
System.out.println(String.format("Receiver: reply(\"%s\") Yes, I just saw your message!", message));
}
}
參照示例一的方法,我發(fā)布了 10000 條消息加入隊(duì)列,且消費(fèi)端的調(diào)用每次阻塞一秒,那可有意思了,什么時(shí)候能處理完?但如果開幾百個(gè)線程同時(shí)處理的話,那幾十秒就夠了,當(dāng)然具體多少合適還應(yīng)根據(jù)具體的業(yè)務(wù)場(chǎng)景和服務(wù)器配置酌情考慮。另外,別忘了配線程池:
@Configuration
public class AsyncConfig {
@Bean
public Executor asyncExecutor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(500);
executor.setQueueCapacity(10);
executor.setThreadNamePrefix("MyExecutor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
3. Exchange
RabbitMQ 可能為 N 個(gè)應(yīng)用同時(shí)提供服務(wù),要是你和你的藍(lán)顏知己突然心有靈犀,在不同的業(yè)務(wù)上使用了同一個(gè) routingKey,想想就刺激。因此,隊(duì)列多了自然要進(jìn)行分組管理,限定好 Exchange 的規(guī)則,接下來(lái)就可以獨(dú)自玩耍了。
MQConstant:
public class MQConstant {
public static final String EXCHANGE = "YOUCLK-MESSAGE-EXCHANGE";
public static final String CALL = MQConstant.EXCHANGE + ".CALL";
public static final String ALL = MQConstant.EXCHANGE + ".#";
}
RabbitConfig:
@Configuration
public class RabbitConfig {
@Bean
public Queue callQueue() {
return new Queue(MQConstant.CALL);
}
@Bean
TopicExchange exchange() {
return new TopicExchange(MQConstant.EXCHANGE);
}
@Bean
Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
return BindingBuilder.bind(queueMessage).to(exchange).with(MQConstant.ALL);
}
}
此時(shí)我們?cè)偃ゲ殛?duì)列 CALL,可以看到已經(jīng)綁定了Exchange:

當(dāng)然 Exchange 的作用遠(yuǎn)不止如此,以上示例為 Topic 模式,除此之外還有 Direct、Headers 和 Fanout 模式,寫法都差不多,感興趣的童鞋可以去查看 “官方文檔” 進(jìn)行更深入了解。
4. 延時(shí)隊(duì)列
延時(shí)任務(wù)的場(chǎng)景相信小伙伴們都接觸過,特別是搶購(gòu)的時(shí)候,在規(guī)定時(shí)間內(nèi)未付款訂單就被回收了。微信支付的 API 里面也有一個(gè)支付完成后的延時(shí)再確認(rèn)消息推送,實(shí)現(xiàn)原理應(yīng)該都差不多。
利用 RabbitMQ 實(shí)現(xiàn)該功能首先要了解他的兩個(gè)特性,分別是 Time-To-Live Extensions 和 Dead Letter Exchanges,字面意思上就能理解個(gè)大概,一個(gè)是生存時(shí)間,一個(gè)是死信。整個(gè)過程也很容易理解,TTL 相當(dāng)于一個(gè)緩沖隊(duì)列,等待其過期之后消息會(huì)由 DLX 轉(zhuǎn)發(fā)到實(shí)際消費(fèi)隊(duì)列,如此便實(shí)現(xiàn)了他的延時(shí)過程。
MQConstant:
public class MQConstant {
public static final String PER_DELAY_EXCHANGE = "PER_DELAY_EXCHANGE";
public static final String DELAY_EXCHANGE = "DELAY_EXCHANGE";
public static final String DELAY_CALL_TTL = "DELAY_CALL_TTL";
public static final String CALL = "CALL";
}
ExpirationMessagePostProcessor:
public class ExpirationMessagePostProcessor implements MessagePostProcessor {
private final Long ttl;
public ExpirationMessagePostProcessor(Long ttl) {
this.ttl = ttl;
}
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties()
.setExpiration(ttl.toString());
return message;
}
}
Client:
@Component
public class Client {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendCall(String content) {
for (int i = 1; i <= 3; i++) {
long expiration = i * 5000;
String message = i + "-" + content;
System.out.println(String.format("Sender: %s", message));
rabbitTemplate.convertAndSend(MQConstant.DELAY_CALL_TTL, (Object) message, new ExpirationMessagePostProcessor(expiration));
}
}
}
Server:
@Component
public class Server {
@Async
@RabbitHandler
@RabbitListener(queues = MQConstant.CALL)
public void callProcess(String message) throws InterruptedException {
String date = (new SimpleDateFormat("HH:mm:ss")).format(new Date());
System.out.println(String.format("Receiver: reply(\"%s\") Yes, I just saw your message!- %s", message, date));
}
}
Result:
Sender: 1-Hello, are you there!
Sender: 2-Hello, are you there!
Sender: 3-Hello, are you there!
Receiver: reply("1-Hello, are you there!") Yes, I just saw your message!- 23:04:12
Receiver: reply("2-Hello, are you there!") Yes, I just saw your message!- 23:04:17
Receiver: reply("3-Hello, are you there!") Yes, I just saw your message!- 23:04:22
結(jié)果一目了然,分別在隊(duì)列中延遲了 5秒,10秒,15秒,當(dāng)然,以上只是我的簡(jiǎn)單示例,童鞋們可翻閱官方文檔(“ ttl ” && “ dlx ”)進(jìn)一步深入學(xué)習(xí)。
結(jié)語(yǔ)
本篇隨筆不該就這么結(jié)束,但晚上心情不好,百感交集,無(wú)法繼續(xù)寫作,無(wú)奈至此。近期正在尋覓新的工作機(jī)會(huì),我的微信:youclk,無(wú)論有沒有推薦的,給我點(diǎn)鼓勵(lì),謝謝!
我的公眾號(hào)《有刻》,我們共同成長(zhǎng)!
