一、系統(tǒng)之間的通信技術(shù)
- 分布式系統(tǒng)之間并不獨(dú)立存在的,各個(gè)系統(tǒng)之間往往需要共同完成某一個(gè)功能,這樣就涉及到系統(tǒng)之間的通信,業(yè)界通常有兩種通信方式,一種是遠(yuǎn)程過程調(diào)用(RPC),一種就是消息隊(duì)列方式(MQ)。
- 消息隊(duì)列通信:指有應(yīng)用中的某一個(gè)系統(tǒng)發(fā)送信息,由關(guān)心這條信息的系統(tǒng)負(fù)責(zé)接收,并在接收到消息后進(jìn)行各個(gè)系統(tǒng)之間的業(yè)務(wù)邏輯開發(fā)。其中這里的消息可以是簡(jiǎn)單的字符串,也可以是復(fù)雜的流或者是對(duì)象。
-
消息隊(duì)列的實(shí)現(xiàn)方式:消息在被發(fā)送后就立即返回,由消息隊(duì)列來負(fù)責(zé)消息的傳遞,消息發(fā)布者只是管將消息發(fā)布到消息隊(duì)列而不用管消息的接受,消息使用者只是管從消息隊(duì)列中取出消息而不用管誰來發(fā)布的。
二、消息隊(duì)列由來(使用場(chǎng)合)
- 使用消息隊(duì)列的典型場(chǎng)景是異步,同時(shí)也可以解決解耦、削峰、日志收集、事物最終一致性等問題。概括起來消息隊(duì)列最大功能就是八個(gè)字:異步解耦,削峰填谷
1、解耦
緊耦合的缺陷:一個(gè)模塊的改動(dòng)將會(huì)導(dǎo)致其他關(guān)聯(lián)模塊發(fā)生變化,各個(gè)模塊難以完美獨(dú)立演化。
解耦:所謂解耦就是一個(gè)模塊只關(guān)心自己的事情就可以,而依賴該模塊的其他模塊如果做得不是很重要的事,有通知即可,無需等待結(jié)果。
解耦實(shí)現(xiàn):要在各個(gè)系統(tǒng)之間實(shí)現(xiàn)解耦,只要加一個(gè)中間件就行。通??梢约右砸粋€(gè)消息中間件來完成系統(tǒng)之間的解耦,基于消息中間件實(shí)現(xiàn)的解耦,只關(guān)心通知,而非結(jié)果。
2、流量削峰
在流量來臨的時(shí)候,通常通過增加機(jī)器來面對(duì),提高系統(tǒng)的可用性。但是有的流量是突然來的(比如說一天的流量是波動(dòng)變化的,存在波峰情況),如果機(jī)器一直加著,高峰時(shí)候可行,但是平時(shí)確實(shí)浪費(fèi)機(jī)器。
- 面對(duì)這種情況,可以考慮使用消息隊(duì)列先將請(qǐng)求持久化,然后逐步的進(jìn)行處理,從而削平高峰流量,改善系統(tǒng)性能。
3、日志收集
不多少,經(jīng)典的kafka
4、事物最終一致性
三、消息隊(duì)列功能特點(diǎn)
消息:指應(yīng)用之間 傳遞的數(shù)據(jù),消息的表現(xiàn)形式多樣,可以是簡(jiǎn)單的字符串等也可以是復(fù)雜的對(duì)象等。
隊(duì)列:抽象的指消息的進(jìn)和出,但是消息的進(jìn)出并不一定是同步進(jìn)行的,因此需要一個(gè)容器來暫存和處理消息。

