rabbitmq使用

RabbitMQ概述

參考博客

MQ全稱為Message Queue,消息隊(duì)列是應(yīng)用程序和應(yīng)用程序之間的通信方法;

RabbitMQ是開(kāi)源的,實(shí)現(xiàn)了AMQP協(xié)議的,采用Erlang(面向并發(fā)編程語(yǔ)言)編寫的,可復(fù)用的企業(yè)級(jí)消息系統(tǒng);

AMQP(高級(jí)消息隊(duì)列協(xié)議)是一個(gè)異步消息傳遞所使用應(yīng)用層協(xié)議規(guī)范,為面向消息中間件設(shè)計(jì),基于此協(xié)議的客戶端與消息中間件可以無(wú)視消息來(lái)源傳遞消息,不受客戶端、消息中間件、不同的開(kāi)發(fā)語(yǔ)言環(huán)境等條件的限制;

支持主流操作系統(tǒng):Linux、Windows,MacOX等;

支持多種客戶端開(kāi)發(fā)語(yǔ)言:Java、Python、Ruby、.NET,PHP、C/C++、Node.js等

2.RabbitMQ安裝

centos上安裝

1.安裝前準(zhǔn)備

  wget http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm
  rpm -ivh epel-release-6-8.noarch.rpm
  wget -P /etc/yum.repos.d/ http://repos.fedorapeople.org/repos/peter/erlang/epel-erlang.repo 
  yum clean all 
  yum -y install erlang

2.安裝rabbitmq

 rpm --import http://www.rabbitmq.com/rabbitmq-signing-key-public.asc 
 wget http://www.rabbitmq.com/releases/rabbitmq-server/v2.8.5/rabbitmq-server-2.8.5-1.noarch.rpm 
 rpm -ivh rabbitmq-server-2.8.5-1.noarch.rpm

3.啟動(dòng)rabbitmq并設(shè)置開(kāi)機(jī)啟動(dòng)

systemctl start rabbitmq-server
chkconfig rabbitmq-server on

4.檢查rabbitmq是否啟動(dòng)

ps aux|grep rabbitmq    或者    systemctl status rabbitmq-server

5.開(kāi)啟web管理頁(yè)面

rabbitmq-plugins enable rabbitmq_management
service rabbitmq-server restart

6.防火墻開(kāi)放15672端口

/sbin/iptables -I INPUT -p tcp --dport 15672 -j ACCEPT
/etc/rc.d/init.d/iptables save

mac

brew install rabbitmq

windows安裝

2.基本使用

1.啟動(dòng)rabbitmq-server

2.在rabbitmq server上創(chuàng)建一個(gè)用戶
rabbitmqctl add_user zou 123

3.配置權(quán)限,允許從外面訪問(wèn)
rabbitmqctl set_permissions -p / zou ".*" ".*" ".*"

4.查看當(dāng)前隊(duì)列
rabbitmqctl list_queues

基本模型

producer

import pika

# 和本地服務(wù)器建立連接
credentials = pika.PlainCredentials("zou","123")
parameters = pika.ConnectionParameters(host="192.168.56.10", credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()

# 聲明隊(duì)列名
channel.queue_declare(queue="hello")

# 在 RabbitMQ 中,消息并不能直接發(fā)送到隊(duì)列中,而總會(huì)被傳遞給代理(exchange)
# 我們可以在參數(shù)中將 exchange 設(shè)定為空字符串,
# 則可以使用一個(gè)默認(rèn)的 exchange。這個(gè) exchange 非常特殊:
# 允許我們指定使用哪個(gè)隊(duì)列,我們可以使用 routing_key 參數(shù)指定隊(duì)列名稱:
channel.basic_publish(
    exchange="",
    routing_key="hello",
    body="fuck you",
)

print("Send 'fuck you' ")

# 關(guān)閉連接
connection.close()

consumer

import pika

credentials = pika.PlainCredentials('zou', '123')

parameters = pika.ConnectionParameters(host='192.168.56.10', credentials=credentials)
connection = pika.BlockingConnection(parameters)

channel = connection.channel()  # 隊(duì)列連接通道
channel.queue_declare(queue="hello")


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)


