rabbitmq消息發(fā)送卡頓的排查(spring-amqp)

問題描述

線上系統(tǒng)出現(xiàn)SLOW慢請求告警,經(jīng)排查,發(fā)現(xiàn)為rabbitmq的消息發(fā)送卡頓引起,卡頓時(shí)間幾秒鐘~幾十秒不等。由于只卡頓了1分鐘左右,筆數(shù)不多(30筆以內(nèi)),且后續(xù)就沒了,所以只是排查了下系統(tǒng)各項(xiàng)指標(biāo)(JVM、mq狀態(tài)、磁盤等),都正常。就沒有繼續(xù)深入了排查了。 但過了一段時(shí)間,又出現(xiàn)問題了~~(果然每個(gè)線上問題都要仔細(xì)追蹤- -#)
發(fā)生頻率:每次系統(tǒng)重啟后,都會出現(xiàn)一次,且每個(gè)實(shí)例出現(xiàn)一次后,后續(xù)就不在出現(xiàn)了,直到下一次系統(tǒng)實(shí)例重啟。 在出現(xiàn)卡頓時(shí),當(dāng)前實(shí)例仍有其他的消息發(fā)送是正常的。
單個(gè)消息大?。?/strong>1KB以內(nèi)。

使用情況描述

  1. rabbitmq:3.7.17, erlang:22.0.7
  2. 采用spirng-amqp包進(jìn)行封裝與rabbitmq的交互,版本為1.7.3.RELEASE
  3. 使用Spring-amqp的CachingConnectionFactory緩存連接工廠
  4. 調(diào)整生產(chǎn)者的cache-mode為Connection模式(原先為CHANNEL)
  5. 調(diào)整生產(chǎn)者的核心連接數(shù)量為24個(gè),最大連接數(shù)量為48個(gè)
  6. 設(shè)置生產(chǎn)者連接的channel.checkout-timeout為3秒(該參數(shù)的旨意是從CachingConnectionFactory獲取connection/channel的等待時(shí)間,CachingConnectionFactory實(shí)際就是個(gè)connection或者channel的緩存池子)

排查一: rabbitmq的per-connection連接流控引起的客戶端消息發(fā)送卡頓

由于通過日志已經(jīng)能夠確定是消息發(fā)送這一步產(chǎn)生了卡頓,那初步懷疑就是由于rabbitmq的流控機(jī)制引起的卡頓,但根據(jù)監(jiān)控,當(dāng)時(shí)卡頓時(shí)間內(nèi),rabbitmq的所有connection/channel的狀態(tài)都是正常的,沒有存在flow/block狀態(tài)的情況。(rabbitmq的監(jiān)控API:{rabbitmq-admin管理后臺域名}/api/index.html)
所以排除是rabbitmq服務(wù)器的流控引起的

排查二: 應(yīng)用客戶端自己的問題

由于出現(xiàn)問題是某次spring-amqp的緩存模式cache-mode從channel改為了connection(PS:此處調(diào)整,是因?yàn)閏hannel模式為單連接,所有的MQ消息,無論各種業(yè)務(wù)場景都是共用了這個(gè)連接,之前發(fā)生了一次per-connection的流控,導(dǎo)致這個(gè)連接下的所有消息都發(fā)不出去,為了降低業(yè)務(wù)影響,將單連接改造為多連接模式)
所以把目光轉(zhuǎn)向了spring-amqp的使用上,經(jīng)過代碼排查,果然是spring-amqp的一個(gè)BUG。。

在使用CachingConnectionFactory的createConnection()創(chuàng)建rabbitmq連接的時(shí)候,會存在一個(gè)同步鎖(由于該工廠就是個(gè)緩存池,所以需要同步鎖來確保緩存的個(gè)數(shù)不超過配置的MAX個(gè)數(shù)),知道連接創(chuàng)建完畢后,才會釋放該鎖,那么如果存在某個(gè)連接創(chuàng)建速度過慢,就會導(dǎo)致后續(xù)的連接創(chuàng)建時(shí)間也卡頓,都在等待該同步鎖的釋放


如果cache-mode是connection模式,根據(jù)源碼展示的,只要從連接緩存池沒有拿到一個(gè)空閑的connection,就直接等待一個(gè)checkout-timeout的時(shí)間(配置的是3秒)。這就代表,每一個(gè)連接的創(chuàng)建,都必須等待3秒鐘。。這就解釋了為啥線上的卡頓基本都是3、6、9秒左右。

這是一個(gè)比較明顯的BUG,后來在gitgub也找到相關(guān)的問題:https://github.com/spring-projects/spring-amqp/issues/1026

修復(fù)代碼也很簡單:


但升級spring-amqp包到已修復(fù)的版本是2.1.x 中間版本跨度太大, 且2.1.x需要依賴spring 5,我們spring的基礎(chǔ)版本還是4.x,貿(mào)然升級風(fēng)險(xiǎn)太大

