MongoDB change stream 實(shí)戰(zhàn)

背景

最近公司要上一個需求,就是部分業(yè)務(wù)數(shù)據(jù)有插入動作時,對用戶進(jìn)行通知
舉個虛假的例子,你的下屬這會兒有個成交什么的

雖然 save 的邏輯的確是在我們的微服務(wù)代碼里,
我如果在 save 這里加上這些邏輯,功能沒問題,但是不就增加耦合度了么?后面修改邏輯,難度上天
用切面,先不說性能,感覺把代碼邏輯放在api容器里,總覺得不太對,我覺得這個功能肯定是要放在離線計(jì)算的task容器里的

那用定時任務(wù),看下這段時間內(nèi)有多少新創(chuàng)建的數(shù)據(jù)?可以是可以,但是一是延遲高,二是實(shí)現(xiàn)邏輯有點(diǎn)傻

還好 MongoDB 提供了 Change Stream 的功能, 戳 mongodb changeStreams

原理和中文文檔,網(wǎng)上一搜一堆,但是就是沒有生產(chǎn)使用的具體代碼例子

所以我來分享下

Spring data

我用的是 Spring Data Mongodb, 其它持久層框架的,請自行查閱官方文檔

spring data change-streams

但是這個文檔寫的有點(diǎn)簡陋

  1. import 沒寫,根本不知道該導(dǎo)哪個包
  2. 例子太簡單了點(diǎn),消費(fèi)的時候就打印一下,看不到更多的細(xì)節(jié)

所以我盡量解釋的詳細(xì)點(diǎn),但是注意,代碼本身是運(yùn)行不起來的,包括mongodb的配置和業(yè)務(wù)代碼等,需要自行實(shí)現(xiàn)

代碼

ChangeStreamService

我自己的代碼,包名是 com.xixi 開頭的,其它都是可以引入的包的代碼


import com.xixi.SkmrActionLogsDocument;
import com.xixi.DBNameConstant;
import com.xixi.StopWatch;
import com.xixi.OnApplicationStarted;
import com.xixi.IChangeStreamStop;
import com.xixi.CommonErrorHandler;
import com.xixi.LogListener;

import com.mongodb.client.model.changestream.OperationType;
import org.apache.commons.lang3.StringUtils;
import org.bson.BsonString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.data.mongodb.core.ChangeStreamOptions;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.MatchOperation;
import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest;
import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest.ChangeStreamRequestOptions;
import org.springframework.data.mongodb.core.messaging.DefaultMessageListenerContainer;
import org.springframework.data.mongodb.core.messaging.MessageListenerContainer;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.stereotype.Service;


/**
 * @author YellowTail
 * @since 2020-06-02
 */
@Service
public class ChangeStreamService implements IChangeStreamStop, OnApplicationStarted  {

    private static final Logger LOGGER = LoggerFactory.getLogger(ChangeStreamService.class);

    @Autowired
    private MongoTemplate mongoTemplate;

    @Autowired
    private LogListener logListener;

    @Autowired
    private ResumeTokenService resumeTokenService;

    private MessageListenerContainer messageListenerContainer;


    // https://docs.spring.io/spring-data/mongodb/docs/2.2.6.RELEASE/reference/html/#change-streams

    public void onStarted() {

        LOGGER.info("ChangeStreamService start");

        // 1. 啟動一個 消息監(jiān)聽容器
        // 構(gòu)造、使用了一個 spring 實(shí)現(xiàn)的線程池
        SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
        simpleAsyncTaskExecutor.setConcurrencyLimit(10);
        simpleAsyncTaskExecutor.setThreadNamePrefix("cs-mq-consumer-");

        messageListenerContainer = new DefaultMessageListenerContainer(mongoTemplate, simpleAsyncTaskExecutor);
        messageListenerContainer.start();

        // 2. 建 一個監(jiān)聽器, 當(dāng)有 消息收到的時候, 會被調(diào)用, 消息的 body 會被轉(zhuǎn)成 domain, 原始消息在 Document 里面
        // 也就是 LogListener

        
        // 3. 設(shè)置一些監(jiān)聽選項(xiàng)
        // https://docs.mongodb.com/manual/reference/change-events/#change-stream-output

        ChangeStreamRequestOptions requestOptions = new ChangeStreamRequestOptions("yourDb",
                "yourCollection", genOptions());


        // 4. 向容器注冊一個監(jiān)聽請求, 返回值是一個訂閱對象, 可以檢查當(dāng)前任務(wù)的狀態(tài), 也可以用來取消執(zhí)行來釋放資源
        CommonErrorHandler commonErrorHandler = new CommonErrorHandler();

        messageListenerContainer.register(new ChangeStreamRequest<>(logListener, requestOptions),
                SkmrActionLogsDocument.class, commonErrorHandler);
    }