channel.basic_consume(callback,  # 取到消息后,調(diào)用callback 函數(shù)
                      queue='hello',
                      no_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()  # 阻塞模式

Work Queues

消息持久化

消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保證消息可靠性的呢——消息持久化。
為了保證RabbitMQ在退出或者crash等異常情況下數(shù)據(jù)沒(méi)有丟失,需要將queue,exchange和Message都持久化。

channel.queue_declare(queue="hello3",durable=True)
channel.basic_publish(
    exchange="",
    routing_key="hello3",
    body=message,
    properties=pika.BasicProperties(
        delivery_mode=2  # 使消息持久化,rabbitmq掛了,隊(duì)列消息仍在
    )
)

消息公平分發(fā)

如果Rabbit只管按順序把消息發(fā)到各個(gè)消費(fèi)者身上,不考慮消費(fèi)者負(fù)載的話,很可能出現(xiàn),一個(gè)機(jī)器配置不高的消費(fèi)者那里堆積了很多消息處理不完,同時(shí)配置高的消費(fèi)者卻一直很輕松。為解決此問(wèn)題,可以在各個(gè)消費(fèi)者端,配置perfetch=1,意思就是告訴RabbitMQ在我這個(gè)消費(fèi)者當(dāng)前消息還沒(méi)處理完的時(shí)候就不要再給我發(fā)新消息了。

[圖片上傳失敗...(image-d63693-1526605794569)]

消息持久化+公平分發(fā)的完整代碼

Producer

import pika,time

# 和本地服務(wù)器建立連接
credentials = pika.PlainCredentials("zou","123")
parameters = pika.ConnectionParameters(host="192.168.56.10", credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()

# 聲明隊(duì)列名
channel.queue_declare(queue="hello3",durable=True) # 隊(duì)列持久化

import sys
message = ' '.join(sys.argv[1:]) or "Hello World! %s" % time.time()
channel.basic_publish(
    exchange="",
    routing_key="hello3",
    body=message,
    properties=pika.BasicProperties(
        delivery_mode=2  # 使消息持久化,rabbitmq掛了,隊(duì)列消息仍在
    )
)
print("[p] Send %s " % (message))

# 關(guān)閉連接
connection.close()

consumer

import pika,time

credentials = pika.PlainCredentials('zou', '123')

parameters = pika.ConnectionParameters(host='192.168.56.10', credentials=credentials)
connection = pika.BlockingConnection(parameters)

channel = connection.channel()  # 隊(duì)列連接通道
# channel.queue_declare(queue="hello3")

def callback(ch, method, properties, body):
    print(" [x] Received %s" % body)
    time.sleep(6)
    print(" [x] Done")
    # print("method.delivery_tag",method.delivery_tag)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,  # 取到消息后,調(diào)用callback 函數(shù)
                      queue='hello3',
                      )

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()  # 阻塞模式

Publish\Subscribe(消息發(fā)布\訂閱)

fanout: 所有bind到此exchange的queue都可以接收消息
direct: 通過(guò)routingKey和exchange決定的那個(gè)唯一的queue可以接收消息
topic:所有符合routingKey(此時(shí)可以是一個(gè)表達(dá)式)的routingKey所bind的queue可以接收消息

廣播模式

Prodcer

import pika,time

# 和本地服務(wù)器建立連接
credentials = pika.PlainCredentials("zou","123")
parameters = pika.ConnectionParameters(host="192.168.56.10", credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()

# 聲明隊(duì)列名
# channel.queue_declare(queue="hello3",durable=True)
channel.exchange_declare(exchange='logs',type='fanout') # 廣播模式

import sys
message = ' '.join(sys.argv[1:]) or "Hello World! %s" % time.time()
channel.basic_publish(
    exchange="logs",
    routing_key="",
    body=message,
    properties=pika.BasicProperties(
        delivery_mode=2  # 使消息持久化,rabbitmq掛了,隊(duì)列消息仍在
    )
)
print("[p] Send %s " % (message))

# 關(guān)閉連接
connection.close()

consumer

import pika,time

credentials = pika.PlainCredentials('zou', '123')

parameters = pika.ConnectionParameters(host='192.168.56.10', credentials=credentials)
connection = pika.BlockingConnection(parameters)

channel = connection.channel()  # 隊(duì)列連接通道

#不指定queue名字,rabbit會(huì)隨機(jī)分配一個(gè)名字,
# exclusive=True會(huì)在使用此queue的消費(fèi)者斷開(kāi)后,自動(dòng)將queue刪除
channel.exchange_declare(exchange='logs',type='fanout')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs',
                   queue=queue_name)

def callback(ch, method, properties, body):
    print(" [x] Received %s" % body)
    time.sleep(1)
    print(" [x] Done")
    # print("method.delivery_tag",method.delivery_tag)
    ch.basic_ack(delivery_tag=method.delivery_tag)  # 代表消費(fèi)完畢

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,  # 取到消息后,調(diào)用callback 函數(shù)
                      queue=queue_name,
                      )

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()  # 阻塞模式

