目前最好的異步消息隊(duì)列處理器
阻塞隊(duì)列

寫一個(gè)生產(chǎn)者消費(fèi)者模式,使用阻塞隊(duì)列
Kafka入門

消息持久化:消息永久保存,存到硬盤里。硬盤順序讀寫性能很高,可能高于內(nèi)存的隨機(jī)讀寫。所以才能保證海量數(shù)據(jù)高速度處理
高可靠性:靠分布式,可以做集群部署
高擴(kuò)展性:簡單配配就可以加一臺(tái)服務(wù)器
術(shù)語
Broker:就是一臺(tái)服務(wù)器的意思
Zookeeper:是一個(gè)獨(dú)立的軟件和應(yīng)用,用來管理集群,可以用Zookeeper管理集群??梢詥为?dú)下載也可以內(nèi)置。
消息隊(duì)列實(shí)現(xiàn)方式
點(diǎn)對點(diǎn):生產(chǎn)者消費(fèi)者,每個(gè)數(shù)據(jù)只被一個(gè)消費(fèi)者消費(fèi)
發(fā)布訂閱模式(Kafka模式):數(shù)據(jù)可以被多個(gè)消費(fèi)者使用
Topic:生產(chǎn)者發(fā)布的位置就是Topic,可以理解為文件夾。
Partition:對Topic位置進(jìn)行分區(qū)如最開始的圖片,可以同時(shí)向多個(gè)分區(qū)寫入,追加的寫入。例如生產(chǎn)者追加數(shù)據(jù)。讀的話則按照順序讀連續(xù)的Offset作為一個(gè)消息
OffSet:就是存放的索引
RePlica:副本對數(shù)據(jù)做備份的,為了讓數(shù)據(jù)更可靠,每一個(gè)分區(qū)都存不是一個(gè),而是多個(gè)副本
Leader Replica:主副本從這個(gè)分區(qū)獲取數(shù)據(jù)的時(shí)候是主副本給你數(shù)據(jù)
Follower Replica:只負(fù)責(zé)備份數(shù)據(jù),如果主副本掛了,則會(huì)從副本中選擇一個(gè)作為主副本
- 官網(wǎng)下載推薦版本,解壓
- 配置zookeeper.properties,用于存放數(shù)據(jù)的目錄
dataDir=/home/...

配置server.properties
log.dirs=/home/...

- 命令
去官網(wǎng)快速開始部分學(xué)習(xí)
啟動(dòng)zookeeper,使用配置文件
./zookeeper-server-start.sh config/zookeeper.properties
再開啟一個(gè)窗口啟動(dòng)kafka。同上操作
再啟動(dòng)一個(gè)命令,使用kafka命令工具
//創(chuàng)建一個(gè)Topic主題,代表位置,也代表消息的分類
./kafka-topics.sh --create --bootstrap-server localhost:9092
--replication-factor 1 --partitions 1
--topic test
沒有提示則創(chuàng)建好了
查看所有的主題,指定某個(gè)服務(wù)器
./kafka-topics.sh --list --bootstrap-server localhost:9092
向主題發(fā)送消息,調(diào)用生產(chǎn)者執(zhí)行文件
//指定向哪個(gè)服務(wù)器發(fā)送消息以及主題
kafka-console-producer --broker-list localhost:9092 --topic test
//發(fā)送消息
>hello
>word
再開啟一個(gè)窗口執(zhí)行消費(fèi)者
//指定服務(wù)器,它topic 并從頭讀數(shù)據(jù)
./kafka-console-consumer.sh --bootstrap-server localhost:9092 -- topic test --from-beginning
//輸出hello word
Spring整合Kafka
- 引入依賴
spring-kafka - 配置Kafka
配置server、consumer - 訪問Kafka
- 生產(chǎn)者
kafkaTemplate.send(topic, data);
- 生產(chǎn)者
- 消費(fèi)者
@KafkaListener(topics = {"test"})
public void handleMessage(ConsumerRecord record) {}
配置項(xiàng)
-
consumer配置項(xiàng)
image.png
2.application.properties
# KafkaProperties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=community-consumer-group
spring.kafka.consumer.enable-auto-commit=true //自動(dòng)提交,是否自動(dòng)提交偏移量,要不要記錄下來做提交
spring.kafka.consumer.auto-commit-interval=3000//間隔3s
-
測試類
測試類中定義
image.png
image.png
發(fā)送系統(tǒng)通知


三類定義三個(gè)不同的主題,
-
封裝事件對象,更好的擴(kuò)展。
把set方法返回this,實(shí)現(xiàn)鏈?zhǔn)骄幊獭?/p>
image.png

-
事件生產(chǎn)者類,EventProducer
image.png - 事件的消費(fèi)者
事件的消費(fèi)者主要是接收到事件對象后把它封裝到Message類中,并存入數(shù)據(jù)庫。參照數(shù)據(jù)庫from_id為1 的數(shù)據(jù)格式。 - 評論點(diǎn)贊,關(guān)注,
取消贊沒必要通知。
評論連接到評論詳情,關(guān)注連接到用戶主頁,帖子也是連接到帖子詳情。
like方法加一個(gè)postId參數(shù),便于連接到帖子詳情。 - 注意要把kafka zookeeper啟動(dòng)。windows下可以kafka會(huì)報(bào)錯(cuò),我們把kafka-log目錄刪掉重啟即可。
顯示系統(tǒng)通知
- 通知列表
顯示評論、點(diǎn)贊、關(guān)注三種類型的通知 - 通知詳情
分頁顯示某一類主題所包含的通知 -
未讀消息
在頁面頭部顯示所有的未讀消息數(shù)量
image.png
2, MessageController
把三種類型的數(shù)據(jù)都查出來,其中content是json傳的數(shù)據(jù),我么需要把它轉(zhuǎn)成對象,但是傳進(jìn)去的有轉(zhuǎn)義字符,我們需要把轉(zhuǎn)義字符解析出來
String content = HtmlUtil.htmlUnescape(message.getContent());
Map<String, Object> data = JSONObject.parseObject(content, HashMap.class);
這樣就得到了我們要的數(shù)據(jù)
查詢評論類通知需要的數(shù)據(jù)是

另外兩類差不多,直接復(fù)制然后修改一些名字即可。
關(guān)注去掉postId參數(shù),因?yàn)椴挥眠B接到詳情。
然后查詢私信的未讀消息數(shù)量以及未讀通知的數(shù)量。計(jì)算出總的消息數(shù)。私信列表也要查詢出通知的數(shù)量。
- 處理頁面,
letter的頭復(fù)制到notice即可唯一修改的就是active激活位置,active表示激活的頁面標(biāo)簽,修改下即可。 -
通知詳情
設(shè)置分頁,消息已讀,消息的發(fā)送人
image.png
頁面,處理返回按鈕,三個(gè)都是顯示到同一個(gè)評論處理,我們復(fù)制三份,然后通過判斷顯示。

-
消息未讀的數(shù)量,用攔截器處理
MessageInterceptor
image.png







