1.RabbitMQ概述
簡介:
- MQ全稱為Message Queue,消息隊列是應(yīng)用程序和應(yīng)用程序之間的通信方法;
- RabbitMQ是開源的,實現(xiàn)了AMQP協(xié)議的,采用Erlang(面向并發(fā)編程語言)編寫的,可復(fù)用的企業(yè)級消息系統(tǒng);
- AMQP(高級消息隊列協(xié)議)是一個異步消息傳遞所使用應(yīng)用層協(xié)議規(guī)范,為面向消息中間件設(shè)計,基于此協(xié)議的客戶端與消息中間件可以無視消息來源傳遞消息,不受客戶端、消息中間件、不同的開發(fā)語言環(huán)境等條件的限制;
- 支持主流操作系統(tǒng):Linux、Windows,MacOX等;
- 支持多種客戶端開發(fā)語言:Java、Python、Ruby、.NET,PHP、C/C++、Node.js等
術(shù)語說明:
- Server(Broker):接收客戶端連接,實現(xiàn)AMQP協(xié)議的消息隊列和路由功能的進(jìn)程;
- Virtual Host:虛擬主機的概念,類似權(quán)限控制組,一個Virtual Host里可以有多個Exchange和Queue,權(quán)限控制的最小麗都是Virtual Host;
- Exchange:交換機,接收生產(chǎn)者發(fā)送的消息,并根據(jù)Routing Key將消息路由到服務(wù)器中的隊列Queue。
- ExchangeType:交換機類型決定了路由消息行為,RabbitMQ中有三種類型Exchange,分別是fanout、direct、topic;
- Message Queue:消息隊列,用于存儲還未被消費者消費的消息;
- Message:由Header和body組成,Header是由生產(chǎn)者添加的各種屬性的集合,包括Message是否被持久化、優(yōu)先級是多少、由哪個Message Queue接收等;body是真正需要發(fā)送的數(shù)據(jù)內(nèi)容;
BindingKey:綁定關(guān)鍵字,將一個特定的Exchange和一個特定的Queue綁定起來。
2.RabbitMQ安裝啟動與管理
2.1 Windows64位環(huán)境下安裝RabbitMQ
到RabbitMQ官網(wǎng)下載win64位最新版erlang和rabbitmq-server的安裝包,分別是 erlang otp_win64_19.3和rabbitmq-server-3.6.9。注意安裝時計算機全名最好是英文,先安裝erlang,再安裝rabbitmq-server,根據(jù)安裝向?qū)?,采用默認(rèn)安裝配置即可。安裝完成后,可以從開始-所有程序中找到RabbitMQ Server如下圖所示:
點RabbitMQ Command Prompt啟動命令行,輸入rabbitmq-plugins enable rabbitmq_management
啟動管理工具,在瀏覽器中輸入http://127.0.0.1:15672/即可打開管理登錄界面,默認(rèn)超級管理員用戶名guest,密碼guest
2.2 Linux環(huán)境下安裝RabbitMQ
先安裝Erlang
wget http://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
rpm -Uvh erlang-solutions-1.0-1.noarch.rpm
rpm --import http://packages.erlang-solutions.com/rpm/erlang_solutions.asc
sudo yum install erlang
再安裝RabbitMQ
rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc
yum install rabbitmq-server-3.6.9-1.noarch.rpm
操作命令:
啟動 service rabbitmq-server start
停止 service rabbitmq-server stop
重啟 service rabbitmq-server restart
設(shè)置開機啟動 chkconfig rabbitmq-server on
開啟web界面管理工具
rabbitmq-plugins enable rabbitmq_management
service rabbitmq-server restart
防火墻開放15672端口訪問
/sbin/iptables -I INPUT -p tcp --dport 15672 -j ACCEPT
/etc/rc.d/init.d/iptables save
2.3RabbitMQ管理界面添加用戶和Virtual host
Admin-Users-Add a user
Tags:用戶角色說明
** 超級管理員(administrator)**
可登陸管理控制臺,可查看所有的信息,并且可以對用戶,策略(policy)進(jìn)行操作。
監(jiān)控者(monitoring)
可登陸管理控制臺,同時可以查看rabbitmq節(jié)點的相關(guān)信息(進(jìn)程數(shù),內(nèi)存使用情況,磁盤使用情況等)
策略制定者(policymaker)
可登陸管理控制臺, 同時可以對policy進(jìn)行管理,但無法查看節(jié)點的相關(guān)信息。
普通管理者(management)
僅可登陸管理控制臺,無法看到節(jié)點信息,也無法對策略進(jìn)行管理。
其他none
無法登陸管理控制臺,通常就是普通的生產(chǎn)者和消費者。
Admin-Virtual Host-Add virtual host
添加virtual host和用戶后,需要為用戶指定virtual host,之后用該用戶可以登錄
3.RabbitMQ的五種隊列模式與實例
3.1 簡單模式Hello World
功能:一個生產(chǎn)者P發(fā)送消息到隊列Q,一個消費者C接收
生產(chǎn)者實現(xiàn)思路:
創(chuàng)建連接工廠ConnectionFactory,設(shè)置服務(wù)地址127.0.0.1,端口號5672,設(shè)置用戶名、密碼、virtual host,從連接工廠中獲取連接connection,使用連接創(chuàng)建通道channel,使用通道channel創(chuàng)建隊列queue,使用通道channel向隊列中發(fā)送消息,關(guān)閉通道和連接。
消費者實現(xiàn)思路
創(chuàng)建連接工廠ConnectionFactory,設(shè)置服務(wù)地址127.0.0.1,端口號5672,設(shè)置用戶名、密碼、virtual host,從連接工廠中獲取連接connection,使用連接創(chuàng)建通道channel,使用通道channel創(chuàng)建隊列queue, 創(chuàng)建消費者并監(jiān)聽隊列,從隊列中讀取消息。
3.2 工作隊列模式Work Queue
功能:一個生產(chǎn)者,多個消費者,每個消費者獲取到的消息唯一,多個消費者只有一個隊列
任務(wù)隊列:避免立即做一個資源密集型任務(wù),必須等待它完成,而是把這個任務(wù)安排到稍后再做。我們將任務(wù)封裝為消息并將其發(fā)送給隊列。后臺運行的工作進(jìn)程將彈出任務(wù)并最終執(zhí)行作業(yè)。當(dāng)有多個worker同時運行時,任務(wù)將在它們之間共享。
生產(chǎn)者實現(xiàn)思路:
創(chuàng)建連接工廠ConnectionFactory,設(shè)置服務(wù)地址127.0.0.1,端口號5672,設(shè)置用戶名、密碼、virtual host,從連接工廠中獲取連接connection,使用連接創(chuàng)建通道channel,使用通道channel創(chuàng)建隊列queue,使用通道channel向隊列中發(fā)送消息,2條消息之間間隔一定時間,關(guān)閉通道和連接。
消費者實現(xiàn)思路:
創(chuàng)建連接工廠ConnectionFactory,設(shè)置服務(wù)地址127.0.0.1,端口號5672,設(shè)置用戶名、密碼、virtual host,從連接工廠中獲取連接connection,使用連接創(chuàng)建通道channel,使用通道channel創(chuàng)建隊列queue,創(chuàng)建消費者C1并監(jiān)聽隊列,獲取消息并暫停10ms,另外一個消費者C2暫停1000ms,由于消費者C1消費速度快,所以C1可以執(zhí)行更多的任務(wù)。
3.3發(fā)布/訂閱模式 Publish/Subscribe
功能:一個生產(chǎn)者發(fā)送的消息會被多個消費者獲取。一個生產(chǎn)者、一個交換機、多個隊列、多個消費者
生產(chǎn)者:可以將消息發(fā)送到隊列或者是交換機。
消費者:只能從隊列中獲取消息。
如果消息發(fā)送到?jīng)]有隊列綁定的交換機上,那么消息將丟失。
交換機不能存儲消息,消息存儲在隊列中
生產(chǎn)者實現(xiàn)思路:
創(chuàng)建連接工廠ConnectionFactory,設(shè)置服務(wù)地址127.0.0.1,端口號5672,設(shè)置用戶名、密碼、virtual host,從連接工廠中獲取連接connection,使用連接創(chuàng)建通道channel,使用通道channel創(chuàng)建隊列queue,使用通道channel創(chuàng)建交換機并指定交換機類型為fanout,使用通道向交換機發(fā)送消息,關(guān)閉通道和連接。
消費者實現(xiàn)思路:
創(chuàng)建連接工廠ConnectionFactory,設(shè)置服務(wù)地址127.0.0.1,端口號5672,設(shè)置用戶名、密碼、virtual host,從連接工廠中獲取連接connection,使用連接創(chuàng)建通道channel,使用通道channel創(chuàng)建隊列queue,綁定隊列到交換機,設(shè)置Qos=1,創(chuàng)建消費者并監(jiān)聽隊列,使用手動方式返回完成。可以有多個隊列綁定到交換機,多個消費者進(jìn)行監(jiān)聽。
3.4路由模式Routing
說明:生產(chǎn)者發(fā)送消息到交換機并且要指定路由key,消費者將隊列綁定到交換機時需要指定路由key
生產(chǎn)者實現(xiàn)思路:
創(chuàng)建連接工廠ConnectionFactory,設(shè)置服務(wù)地址127.0.0.1,端口號5672,設(shè)置用戶名、密碼、virtual host,從連接工廠中獲取連接connection,使用連接創(chuàng)建通道channel,使用通道channel創(chuàng)建隊列queue,使用通道channel創(chuàng)建交換機并指定交換機類型為direct,使用通道向交換機發(fā)送消息并指定key=b,關(guān)閉通道和連接。
消費者實現(xiàn)思路:
創(chuàng)建連接工廠ConnectionFactory,設(shè)置服務(wù)地址127.0.0.1,端口號5672,設(shè)置用戶名、密碼、virtual host,從連接工廠中獲取連接connection,使用連接創(chuàng)建通道channel,使用通道channel創(chuàng)建隊列queue,綁定隊列到交換機,設(shè)置Qos=1,創(chuàng)建消費者并監(jiān)聽隊列,使用手動方式返回完成??梢杂卸鄠€隊列綁定到交換機,但只要綁定key=b的隊列key接收到消息,多個消費者進(jìn)行監(jiān)聽。
3.5通配符模式Topic
說明:生產(chǎn)者P發(fā)送消息到交換機X,type=topic,交換機根據(jù)綁定隊列的routing key的值進(jìn)行通配符匹配;
符號#:匹配一個或者多個詞 lazy.# 可以匹配 lazy.irs或者lazy.irs.cor
符號*:只能匹配一個詞 lazy.* 可以匹配 lazy.irs或者lazy.cor
生產(chǎn)者實現(xiàn)思路:
創(chuàng)建連接工廠ConnectionFactory,設(shè)置服務(wù)地址127.0.0.1,端口號5672,設(shè)置用戶名、密碼、virtual host,從連接工廠中獲取連接connection,使用連接創(chuàng)建通道channel,使用通道channel創(chuàng)建隊列queue,使用通道channel創(chuàng)建交換機并指定交換機類型為topic,使用通道向交換機發(fā)送消息并指定key=key.1,關(guān)閉通道和連接。
消費者實現(xiàn)思路:
創(chuàng)建連接工廠ConnectionFactory,設(shè)置服務(wù)地址127.0.0.1,端口號5672,設(shè)置用戶名、密碼、virtual host,從連接工廠中獲取連接connection,使用連接創(chuàng)建通道channel,使用通道channel創(chuàng)建隊列queue,綁定隊列到交換機,設(shè)置Qos=1,創(chuàng)建消費者并監(jiān)聽隊列,使用手動方式返回完成。可以有多個隊列綁定到交換機,凡是綁定規(guī)則符合通配符規(guī)則的隊列均可以接收到消息,比如key.*,key.#,多個消費者進(jìn)行監(jiān)聽。
4.Spring集成RabbitMQ配置
Spring提供了AMQP的一個實現(xiàn),并且spring-rabbit是RabbitMQ的一個實現(xiàn),下面給出訂閱者模式的事例配置如下:
5.總結(jié)
RabbitMQ提供6種模式,分別是Hello,Work Queue,Publish/Subscribe,Routing,Topics,RPC Request/reply,本文詳細(xì)講述了前5種,并給出代碼實現(xiàn)和思路。其中Publish/Subscribe,Routing,Topics三種模式可以統(tǒng)一歸為Exchange模式,只是創(chuàng)建時交換機的類型不一樣,分別是fanout、direct、topic。Spring提供了rabbitmq的一個實現(xiàn),所以集成起來很方便,本文第4章給出了訂閱者模式的一種spring配置。
本訂閱號提供Java相關(guān)技術(shù)分享,從Java編程基礎(chǔ)到Java高級技術(shù),從JavaWeb技術(shù)基礎(chǔ)Jsp、Servlet、>JDBC到SSH、SSM開發(fā)框架,從REST風(fēng)格接口設(shè)計到分布式項目實戰(zhàn)。剖析主流開源技術(shù)框架,用親身
實踐來譜寫深度Java技術(shù)日志。
歡迎關(guān)注 Java技術(shù)日志 微信訂閱號