組播

Producer

__author__ = 'Administrator'
import pika
import sys

credentials = pika.PlainCredentials('alex', 'alex3714')

parameters = pika.ConnectionParameters(host='192.168.11.106',credentials=credentials)
connection = pika.BlockingConnection(parameters)

channel = connection.channel() #隊(duì)列連接通道

channel.exchange_declare(exchange='direct_log',type='direct')

log_level =  sys.argv[1] if len(sys.argv) > 1 else 'info'

message = ' '.join(sys.argv[1:]) or "info: Hello World!"

channel.basic_publish(exchange='direct_log',
                      routing_key=log_level,
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

consumer

__author__ = 'Administrator'
import pika,sys
credentials = pika.PlainCredentials('alex', 'alex3714')

parameters = pika.ConnectionParameters(host='192.168.11.106',credentials=credentials)
connection = pika.BlockingConnection(parameters)

channel = connection.channel() #隊(duì)列連接通道

queue_obj = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit會(huì)隨機(jī)分配一個(gè)名字,exclusive=True會(huì)在使用此queue的消費(fèi)者斷開(kāi)后,自動(dòng)將queue刪除
queue_name = queue_obj.method.queue
print('queue name',queue_name,queue_obj)

log_levels = sys.argv[1:] # info warning errr

if not log_levels:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)


for level in log_levels:
    channel.queue_bind(exchange='direct_log',
                       queue=queue_name,
                       routing_key=level) #綁定隊(duì)列到Exchange

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,queue=queue_name, no_ack=True)

channel.start_consuming()

Topic播

producer

import pika
import sys

credentials = pika.PlainCredentials('alex', 'alex3714')

parameters = pika.ConnectionParameters(host='192.168.11.106',credentials=credentials)
connection = pika.BlockingConnection(parameters)

channel = connection.channel() #隊(duì)列連接通道

channel.exchange_declare(exchange='topic_log',type='topic')

#log_level =  sys.argv[1] if len(sys.argv) > 1 else 'info'
log_level =  sys.argv[1] if len(sys.argv) > 1 else 'all.info'

message = ' '.join(sys.argv[1:]) or "all.info: Hello World!"

channel.basic_publish(exchange='topic_log',
                      routing_key=log_level,
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

consumer

import pika,sys
credentials = pika.PlainCredentials('alex', 'alex3714')

parameters = pika.ConnectionParameters(host='192.168.11.106',credentials=credentials)
connection = pika.BlockingConnection(parameters)

channel = connection.channel() #隊(duì)列連接通道

queue_obj = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit會(huì)隨機(jī)分配一個(gè)名字,exclusive=True會(huì)在使用此queue的消費(fèi)者斷開(kāi)后,自動(dòng)將queue刪除
queue_name = queue_obj.method.queue


log_levels = sys.argv[1:] # info warning errr

if not log_levels:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)


for level in log_levels:
    channel.queue_bind(exchange='topic_log',
                       queue=queue_name,
                       routing_key=level) #綁定隊(duì)列到Exchange

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,queue=queue_name, no_ack=True)

channel.start_consuming()

Remote procedure call (RPC)

Rpc-Server

__author__ = 'Administrator'

#1 。 定義fib函數(shù)
#2. 聲明接收指令的隊(duì)列名rpc_queue
#3. 開(kāi)始監(jiān)聽(tīng)隊(duì)列,收到消息后 調(diào)用fib函數(shù)
#4 把fib執(zhí)行結(jié)果,發(fā)送回客戶端指定的reply_to 隊(duì)列
import subprocess
import pika
import time
credentials = pika.PlainCredentials('zou', '123')

