1、消息隊(duì)列簡(jiǎn)介

一、系統(tǒng)之間的通信技術(shù)

  • 分布式系統(tǒng)之間并不獨(dú)立存在的,各個(gè)系統(tǒng)之間往往需要共同完成某一個(gè)功能,這樣就涉及到系統(tǒng)之間的通信,業(yè)界通常有兩種通信方式,一種是遠(yuǎn)程過程調(diào)用(RPC),一種就是消息隊(duì)列方式(MQ)
  • 消息隊(duì)列通信:指有應(yīng)用中的某一個(gè)系統(tǒng)發(fā)送信息,由關(guān)心這條信息的系統(tǒng)負(fù)責(zé)接收,并在接收到消息后進(jìn)行各個(gè)系統(tǒng)之間的業(yè)務(wù)邏輯開發(fā)。其中這里的消息可以是簡(jiǎn)單的字符串,也可以是復(fù)雜的流或者是對(duì)象。
  • 消息隊(duì)列的實(shí)現(xiàn)方式:消息在被發(fā)送后就立即返回,由消息隊(duì)列來負(fù)責(zé)消息的傳遞,消息發(fā)布者只是管將消息發(fā)布到消息隊(duì)列而不用管消息的接受,消息使用者只是管從消息隊(duì)列中取出消息而不用管誰來發(fā)布的。

二、消息隊(duì)列由來(使用場(chǎng)合)

  • 使用消息隊(duì)列的典型場(chǎng)景是異步,同時(shí)也可以解決解耦、削峰、日志收集、事物最終一致性等問題。概括起來消息隊(duì)列最大功能就是八個(gè)字:異步解耦,削峰填谷
1、解耦

緊耦合的缺陷:一個(gè)模塊的改動(dòng)將會(huì)導(dǎo)致其他關(guān)聯(lián)模塊發(fā)生變化,各個(gè)模塊難以完美獨(dú)立演化。
解耦:所謂解耦就是一個(gè)模塊只關(guān)心自己的事情就可以,而依賴該模塊的其他模塊如果做得不是很重要的事,有通知即可,無需等待結(jié)果。
解耦實(shí)現(xiàn):要在各個(gè)系統(tǒng)之間實(shí)現(xiàn)解耦,只要加一個(gè)中間件就行。通??梢约右砸粋€(gè)消息中間件來完成系統(tǒng)之間的解耦,基于消息中間件實(shí)現(xiàn)的解耦,只關(guān)心通知,而非結(jié)果。

2、流量削峰

在流量來臨的時(shí)候,通常通過增加機(jī)器來面對(duì),提高系統(tǒng)的可用性。但是有的流量是突然來的(比如說一天的流量是波動(dòng)變化的,存在波峰情況),如果機(jī)器一直加著,高峰時(shí)候可行,但是平時(shí)確實(shí)浪費(fèi)機(jī)器。

  • 面對(duì)這種情況,可以考慮使用消息隊(duì)列先將請(qǐng)求持久化,然后逐步的進(jìn)行處理,從而削平高峰流量,改善系統(tǒng)性能。
3、日志收集

不多少,經(jīng)典的kafka

4、事物最終一致性

三、消息隊(duì)列功能特點(diǎn)

消息:指應(yīng)用之間 傳遞的數(shù)據(jù),消息的表現(xiàn)形式多樣,可以是簡(jiǎn)單的字符串等也可以是復(fù)雜的對(duì)象等。
隊(duì)列:抽象的指消息的進(jìn)和出,但是消息的進(jìn)出并不一定是同步進(jìn)行的,因此需要一個(gè)容器來暫存和處理消息。

Broker:消息處理中心,負(fù)責(zé)消息的接受、存儲(chǔ)、路由、轉(zhuǎn)發(fā)等
Producer:消息生產(chǎn)者,負(fù)責(zé)產(chǎn)生和發(fā)送消息到消息處理中心
Consumer:消息消費(fèi)者,負(fù)責(zé)從消息處理中心獲取消息,并進(jìn)行處理。

