消息中間件:RabbitMQ

(1)理論基礎(chǔ)

Erlang語言

最初用于交換機(jī)領(lǐng)域的架構(gòu)模式,這樣使得RabbitMQ在Broker之間進(jìn)行數(shù)據(jù)交互的性能非常優(yōu)秀(Erlang有著和原生Socket一樣的延遲)。

RabbitMQ

RabbitMQ是一個(gè)開源的消息代理和隊(duì)列服務(wù)器,用來通過普通協(xié)議在不同的應(yīng)用之間共享數(shù)據(jù)(跨平臺(tái)跨語言)。RabbitMQ是使用Erlang語言編寫,并且基于AMQP協(xié)議實(shí)現(xiàn)。

RabbitMQ是一個(gè)消息中間件:它接受并轉(zhuǎn)發(fā)消息。你可以把它當(dāng)做一個(gè)快遞站點(diǎn),當(dāng)你要發(fā)送一個(gè)包裹時(shí),你把你的包裹放到快遞站,快遞員最終會(huì)把你的快遞送到收件人那里,按照這種邏輯RabbitMQ是一個(gè)快遞站,一個(gè)快遞員幫你傳遞快件。RabbitMQ與快遞站的主要區(qū)別在于,它不處理快件而是接收,存儲(chǔ)和轉(zhuǎn)發(fā)消息數(shù)據(jù)。

(2)配置

(1)安裝otp、并配置系統(tǒng)環(huán)境變量PATH

(2)安裝 RabbitMQ、并sbin目錄下運(yùn)行

rabbitmq-plugins enable rabbitmq_management

(3)rabbitmq重新啟動(dòng)、并訪問管理頁面

http://127.0.0.1:15672

賬號(hào):guest

密碼:guest

(4)Vue訪問參數(shù)

export const PROTOCOL = 'ws'? ? //連接協(xié)議

export const IP = 'localhost'? //ip

export const PORT = '15674'? ? ? //端口

export const ADDRESS = 'ws'?

export const ACCOUNT = 'guest'? //賬號(hào)

export const PASSWORD = 'guest'? //密碼

Rabbit主要的端口說明:

4369 – erlang發(fā)現(xiàn)口

5672 --client端通信口

15672 – 管理界面ui端口

25672 – server間內(nèi)部通信口

(3)生產(chǎn)者消費(fèi)者模型

生產(chǎn)者消費(fèi)者模型:添加了一個(gè)隊(duì)列,并創(chuàng)建了兩個(gè)消費(fèi)者用于監(jiān)聽隊(duì)列消息,我們發(fā)現(xiàn),當(dāng)有消息到達(dá)時(shí),兩個(gè)消費(fèi)者會(huì)交替收到消息。這一過程雖然不用創(chuàng)建交換機(jī),但會(huì)使用默認(rèn)的交換機(jī),并用默認(rèn)的直連(default-direct)策略連接隊(duì)列

//生產(chǎn)者消費(fèi)者模式的配置,包括一個(gè)隊(duì)列和兩個(gè)對(duì)應(yīng)的消費(fèi)者

@Configuration

public class ProducerConsumerConfig {

@Bean

? ? public Queue myQueue() {

? ? ? Queue queue=new Queue("myqueue");

? ? ? return queue;

? ? }


}

@Component

public class QueueListener1 {

@RabbitListener(queues = "myqueue")

public void displayMail(Mail mail) throws Exception {

System.out.println("隊(duì)列監(jiān)聽器1號(hào)收到消息"+mail.toString());

}

}

@Component

public class QueueListener2 {

@RabbitListener(queues = "myqueue")

public void displayMail(Mail mail) throws Exception {

System.out.println("隊(duì)列監(jiān)聽器2號(hào)收到消息"+mail.toString());

}

}

(4)發(fā)布訂閱模型

發(fā)布訂閱模型,添加兩個(gè)隊(duì)列,分別各用一個(gè)消費(fèi)者監(jiān)聽,設(shè)置一個(gè)交換機(jī),類型為廣播(fanout),交換機(jī)會(huì)將收到的消息廣播給所有相連的隊(duì)列

//發(fā)布訂閱模式的配置,包括兩個(gè)隊(duì)列和對(duì)應(yīng)的訂閱者,發(fā)布者的交換機(jī)類型使用fanout(子網(wǎng)廣播),兩根網(wǎng)線binding用來綁定隊(duì)列到交換機(jī)

@Configuration

public class PublishSubscribeConfig {

@Bean

? ? public Queue myQueue1() {

? ? ? Queue queue=new Queue("queue1");

? ? ? return queue;

? ? }

@Bean

? ? public Queue myQueue2() {

? ? ? Queue queue=new Queue("queue2");

? ? ? return queue;

? ? }

@Bean

public FanoutExchange fanoutExchange(){

FanoutExchange fanoutExchange=new FanoutExchange("fanout");

return fanoutExchange;

}

@Bean

public Binding binding1(){

Binding binding=BindingBuilder.bind(myQueue1()).to(fanoutExchange());

return binding;

}

@Bean

public Binding binding2(){

Binding binding=BindingBuilder.bind(myQueue2()).to(fanoutExchange());

return binding;

}

}

