最近幫別人看一個(gè)問(wèn)題,其項(xiàng)目使用了 rabbit mq,一是業(yè)務(wù)代碼使用;二是配合 spring cloud config & spring cloud bus做配置動(dòng)態(tài)刷新。
在測(cè)試環(huán)境偶爾會(huì)瞬時(shí)的網(wǎng)絡(luò)中斷,在幾秒內(nèi)即恢復(fù),但之后項(xiàng)目日志內(nèi)會(huì)一直報(bào) rabbit mq 的重連錯(cuò)誤:一開(kāi)始的報(bào)錯(cuò)是 :
#method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'springCloudBus.anonymous.xxxxx' in vhost '/', class-id=50, method-id=20)
過(guò)了一段時(shí)候后,報(bào)錯(cuò)變成 :
#method<channel.close>(reply-code=405, reply-text=NOT_FOUND.....
經(jīng)過(guò)測(cè)試發(fā)現(xiàn),業(yè)務(wù)隊(duì)列收發(fā)沒(méi)有收到影響,結(jié)合報(bào)錯(cuò)內(nèi)容內(nèi)的 springCloudBus.anonymous.xxxxxx ,明顯是和 spring cloud bus 相關(guān)的隊(duì)列,項(xiàng)目里用到的地方只有配置中心的配置動(dòng)態(tài)刷新,一測(cè)果然配置無(wú)法動(dòng)態(tài)刷新了。
原因分析:
第一個(gè)報(bào)錯(cuò)里,rabbit mq 返回了405 RESOURCE_LOCKED,搜一下就知道是因?yàn)?rabbit 排他性 隊(duì)列的特性,通過(guò)查閱 spring cloud bus 源碼確認(rèn)了這一點(diǎn)(RabbitExchangeQueueProvisioner.provisionConsumerDestination):
@Override
public ConsumerDestination provisionConsumerDestination(String name, String group,
ExtendedConsumerProperties<RabbitConsumerProperties> properties) {
boolean anonymous = !StringUtils.hasText(group);
String baseQueueName = anonymous ? groupedName(name, ANONYMOUS_GROUP_NAME_GENERATOR.generateName())
: properties.getExtension().isQueueNameGroupOnly() ? group : groupedName(name, group);
if (this.logger.isInfoEnabled()) {
this.logger.info("declaring queue for inbound: " + baseQueueName + ", bound to: " + name);
}
String prefix = properties.getExtension().getPrefix();
final String exchangeName = applyPrefix(prefix, name);
Exchange exchange = buildExchange(properties.getExtension(), exchangeName);
if (properties.getExtension().isDeclareExchange()) {
declareExchange(exchangeName, exchange);
}
String queueName = applyPrefix(prefix, baseQueueName);
boolean partitioned = !anonymous && properties.isPartitioned();
boolean durable = !anonymous && properties.getExtension().isDurableSubscription();
Queue queue;
if (anonymous) {
queue = new Queue(queueName, false, true, true, queueArgs(queueName, properties.getExtension(), false));
}
else {
if (partitioned) {
String partitionSuffix = "-" + properties.getInstanceIndex();
queueName += partitionSuffix;
}
if (durable) {
queue = new Queue(queueName, true, false, false,
queueArgs(queueName, properties.getExtension(), false));
}
else {
queue = new Queue(queueName, false, false, true,
queueArgs(queueName, properties.getExtension(), false));
}
}
declareQueue(queueName, queue);
Binding binding = null;
if (properties.getExtension().isBindQueue()) {
binding = declareConsumerBindings(name, properties, exchange, partitioned, queue);
}
if (durable) {
autoBindDLQ(applyPrefix(properties.getExtension().getPrefix(), baseQueueName), queueName,
properties.getExtension());
}
return new RabbitConsumerDestination(queue, binding);
}
可以看到 group 參數(shù)為空的時(shí)候,就會(huì)自動(dòng)創(chuàng)建匿名的排他隊(duì)列。
那么為什么第二個(gè)報(bào)錯(cuò)變成了 rabbit 返回 404 NOT FOUND?
首先,第一段報(bào)錯(cuò)和第二段報(bào)錯(cuò)的間隔一般很穩(wěn)定,三次報(bào) 405 后就會(huì)變成 404,跟異常棧對(duì)應(yīng)的源碼,可以發(fā)現(xiàn)這段間隔對(duì)應(yīng)的配置:
private int declarationRetries = 3;
這個(gè)字段是 spring 封裝的 rabbit mq 包的 BlockingQueueConsumer 類,這個(gè)封裝的消費(fèi)者類,會(huì)在與指定隊(duì)列綁定消費(fèi)連接時(shí),試圖重聲明隊(duì)列,重試間隔默認(rèn) 5000ms,在它試圖重新聲明這個(gè)匿名的排他隊(duì)列時(shí),會(huì)被無(wú)情的返回 405 拒絕連接,即使這個(gè)排他隊(duì)列是它之前創(chuàng)建...
在三次重試過(guò)后差不多的時(shí)間點(diǎn),這個(gè)隊(duì)列會(huì)自動(dòng)刪除,沒(méi)錯(cuò),這個(gè)匿名隊(duì)列不但是排他性 的,而且是 自動(dòng)刪除 的。在 rabbit 之后的消費(fèi)者重連嘗試中,就會(huì)返回 404 找不到指定隊(duì)列的報(bào)錯(cuò)。
注意:需要區(qū)分 rabbit
消費(fèi)者重連重試和隊(duì)列重聲明重試機(jī)制。
消費(fèi)者重連若不進(jìn)行手動(dòng)配置,在RabbitAdmin中就可以看到其實(shí)也是代碼中寫死的——5次嘗試,但不管怎樣肯定要比隊(duì)列的重試周期長(zhǎng)。
解決方案:
廢了這么多話,各位很容易就能想出一個(gè)解決方案:把 隊(duì)列重聲明 的次數(shù)配置多一些就好了嘛,等舊的匿名排他隊(duì)列自動(dòng)刪掉了,就可以正常的重聲明出新的匿名排他隊(duì)列了。那讓我們看看怎么配、配完效果是什么?
嘗試解決:加大 隊(duì)列重聲明 的次數(shù)
配置
在 github 的 spring cloud bus 的倉(cāng)庫(kù)并沒(méi)有找到相關(guān)配置,但在 spring cloud stream binder rabbit 的倉(cāng)庫(kù)找到了:

