RocketMQ源碼閱讀(十)-Consumer消費消息

1.消費方式和消費者組

1.消費方式: 拉取和推送兩種(事實上所有從遠程獲取數(shù)據(jù)都是這兩種方式).
2.消費者組與消費模式
多個消費者組成一個消費組, 兩種模式: 集群(消息被其中任何一個消息者消費), 廣播模式(全部消費者消費).

2.Consumer消費消息的基本流程

RocketMQ 分別使用 DefaultMQPullConsumer 和 DefaultMQPushConsumer 實現(xiàn)了拉取和推送兩種方式. 下面主要以DefaultMQPullConsumer為例進行分析.

先看源碼中給出的Demo:

public class PullConsumerTest {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5"); //@1
        consumer.start(); //@2

        try {
            MessageQueue mq = new MessageQueue();
            mq.setQueueId(0);
            mq.setTopic("TopicTest3");
            mq.setBrokerName("vivedeMacBook-Pro.local");

            long offset = 26;

            long beginTime = System.currentTimeMillis();
            PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, offset, 32); //@3
            System.out.printf("%s%n", System.currentTimeMillis() - beginTime);
            System.out.printf("%s%n", pullResult);
        } catch (Exception e) {
            e.printStackTrace();
        }

        consumer.shutdown();
    }
}

首先在@1處構建Consumer并且制定其所屬的消費者組. 在@2處啟動Consumer, 并且在@3處拉取消息.

Consumer啟動

事實上DefaultMQPullConsumer將所有操作都委托給DefaultMQPullConsumerImpl, 下面看DefaultMQPullConsumerImpl#start.

public void start() throws MQClientException {
    switch (this.serviceState) {
        case CREATE_JUST:
            this.serviceState = ServiceState.START_FAILED;

            this.checkConfig();  //@1 

            this.copySubscription(); //@2

            if (this.defaultMQPullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                this.defaultMQPullConsumer.changeInstanceNameToPID(); //@3
            }

            this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPullConsumer, this.rpcHook); //@4

            //@5            
            this.rebalanceImpl.setConsumerGroup(this.defaultMQPullConsumer.getConsumerGroup());
            this.rebalanceImpl.setMessageModel(this.defaultMQPullConsumer.getMessageModel());
            this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPullConsumer.getAllocateMessageQueueStrategy());
            this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

            this.pullAPIWrapper = new PullAPIWrapper(//
                mQClientFactory, //
                this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode());
            
            //@6            
            this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
            //@7
            if (this.defaultMQPullConsumer.getOffsetStore() != null) {
                this.offsetStore = this.defaultMQPullConsumer.getOffsetStore();
            } else {
                switch (this.defaultMQPullConsumer.getMessageModel()) {
                    case BROADCASTING:
                        this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());
                        break;
                    case CLUSTERING:
                        this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());
                        break;
                    default:
                        break;
                }
            }

            this.offsetStore.load();

            //@8
            boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPullConsumer.getConsumerGroup(), this);
            if (!registerOK) {
                this.serviceState = ServiceState.CREATE_JUST;

                throw new MQClientException("The consumer group[" + this.defaultMQPullConsumer.getConsumerGroup()
                    + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                    null);
            }

            //@9
            mQClientFactory.start();
            log.info("the consumer [{}] start OK", this.defaultMQPullConsumer.getConsumerGroup());
            //@10
            this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The PullConsumer service state not OK, maybe started once, "http://
                + this.serviceState//
                + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                null);
        default:
            break;
    }
}

首先判斷當前consumer的狀態(tài), 除了CREATE_JUST之外, 全部是非法狀態(tài).(這個容易理解, 因為時剛剛啟動, 不應該處于其他狀態(tài)).
狀態(tài)合法后, 大體過程如下:

  • @1校驗各配置項是否合法(consumerGroup, allocateMessageQueueStrategy, messageModel)
  • @2將當前defaultMQPullConsumer中的訂閱關系復制到當前rebalanceImpl(負載均衡器, 主要負責決定, 當前的consumer應該從哪些Queue中消費消息)中.
  • @3如果是集群模式,則將當前defaultMQPullConsumer實例名改為線程ID.
  • @4實例化MQClientInstance(這個類是一個大雜燴,負責管理client(consumer, producer), 并提供多中功能接口供各個Service(Rebalance, PullMessage等)調(diào)用)
  • @5初始化rebalance變量
  • @6初始化pullAPIWrapper(長連接, 負責從broker處拉取消息, 然后利用ConsumeMessageService回調(diào)用戶的Listener執(zhí)行消息消費邏輯)
  • @7構建offsetStore消費進度存儲對象(有兩種實現(xiàn), Local和Rmote, Local存儲在本地磁盤上, 適用于BROADCASTING廣播消費模式; 而Remote則將消費進度存儲在Broker上, 適用于CLUSTERING集群消費模式).
  • @8向mqClientFactory注冊本消費者
  • @9啟動mqClientFactory(啟動各種定時任務, 如定時獲取nameserver地址, 定時清理下線的borker, 啟動各種service, 如拉消息服務, 負載均衡服務)
  • @10將serviceState修改為ServiceState.RUNNING