但真實(shí)的生產(chǎn)環(huán)境,消息隊(duì)列往往不止是要有基本的消息發(fā)送、接受、存儲(chǔ)。往往還需要解決諸如消息堆積、消息持久化、消息可靠投遞、消息重復(fù)、嚴(yán)格有序性、集群高可用等等問題

1、消息堆積

消息堆積:某一個(gè)時(shí)間段內(nèi),消費(fèi)者處理速度沒有跟上生產(chǎn)者發(fā)送消息的速度,消息在消息處理中心積壓的過程。

因此需要消息隊(duì)列能夠處理這樣的場(chǎng)景,比如設(shè)置一個(gè)閾值,一旦消息堆積超過這個(gè)閾值就觸發(fā)消息不在存儲(chǔ)進(jìn)入消息中心直接丟棄。

2、消息持久化

消息持久化是消息中心必備的一個(gè)功能。通常作用是將消息進(jìn)行暫存下來,合適的時(shí)機(jī)在進(jìn)行消費(fèi)。持久化的方案很多,可以存儲(chǔ)到內(nèi)存,可以存儲(chǔ)到磁盤,通常兩者都要。

3、可靠投遞

可靠投遞:不允許存在消息丟失的場(chǎng)景發(fā)生
消息丟失可發(fā)生在:

1、從生產(chǎn)者到消息中心過程
2.、從消息中心到消費(fèi)者
3、消息中心處理持久化消息過程

4、消息重復(fù)

上面講了消息的可靠投遞,為了滿足消息的可靠投遞會(huì)把消息持久化,然后定時(shí)輪詢持久化中的消息,有可能會(huì)造成消息的重復(fù)消費(fèi),有時(shí)候消息的重復(fù)消費(fèi)比消息丟失更加影響更大。

5、嚴(yán)格有序

有時(shí)候要求消息是按順序消費(fèi)的,比如購買商品過程中先下訂單,在支付。

6、高可用

作為中間件,高可用方案是必須的

四、簡(jiǎn)單的消息中間系統(tǒng)實(shí)現(xiàn)

消息隊(duì)列至少包含的的三個(gè)角色:
Broker:消息處理中心,負(fù)責(zé)消息的接受、存儲(chǔ)、路由、轉(zhuǎn)發(fā)等
Producer:消息生產(chǎn)者,負(fù)責(zé)產(chǎn)生和發(fā)送消息到消息處理中心
Consumer:消息消費(fèi)者,負(fù)責(zé)從消息處理中心獲取消息,并進(jìn)行處理。

其中,消息處理中心是核心,必須具備消息發(fā)送、消息接收和消息暫存的功能。下面就用socket結(jié)合多線程知識(shí)實(shí)現(xiàn)一個(gè)簡(jiǎn)單的消息隊(duì)列應(yīng)用。

1、消息處理中心
  • 消息處理消息主要實(shí)現(xiàn)的功能有存儲(chǔ)消息、消息接收、消息發(fā)送。并要對(duì)外暴露相應(yīng)的端口和ip,用戶請(qǐng)求。
public class Broker {
    //存儲(chǔ)的最大消息條數(shù)
    private final static int max_size = 10;
    //保存消息的容器
    private static BlockingQueue<String> messageQueue = new ArrayBlockingQueue(max_size);

    public static void produce(String message) {
        if (messageQueue.offer(message)) {
            System.out.println("成功向消息中心投遞消息:" + message + ",當(dāng)前的消息總數(shù)為:" + messageQueue.size());
        } else {
            System.out.println("當(dāng)前消息容易已經(jīng)滿了,不能在進(jìn)行消息投遞!");
        }
        System.out.println("===============================");
    }

    public static String consume() {
        String message = messageQueue.poll();
        if (message != null) {
            System.out.println("已經(jīng)消費(fèi)消息:" + message + ",當(dāng)前消息總數(shù):" + messageQueue.size());
        } else {
            System.out.println("當(dāng)前消息容器中沒有消息可以消費(fèi)!");
        }
        System.out.println("===================================");
        return message;
    }
}

