Python消息隊(duì)列RabbitMQ異常重試機(jī)制及Pika重連機(jī)制

最近線上服務(wù)出現(xiàn)rabbitMq隊(duì)列不消費(fèi)的情況,我們最終定位到可能是rabbitMq服務(wù)異常,而其他服務(wù)沒(méi)有建立重連機(jī)制導(dǎo)致的。

首先我們需要了解RabbitMq,RabbitMq 是實(shí)現(xiàn)了高級(jí)消息隊(duì)列協(xié)議(AMQP)的開源消息代理中間件。消息隊(duì)列是一種應(yīng)用程序?qū)?yīng)用程序的通行方式,應(yīng)用程序通過(guò)寫消息,將消息傳遞于隊(duì)列,由另一應(yīng)用程序讀取完成通信。而作為中間件的 RabbitMq 無(wú)疑是目前最流行的消息隊(duì)列之一。

RabbitMq 應(yīng)用場(chǎng)景廣泛:

--系統(tǒng)的高可用:日常生活當(dāng)中各種商城秒殺,高流量,高并發(fā)的場(chǎng)景。當(dāng)服務(wù)器接收到如此大量請(qǐng)求處理業(yè)務(wù)時(shí),有宕機(jī)的風(fēng)險(xiǎn)。某些業(yè)務(wù)可能極其復(fù)雜,但這部分不是高時(shí)效性,不需要立即反饋給用戶,我們可以將這部分處理請(qǐng)求拋給隊(duì)列,讓程序后置去處理,減輕服務(wù)器在高并發(fā)場(chǎng)景下的壓力。
--分布式系統(tǒng),集成系統(tǒng),子系統(tǒng)之間的對(duì)接,以及架構(gòu)設(shè)計(jì)中常常需要考慮消息隊(duì)列的應(yīng)用。

python連接操作rabbitMQ 主要是使用pika庫(kù)。
pika連接rabbitMQ的連接參數(shù)主要是在使用ConnectionParametersURLParameters
URLParameters可官方自行了解,這里主要簡(jiǎn)述下ConnectionParameters
connectionParameters定義簡(jiǎn)化為:

class ConnectionParameters(Parameters):
    def __init__(self,
            host=_DEFAULT,
            port=_DEFAULT,
            virtual_host=_DEFAULT,
            credentials=_DEFAULT,
            channel_max=_DEFAULT,
            frame_max=_DEFAULT,
            heartbeat=_DEFAULT,
            ssl_options=_DEFAULT,
            connection_attempts=_DEFAULT,
            retry_delay=_DEFAULT,
            socket_timeout=_DEFAULT,
            stack_timeout=_DEFAULT,
            locale=_DEFAULT,
            blocked_connection_timeout=_DEFAULT,
            client_properties=_DEFAULT,
            tcp_options=_DEFAULT,
            **kwargs)

參數(shù)默認(rèn)值都是一個(gè)_DEFAULT的類, 這個(gè)將映射對(duì)應(yīng)的默認(rèn)值到對(duì)應(yīng)的參數(shù)
參數(shù)說(shuō)明

1.host
DEFAULT_HOST = 'localhost'
2.port
DEFAULT_PORT = 5672
3.virtual_host
DEFAULT_VIRTUAL_HOST = ‘/’
4.credentials認(rèn)證參數(shù):默認(rèn)值:DEFAULT_CREDENTIALS = pika.credentials.PlainCredentials(DEFAULT_USERNAME, DEFAULT_PASSWORD)
DEFAULT_USERNAME = 'guest'
DEFAULT_PASSWORD = 'guest'
5.channel_max最大通道數(shù)
DEFAULT_CHANNEL_MAX = pika.channel.MAX_CHANNELS
6.frame_max要使用的所需最大AMQP幀大小
DEFAULT_FRAME_MAX = spec.FRAME_MAX_SIZE
7.heartbeat心跳, 0 為關(guān)閉。連接調(diào)整期間協(xié)商的AMQP連接心跳超時(shí)值或連接調(diào)整期間調(diào)用的可調(diào)用值
DEFAULT_HEARTBEAT_TIMEOUT = None # None accepts server’s proposal
8.ssl_options傳入值pika.SSLOptions
DEFAULT_SSL_OPTIONS = None
9.connection_attempts套接字連接嘗試次數(shù)
DEFAULT_CONNECTION_ATTEMPTS = 1
10.retry_delay套接字連接嘗試重連間隔
DEFAULT_RETRY_DELAY = 2.0
11.socket_timeout
DEFAULT_SOCKET_TIMEOUT = 10.0 # socket.connect() timeout
12.stack_timeout套接字連接嘗試間隔 , None為禁用
DEFAULT_STACK_TIMEOUT = 15.0 # full-stack TCP/[SSl]/AMQP bring-up timeout
13.locale
DEFAULT_LOCALE = ‘en_US’
14.blocked_connection_timeout阻塞的超時(shí)時(shí)間,默認(rèn)不超時(shí)
DEFAULT_BLOCKED_CONNECTION_TIMEOUT = None
15.client_properties客戶端屬性,用于覆蓋通過(guò)Connection.StartOk 方法向RabbitMQ報(bào)告的默認(rèn)客戶端屬性中的字段,字典類型/None
DEFAULT_CLIENT_PROPERTIES = None
16.tcp_options
DEFAULT_TCP_OPTIONS = None
其他:
DEFAULT_SSL = False
DEFAULT_SSL_PORT = 5671

回到這個(gè)話題的出發(fā)點(diǎn),當(dāng)時(shí)排除代碼發(fā)現(xiàn),python使用pika連接rabbitMq并沒(méi)有做任何處理,導(dǎo)致rabbitMq異常,相應(yīng)的python服務(wù)會(huì)直接假死或者直接掛掉,這就導(dǎo)致了rabbitMQ隊(duì)列不消費(fèi)。