parameters = pika.ConnectionParameters(host='192.168.56.10',credentials=credentials)
connection = pika.BlockingConnection(parameters)

channel = connection.channel() #隊(duì)列連接通道

channel.queue_declare(queue='rpc_queue2')

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)


def run_cmd(cmd):
    cmd_obj = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
    result = cmd_obj.stdout.read() + cmd_obj.stderr.read()

    return result


def on_request(ch, method, props, body):
    cmd = body.decode("utf-8")

    print(" [.] run (%s)" % cmd)
    response = run_cmd(cmd)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to, #隊(duì)列
                     properties=pika.BasicProperties(correlation_id = \
                                                         props.correlation_id),
                     body=response)

    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(on_request, queue='rpc_queue2')

print(" [x] Awaiting RPC requests")
channel.start_consuming()

Rpc-Client

# 1.聲明一個(gè)隊(duì)列,作為reply_to返回消息結(jié)果的隊(duì)列
# 2.  發(fā)消息到隊(duì)列,消息里帶一個(gè)唯一標(biāo)識(shí)符uid,reply_to
# 3.  監(jiān)聽(tīng)reply_to 的隊(duì)列,直到有結(jié)果
import queue

import pika
import uuid

class CMDRpcClient(object):
    def __init__(self):
        credentials = pika.PlainCredentials('zou', '123')
        parameters = pika.ConnectionParameters(host='192.168.11.106',credentials=credentials)
        self.connection = pika.BlockingConnection(parameters)
        self.channel = self.connection.channel()

        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue #命令的執(zhí)行結(jié)果的queue

        #聲明要監(jiān)聽(tīng)callback_queue
        self.channel.basic_consume(self.on_response, no_ack=True,
                                   queue=self.callback_queue)

    def on_response(self, ch, method, props, body):
        """
        收到服務(wù)器端命令結(jié)果后執(zhí)行這個(gè)函數(shù)
        :param ch:
        :param method:
        :param props:
        :param body:
        :return:
        """
        if self.corr_id == props.correlation_id:
            self.response = body.decode("gbk") #把執(zhí)行結(jié)果賦值給Response

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4()) #唯一標(biāo)識(shí)符號(hào)
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue2',
                                   properties=pika.BasicProperties(
                                         reply_to = self.callback_queue,
                                         correlation_id = self.corr_id,
                                         ),
                                   body=str(n))


        while self.response is None:
            self.connection.process_data_events()  # 檢測(cè)監(jiān)聽(tīng)的隊(duì)列里有沒(méi)有新消息,如果有,收,如果沒(méi)有,返回None
            #檢測(cè)有沒(méi)有要發(fā)送的新指令
        return self.response

cmd_rpc = CMDRpcClient()

print(" [x] Requesting fib(30)")
response = cmd_rpc.call('ipconfig')

print(response)
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,534評(píng)論 19 139
  • 來(lái)源 RabbitMQ是用Erlang實(shí)現(xiàn)的一個(gè)高并發(fā)高可靠AMQP消息隊(duì)列服務(wù)器。支持消息的持久化、事務(wù)、擁塞控...
    jiangmo閱讀 10,507評(píng)論 2 34
  • 1. 歷史 RabbitMQ是一個(gè)由erlang開(kāi)發(fā)的AMQP(Advanced Message Queue )的...
    高廣超閱讀 6,233評(píng)論 3 51
  • 本文大綱 RabbitMQ 歷史 RabbitMQ 應(yīng)用場(chǎng)景 RabbitMQ 系統(tǒng)架構(gòu) RabbitMQ 基本概...
    Java_Explorer閱讀 16,797評(píng)論 1 40
  • 消息隊(duì)列是后臺(tái)開(kāi)發(fā)常用的中間件,使用消息隊(duì)列有下列好處:1、可以使系統(tǒng)異步化,降低響應(yīng)時(shí)間;2、減少不同模塊的耦合...
    millions_chan閱讀 5,577評(píng)論 0 5

友情鏈接更多精彩內(nèi)容