問題描述
使用python pika框架,從Rabbit MQ消費數(shù)據(jù)時,遇到了connection reset的錯誤,錯誤內(nèi)容如下:
Traceback (most recent call last):
File "/app/utils/rabbit.py", line 27, in message_callback
channel.basic_ack(delivery_tag=method.delivery_tag)
File "/usr/local/lib/python3.11/site-packages/pika/adapters/blocking_connection.py", line 2130, in basic_ack
self._flush_output()
File "/usr/local/lib/python3.11/site-packages/pika/adapters/blocking_connection.py", line 1353, in _flush_output
self._connection._flush_output(lambda: self.is_closed, *waiters)
File "/usr/local/lib/python3.11/site-packages/pika/adapters/blocking_connection.py", line 523, in _flush_output
raise self._closed_result.value.error
pika.exceptions.StreamLostError: Stream connection lost: ConnectionResetError(104, 'Connection reset by peer')
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/app/app.py", line 14, in main
rabbit.start_listen()
File "/app/utils/rabbit.py", line 52, in start_listen
channel.start_consuming()
File "/usr/local/lib/python3.11/site-packages/pika/adapters/blocking_connection.py", line 1883, in start_consuming
self._process_data_events(time_limit=None)
File "/usr/local/lib/python3.11/site-packages/pika/adapters/blocking_connection.py", line 2044, in _process_data_events
self.connection.process_data_events(time_limit=time_limit)
File "/usr/local/lib/python3.11/site-packages/pika/adapters/blocking_connection.py", line 851, in process_data_events
self._dispatch_channel_events()
File "/usr/local/lib/python3.11/site-packages/pika/adapters/blocking_connection.py", line 567, in _dispatch_channel_events
impl_channel._get_cookie()._dispatch_events()
File "/usr/local/lib/python3.11/site-packages/pika/adapters/blocking_connection.py", line 1510, in _dispatch_events
consumer_info.on_message_callback(self, evt.method,
File "/app/utils/rabbit.py", line 34, in message_callback
channel.basic_nack(delivery_tag=method.delivery_tag)
File "/usr/local/lib/python3.11/site-packages/pika/adapters/blocking_connection.py", line 2151, in basic_nack
self._impl.basic_nack(
File "/usr/local/lib/python3.11/site-packages/pika/channel.py", line 401, in basic_nack
self._raise_if_not_open()
File "/usr/local/lib/python3.11/site-packages/pika/channel.py", line 1403, in _raise_if_not_open
raise exceptions.ChannelWrongStateError('Channel is closed.')
pika.exceptions.ChannelWrongStateError: Channel is closed.
我的代碼如下:
import pika
from utils.logger import logger
import traceback
import json
from utils.config import get_rabbit_config
from utils.mail import send_mail
import time
EXCHANGE = 'DEFAULT_EXCHANGE'
ROUTING_KEY = 'purchasing.system.contract'
QUEUE_NAME = 'PURCHASING_CONTRACT_CONSUMER'
class Rabbit():
def __init__(self, callback=None) -> None:
self.callback = callback
def message_callback(self, channel, method, properties, body):
try:
logger.info(f'receive message: {body}')
message = json.loads(body)
if self.callback:
result = self.callback(message)
# time.sleep(5)
# result = False
else:
logger.warn('self.callback is None')
if result:
channel.basic_ack(delivery_tag=method.delivery_tag)
else:
channel.basic_nack(delivery_tag=method.delivery_tag)
except Exception as e:
logger.error(traceback.format_exc())
send_mail('purchasing-contract-consumer error',
traceback.format_exc())
channel.basic_nack(delivery_tag=method.delivery_tag)
time.sleep(3)
def start_listen(self):
config = get_rabbit_config()
credentials = pika.PlainCredentials(
username=config['username'], password=config['password'])
parameters = pika.ConnectionParameters(
# host=config['host'], port=config['port'], credentials=credentials, heartbeat=300, stack_timeout=300, socket_timeout=300, blocked_connection_timeout=300)
host=config['host'], port=config['port'], credentials=credentials, heartbeat=120, socket_timeout=60)
connection = pika.BlockingConnection(parameters=parameters)
channel = connection.channel()
# channel.exchange_declare(exchange=EXCHANGE,exchange_type='topic',durable=True)
channel.queue_declare(queue=QUEUE_NAME, durable=True)
channel.queue_bind(queue=QUEUE_NAME, exchange=EXCHANGE,
routing_key=ROUTING_KEY)
channel.basic_consume(
queue=QUEUE_NAME, on_message_callback=self.message_callback, auto_ack=False)
logger.info('start listening...')
channel.start_consuming()
logger.warning('warning...')
logger.warning('warning...')
logger.warning('warning...')
logger.warning('warning...')
logger.warning('warning...')
logger.warning('warning...')
我的應用場景是:從rabbit 消費數(shù)據(jù),并通過restful API調(diào)用web服務(self.callback是一個web API調(diào)用),上報消費的數(shù)據(jù)。但是web服務響應非常慢,每次調(diào)用花費1min多鐘,并且rabbit MQ中積累了大量消息。
原因分析
剛開始以為是rabbit MQ connection的超時問題引起的,于是設置連接超時時間為300s,代碼如下:
host=config['host'], port=config['port'], credentials=credentials, heartbeat=300, stack_timeout=300, socket_timeout=300, blocked_connection_timeout=300)
connection reset問題依然存在,于是又懷疑是result = self.callback(message)調(diào)用時間過長,超過了1min,于是將其注釋掉,改成
result = self.callback(message)
time.sleep(5)
result = False
錯誤依然存在,也排除了不是self.callback(message)長時間阻塞引起的問題。
查閱相關(guān)資料,發(fā)現(xiàn)是因為未設置prefetch_count,如果不設置這個參數(shù),框架默認設置為0,意味著無限消費數(shù)據(jù),此時,如果消費端處理消息的速度非常慢,并且Rabbit MQ中有大量消息堆積,那么socket的緩存區(qū)就會塞滿,此時客戶端socket就會告訴服務端socket,將滑動窗口設置為0。由于客戶端socket的緩存區(qū)一直被沾滿,服務端長時間無法發(fā)送數(shù)據(jù),甚至連socket的心跳消息也無法發(fā)出,就會導致connection reset異常;之后如果客戶端調(diào)用restful API完成數(shù)據(jù)上傳,嘗試調(diào)用channel.basic_ack(delivery_tag=method.delivery_tag),此時,連接已經(jīng)被重置,那么就拋出connection reset異常。
解決方案
在代碼中增加prefetch_count,代碼如下:
import pika
from utils.logger import logger
import traceback
import json
from utils.config import get_rabbit_config
from utils.mail import send_mail
import time
EXCHANGE = 'DEFAULT_EXCHANGE'
ROUTING_KEY = 'purchasing.system.contract'
QUEUE_NAME = 'PURCHASING_CONTRACT_CONSUMER'
class Rabbit():
def __init__(self, callback=None) -> None:
self.callback = callback
def message_callback(self, channel, method, properties, body):
try:
logger.info(f'receive message: {body}')
message = json.loads(body)
if self.callback:
result = self.callback(message)
# time.sleep(5)
# result = False
else:
logger.warn('self.callback is None')
if result:
channel.basic_ack(delivery_tag=method.delivery_tag)
else:
channel.basic_nack(delivery_tag=method.delivery_tag)
except Exception as e:
logger.error(traceback.format_exc())
send_mail('purchasing-contract-consumer error',
traceback.format_exc())
channel.basic_nack(delivery_tag=method.delivery_tag)
time.sleep(3)
def start_listen(self):
config = get_rabbit_config()
credentials = pika.PlainCredentials(
username=config['username'], password=config['password'])
parameters = pika.ConnectionParameters(
# host=config['host'], port=config['port'], credentials=credentials, heartbeat=300, stack_timeout=300, socket_timeout=300, blocked_connection_timeout=300)
host=config['host'], port=config['port'], credentials=credentials, heartbeat=120, socket_timeout=60)
connection = pika.BlockingConnection(parameters=parameters)
channel = connection.channel()
channel.basic_qos(prefetch_count=1) # 限制消費端消費的數(shù)據(jù),防止緩存區(qū)被占滿。
# channel.exchange_declare(exchange=EXCHANGE,exchange_type='topic',durable=True)
channel.queue_declare(queue=QUEUE_NAME, durable=True)
channel.queue_bind(queue=QUEUE_NAME, exchange=EXCHANGE,
routing_key=ROUTING_KEY)
channel.basic_consume(
queue=QUEUE_NAME, on_message_callback=self.message_callback, auto_ack=False)
logger.info('start listening...')
channel.start_consuming()
logger.warning('warning...')
logger.warning('warning...')
logger.warning('warning...')
logger.warning('warning...')
logger.warning('warning...')
logger.warning('warning...')
設置完成后,可以正常消費數(shù)據(jù),connection reset的異常消息。