public class BrokerServer implements Runnable {

    public static int server_port = 9999;
    private final Socket socket;

    public BrokerServer(Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {
        try {
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            PrintWriter printWriter = new PrintWriter(socket.getOutputStream());
            while (true) {
                String str = bufferedReader.readLine();
                if (str == null) {
                    continue;
                }
                System.out.println("接收到原始信息:" + str);
                if (str.equalsIgnoreCase("consume")) {
                    //消費(fèi)一條消息
                    String message = Broker.consume();
                    printWriter.println(message);
                    printWriter.flush();
                } else {
                    //生產(chǎn)消息
                    Broker.produce(str);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
  //消息處理中心的啟動(dòng)
    public static void main(String[] args) throws IOException {
        ServerSocket serverSocket = new ServerSocket(server_port);
        while (true) {
            BrokerServer brokerServer = new BrokerServer(serverSocket.accept());
            new Thread(brokerServer).start();
        }
    }
}
2、生產(chǎn)者和消費(fèi)者
//封裝了生產(chǎn)和消費(fèi)的方法客戶端
public class MqClient {
    public static void produce(String msg) throws IOException {
        Socket socket=new Socket(InetAddress.getLocalHost(),BrokerServer.server_port);
        PrintWriter out =new PrintWriter(socket.getOutputStream());
        out.println(msg);
        out.flush();
    }
    public static String consume() throws IOException {
        Socket socket=new Socket(InetAddress.getLocalHost(),BrokerServer.server_port);
        BufferedReader bufferedReader=new BufferedReader(new 
InputStreamReader(socket.getInputStream()));
        PrintWriter printWriter=new PrintWriter(socket.getOutputStream());
        printWriter.println("consume");
        printWriter.flush();

        String message=bufferedReader.readLine();
        return message;
    }

}
//生產(chǎn)者
public class Produce1 {
    public static void main(String[] args) throws IOException, InterruptedException {
        int i=0;
        while (true){
            MqClient.produce("hello wolrd  "+i);
            i++;
            Thread.sleep(3000);
        }
    }
}
//消費(fèi)者
public class Consume1 {
    public static void main(String[] args) throws IOException, InterruptedException {
        while (true){
            String consume = MqClient.consume();
            if (consume!=null){
                System.out.println("consum1-->獲取消費(fèi)信息為:"+consume);
            }
            Thread.sleep(100000);
        }
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 消息隊(duì)列 什么是消息隊(duì)列(Message Queue,MQ)呢? 首先回憶下生活中在餐館點(diǎn)餐的場(chǎng)景,當(dāng)你點(diǎn)完餐之后...
    JunChow520閱讀 3,153評(píng)論 0 29
  • 以下是消息隊(duì)列以下的大綱,本文主要介紹消息隊(duì)列概述,消息隊(duì)列應(yīng)用場(chǎng)景和消息中間件示例(電商,日志系統(tǒng))。 本次分享...
    文檔隨手記閱讀 1,931評(píng)論 0 28
  • 一、 消息隊(duì)列概述 消息隊(duì)列中間件是分布式系統(tǒng)中重要的組件,主要解決應(yīng)用耦合、異步消息、流量削鋒等問題。實(shí)現(xiàn)高性能...
    步積閱讀 57,422評(píng)論 10 138
  • 轉(zhuǎn)自:https://tech.meituan.com/2016/07/01/mq-design.html 一、何...
    小manong閱讀 500評(píng)論 0 0
  • 1.什么是消息隊(duì)列 消息隊(duì)列允許應(yīng)用間通過消息的發(fā)送與接收的方式進(jìn)行通信,當(dāng)消息接收方服務(wù)忙或不可用時(shí),其提供了一...
    zhuke閱讀 4,622評(píng)論 0 12

友情鏈接更多精彩內(nèi)容