Consumer獲取消息

Consumer獲取消息使用pullBlockIfNotFound方法, 方法簽名如下:

PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums) 
      throws MQClientException, RemotingException, MQBrokerException, InterruptedException

該方法一共有4個參數(shù).

  • mq, 從哪個隊列中拉取消息;
  • subExpression, SubscriptionData中的subString;
  • offset, 消息拉取的offset;
  • maxNums, 最大拉取的消息數(shù)目.
    跟蹤該方法,最終會委托給DefaultMQPullConsumerImpl中的pullSyncImpl方法執(zhí)行, 代碼如下:
private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block, long timeout)
    throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    this.makeSureStateOK();     //@1
    //@2
    if (null == mq) {
        throw new MQClientException("mq is null", null);

    }

    if (offset < 0) {
        throw new MQClientException("offset < 0", null);
    }

    if (maxNums <= 0) {
        throw new MQClientException("maxNums <= 0", null);
    }

    this.subscriptionAutomatically(mq.getTopic()); //@3

    int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false); //@4
    //@5
    SubscriptionData subscriptionData;
    try {
        subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), //
            mq.getTopic(), subExpression);
    } catch (Exception e) {
        throw new MQClientException("parse subscription error", e);
    }

    long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;   //@6

    PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(//
        mq, 
        subscriptionData.getSubString(), 
        0L, 
        offset, 
        maxNums, 
        sysFlag, 
        0, 
        this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), 
        timeoutMillis, 
        CommunicationMode.SYNC, 
        null
    );   //@7
    this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);  //@8
    //@9
    if (!this.consumeMessageHookList.isEmpty()) {
        ConsumeMessageContext consumeMessageContext = null;
        consumeMessageContext = new ConsumeMessageContext();
        consumeMessageContext.setConsumerGroup(this.groupName());
        consumeMessageContext.setMq(mq);
        consumeMessageContext.setMsgList(pullResult.getMsgFoundList());
        consumeMessageContext.setSuccess(false);
        this.executeHookBefore(consumeMessageContext);
        consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
        consumeMessageContext.setSuccess(true);
        this.executeHookAfter(consumeMessageContext);
    }
    return pullResult;
}

過程概括如下:

  • @1檢驗當前consumer客戶端是否處于RUNNING狀態(tài), 否則非法;
  • @2檢查mq, offset, maxNums三個參數(shù)是否合法;
  • @3構建rebalanceImpl中的SubscriptionData;
  • @4構建sysFlag;
  • @5構建當前consumer的SubscriptionData(這一步和@3有點重復);
  • @6從broker拉取消息時的超時時間;
  • @7從broker拉取消息;
  • @8對pullresult進行處理, 這一步主要進行兩個操作, a.更新消息隊列拉取消息Broker編號的映射, b.解析消息,并根據(jù)訂閱信息消息tagCode匹配合適消息.
  • @9如果HookList不為空, 執(zhí)行HookList中的操作.
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

  • 姓名:周小蓬 16019110037 轉(zhuǎn)載自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw閱讀 34,900評論 13 425
  • Spring Cloud為開發(fā)人員提供了快速構建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,659評論 19 139
  • 背景介紹 Kafka簡介 Kafka是一種分布式的,基于發(fā)布/訂閱的消息系統(tǒng)。主要設計目標如下: 以時間復雜度為O...
    高廣超閱讀 13,057評論 8 167
  • consumer 1.啟動 有別于其他消息中間件由broker做負載均衡并主動向consumer投遞消息,Rock...
    veShi文閱讀 5,081評論 0 2
  • 馬驍在哄笑中極力鎮(zhèn)定著自己的情緒,想盡快入睡。 可是事與愿違。 總是這樣,在人們極力追求和夢想的事情上,老天總是那...
    行走的M閱讀 416評論 0 0

友情鏈接更多精彩內(nèi)容