    /**
     * 停止 消息監(jiān)聽容器
     * @param
     * @author YellowTail
     * @since 2020-06-02
     */
    public void stop() {
        LOGGER.info("ChangeStreamService stop MessageListenerContainer");

        StopWatch stopWatch = StopWatch.createStarted();

        // 5. 停止容器
        messageListenerContainer.stop();
        LOGGER.info("ChangeStreamService stop container success, cost {} ms", stopWatch.stopThenRestart());

        // 6. 向 redis 存入 這次最后的 token
        // dev 測試發(fā)現(xiàn),新的容器先起來了,老的容器后 stop的
        resumeTokenService.updateToken(logListener.getLastToken());

        LOGGER.info("ChangeStreamService stop success, cost {} ms", stopWatch.stop());
    }

    /**
     * 生成 change stream 的配置
     * @param
     * @author YellowTail
     * @since 2020-06-09
     */
    private ChangeStreamOptions genOptions() {
        // 使用 pipeline 來過濾

        MatchOperation matchOperation = Aggregation.match(Criteria.where("operationType").is(OperationType.INSERT.getValue()));

//        ChangeStreamOptions changeStreamOptions =  ChangeStreamOptions.empty();

        ChangeStreamOptions.ChangeStreamOptionsBuilder changeStreamOptionsBuilder = ChangeStreamOptions.builder()
                .filter(Aggregation.newAggregation(matchOperation));

        String token = resumeTokenService.getToken();
        if (StringUtils.isNotBlank(token)) {
            // 重啟的時候,可以接著上次的來
            changeStreamOptionsBuilder.resumeToken(new BsonString(token));
        }

        return changeStreamOptionsBuilder.build();
    }
}

現(xiàn)在來講解一下代碼細(xì)節(jié)

onStarted 這個方法做了幾件事

  1. 新建一個消息監(jiān)聽容器,并啟動起來
  2. 新建一個監(jiān)聽器,有消息來的時候,去消費(fèi)
  3. 根據(jù)需要,設(shè)置一些監(jiān)聽選項(xiàng)
  4. 消息監(jiān)聽容器注冊一個監(jiān)聽請求,關(guān)聯(lián)上監(jiān)聽器

新建一個消息監(jiān)聽容器

我偷懶,沒有做什么更多的設(shè)置,基本是全默認(rèn)的
設(shè)置了線程數(shù)(應(yīng)該是這個作用吧,沒詳細(xì)了解)
設(shè)置了線程名字前綴(方便在日志文件里搜索日志)

新建一個監(jiān)聽器

我新建了一個 Service 來做這件事


import com.xixi.SkmrActionLogsDocument;
import com.xixi.IdTokenModel;
import com.xixi.ResumeTokenStack;
import com.xixi.VisitorNotifyMqObject;
import com.xixi.VisitorMqProducerService;

import com.mongodb.client.model.changestream.ChangeStreamDocument;
import org.apache.commons.lang3.StringUtils;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.messaging.Message;
import org.springframework.data.mongodb.core.messaging.MessageListener;
import org.springframework.stereotype.Service;

/**
 * @author YellowTail
 * @since 2020-06-02
 */
@Service
public class LogListener implements MessageListener<ChangeStreamDocument<Document>, SkmrActionLogsDocument> {

    private static final Logger LOGGER = LoggerFactory.getLogger(LogListener.class);

