背景
最近公司要上一個需求,就是部分業(yè)務(wù)數(shù)據(jù)有插入動作時,對用戶進(jìn)行通知
舉個虛假的例子,你的下屬這會兒有個成交什么的
雖然 save 的邏輯的確是在我們的微服務(wù)代碼里,
我如果在 save 這里加上這些邏輯,功能沒問題,但是不就增加耦合度了么?后面修改邏輯,難度上天
用切面,先不說性能,感覺把代碼邏輯放在api容器里,總覺得不太對,我覺得這個功能肯定是要放在離線計(jì)算的task容器里的
那用定時任務(wù),看下這段時間內(nèi)有多少新創(chuàng)建的數(shù)據(jù)?可以是可以,但是一是延遲高,二是實(shí)現(xiàn)邏輯有點(diǎn)傻
還好 MongoDB 提供了 Change Stream 的功能, 戳 mongodb changeStreams
原理和中文文檔,網(wǎng)上一搜一堆,但是就是沒有生產(chǎn)使用的具體代碼例子
所以我來分享下
Spring data
我用的是 Spring Data Mongodb, 其它持久層框架的,請自行查閱官方文檔
但是這個文檔寫的有點(diǎn)簡陋
- import 沒寫,根本不知道該導(dǎo)哪個包
- 例子太簡單了點(diǎn),消費(fèi)的時候就打印一下,看不到更多的細(xì)節(jié)
所以我盡量解釋的詳細(xì)點(diǎn),但是注意,代碼本身是運(yùn)行不起來的,包括mongodb的配置和業(yè)務(wù)代碼等,需要自行實(shí)現(xiàn)
代碼
ChangeStreamService
我自己的代碼,包名是 com.xixi 開頭的,其它都是可以引入的包的代碼
import com.xixi.SkmrActionLogsDocument;
import com.xixi.DBNameConstant;
import com.xixi.StopWatch;
import com.xixi.OnApplicationStarted;
import com.xixi.IChangeStreamStop;
import com.xixi.CommonErrorHandler;
import com.xixi.LogListener;
import com.mongodb.client.model.changestream.OperationType;
import org.apache.commons.lang3.StringUtils;
import org.bson.BsonString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.data.mongodb.core.ChangeStreamOptions;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.MatchOperation;
import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest;
import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest.ChangeStreamRequestOptions;
import org.springframework.data.mongodb.core.messaging.DefaultMessageListenerContainer;
import org.springframework.data.mongodb.core.messaging.MessageListenerContainer;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.stereotype.Service;
/**
* @author YellowTail
* @since 2020-06-02
*/
@Service
public class ChangeStreamService implements IChangeStreamStop, OnApplicationStarted {
private static final Logger LOGGER = LoggerFactory.getLogger(ChangeStreamService.class);
@Autowired
private MongoTemplate mongoTemplate;
@Autowired
private LogListener logListener;
@Autowired
private ResumeTokenService resumeTokenService;
private MessageListenerContainer messageListenerContainer;
// https://docs.spring.io/spring-data/mongodb/docs/2.2.6.RELEASE/reference/html/#change-streams
public void onStarted() {
LOGGER.info("ChangeStreamService start");
// 1. 啟動一個 消息監(jiān)聽容器
// 構(gòu)造、使用了一個 spring 實(shí)現(xiàn)的線程池
SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
simpleAsyncTaskExecutor.setConcurrencyLimit(10);
simpleAsyncTaskExecutor.setThreadNamePrefix("cs-mq-consumer-");
messageListenerContainer = new DefaultMessageListenerContainer(mongoTemplate, simpleAsyncTaskExecutor);
messageListenerContainer.start();
// 2. 建 一個監(jiān)聽器, 當(dāng)有 消息收到的時候, 會被調(diào)用, 消息的 body 會被轉(zhuǎn)成 domain, 原始消息在 Document 里面
// 也就是 LogListener
// 3. 設(shè)置一些監(jiān)聽選項(xiàng)
// https://docs.mongodb.com/manual/reference/change-events/#change-stream-output
ChangeStreamRequestOptions requestOptions = new ChangeStreamRequestOptions("yourDb",
"yourCollection", genOptions());
// 4. 向容器注冊一個監(jiān)聽請求, 返回值是一個訂閱對象, 可以檢查當(dāng)前任務(wù)的狀態(tài), 也可以用來取消執(zhí)行來釋放資源
CommonErrorHandler commonErrorHandler = new CommonErrorHandler();
messageListenerContainer.register(new ChangeStreamRequest<>(logListener, requestOptions),
SkmrActionLogsDocument.class, commonErrorHandler);
}
/**
* 停止 消息監(jiān)聽容器
* @param
* @author YellowTail
* @since 2020-06-02
*/
public void stop() {
LOGGER.info("ChangeStreamService stop MessageListenerContainer");
StopWatch stopWatch = StopWatch.createStarted();
// 5. 停止容器
messageListenerContainer.stop();
LOGGER.info("ChangeStreamService stop container success, cost {} ms", stopWatch.stopThenRestart());
// 6. 向 redis 存入 這次最后的 token
// dev 測試發(fā)現(xiàn),新的容器先起來了,老的容器后 stop的
resumeTokenService.updateToken(logListener.getLastToken());
LOGGER.info("ChangeStreamService stop success, cost {} ms", stopWatch.stop());
}
/**
* 生成 change stream 的配置
* @param
* @author YellowTail
* @since 2020-06-09
*/
private ChangeStreamOptions genOptions() {
// 使用 pipeline 來過濾
MatchOperation matchOperation = Aggregation.match(Criteria.where("operationType").is(OperationType.INSERT.getValue()));
// ChangeStreamOptions changeStreamOptions = ChangeStreamOptions.empty();
ChangeStreamOptions.ChangeStreamOptionsBuilder changeStreamOptionsBuilder = ChangeStreamOptions.builder()
.filter(Aggregation.newAggregation(matchOperation));
String token = resumeTokenService.getToken();
if (StringUtils.isNotBlank(token)) {
// 重啟的時候,可以接著上次的來
changeStreamOptionsBuilder.resumeToken(new BsonString(token));
}
return changeStreamOptionsBuilder.build();
}
}
現(xiàn)在來講解一下代碼細(xì)節(jié)
onStarted 這個方法做了幾件事
- 新建一個
消息監(jiān)聽容器,并啟動起來 - 新建一個
監(jiān)聽器,有消息來的時候,去消費(fèi) - 根據(jù)需要,設(shè)置一些監(jiān)聽選項(xiàng)
- 向
消息監(jiān)聽容器注冊一個監(jiān)聽請求,關(guān)聯(lián)上監(jiān)聽器
新建一個消息監(jiān)聽容器
我偷懶,沒有做什么更多的設(shè)置,基本是全默認(rèn)的
設(shè)置了線程數(shù)(應(yīng)該是這個作用吧,沒詳細(xì)了解)
設(shè)置了線程名字前綴(方便在日志文件里搜索日志)
新建一個監(jiān)聽器
我新建了一個 Service 來做這件事
import com.xixi.SkmrActionLogsDocument;
import com.xixi.IdTokenModel;
import com.xixi.ResumeTokenStack;
import com.xixi.VisitorNotifyMqObject;
import com.xixi.VisitorMqProducerService;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import org.apache.commons.lang3.StringUtils;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.messaging.Message;
import org.springframework.data.mongodb.core.messaging.MessageListener;
import org.springframework.stereotype.Service;
/**
* @author YellowTail
* @since 2020-06-02
*/
@Service
public class LogListener implements MessageListener<ChangeStreamDocument<Document>, SkmrActionLogsDocument> {
private static final Logger LOGGER = LoggerFactory.getLogger(LogListener.class);
@Autowired
private MqProducerService mqProducerService;
private final ResumeTokenStack resumeTokenStack = new ResumeTokenStack();
/**
* 收到消息的時候,這個方法會被調(diào)用
* @param message
* @author YellowTail
* @since 2020-06-03
*/
public void onMessage(Message<ChangeStreamDocument<Document>, SkmrActionLogsDocument> message) {
LOGGER.info("LogListener receive a message");
String token = message.getRaw().getResumeToken().get("_data").asString().getValue();
SkmrActionLogsDocument document = message.getBody();
String _id = document.get_id();
LOGGER.info("document _id is {}, targetType {}", _id, document.getTargetType());
// 1. 把 _id 和 token 存起來
resumeTokenStack.push(new IdTokenModel(_id, token));
// 2. 做一些事情
// 3. 發(fā)送 mq 消息
mqProducerService.send(xxx);
}
/**
* 得到消費(fèi)的最后一個 token
* @author YellowTail
* @since 2020-06-08
*/
public String getLastToken() {
IdTokenModel pop = resumeTokenStack.pop();
if (null == pop) {
return null;
}
return pop.getToken();
}
}
錯誤處理
簡單實(shí)現(xiàn)了一下
public class CommonErrorHandler implements ErrorHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(CommonErrorHandler.class);
public void handleError(Throwable t) {
LOGGER.error("occur error, ", t);
}
}
優(yōu)雅停機(jī)和避免重復(fù)消費(fèi)
因?yàn)槲覀兪?code>k8s,容器重啟的時候,可能會有多個容器并存,我擔(dān)心多個容器同時去 watch,導(dǎo)致重復(fù)提醒,
所以加了優(yōu)雅停機(jī), 在重啟的時候,讓老容器不再去watch
邏輯就是
- 寫一個 controller
- controller 調(diào)用接口
IChangeStreamStop的stop方法
這樣,k8s在對容器關(guān)停的時候,容器就能停止容器,避免重復(fù)消費(fèi);具體代碼實(shí)現(xiàn)就不貼了
定義一個接口
public interface IChangeStreamStop {
void stop();
}
所以可以看到前面的代碼
ChangeStreamService implements IChangeStreamStop
k8s yaml 配置
containers:
- name: change-stream
env:
- name: aliyun_logs_xxx
value: /ms/logs/*.log
image: rxxx:latest
imagePullPolicy: Always
ports:
- containerPort: 8080
#就緒檢查
readinessProbe:
failureThreshold: 10
httpGet:
path: /xxx
port: 8080
scheme: HTTP
initialDelaySeconds: 20
periodSeconds: 2
successThreshold: 1
timeoutSeconds: 1
#健康檢查
livenessProbe:
failureThreshold: 10
initialDelaySeconds: 20
periodSeconds: 2
successThreshold: 1
tcpSocket:
port: 8080
timeoutSeconds: 1
# 優(yōu)雅停機(jī)
lifecycle:
preStop:
httpGet:
path: /xxx/stop
port: 8080
scheme: HTTP
#資源限制
resources:
limits:
memory: 2Gi
requests:
memory: 1500Mi
啟動
那么 ChangeStreamService里的消息監(jiān)聽容器 什么時候啟動起來呢?
定義一個接口
/**
* IOC 容器啟動之后會自動調(diào)用的接口
* @author YellowTail
* @since 2019-04-03
*/
public interface OnApplicationStarted {
/**
* <br>IOC 容器啟動之后會調(diào)用的方法
*
* @author YellowTail
* @since 2019-04-03
*/
void onStarted();
}
對應(yīng)代碼
ChangeStreamService implements IChangeStreamStop, OnApplicationStarted
具體自行實(shí)現(xiàn)