一、準(zhǔn)備工作
1、整體流程圖

<figcaption></figcaption>
基于事務(wù)消息分布式事物方案是 事務(wù)消息最終是分布式事務(wù)的最終一致性跟我之前寫(xiě)的分布式的seata的方案不太一樣,比如a給b打錢(qián),當(dāng)a的錢(qián)扣除成功之后(扣除失敗則本次失敗),。 如果系統(tǒng) B 的本地事務(wù)執(zhí)行失敗了咋辦? 基于 mq 重試咯,mq 會(huì)自動(dòng)不斷重試直到成功,如果實(shí)在是不行,可以發(fā)送報(bào)警由人工來(lái)手工回滾和補(bǔ)償。 這種方案的要點(diǎn)就是可以基于 mq 來(lái)進(jìn)行不斷重試,最終一定會(huì)執(zhí)行成功的。 因?yàn)橐话銏?zhí)行失敗的原因是網(wǎng)絡(luò)抖動(dòng)或者數(shù)據(jù)庫(kù)瞬間負(fù)載太高,都是暫時(shí)性問(wèn)題。 通過(guò)這種方案,99.9%的情況都是可以保證數(shù)據(jù)最終一致性的,剩下的 0.1%出問(wèn)題的時(shí)候,就人工修復(fù)數(shù)據(jù)唄。
2、安裝rocketmq
這里使用docker安裝rocketmq,先安裝docker
- Centos系統(tǒng)
sudo yum install -y yum-utils device-mapper-persistent-data lvm2
sudo yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo
sudo yum install docker-ce -y
sudo systemctl enable docker
sudo systemctl start docker
復(fù)制代碼
在安裝過(guò)程中,也許會(huì)遇到Requires: container-selinux >= 2.9 的異常; 可以打開(kāi)Centos下載包中的最新container-selinux包的地址, 然后運(yùn)行:
sudo yum install -y http://mirror.centos.org/centos/7/extras/x86_64/Packages/container-selinux-2.68-1.el7.noarch.rpm
復(fù)制代碼
- Ubuntu系統(tǒng)
sudo apt-get update
sudo apt-get install apt-transport-https ca-certificates curl software-properties-common
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo apt-key add -
sudo apt-key fingerprint 0EBFCD88
sudo add-apt-repository \
"deb [arch=amd64] https://download.docker.com/linux/ubuntu \
$(lsb_release -cs) \
stable"
sudo apt-get update
sudo apt-get install docker-ce
復(fù)制代碼
- Mac系統(tǒng) docs.docker.com/docker-for-…
- Windows docs.docker.com/docker-for-…
再安裝docker-compose
pip install docker-compose -i https://mirrors.aliyun.com/pypi/simple/
復(fù)制代碼
如果pip不存在,可以嘗試
sudo yum install python-pip
sudo pip install --upgrade pip
復(fù)制代碼
部署rocketmq(這里以centos為例)
mkdir /usr/local/docker/rocketmq
cd /usr/local/docker/rocketmq
vi broker.conf
復(fù)制代碼
broker.conf內(nèi)容如下
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
# 修改為你宿主機(jī)的 IP
brokerIP1=192.168.30.131
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=10911
deleteWhen=04
fileReservedTime=120
mapedFileSizeCommitLog=1073741824
mapedFileSizeConsumeQueue=300000
diskMaxUsedSpaceRatio=88
maxMessageSize=65536
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
復(fù)制代碼
cd /usr/local/docker/rocketmq
vi docker-compose.yml
復(fù)制代碼
docker-compose.yml 內(nèi)容如下
version: '3.5'
services:
rmqnamesrv:
image: foxiswho/rocketmq:server
container_name: rmqnamesrv
ports:
- 9876:9876
volumes:
- ./data/logs:/opt/logs
- ./data/store:/opt/store
networks:
rmq:
aliases:
- rmqnamesrv
rmqbroker:
image: foxiswho/rocketmq:broker
container_name: rmqbroker
ports:
- 10909:10909
- 10911:10911
volumes:
- ./data/logs:/opt/logs
- ./data/store:/opt/store
- ./data/brokerconf/broker.conf:/etc/rocketmq/broker.conf
environment:
NAMESRV_ADDR: "rmqnamesrv:9876"
JAVA_OPTS: " -Duser.home=/opt"
JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m"
command: mqbroker -c /etc/rocketmq/broker.conf
depends_on:
- rmqnamesrv
networks:
rmq:
aliases:
- rmqbroker
rmqconsole:
image: styletang/rocketmq-console-ng
container_name: rmqconsole
ports:
- 8080:8080
environment:
JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
depends_on:
- rmqnamesrv
networks:
rmq:
aliases:
- rmqconsole
networks:
rmq:
name: rmq
driver: bridge
復(fù)制代碼
cd /usr/local/docker/rocketmq
docker-compose up -d
復(fù)制代碼
啟動(dòng)完成后docker ps -a 正常如下圖

<figcaption></figcaption>
輸入 ip:8080 可進(jìn)入 rockermq 控制臺(tái)
3、下載啟動(dòng)nacos
nacos-server-1.1.:github.com/alibaba/nac… 啟動(dòng)nacos訪(fǎng)問(wèn):http://localhost:8848/nacos 用戶(hù)名密碼 nacos/nacos
4、 demo 工程代碼:github.com/leo20131231…
5、建數(shù)據(jù)庫(kù)
示例工程order訂單 的數(shù)據(jù)庫(kù)腳本:github.com/leo20131231…
示例工程storage 的數(shù)據(jù)庫(kù)腳本:github.com/leo20131231…
三、啟動(dòng)項(xiàng)目
修改相應(yīng)的數(shù)據(jù)庫(kù)鏈接
請(qǐng)求 http://localhost:9091/order/placeOrder/commit
1、訂單系統(tǒng)發(fā)送prepare消息

<figcaption></figcaption>
2、訂單系統(tǒng)執(zhí)行本地事物 1).如果本地事務(wù)執(zhí)行失敗,回滾消息,Broker端會(huì)刪除半消息 2)如果本地事務(wù)執(zhí)行成功,會(huì)確認(rèn)發(fā)送消息 3)如果當(dāng)本地事務(wù)執(zhí)行一班系統(tǒng)掛掉了,系統(tǒng)會(huì)重新調(diào)用下面checkLocalTransaction方法

<figcaption></figcaption>
3、檢查本地事務(wù)執(zhí)行狀態(tài)

<figcaption></figcaption>
4、庫(kù)存系統(tǒng)消息成功,扣減庫(kù)存

作者:leo2012
鏈接:https://juejin.im/post/6854573218851160071
來(lái)源:掘金