解決

結(jié)合業(yè)務(wù)場景,此處MQ的消息場景是用于將交易請求做異步化處理,達(dá)到一個(gè)削峰填谷的目的。 改造為多連接模式,是由于之前單連接模式發(fā)生了流控后,導(dǎo)致使用該連接的所有業(yè)務(wù)場景均block。雖然改為了多連接,但實(shí)際上還是存在各個(gè)業(yè)務(wù)共享某個(gè)連接的情況,隔離性還不是特別高,應(yīng)該是按重要程度進(jìn)行分級隔離使用各自的連接。
經(jīng)過考量,決定使用Spring提供的RoutingConnectionFactory,通過將業(yè)務(wù)場景碼來分隔實(shí)際的ConnectionFactory(實(shí)際還是CachingConectionFactory,采用單連接channel模式),來達(dá)到業(yè)務(wù)場景隔離連接。

PS:連接并不是越多越好,我們此處只是將業(yè)務(wù)高等級的區(qū)分各個(gè)連接,其余實(shí)時(shí)性要求不是那么高的,單獨(dú)在共享使用另外一個(gè)連接

RoutingConnectionFactory,使用此方式,可根據(jù)Expression靈活將連接(connection)分為多個(gè),并根據(jù)自己場景定制(例如生產(chǎn)和消費(fèi)連接分離,生產(chǎn),消費(fèi)者又按照不同維度分多個(gè)連接),此時(shí),ConnectionFactory的實(shí)現(xiàn)仍然使用CachingConnectionFactory的channel模式。這樣,可以形成不同場景使用不同的連接(避免相互影響),同一連接又可為多個(gè)channel共享(提高性能)

附上CachingConnectionFactory的createConnection()方法源碼

org.springframework.amqp.rabbit.connection.CachingConnectionFactory
@Override
    public final Connection createConnection() throws AmqpException {
        Assert.state(!this.stopped, "The ApplicationContext is closed and the ConnectionFactory can no longer create connections.");
        synchronized (this.connectionMonitor) {
            if (this.cacheMode == CacheMode.CHANNEL) {
                if (this.connection.target == null) {
                    this.connection.target = super.createBareConnection();
                    // invoke the listener *after* this.connection is assigned
                    if (!this.checkoutPermits.containsKey(this.connection)) {
                        this.checkoutPermits.put(this.connection, new Semaphore(this.channelCacheSize));
                    }
                    this.connection.closeNotified.set(false);
                    getConnectionListener().onCreate(this.connection);
                }
                return this.connection;
            }
            else if (this.cacheMode == CacheMode.CONNECTION) {
                ChannelCachingConnectionProxy connection = findIdleConnection();
                long now = System.currentTimeMillis();
                while (connection == null && System.currentTimeMillis() - now < this.channelCheckoutTimeout) {
                    if (countOpenConnections() >= this.connectionLimit) {
                        try {
                            this.connectionMonitor.wait(this.channelCheckoutTimeout);
                            connection = findIdleConnection();
                        }
                        catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            throw new AmqpException("Interrupted while waiting for a connection", e);
                        }
                    }
                }
                if (connection == null) {
                    if (countOpenConnections() >= this.connectionLimit
                            && System.currentTimeMillis() - now >= this.channelCheckoutTimeout) {
                        throw new AmqpTimeoutException("Timed out attempting to get a connection");
                    }
                    connection = new ChannelCachingConnectionProxy(super.createBareConnection());
                    if (logger.isDebugEnabled()) {
                        logger.debug("Adding new connection '" + connection + "'");
                    }
                    this.allocatedConnections.add(connection);
                    this.allocatedConnectionNonTransactionalChannels.put(connection, new LinkedList<ChannelProxy>());
                    this.channelHighWaterMarks.put(ObjectUtils.getIdentityHexString(
                            this.allocatedConnectionNonTransactionalChannels.get(connection)), new AtomicInteger());
                    this.allocatedConnectionTransactionalChannels.put(connection, new LinkedList<ChannelProxy>());
                    this.channelHighWaterMarks.put(
                            ObjectUtils.getIdentityHexString(this.allocatedConnectionTransactionalChannels.get(connection)),
                            new AtomicInteger());
                    this.checkoutPermits.put(connection, new Semaphore(this.channelCacheSize));
                    getConnectionListener().onCreate(connection);
                }
                else if (!connection.isOpen()) {
                    try {
                        refreshProxyConnection(connection);
                    }
                    catch (Exception e) {
                        this.idleConnections.addLast(connection);
                    }
                }
                else {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Obtained connection '" + connection + "' from cache");
                    }
                }
                return connection;
            }
        }
        return null;
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容