#無(wú)重試配置
@property
    def connection(self):
        if self._connection is not None:
            return self._connection
        credentials = pika.credentials.PlainCredentials('root','root')
        connectionParameters = pika.ConnectionParameters(
            host='localhost',
            virtual_host=5672,
            credentials=credentials,
            socket_timeout=10
        )
        self._connection = pika.BlockingConnection(connectionParameters)
        return self._connection

#增加重試配置及設(shè)置心跳為0(不主動(dòng)關(guān)閉連接)
@property
    def connection(self):
        if self._connection is not None:
            return self._connection
        credentials = pika.credentials.PlainCredentials('root','root')
        connectionParameters = pika.ConnectionParameters(
            host='localhost',
            virtual_host=5672,
            credentials=credentials,
            socket_timeout=10,
            heartbeat=0,
            retry_delay=10,
            connection_attempts=10
        )
        self._connection = pika.BlockingConnection(connectionParameters)
        return self._connection

最后對(duì)于Bunny, Java, .NET, Objective-C, Swiftrabbitmq客戶端擁有自動(dòng)重連機(jī)制, 但是對(duì)于python 客戶端 目前還沒(méi)有提供自動(dòng)重連機(jī)制,這就需要自行實(shí)現(xiàn)。
1.while實(shí)現(xiàn):

import pika

while True:
    try:
        connection = pika.BlockingConnection()
        channel = connection.channel()
        channel.basic_consume('test', on_message_callback)
        channel.start_consuming()
    # Don't recover if connection was closed by broker
    except pika.exceptions.ConnectionClosedByBroker:
        break
    # Don't recover on channel errors
    except pika.exceptions.AMQPChannelError:
        break
    # Recover on all other connection errors
    except pika.exceptions.AMQPConnectionError:
        continue

這種方式簡(jiǎn)單,但不夠優(yōu)雅, 因?yàn)楫惓:?,?huì)不停地進(jìn)行重試。
2.retry實(shí)現(xiàn)

from retry import retry

@retry(pika.exceptions.AMQPConnectionError, delay=5, jitter=(1, 3))
    def consume(self, callback):
        """Start consuming AMQP messages in the current process"""
        if self.connection.is_closed:
            self.reconnect()
        try:
            channel, queue = self.setListener(callback)
            channel.start_consuming()
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            log.exception('Failed to prepare AMQP consumer')
            raise
#當(dāng)時(shí)忘記發(fā)布也進(jìn)行連接重連設(shè)置了,導(dǎo)致問(wèn)題沒(méi)有徹底解決。
@retry(pika.exceptions.AMQPConnectionError, delay=5, jitter=(1, 3))
    def publish(self, body, block=True, timeout=None, properties=None):

retry是一個(gè)用于錯(cuò)誤處理的模塊,功能類似try-except,但更加快捷方便,這里就將簡(jiǎn)單地介紹一下retry的基本用法。
retry-作為裝飾器進(jìn)行使用,不傳入?yún)?shù)時(shí)功能如下例所示:

from retry import retry
 
@retry()
def make_trouble():
    '''Retry until succeed'''
    print ('retrying...')
    raise
 
if __name__ == '__main__':
    make_trouble()
 
# 輸出: 一直重試,直到運(yùn)行成功
retrying...
retrying...
retrying...
retrying...
retrying...
retrying...

retry 參數(shù)介紹

def retry(exceptions=Exception, tries=-1, delay=0, max_delay=None, backoff=1, jitter=0, logger=logging_logger):
    """Return a retry decorator.

    :param exceptions: an exception or a tuple of exceptions to catch. default: Exception.
    :param tries: the maximum number of attempts. default: -1 (infinite).
    :param delay: initial delay between attempts. default: 0.
    :param max_delay: the maximum value of delay. default: None (no limit).
    :param backoff: multiplier applied to delay between attempts. default: 1 (no backoff).
    :param jitter: extra seconds added to delay between attempts. default: 0.
                   fixed if a number, random if a range tuple (min, max)
    :param logger: logger.warning(fmt, error, delay) will be called on failed attempts.
                   default: retry.logging_logger. if None, logging is disabled.
    """

retry()在這里的功能,是在其裝飾的函數(shù)運(yùn)行報(bào)錯(cuò)后重新運(yùn)行該函數(shù),在上例中的效果就是反復(fù)運(yùn)行make_trouble(),這也是retry()的基本用法,下面介紹其幾個(gè)主要參數(shù):

exceptions:傳入指定的錯(cuò)誤類型,默認(rèn)為Exception,即捕獲所有類型的錯(cuò)誤,也可傳入元組形式的多種指定錯(cuò)誤類型。
tries:定義捕獲錯(cuò)誤之后重復(fù)運(yùn)行次數(shù),默認(rèn)為-1,即為無(wú)數(shù)次。
delay:定義每次重復(fù)運(yùn)行之間的停頓時(shí)長(zhǎng),單位秒,默認(rèn)為0,即無(wú)停頓。
backoff:呈指數(shù)增長(zhǎng)的每次重復(fù)運(yùn)行之間的停頓時(shí)長(zhǎng),需要配合delay來(lái)使用,譬如delay設(shè)置為3,backoff設(shè)置為2,則第一次間隔為3*2^0=3秒,第二次3*2^1=6秒,第三次3*2^2=12秒,以此類推,默認(rèn)為1。
max_delay:定義backoffdelay配合下出現(xiàn)的等待時(shí)間上限,當(dāng)delay*backoff**n大于max_delay時(shí),等待間隔固定為該值而不再增長(zhǎng)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容