@Component

public class SubscribeListener1 {

@RabbitListener(queues = "queue1")

public void subscribe(Mail mail) throws IOException {

System.out.println("訂閱者1收到消息"+mail.toString());

}

}

@Component

public class SubscribeListener2 {

@RabbitListener(queues = "queue2")

public void subscribe(Mail mail) throws IOException {

System.out.println("訂閱者2收到消息"+mail.toString());

}

}

(5)direct直連交換機(jī)通信模型

direct直連交換機(jī)通信模型,包括一個(gè)direct交換機(jī),三個(gè)binding,兩個(gè)隊(duì)列,兩個(gè)消費(fèi)者監(jiān)聽器,消息只會(huì)被投入到routingkey一致的隊(duì)列中

//direct直連模式的交換機(jī)配置,包括一個(gè)direct交換機(jī),兩個(gè)隊(duì)列,三根網(wǎng)線binding

@Configuration

public class DirectExchangeConfig {

@Bean

public DirectExchange directExchange(){

DirectExchange directExchange=new DirectExchange("direct");

return directExchange;

}

@Bean

? ? public Queue directQueue1() {

? ? ? Queue queue=new Queue("directqueue1");

? ? ? return queue;

? ? }

@Bean

? ? public Queue directQueue2() {

? ? ? Queue queue=new Queue("directqueue2");

? ? ? return queue;

? ? }

//3個(gè)binding將交換機(jī)和相應(yīng)隊(duì)列連起來

@Bean

public Binding bindingorange(){

Binding binding=BindingBuilder.bind(directQueue1()).to(directExchange()).with("orange");

return binding;

}

@Bean

public Binding bindingblack(){

Binding binding=BindingBuilder.bind(directQueue2()).to(directExchange()).with("black");

return binding;

}

@Bean

public Binding bindinggreen(){

Binding binding=BindingBuilder.bind(directQueue2()).to(directExchange()).with("green");

return binding;

}

}

@Component

public class DirectListener1 {

@RabbitListener(queues = "directqueue1")

public void displayMail(Mail mail) throws Exception {

System.out.println("directqueue1隊(duì)列監(jiān)聽器1號(hào)收到消息"+mail.toString());

}

}

@Component

public class DirectListener2 {

@RabbitListener(queues = "directqueue2")

public void displayMail(Mail mail) throws Exception {

System.out.println("directqueue2隊(duì)列監(jiān)聽器2號(hào)收到消息"+mail.toString());

}

}

(6)topic主題交換機(jī)通信

topic主題交換機(jī)通信,包括一個(gè)topic交換機(jī),三個(gè)binding,兩個(gè)隊(duì)列,兩個(gè)消費(fèi)者監(jiān)聽器,消息只會(huì)被投入到routingkey能夠匹配的隊(duì)列中,#表示0個(gè)或若干個(gè)關(guān)鍵字,*表示一個(gè)關(guān)鍵字

//topic交換機(jī)模型,需要一個(gè)topic交換機(jī),兩個(gè)隊(duì)列和三個(gè)binding

@Configuration

public class TopicExchangeConfig {

@Bean

public TopicExchange topicExchange(){

TopicExchange topicExchange=new TopicExchange("mytopic");

return topicExchange;

}

@Bean

? ? public Queue topicQueue1() {

? ? ? Queue queue=new Queue("topicqueue1");

? ? ? return queue;

? ? }

@Bean

? ? public Queue topicQueue2() {

? ? ? Queue queue=new Queue("topicqueue2");

? ? ? return queue;

? ? }

//3個(gè)binding將交換機(jī)和相應(yīng)隊(duì)列連起來

@Bean

public Binding bindingtopic1(){

Binding binding=BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("*.orange.*");//binding key

return binding;

}

@Bean

public Binding bindingtopic2(){

Binding binding=BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("*.*.rabbit");

return binding;

}

@Bean

public Binding bindingtopic3(){

Binding binding=BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("lazy.#");//#表示0個(gè)或若干個(gè)關(guān)鍵字,*表示一個(gè)關(guān)鍵字

return binding;

}

}

@Component

public class TopicListener1 {

@RabbitListener(queues = "topicqueue1")

public void displayTopic(Mail mail) throws IOException {

System.out.println("從topicqueue1取出消息"+mail.toString());

}

}

@Component

public class TopicListener2 {

@RabbitListener(queues = "topicqueue2")

public void displayTopic(Mail mail) throws IOException {

System.out.println("從topicqueue2取出消息"+mail.toString());

}

}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容