該配置項(xiàng)是:
spring.cloud.stream.rabbit.bingings.<channelName>.consumer.queue-declaration-retries,那么問(wèn)題來(lái)了,這個(gè) <channelName> 填什么???只好從配置項(xiàng)注入代碼附近入手,通過(guò)打斷點(diǎn)在運(yùn)行時(shí)的拿參數(shù)出來(lái),可以知道 spring cloud bus 通過(guò) spring cloud stream binder rabbit 創(chuàng)建隊(duì)列的 channelName 屬性值為
springCloudBusInput,那么帶入配置一下,設(shè)一個(gè)很大數(shù)如999999即可。結(jié)果
運(yùn)行測(cè)試,依然在幾次 405 報(bào)錯(cuò)后返回 404......
繼續(xù)從源碼分析入手,隊(duì)列重聲明調(diào)用的是
Channel.queueDeclarePassive 方法,搜了下發(fā)現(xiàn)用這個(gè)方法聲明隊(duì)列時(shí),如果隊(duì)列不存在就會(huì)報(bào)404....WTF....
最終解決:配置指定名稱隊(duì)列
那么現(xiàn)在還能怎樣解決這個(gè)問(wèn)題?第一次聲明匿名隊(duì)列的源碼中有一段(RabbitExchangeQueueProvisioner.provisionConsumerDestination):
boolean anonymous = !StringUtils.hasText(group);
.......
if (anonymous) {
queue = new Queue(queueName, false, true, true, queueArgs(queueName, properties.getExtension(), false));
}else {
if (partitioned) {
String partitionSuffix = "-" + properties.getInstanceIndex();
queueName += partitionSuffix;
}
if (durable) {
queue = new Queue(queueName, true, false, false, queueArgs(queueName, properties.getExtension(), false));
}else {
queue = new Queue(queueName, false, false, queueArgs(queueName, properties.getExtension(), false));
}
}
可以看出非匿名隊(duì)列是不會(huì)設(shè)置 排他性 和 自動(dòng)刪除 的,而group 這個(gè)參數(shù)不為空時(shí),就會(huì)用 group 為隊(duì)列名進(jìn)行聲明。這個(gè)參數(shù)同樣是可以配置的,同樣在之前的github倉(cāng)庫(kù)中有介紹:

簡(jiǎn)言之:如果該值設(shè)為true,則使用
group作為隊(duì)列的名稱。
為指定 channel 配置 group 沒(méi)有找到可用配置,但可以通過(guò)設(shè)置一個(gè)全局默認(rèn) group 做到同樣效果:
spring.cloud.stream.default.group=springCloudBus-${spring.application.name}-${spring.cloud.client.ip-address}-${server.port}
完整配置
spring:
cloud:
stream:
default:
group: ${spring.application.name}-${spring.cloud.client.ip-address}-${server.port}
rabbit:
bindings:
springCloudBusInput:
consumer:
# 隊(duì)列聲明重試次數(shù)
queue-declaration-retries: 2000
# 重試間隔(ms)
recovery-interval: 5000
# 為true時(shí),使用‘group’作為配置刷新隊(duì)列的名稱
queue-name-group-only: true
經(jīng)測(cè)試,網(wǎng)絡(luò)斷連恢復(fù)后,程序即時(shí)地恢復(fù)了消費(fèi)連接,沒(méi)有報(bào)錯(cuò)。
小小的感悟
其實(shí)整個(gè)問(wèn)題解決下來(lái),最后只用了一段配置就搞定了。官方文檔里幾十個(gè)配置項(xiàng),一個(gè)個(gè)讀下去每個(gè)都像是解決的方式,甚至有一些官方文檔沒(méi)介紹的配置,其實(shí)都直接可以寫在配置文件里,啟動(dòng)時(shí)會(huì)自動(dòng)注入。
總而言之,現(xiàn)在的開(kāi)發(fā)工作很少離得開(kāi)功能完善的開(kāi)源庫(kù)了,閱讀源碼是發(fā)現(xiàn)、解決問(wèn)題的不二手段,甚至通過(guò)源碼,你可以發(fā)現(xiàn)一些巧妙的間接的解決方式。