    @Autowired
    private MqProducerService mqProducerService;

    private final ResumeTokenStack resumeTokenStack = new ResumeTokenStack();

    /**
     * 收到消息的時候,這個方法會被調(diào)用
     * @param message
     * @author YellowTail
     * @since 2020-06-03
     */
    public void onMessage(Message<ChangeStreamDocument<Document>, SkmrActionLogsDocument> message) {
        LOGGER.info("LogListener receive a message");

        String token = message.getRaw().getResumeToken().get("_data").asString().getValue();

        SkmrActionLogsDocument document = message.getBody();
        String _id = document.get_id();

        LOGGER.info("document _id is {}, targetType {}", _id, document.getTargetType());

        // 1. 把 _id 和 token 存起來
        resumeTokenStack.push(new IdTokenModel(_id, token));

        // 2. 做一些事情
      

        // 3. 發(fā)送 mq 消息
        mqProducerService.send(xxx);

    }

    /**
     * 得到消費(fèi)的最后一個 token
     * @author YellowTail
     * @since 2020-06-08
     */
    public String getLastToken() {
        IdTokenModel pop = resumeTokenStack.pop();

        if (null == pop) {
            return null;
        }

        return pop.getToken();
    }

}

錯誤處理

簡單實(shí)現(xiàn)了一下

public class CommonErrorHandler implements ErrorHandler {

    private static final Logger LOGGER = LoggerFactory.getLogger(CommonErrorHandler.class);

    public void handleError(Throwable t) {
        LOGGER.error("occur error, ", t);
    }
}

優(yōu)雅停機(jī)和避免重復(fù)消費(fèi)

因?yàn)槲覀兪?code>k8s,容器重啟的時候,可能會有多個容器并存,我擔(dān)心多個容器同時去 watch,導(dǎo)致重復(fù)提醒,
所以加了優(yōu)雅停機(jī), 在重啟的時候,讓老容器不再去watch

邏輯就是

  1. 寫一個 controller
  2. controller 調(diào)用接口IChangeStreamStopstop 方法

這樣,k8s在對容器關(guān)停的時候,容器就能停止容器,避免重復(fù)消費(fèi);具體代碼實(shí)現(xiàn)就不貼了

定義一個接口

public interface IChangeStreamStop {

    void stop();
}

所以可以看到前面的代碼

ChangeStreamService implements IChangeStreamStop

k8s yaml 配置

containers:
- name: change-stream
  env:
    - name: aliyun_logs_xxx
      value: /ms/logs/*.log
  image: rxxx:latest
  imagePullPolicy: Always   
  ports:
    - containerPort: 8080
  #就緒檢查
  readinessProbe:
    failureThreshold: 10
    httpGet:
      path: /xxx
      port: 8080
      scheme: HTTP
    initialDelaySeconds: 20
    periodSeconds: 2
    successThreshold: 1
    timeoutSeconds: 1
  #健康檢查
  livenessProbe:
    failureThreshold: 10
    initialDelaySeconds: 20
    periodSeconds: 2
    successThreshold: 1
    tcpSocket:
      port: 8080
    timeoutSeconds: 1
  # 優(yōu)雅停機(jī)
  lifecycle:
    preStop:
        httpGet:
            path: /xxx/stop
            port: 8080
            scheme: HTTP
  #資源限制
  resources:
    limits:
      memory: 2Gi
    requests:
      memory: 1500Mi

啟動

那么 ChangeStreamService里的消息監(jiān)聽容器 什么時候啟動起來呢?

定義一個接口

/**
 * IOC 容器啟動之后會自動調(diào)用的接口
 * @author YellowTail
 * @since 2019-04-03
 */
public interface OnApplicationStarted {

    /**
     * <br>IOC 容器啟動之后會調(diào)用的方法
     *
     * @author YellowTail
     * @since 2019-04-03
     */
    void onStarted();
}

對應(yīng)代碼

ChangeStreamService implements IChangeStreamStop, OnApplicationStarted 

具體自行實(shí)現(xiàn)

參考

mongodb changeStreams

spring data change-streams

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容