Broker:消息處理中心,負(fù)責(zé)消息的接受、存儲(chǔ)、路由、轉(zhuǎn)發(fā)等
Producer:消息生產(chǎn)者,負(fù)責(zé)產(chǎn)生和發(fā)送消息到消息處理中心
Consumer:消息消費(fèi)者,負(fù)責(zé)從消息處理中心獲取消息,并進(jìn)行處理。
但真實(shí)的生產(chǎn)環(huán)境,消息隊(duì)列往往不止是要有基本的消息發(fā)送、接受、存儲(chǔ)。往往還需要解決諸如消息堆積、消息持久化、消息可靠投遞、消息重復(fù)、嚴(yán)格有序性、集群高可用等等問題
1、消息堆積
消息堆積:某一個(gè)時(shí)間段內(nèi),消費(fèi)者處理速度沒有跟上生產(chǎn)者發(fā)送消息的速度,消息在消息處理中心積壓的過程。
因此需要消息隊(duì)列能夠處理這樣的場(chǎng)景,比如設(shè)置一個(gè)閾值,一旦消息堆積超過這個(gè)閾值就觸發(fā)消息不在存儲(chǔ)進(jìn)入消息中心直接丟棄。
2、消息持久化
消息持久化是消息中心必備的一個(gè)功能。通常作用是將消息進(jìn)行暫存下來,合適的時(shí)機(jī)在進(jìn)行消費(fèi)。持久化的方案很多,可以存儲(chǔ)到內(nèi)存,可以存儲(chǔ)到磁盤,通常兩者都要。
3、可靠投遞
可靠投遞:不允許存在消息丟失的場(chǎng)景發(fā)生
消息丟失可發(fā)生在:
1、從生產(chǎn)者到消息中心過程
2.、從消息中心到消費(fèi)者
3、消息中心處理持久化消息過程
4、消息重復(fù)
上面講了消息的可靠投遞,為了滿足消息的可靠投遞會(huì)把消息持久化,然后定時(shí)輪詢持久化中的消息,有可能會(huì)造成消息的重復(fù)消費(fèi),有時(shí)候消息的重復(fù)消費(fèi)比消息丟失更加影響更大。
5、嚴(yán)格有序
有時(shí)候要求消息是按順序消費(fèi)的,比如購買商品過程中先下訂單,在支付。
6、高可用
作為中間件,高可用方案是必須的
四、簡(jiǎn)單的消息中間系統(tǒng)實(shí)現(xiàn)
消息隊(duì)列至少包含的的三個(gè)角色:
Broker:消息處理中心,負(fù)責(zé)消息的接受、存儲(chǔ)、路由、轉(zhuǎn)發(fā)等
Producer:消息生產(chǎn)者,負(fù)責(zé)產(chǎn)生和發(fā)送消息到消息處理中心
Consumer:消息消費(fèi)者,負(fù)責(zé)從消息處理中心獲取消息,并進(jìn)行處理。
其中,消息處理中心是核心,必須具備消息發(fā)送、消息接收和消息暫存的功能。下面就用socket結(jié)合多線程知識(shí)實(shí)現(xiàn)一個(gè)簡(jiǎn)單的消息隊(duì)列應(yīng)用。
1、消息處理中心
- 消息處理消息主要實(shí)現(xiàn)的功能有存儲(chǔ)消息、消息接收、消息發(fā)送。并要對(duì)外暴露相應(yīng)的端口和ip,用戶請(qǐng)求。
public class Broker {
//存儲(chǔ)的最大消息條數(shù)
private final static int max_size = 10;
//保存消息的容器
private static BlockingQueue<String> messageQueue = new ArrayBlockingQueue(max_size);
public static void produce(String message) {
if (messageQueue.offer(message)) {
System.out.println("成功向消息中心投遞消息:" + message + ",當(dāng)前的消息總數(shù)為:" + messageQueue.size());
} else {
System.out.println("當(dāng)前消息容易已經(jīng)滿了,不能在進(jìn)行消息投遞!");
}
System.out.println("===============================");
}
public static String consume() {
String message = messageQueue.poll();
if (message != null) {
System.out.println("已經(jīng)消費(fèi)消息:" + message + ",當(dāng)前消息總數(shù):" + messageQueue.size());
} else {
System.out.println("當(dāng)前消息容器中沒有消息可以消費(fèi)!");
}
System.out.println("===================================");
return message;
}
}
public class BrokerServer implements Runnable {
public static int server_port = 9999;
private final Socket socket;
public BrokerServer(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintWriter printWriter = new PrintWriter(socket.getOutputStream());
while (true) {
String str = bufferedReader.readLine();
if (str == null) {
continue;
}
System.out.println("接收到原始信息:" + str);
if (str.equalsIgnoreCase("consume")) {
//消費(fèi)一條消息
String message = Broker.consume();
printWriter.println(message);
printWriter.flush();
} else {
//生產(chǎn)消息
Broker.produce(str);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
//消息處理中心的啟動(dòng)
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(server_port);
while (true) {
BrokerServer brokerServer = new BrokerServer(serverSocket.accept());
new Thread(brokerServer).start();
}
}
}
2、生產(chǎn)者和消費(fèi)者
//封裝了生產(chǎn)和消費(fèi)的方法客戶端
public class MqClient {
public static void produce(String msg) throws IOException {
Socket socket=new Socket(InetAddress.getLocalHost(),BrokerServer.server_port);
PrintWriter out =new PrintWriter(socket.getOutputStream());
out.println(msg);
out.flush();
}
public static String consume() throws IOException {
Socket socket=new Socket(InetAddress.getLocalHost(),BrokerServer.server_port);
BufferedReader bufferedReader=new BufferedReader(new
InputStreamReader(socket.getInputStream()));
PrintWriter printWriter=new PrintWriter(socket.getOutputStream());
printWriter.println("consume");
printWriter.flush();
String message=bufferedReader.readLine();
return message;
}
}
//生產(chǎn)者
public class Produce1 {
public static void main(String[] args) throws IOException, InterruptedException {
int i=0;
while (true){
MqClient.produce("hello wolrd "+i);
i++;
Thread.sleep(3000);
}
}
}
//消費(fèi)者
public class Consume1 {
public static void main(String[] args) throws IOException, InterruptedException {
while (true){
String consume = MqClient.consume();
if (consume!=null){
System.out.println("consum1-->獲取消費(fèi)信息為:"+consume);
}
Thread.sleep(100000);
}
}
}
