本文質(zhì)量: 3 / 10(對Message Queue有一定了解讀會比較好)
本文修改次數(shù): 0
閱讀時長: 10 min
前言
本篇對RocketMQ Connect 以及 OpenMessaging中文文檔里的部分概念,術語做了一個整理。筆者的理解可能有些偏差,如有錯誤還請指出。
OpenMessaging, Rocket Connect以及 Message Connector
原文的行文思路有點不直觀,OpenMessaging和Message Connector的解釋交叉進行,導致有些概念不是很清晰。我們先來說OpenMessaging
OpenMessaging 是一套消息中間件領域的規(guī)范
這個規(guī)范有點類似于容器領域的標準cri,以及容器網(wǎng)絡領域的標準cni。規(guī)范希望市面上所有的Message Queue 的實現(xiàn),比如Kafka,RabbitMQ,RocketMQ可以遵循同樣一套標準。這樣的一個好處(只是其中一個好處)應用程序不用為每個Message Queue的實現(xiàn)都寫一個接入的插件。比如MongoDB想要給Kafka發(fā)送數(shù)據(jù)的話,可能要寫一個叫Mongo-connect-Kafka的插件,如果之后換到RocketMQ又要寫Mongo-connect-RocketMQ。這肯定加大了開源社區(qū)的工作量。如果Kafka,RocketMQ都去實現(xiàn)OpenMessaging這個規(guī)范,重復造輪子就可以避免了。
那么OpenMessaging Connect 是所有OpenMessaging 規(guī)范里的一個子集,這個子集能夠解決我們重復造輪子的問題。另外一個OpenMessage Runtime 也是OpenMessaging 規(guī)范里的一個子集。
我們接下來的討論都是關于OpenMessaging Connect和OpenMessaging Runtime這兩個規(guī)范。
OpenMessaging Connect規(guī)范的架構里,有三個組件
-
Source Connector: 負責把數(shù)據(jù)從Source 系統(tǒng),比如MySQL,讀到某一種Message Queue中,可以是RocketMQ -
Sink Connector: 從某一種Message Queue中讀取數(shù)據(jù)到目標系統(tǒng)中,比如一個Postgres -
Task: 可以是Source Task或者Sink Task,Connector會創(chuàng)建Task的定義,然后交給OpenMessaging Runtime去跑這個Task
期間兩種Connector會負責數(shù)據(jù)格式轉(zhuǎn)換。
這樣的設計對于數(shù)據(jù)庫同步的使用場景非常有用,不過這不代表OpenMessaging只能用于數(shù)據(jù)庫同步,它當然也可以用于普通的Producer/Consumer場景。不同的是,Producer不直接連接到某一個Message Queue集群,而是去訪問對應Source Connector,Consumer同理。(這一段不是很確定,我可能說錯了)(如果不是某種數(shù)據(jù)系統(tǒng)的同步場景的話,connector似乎有點多此一舉?)(為什么數(shù)據(jù)庫同步場景適合用Connector?這是個很有趣的問題,之后把我的一些疑問搞懂之后再來寫)
Open Messaging Runtime 為 Source Connector 以及 Sink Connector 提供運行環(huán)境。當然它還有些其他的功能,比如
OpenMessaging Runtime為其上運行的各種類型Connector及其關聯(lián)的Task任務提供統(tǒng)一的運行時環(huán)境(包括負載均衡,實例間的調(diào)度,配置管理以及集群管理)。 因此,在RocketMQ的消息路由的特性設計與實現(xiàn)中,依然不會將涉及消息復制和同步的邏輯滲透至OpenMessaging Runtime層,該層仍然是一個具體Connector實現(xiàn)無關且高度抽象統(tǒng)一的中間層。這里,只需要將runtime 組件進行一定的參數(shù)可配置化改造即可。
最后總結(jié)一下
- Source onnector和Sink Connector分別去連兩端的數(shù)據(jù)源
- Open Messaging Runtime 是用來跑Connector的,因此可以在多個平臺(go,Java)上實現(xiàn)
- Source Connector 從源數(shù)據(jù)系統(tǒng)讀數(shù)據(jù)并發(fā)到一個Message Queue,Sink Connector從Message Queue里讀數(shù)據(jù)再寫入到目標數(shù)據(jù)系統(tǒng)里。
接下來是RocketMQ Connect
先看官方的解釋
RocketMQ遵循Message Connector的設計理念,依托RocketMQ進行實現(xiàn)。
簡單理解RocketMQ Connect就是借RocketMQ從其他系統(tǒng)獲取數(shù)據(jù)發(fā)送到RocketMQ,然后從RocketMQ消費消息寫入到其他系統(tǒng)。主要由Source Connector,Sink Connector,Runtime組成。
這段話其實可以暫時忽略Message Connector,寫成
RocketMQ Connect實現(xiàn)了OpenMessaging,依托RocketMQ進行實現(xiàn)。
簡單理解RocketMQ Connect使用Source Connector 從其他系統(tǒng)獲取數(shù)據(jù)發(fā)送到RocketMQ,然后從RocketMQ消費消息寫入到其他系統(tǒng)。主要由Source Connector,Sink Connector實現(xiàn)以及RocketMQ Connect Runtime組成。
所以我理解的是
- RocketMQ Connect Runtime是 OpenMessaging Runtime的一個實現(xiàn)
- RocketMQ Connect 應該能夠使用各種已經(jīng)實現(xiàn)OpenMessaging Connect規(guī)范的Connector,而不用自己另外寫。
最后是Message Connector。當我看了RocketMQ Connect 和 OpenMessaging之后,我覺得Message Connector的很多概念和OpenMessaging是重合的,所以我決定暫時不管Message Connector的概念,因為Message Connector是和RocketMQ cluster 之間的消息同步相關的,而我發(fā)現(xiàn)不用Message Connector的概念也能把RocketMQ cluster 消息同步講清楚。
RocketMQ Connect 和 RocketMQ Connector
這是兩個完全不同的東西,RocketMQ Connect已經(jīng)講過了,是使用RocketMQ
對OpenMessaging規(guī)范的實現(xiàn)。而關于RocketMQ Connector,官方解釋是
Replicator是RocketMQ Connector的別名,用于RocketMQ集群之間的信息同步,Replicator是運行在RocketMQ Runtime上的RocketMQ 集群消息同步Connector,其主要實現(xiàn)了Connector的機制,能夠同步兩個獨立的RocketMQ集群之間的消息。
原文中有幾處混用,對開發(fā)者造成了一定的困擾,等我整理完了可以交個PR改一下。(不過我不知道這個GitBook在哪里==
RocketMQ Connector只有Source Connector。和Connector一樣,都是從某一個數(shù)據(jù)系統(tǒng)讀數(shù)據(jù)(通常是備份數(shù)據(jù)),只不過對于Rocket MQConnector來說,要讀的源數(shù)據(jù)系統(tǒng)也是RocketMQ。
Source 和 Sink Connector
目前我能在rocketmq-externals里找到的大多數(shù)Connector都是Source Connector,只有rocketmq-connect-jdbc同時實現(xiàn)了Source/Sink Connector。
我們可以注意到,很多的Source Connector都是用來備份數(shù)據(jù)或者同步數(shù)據(jù)的,有的實現(xiàn)方式是去監(jiān)聽數(shù)據(jù)庫的修改事件(比如MySQL Connector就去模擬一個備份數(shù)據(jù)庫,所有對這個Fake Database的修改都會被MySQL Connector監(jiān)聽到并記錄并通過Message Queue發(fā)到另一個集群再做相應的處理)。
這里就引出了之前說的Message Connector所想要實現(xiàn)的功能。目前來說RocketMQ Connect可以都是用于數(shù)據(jù)系統(tǒng)數(shù)據(jù)備份或者同步,比如RocketMQ Connector就是用來在多個region的RocketMQ之間同步消息。
這里有幾個小疑問,
- 是否Source Connector 可以單獨使用?(即沒有Sink Connector,我猜答案是可以的,但是不知道具體實現(xiàn))
- X Source Connector必須和X Sink Connector一起用嗎, 也就是說,我可以用MySQL Source Connector 以及 Cassandra Sink Connector把數(shù)據(jù)從MySQL 轉(zhuǎn)移到Cassandra里嗎?如果可以,那說明所有的數(shù)據(jù)都必須先被轉(zhuǎn)換成某一種中間格式,我覺得不是很可行。如果不可以,那Source 和 Sink Connector就必須就數(shù)據(jù)轉(zhuǎn)換達成共識,這種coupling似乎也不是很好。之后解決了問題會更新在下面。
問題
我注意到了一個很奇怪的地方,那就是目前所有rocketmq-externals
里的Connector都叫rocketmq-connect-xxx,可是想到我之前寫Connector的部分,所有的Connector應該是實現(xiàn)了OpenMessaging Connect的規(guī)范的Connector,那這也就意味著,他們應該是Message Queue無關的。也就是不應該綁定某個Connector到RocketMQ上。而實際上無論從命名,還是coding上Connector的實際實現(xiàn)都依賴于RocketMQ,這也就意味著我們并不能實現(xiàn)“一種Connector,所有Message Queue都能用”的最初設想。
所以我十分懷疑我對Connector的理解有問題,所以還是得繼續(xù)讀源碼或者問問RocketMQ的開發(fā)者。
后記和一些雜談
我沒有仔細深入研究RocketMQ 和 Kafka在設計上有什么取舍(RocketMQ 因為用于螞蟻金服和淘寶,對消息可靠性要求肯定比Kafka高,所以性能上也肯定有折損),但是RocketMQ能扛住國內(nèi)雙11真挺厲害的。我比較感興趣的是像OpenMessaing這樣的某一領域內(nèi)的標準。因為我之前實習用了很多Kubernetes, 所以知道一些容器領域的標準比如cri,cni。為什么要去爭奪這么一個標準制定者的地位,我想的還不是特別的透徹,懂的朋友可以私戳我給我科普一下嘿嘿。
關于Message Queue領域能不能制定一個類似的標準,其實要考慮的方面很多。一是,像Kafka這樣的市場占有率巨頭沒有必要去制定標準,它的方向肯定是向Kubernetes學習,讓自身成為某一個領域的標準,直接把其他的產(chǎn)品排擠出去。二是(我不是很懂這一點,所以可能有些基礎性的知識錯誤)對于Message Queue這樣的performance-critical的產(chǎn)品,讓一個已經(jīng)有的Message Queue 去適配一個新的標準是否會引入額外的開銷。
之后應該會寫一些關于共識算法的文章,之前看了很多遍Paxos但是一直感覺要自己將給別人還是很難。網(wǎng)上的中文資料有一大半是錯的,還有一些說得很模糊的(雖然Paxos工程實現(xiàn)本身就挺模糊的LOL)。我爭取寫一個能讓我室友看懂的版本出來。