RabbitMQ概述
MQ全稱為Message Queue,消息隊(duì)列是應(yīng)用程序和應(yīng)用程序之間的通信方法;
RabbitMQ是開(kāi)源的,實(shí)現(xiàn)了AMQP協(xié)議的,采用Erlang(面向并發(fā)編程語(yǔ)言)編寫的,可復(fù)用的企業(yè)級(jí)消息系統(tǒng);
AMQP(高級(jí)消息隊(duì)列協(xié)議)是一個(gè)異步消息傳遞所使用應(yīng)用層協(xié)議規(guī)范,為面向消息中間件設(shè)計(jì),基于此協(xié)議的客戶端與消息中間件可以無(wú)視消息來(lái)源傳遞消息,不受客戶端、消息中間件、不同的開(kāi)發(fā)語(yǔ)言環(huán)境等條件的限制;
支持主流操作系統(tǒng):Linux、Windows,MacOX等;
支持多種客戶端開(kāi)發(fā)語(yǔ)言:Java、Python、Ruby、.NET,PHP、C/C++、Node.js等
2.RabbitMQ安裝
centos上安裝
1.安裝前準(zhǔn)備
wget http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm
rpm -ivh epel-release-6-8.noarch.rpm
wget -P /etc/yum.repos.d/ http://repos.fedorapeople.org/repos/peter/erlang/epel-erlang.repo
yum clean all
yum -y install erlang
2.安裝rabbitmq
rpm --import http://www.rabbitmq.com/rabbitmq-signing-key-public.asc
wget http://www.rabbitmq.com/releases/rabbitmq-server/v2.8.5/rabbitmq-server-2.8.5-1.noarch.rpm
rpm -ivh rabbitmq-server-2.8.5-1.noarch.rpm
3.啟動(dòng)rabbitmq并設(shè)置開(kāi)機(jī)啟動(dòng)
systemctl start rabbitmq-server
chkconfig rabbitmq-server on
4.檢查rabbitmq是否啟動(dòng)
ps aux|grep rabbitmq 或者 systemctl status rabbitmq-server
5.開(kāi)啟web管理頁(yè)面
rabbitmq-plugins enable rabbitmq_management
service rabbitmq-server restart
6.防火墻開(kāi)放15672端口
/sbin/iptables -I INPUT -p tcp --dport 15672 -j ACCEPT
/etc/rc.d/init.d/iptables save
mac
brew install rabbitmq
2.基本使用
1.啟動(dòng)rabbitmq-server
2.在rabbitmq server上創(chuàng)建一個(gè)用戶
rabbitmqctl add_user zou 123
3.配置權(quán)限,允許從外面訪問(wèn)
rabbitmqctl set_permissions -p / zou ".*" ".*" ".*"
4.查看當(dāng)前隊(duì)列
rabbitmqctl list_queues
基本模型
producer
import pika
# 和本地服務(wù)器建立連接
credentials = pika.PlainCredentials("zou","123")
parameters = pika.ConnectionParameters(host="192.168.56.10", credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
# 聲明隊(duì)列名
channel.queue_declare(queue="hello")
# 在 RabbitMQ 中,消息并不能直接發(fā)送到隊(duì)列中,而總會(huì)被傳遞給代理(exchange)
# 我們可以在參數(shù)中將 exchange 設(shè)定為空字符串,
# 則可以使用一個(gè)默認(rèn)的 exchange。這個(gè) exchange 非常特殊:
# 允許我們指定使用哪個(gè)隊(duì)列,我們可以使用 routing_key 參數(shù)指定隊(duì)列名稱:
channel.basic_publish(
exchange="",
routing_key="hello",
body="fuck you",
)
print("Send 'fuck you' ")
# 關(guān)閉連接
connection.close()
consumer
import pika
credentials = pika.PlainCredentials('zou', '123')
parameters = pika.ConnectionParameters(host='192.168.56.10', credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel() # 隊(duì)列連接通道
channel.queue_declare(queue="hello")
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
channel.basic_consume(callback, # 取到消息后,調(diào)用callback 函數(shù)
queue='hello',
no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() # 阻塞模式
Work Queues
消息持久化
消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保證消息可靠性的呢——消息持久化。
為了保證RabbitMQ在退出或者crash等異常情況下數(shù)據(jù)沒(méi)有丟失,需要將queue,exchange和Message都持久化。
channel.queue_declare(queue="hello3",durable=True)
channel.basic_publish(
exchange="",
routing_key="hello3",
body=message,
properties=pika.BasicProperties(
delivery_mode=2 # 使消息持久化,rabbitmq掛了,隊(duì)列消息仍在
)
)
消息公平分發(fā)
如果Rabbit只管按順序把消息發(fā)到各個(gè)消費(fèi)者身上,不考慮消費(fèi)者負(fù)載的話,很可能出現(xiàn),一個(gè)機(jī)器配置不高的消費(fèi)者那里堆積了很多消息處理不完,同時(shí)配置高的消費(fèi)者卻一直很輕松。為解決此問(wèn)題,可以在各個(gè)消費(fèi)者端,配置perfetch=1,意思就是告訴RabbitMQ在我這個(gè)消費(fèi)者當(dāng)前消息還沒(méi)處理完的時(shí)候就不要再給我發(fā)新消息了。
[圖片上傳失敗...(image-d63693-1526605794569)]
消息持久化+公平分發(fā)的完整代碼
Producer
import pika,time
# 和本地服務(wù)器建立連接
credentials = pika.PlainCredentials("zou","123")
parameters = pika.ConnectionParameters(host="192.168.56.10", credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
# 聲明隊(duì)列名
channel.queue_declare(queue="hello3",durable=True) # 隊(duì)列持久化
import sys
message = ' '.join(sys.argv[1:]) or "Hello World! %s" % time.time()
channel.basic_publish(
exchange="",
routing_key="hello3",
body=message,
properties=pika.BasicProperties(
delivery_mode=2 # 使消息持久化,rabbitmq掛了,隊(duì)列消息仍在
)
)
print("[p] Send %s " % (message))
# 關(guān)閉連接
connection.close()
consumer
import pika,time
credentials = pika.PlainCredentials('zou', '123')
parameters = pika.ConnectionParameters(host='192.168.56.10', credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel() # 隊(duì)列連接通道
# channel.queue_declare(queue="hello3")
def callback(ch, method, properties, body):
print(" [x] Received %s" % body)
time.sleep(6)
print(" [x] Done")
# print("method.delivery_tag",method.delivery_tag)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, # 取到消息后,調(diào)用callback 函數(shù)
queue='hello3',
)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() # 阻塞模式
Publish\Subscribe(消息發(fā)布\訂閱)
fanout: 所有bind到此exchange的queue都可以接收消息
direct: 通過(guò)routingKey和exchange決定的那個(gè)唯一的queue可以接收消息
topic:所有符合routingKey(此時(shí)可以是一個(gè)表達(dá)式)的routingKey所bind的queue可以接收消息
廣播模式
Prodcer
import pika,time
# 和本地服務(wù)器建立連接
credentials = pika.PlainCredentials("zou","123")
parameters = pika.ConnectionParameters(host="192.168.56.10", credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
# 聲明隊(duì)列名
# channel.queue_declare(queue="hello3",durable=True)
channel.exchange_declare(exchange='logs',type='fanout') # 廣播模式
import sys
message = ' '.join(sys.argv[1:]) or "Hello World! %s" % time.time()
channel.basic_publish(
exchange="logs",
routing_key="",
body=message,
properties=pika.BasicProperties(
delivery_mode=2 # 使消息持久化,rabbitmq掛了,隊(duì)列消息仍在
)
)
print("[p] Send %s " % (message))
# 關(guān)閉連接
connection.close()
consumer
import pika,time
credentials = pika.PlainCredentials('zou', '123')
parameters = pika.ConnectionParameters(host='192.168.56.10', credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel() # 隊(duì)列連接通道
#不指定queue名字,rabbit會(huì)隨機(jī)分配一個(gè)名字,
# exclusive=True會(huì)在使用此queue的消費(fèi)者斷開(kāi)后,自動(dòng)將queue刪除
channel.exchange_declare(exchange='logs',type='fanout')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs',
queue=queue_name)
def callback(ch, method, properties, body):
print(" [x] Received %s" % body)
time.sleep(1)
print(" [x] Done")
# print("method.delivery_tag",method.delivery_tag)
ch.basic_ack(delivery_tag=method.delivery_tag) # 代表消費(fèi)完畢
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, # 取到消息后,調(diào)用callback 函數(shù)
queue=queue_name,
)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() # 阻塞模式
組播
Producer
__author__ = 'Administrator'
import pika
import sys
credentials = pika.PlainCredentials('alex', 'alex3714')
parameters = pika.ConnectionParameters(host='192.168.11.106',credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel() #隊(duì)列連接通道
channel.exchange_declare(exchange='direct_log',type='direct')
log_level = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='direct_log',
routing_key=log_level,
body=message)
print(" [x] Sent %r" % message)
connection.close()
consumer
__author__ = 'Administrator'
import pika,sys
credentials = pika.PlainCredentials('alex', 'alex3714')
parameters = pika.ConnectionParameters(host='192.168.11.106',credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel() #隊(duì)列連接通道
queue_obj = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit會(huì)隨機(jī)分配一個(gè)名字,exclusive=True會(huì)在使用此queue的消費(fèi)者斷開(kāi)后,自動(dòng)將queue刪除
queue_name = queue_obj.method.queue
print('queue name',queue_name,queue_obj)
log_levels = sys.argv[1:] # info warning errr
if not log_levels:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
for level in log_levels:
channel.queue_bind(exchange='direct_log',
queue=queue_name,
routing_key=level) #綁定隊(duì)列到Exchange
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(callback,queue=queue_name, no_ack=True)
channel.start_consuming()
Topic播
producer
import pika
import sys
credentials = pika.PlainCredentials('alex', 'alex3714')
parameters = pika.ConnectionParameters(host='192.168.11.106',credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel() #隊(duì)列連接通道
channel.exchange_declare(exchange='topic_log',type='topic')
#log_level = sys.argv[1] if len(sys.argv) > 1 else 'info'
log_level = sys.argv[1] if len(sys.argv) > 1 else 'all.info'
message = ' '.join(sys.argv[1:]) or "all.info: Hello World!"
channel.basic_publish(exchange='topic_log',
routing_key=log_level,
body=message)
print(" [x] Sent %r" % message)
connection.close()
consumer
import pika,sys
credentials = pika.PlainCredentials('alex', 'alex3714')
parameters = pika.ConnectionParameters(host='192.168.11.106',credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel() #隊(duì)列連接通道
queue_obj = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit會(huì)隨機(jī)分配一個(gè)名字,exclusive=True會(huì)在使用此queue的消費(fèi)者斷開(kāi)后,自動(dòng)將queue刪除
queue_name = queue_obj.method.queue
log_levels = sys.argv[1:] # info warning errr
if not log_levels:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
for level in log_levels:
channel.queue_bind(exchange='topic_log',
queue=queue_name,
routing_key=level) #綁定隊(duì)列到Exchange
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(callback,queue=queue_name, no_ack=True)
channel.start_consuming()
Remote procedure call (RPC)
Rpc-Server
__author__ = 'Administrator'
#1 。 定義fib函數(shù)
#2. 聲明接收指令的隊(duì)列名rpc_queue
#3. 開(kāi)始監(jiān)聽(tīng)隊(duì)列,收到消息后 調(diào)用fib函數(shù)
#4 把fib執(zhí)行結(jié)果,發(fā)送回客戶端指定的reply_to 隊(duì)列
import subprocess
import pika
import time
credentials = pika.PlainCredentials('zou', '123')
parameters = pika.ConnectionParameters(host='192.168.56.10',credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel() #隊(duì)列連接通道
channel.queue_declare(queue='rpc_queue2')
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n-1) + fib(n-2)
def run_cmd(cmd):
cmd_obj = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
result = cmd_obj.stdout.read() + cmd_obj.stderr.read()
return result
def on_request(ch, method, props, body):
cmd = body.decode("utf-8")
print(" [.] run (%s)" % cmd)
response = run_cmd(cmd)
ch.basic_publish(exchange='',
routing_key=props.reply_to, #隊(duì)列
properties=pika.BasicProperties(correlation_id = \
props.correlation_id),
body=response)
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(on_request, queue='rpc_queue2')
print(" [x] Awaiting RPC requests")
channel.start_consuming()
Rpc-Client
# 1.聲明一個(gè)隊(duì)列,作為reply_to返回消息結(jié)果的隊(duì)列
# 2. 發(fā)消息到隊(duì)列,消息里帶一個(gè)唯一標(biāo)識(shí)符uid,reply_to
# 3. 監(jiān)聽(tīng)reply_to 的隊(duì)列,直到有結(jié)果
import queue
import pika
import uuid
class CMDRpcClient(object):
def __init__(self):
credentials = pika.PlainCredentials('zou', '123')
parameters = pika.ConnectionParameters(host='192.168.11.106',credentials=credentials)
self.connection = pika.BlockingConnection(parameters)
self.channel = self.connection.channel()
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue #命令的執(zhí)行結(jié)果的queue
#聲明要監(jiān)聽(tīng)callback_queue
self.channel.basic_consume(self.on_response, no_ack=True,
queue=self.callback_queue)
def on_response(self, ch, method, props, body):
"""
收到服務(wù)器端命令結(jié)果后執(zhí)行這個(gè)函數(shù)
:param ch:
:param method:
:param props:
:param body:
:return:
"""
if self.corr_id == props.correlation_id:
self.response = body.decode("gbk") #把執(zhí)行結(jié)果賦值給Response
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4()) #唯一標(biāo)識(shí)符號(hào)
self.channel.basic_publish(exchange='',
routing_key='rpc_queue2',
properties=pika.BasicProperties(
reply_to = self.callback_queue,
correlation_id = self.corr_id,
),
body=str(n))
while self.response is None:
self.connection.process_data_events() # 檢測(cè)監(jiān)聽(tīng)的隊(duì)列里有沒(méi)有新消息,如果有,收,如果沒(méi)有,返回None
#檢測(cè)有沒(méi)有要發(fā)送的新指令
return self.response
cmd_rpc = CMDRpcClient()
print(" [x] Requesting fib(30)")
response = cmd_rpc.call('ipconfig')
print(response)