
(1)理論基礎(chǔ)
Erlang語言
最初用于交換機(jī)領(lǐng)域的架構(gòu)模式,這樣使得RabbitMQ在Broker之間進(jìn)行數(shù)據(jù)交互的性能非常優(yōu)秀(Erlang有著和原生Socket一樣的延遲)。
RabbitMQ
RabbitMQ是一個(gè)開源的消息代理和隊(duì)列服務(wù)器,用來通過普通協(xié)議在不同的應(yīng)用之間共享數(shù)據(jù)(跨平臺(tái)跨語言)。RabbitMQ是使用Erlang語言編寫,并且基于AMQP協(xié)議實(shí)現(xiàn)。
RabbitMQ是一個(gè)消息中間件:它接受并轉(zhuǎn)發(fā)消息。你可以把它當(dāng)做一個(gè)快遞站點(diǎn),當(dāng)你要發(fā)送一個(gè)包裹時(shí),你把你的包裹放到快遞站,快遞員最終會(huì)把你的快遞送到收件人那里,按照這種邏輯RabbitMQ是一個(gè)快遞站,一個(gè)快遞員幫你傳遞快件。RabbitMQ與快遞站的主要區(qū)別在于,它不處理快件而是接收,存儲(chǔ)和轉(zhuǎn)發(fā)消息數(shù)據(jù)。
(2)配置
(1)安裝otp、并配置系統(tǒng)環(huán)境變量PATH
(2)安裝 RabbitMQ、并sbin目錄下運(yùn)行
rabbitmq-plugins enable rabbitmq_management
(3)rabbitmq重新啟動(dòng)、并訪問管理頁面
http://127.0.0.1:15672
賬號(hào):guest
密碼:guest
(4)Vue訪問參數(shù)
export const PROTOCOL = 'ws'? ? //連接協(xié)議
export const IP = 'localhost'? //ip
export const PORT = '15674'? ? ? //端口
export const ADDRESS = 'ws'?
export const ACCOUNT = 'guest'? //賬號(hào)
export const PASSWORD = 'guest'? //密碼
Rabbit主要的端口說明:
4369 – erlang發(fā)現(xiàn)口
5672 --client端通信口
15672 – 管理界面ui端口
25672 – server間內(nèi)部通信口
(3)生產(chǎn)者消費(fèi)者模型
生產(chǎn)者消費(fèi)者模型:添加了一個(gè)隊(duì)列,并創(chuàng)建了兩個(gè)消費(fèi)者用于監(jiān)聽隊(duì)列消息,我們發(fā)現(xiàn),當(dāng)有消息到達(dá)時(shí),兩個(gè)消費(fèi)者會(huì)交替收到消息。這一過程雖然不用創(chuàng)建交換機(jī),但會(huì)使用默認(rèn)的交換機(jī),并用默認(rèn)的直連(default-direct)策略連接隊(duì)列
//生產(chǎn)者消費(fèi)者模式的配置,包括一個(gè)隊(duì)列和兩個(gè)對(duì)應(yīng)的消費(fèi)者
@Configuration
public class ProducerConsumerConfig {
@Bean
? ? public Queue myQueue() {
? ? ? Queue queue=new Queue("myqueue");
? ? ? return queue;
? ? }
}
@Component
public class QueueListener1 {
@RabbitListener(queues = "myqueue")
public void displayMail(Mail mail) throws Exception {
System.out.println("隊(duì)列監(jiān)聽器1號(hào)收到消息"+mail.toString());
}
}
@Component
public class QueueListener2 {
@RabbitListener(queues = "myqueue")
public void displayMail(Mail mail) throws Exception {
System.out.println("隊(duì)列監(jiān)聽器2號(hào)收到消息"+mail.toString());
}
}
(4)發(fā)布訂閱模型
發(fā)布訂閱模型,添加兩個(gè)隊(duì)列,分別各用一個(gè)消費(fèi)者監(jiān)聽,設(shè)置一個(gè)交換機(jī),類型為廣播(fanout),交換機(jī)會(huì)將收到的消息廣播給所有相連的隊(duì)列
//發(fā)布訂閱模式的配置,包括兩個(gè)隊(duì)列和對(duì)應(yīng)的訂閱者,發(fā)布者的交換機(jī)類型使用fanout(子網(wǎng)廣播),兩根網(wǎng)線binding用來綁定隊(duì)列到交換機(jī)
@Configuration
public class PublishSubscribeConfig {
@Bean
? ? public Queue myQueue1() {
? ? ? Queue queue=new Queue("queue1");
? ? ? return queue;
? ? }
@Bean
? ? public Queue myQueue2() {
? ? ? Queue queue=new Queue("queue2");
? ? ? return queue;
? ? }
@Bean
public FanoutExchange fanoutExchange(){
FanoutExchange fanoutExchange=new FanoutExchange("fanout");
return fanoutExchange;
}
@Bean
public Binding binding1(){
Binding binding=BindingBuilder.bind(myQueue1()).to(fanoutExchange());
return binding;
}
@Bean
public Binding binding2(){
Binding binding=BindingBuilder.bind(myQueue2()).to(fanoutExchange());
return binding;
}
}
@Component
public class SubscribeListener1 {
@RabbitListener(queues = "queue1")
public void subscribe(Mail mail) throws IOException {
System.out.println("訂閱者1收到消息"+mail.toString());
}
}
@Component
public class SubscribeListener2 {
@RabbitListener(queues = "queue2")
public void subscribe(Mail mail) throws IOException {
System.out.println("訂閱者2收到消息"+mail.toString());
}
}
(5)direct直連交換機(jī)通信模型
direct直連交換機(jī)通信模型,包括一個(gè)direct交換機(jī),三個(gè)binding,兩個(gè)隊(duì)列,兩個(gè)消費(fèi)者監(jiān)聽器,消息只會(huì)被投入到routingkey一致的隊(duì)列中
//direct直連模式的交換機(jī)配置,包括一個(gè)direct交換機(jī),兩個(gè)隊(duì)列,三根網(wǎng)線binding
@Configuration
public class DirectExchangeConfig {
@Bean
public DirectExchange directExchange(){
DirectExchange directExchange=new DirectExchange("direct");
return directExchange;
}
@Bean
? ? public Queue directQueue1() {
? ? ? Queue queue=new Queue("directqueue1");
? ? ? return queue;
? ? }
@Bean
? ? public Queue directQueue2() {
? ? ? Queue queue=new Queue("directqueue2");
? ? ? return queue;
? ? }
//3個(gè)binding將交換機(jī)和相應(yīng)隊(duì)列連起來
@Bean
public Binding bindingorange(){
Binding binding=BindingBuilder.bind(directQueue1()).to(directExchange()).with("orange");
return binding;
}
@Bean
public Binding bindingblack(){
Binding binding=BindingBuilder.bind(directQueue2()).to(directExchange()).with("black");
return binding;
}
@Bean
public Binding bindinggreen(){
Binding binding=BindingBuilder.bind(directQueue2()).to(directExchange()).with("green");
return binding;
}
}
@Component
public class DirectListener1 {
@RabbitListener(queues = "directqueue1")
public void displayMail(Mail mail) throws Exception {
System.out.println("directqueue1隊(duì)列監(jiān)聽器1號(hào)收到消息"+mail.toString());
}
}
@Component
public class DirectListener2 {
@RabbitListener(queues = "directqueue2")
public void displayMail(Mail mail) throws Exception {
System.out.println("directqueue2隊(duì)列監(jiān)聽器2號(hào)收到消息"+mail.toString());
}
}
(6)topic主題交換機(jī)通信
topic主題交換機(jī)通信,包括一個(gè)topic交換機(jī),三個(gè)binding,兩個(gè)隊(duì)列,兩個(gè)消費(fèi)者監(jiān)聽器,消息只會(huì)被投入到routingkey能夠匹配的隊(duì)列中,#表示0個(gè)或若干個(gè)關(guān)鍵字,*表示一個(gè)關(guān)鍵字
//topic交換機(jī)模型,需要一個(gè)topic交換機(jī),兩個(gè)隊(duì)列和三個(gè)binding
@Configuration
public class TopicExchangeConfig {
@Bean
public TopicExchange topicExchange(){
TopicExchange topicExchange=new TopicExchange("mytopic");
return topicExchange;
}
@Bean
? ? public Queue topicQueue1() {
? ? ? Queue queue=new Queue("topicqueue1");
? ? ? return queue;
? ? }
@Bean
? ? public Queue topicQueue2() {
? ? ? Queue queue=new Queue("topicqueue2");
? ? ? return queue;
? ? }
//3個(gè)binding將交換機(jī)和相應(yīng)隊(duì)列連起來
@Bean
public Binding bindingtopic1(){
Binding binding=BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("*.orange.*");//binding key
return binding;
}
@Bean
public Binding bindingtopic2(){
Binding binding=BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("*.*.rabbit");
return binding;
}
@Bean
public Binding bindingtopic3(){
Binding binding=BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("lazy.#");//#表示0個(gè)或若干個(gè)關(guān)鍵字,*表示一個(gè)關(guān)鍵字
return binding;
}
}
@Component
public class TopicListener1 {
@RabbitListener(queues = "topicqueue1")
public void displayTopic(Mail mail) throws IOException {
System.out.println("從topicqueue1取出消息"+mail.toString());
}
}
@Component
public class TopicListener2 {
@RabbitListener(queues = "topicqueue2")
public void displayTopic(Mail mail) throws IOException {
System.out.println("從topicqueue2取出消息"+mail.toString());
}
}