RocketMQ 生產(chǎn)者 Producer 啟動(dòng)過(guò)程

MQProducer

從類(lèi)關(guān)系中可以看出,MQProducer 有兩種實(shí)現(xiàn)方式。一個(gè)是 DefaultMQProducer,另一個(gè)是 TransactionMQProducer。

  • DefaultMQProducer: 我們常用的生產(chǎn)者。
  • TransactionMQProducer:繼承自 DefaultMQProducer,并支持事務(wù)消息。

下面我們來(lái)分析下 DefaultMQProducer 啟動(dòng)的過(guò)程。

啟動(dòng)示例

public class Producer {
    public static void main(String[] args) throws UnsupportedEncodingException {
        try {
            DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
            producer.setNamesrvAddr("....");
            ......
            producer.start();
            ......
        }catch(Exception e){}
    }
}

創(chuàng)建 DefaultMQProducer 實(shí)例,然后制定一些參數(shù),調(diào)用 start() 方法就開(kāi)啟了生產(chǎn)者。

DefaultMQProducer 參數(shù)分析

public class DefaultMQProducer extends ClientConfig implements MQProducer {

    //producer 組名
    private String producerGroup;

    // Topic 名字,默認(rèn)為“TBW102”
    private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;

    // 創(chuàng)建 Topic 默認(rèn)的4個(gè)隊(duì)列
    private volatile int defaultTopicQueueNums = 4;

    // 發(fā)送消息超時(shí)時(shí)間
    private int sendMsgTimeout = 3000;

    // 當(dāng)發(fā)送的消息大于 4K 時(shí),開(kāi)始?jí)嚎s消息。
    private int compressMsgBodyOverHowmuch = 1024 * 4;

    //同步發(fā)送消息,發(fā)送失敗時(shí)再?lài)L試發(fā)送2次數(shù)
    private int retryTimesWhenSendFailed = 2;

    // 異步發(fā)送消息,發(fā)送失敗時(shí)再?lài)L試發(fā)送2次數(shù)
    private int retryTimesWhenSendAsyncFailed = 2;

    //發(fā)送broker消息存儲(chǔ)失敗時(shí),是否嘗試去試發(fā)送其他的broker
    private boolean retryAnotherBrokerWhenNotStoreOK = false;

    //最大允許發(fā)送字節(jié)數(shù)
    private int maxMessageSize = 1024 * 1024 * 4; // 4M

DefaultMQProducer 中定義的類(lèi)屬性

  • producerGroup: 生產(chǎn)者組名
  • createTopicKey :Topic 名字,默認(rèn)為“TBW102”
  • defaultTopicQueueNums :創(chuàng)建 Topic 默認(rèn)的4個(gè)隊(duì)列
  • sendMsgTimeout :默認(rèn)發(fā)送消息3秒超時(shí)
  • compressMsgBodyOverHowmuch :當(dāng)發(fā)送的消息大于 4K 時(shí),開(kāi)始?jí)嚎s消息。
  • retryTimesWhenSendFailed :同步發(fā)送消息,發(fā)送失敗時(shí)再?lài)L試發(fā)送2次數(shù)。
  • retryTimesWhenSendAsyncFailed :異步發(fā)送消息,發(fā)送失敗時(shí)再?lài)L試發(fā)送2次數(shù)
  • retryAnotherBrokerWhenNotStoreOK :發(fā)送broker消息存儲(chǔ)失敗時(shí),是否嘗試去試發(fā)送其他的broker

DefaultMQProducer 還有可以設(shè)置其他的參數(shù),這里就不說(shuō)明了。

Producer 啟動(dòng)

public void start() throws MQClientException {
    this.defaultMQProducerImpl.start();
}

public void start() throws MQClientException {
    this.start(true);
}

public void start(final boolean startFactory) throws MQClientException {
    switch (this.serviceState) {
        // 1. 只有 serviceState 狀態(tài)為 CREATE_JUST 時(shí),才啟動(dòng) Producer
        case CREATE_JUST:
            //2. 防止啟動(dòng)多個(gè) Producer,先把 serviceState 狀態(tài)修改為 START_FAILED。
            this.serviceState = ServiceState.START_FAILED;
            // 3. 檢查 groupName 是否合法
            this.checkConfig();

            //4. 判斷是否需要設(shè)置 InstanceName 。
            if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                this.defaultMQProducer.changeInstanceNameToPID();
            }
            // 5. 構(gòu)建 MQClientInstance 對(duì)象。
            this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
            // 6. 將 DefaultMQProducerImpl 對(duì)象注冊(cè)到 ConcurrentHashMap<String/* group */, MQProducerInner> producerTable 中
            boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
            if (!registerOK) {
                this.serviceState = ServiceState.CREATE_JUST;
                throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                    + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                    null);
            }
            // 7.以主題名"TBW102"為key值,新初始化的TopicPublishInfo對(duì)象為value值存入DefaultMQProducerImpl.topicPublishInfoTable變量中
            this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
            // 8. 啟動(dòng) 第五步創(chuàng)建的 MQClientInstance 實(shí)例。
            if (startFactory) {
                mQClientFactory.start();
            }

            log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                this.defaultMQProducer.isSendMessageWithVIPChannel());
            // 9. 設(shè)置DefaultMQProducerImpl的ServiceState為RUNNING
            this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The producer service state not OK, maybe started once, "
                + this.serviceState
                + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                null);
        default:
            break;
    }
    // 10.  向所有的 broker 發(fā)送心跳和上傳 FilterClass
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
  1. 啟動(dòng)Producer的時(shí)候判斷 serviceState 的當(dāng)前狀態(tài),只有 serviceState 狀態(tài)為 CREATE_JUST 時(shí),才啟動(dòng) Producer。否則拋出異常信息。

2、同時(shí)防止啟動(dòng)多個(gè) Producer,先把 serviceState 狀態(tài)修改為 START_FAILED。

3、 檢查 groupName 是否合法。比如不能為空,是否符合正則 ^[%|a-zA-Z0-9_-]+$,并且最大長(zhǎng)度不能超過(guò) 255(CHARACTER_MAX_LENGTH = 255);
groupName 也不能等于 DEFAULT_PRODUCER。只要滿(mǎn)足上面條件,則拋異常信息。

4、如果 producerGroup 不等于 CLIENT_INNER_PRODUCER_GROUP = "CLIENT_INNER_PRODUCER" ,然后調(diào)用 changeInstanceNameToPID() 方法判斷名字不是 "DEFAULT" 則更改 instanceName。

public void changeInstanceNameToPID() {
    if (this.instanceName.equals("DEFAULT")) {
        this.instanceName = String.valueOf(UtilAll.getPid());
    }
}
public static int getPid() {
    RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
    String name = runtime.getName(); // format: "pid@hostname"
    try {
        return Integer.parseInt(name.substring(0, name.indexOf('@')));
    .....
}

5、構(gòu)建 MQClientInstance 對(duì)象。

public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
        String clientId = clientConfig.buildMQClientId();
        MQClientInstance instance = this.factoryTable.get(clientId);
        if (null == instance) {
            instance =
                new MQClientInstance(clientConfig.cloneClientConfig(),
                    this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
            MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
            if (prev != null) {
                instance = prev;
                log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
            } else {
                log.info("Created new MQClientInstance for clientId:[{}]", clientId);
            }
        }

        return instance;
    }
  • 5.1 首先生成 clientId:ip@instanceName 或 ip@instanceName@unitName
  • 5.2 如果 factoryTable 中是不已經(jīng)存在 MQClientInstance 實(shí)例,則創(chuàng)建。 (下面有單獨(dú)分析該源碼)

6、將 DefaultMQProducerImpl 對(duì)象注冊(cè)到 ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();

private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();

public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
    if (null == group || null == producer) {
        return false;
    }
    MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
    if (prev != null) {
        log.warn("the producer group[{}] exist already.", group);
        return false;
    }
    return true;
}

7、以主題名"TBW102"為key值,新初始化的TopicPublishInfo對(duì)象為value值存入DefaultMQProducerImpl.topicPublishInfoTable變量中

8、調(diào)用 第五步創(chuàng)建的 MQClientInstance 實(shí)例 的start() 方法。
該方法做了很多事情:

  • 獲取NameServer地址
  • 啟動(dòng) Netty 客戶(hù)端服務(wù)
  • 設(shè)置多個(gè)定時(shí)任務(wù)
  • 開(kāi)啟 pullMessageService 服務(wù)
  • 開(kāi)啟 rebalanceService 服務(wù)
  • 開(kāi)啟 發(fā)送消息服務(wù)

下面有具體代碼分析MQClientInstance.start() 方法。

9、設(shè)置DefaultMQProducerImpl的ServiceState為RUNNING
10、向所有的 broker 發(fā)送心跳和上傳 FilterClass

創(chuàng)建MQClientInstance實(shí)例(第5.2步)

上面 5.2 步驟中創(chuàng)建MQClientInstance 的代碼如下:

public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
    this.clientConfig = clientConfig;
    this.instanceIndex = instanceIndex;
    this.nettyClientConfig = new NettyClientConfig();
    this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
    this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
    this.clientRemotingProcessor = new ClientRemotingProcessor(this);
    // Netty 中注冊(cè)接收請(qǐng)求的處理器。
    this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
    //設(shè)置 NameServer 地址。
    if (this.clientConfig.getNamesrvAddr() != null) {
        this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
        log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());
    }
    // 客戶(hù)端ID
    this.clientId = clientId;
    //創(chuàng)建 MQAdminImpl 對(duì)象進(jìn)行和 NameServer 進(jìn)行交互,比如創(chuàng)建Topic、獲取 Queue等
    this.mQAdminImpl = new MQAdminImpl(this);
    // 創(chuàng)建 pullMessageService 服務(wù)
    this.pullMessageService = new PullMessageService(this);
    // 創(chuàng)建 rebalanceService  服務(wù)
    this.rebalanceService = new RebalanceService(this);
    // 創(chuàng)建 DefaultMQProducer 服務(wù)
    this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
    this.defaultMQProducer.resetClientConfig(clientConfig);
    // 開(kāi)啟 Comsumer 統(tǒng)計(jì)服務(wù)
    this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);

    log.info("Created a new client Instance, InstanceIndex:{}, ClientID:{}, ClientConfig:{}, ClientVersion:{}, SerializerType:{}",
        this.instanceIndex,
        this.clientId,
        this.clientConfig,
        MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());
}

主要功能:

  • 創(chuàng)建 MQAdminImpl 對(duì)象進(jìn)行和 NameServer 進(jìn)行交互,比如創(chuàng)建Topic、獲取 Queue等
  • 創(chuàng)建 pullMessageService 服務(wù)
  • 創(chuàng)建 rebalanceService 服務(wù),供 Consumer 端使用
  • 創(chuàng)建 DefaultMQProducer 服務(wù),
  • 開(kāi)啟 Comsumer 統(tǒng)計(jì)服務(wù)。統(tǒng)計(jì)最近一段時(shí)間內(nèi),消費(fèi)成功個(gè)數(shù)、消費(fèi)失敗個(gè)數(shù)等信息。

啟動(dòng)MQClientInstance 服務(wù) (第8步)

public void start() throws MQClientException {

    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // 1. 如果配置NameServer地址,則從默認(rèn)服務(wù)器地址中獲?。ㄔ摰刂凡豢筛淖儯?                if (null == this.clientConfig.getNamesrvAddr()) {
                    this.mQClientAPIImpl.fetchNameServerAddr();
                }
                // 2. 啟動(dòng) Netty 客戶(hù)端服務(wù)
                this.mQClientAPIImpl.start();
                // 3. 設(shè)置多個(gè)定時(shí)任務(wù)
                this.startScheduledTask();
                // 4. 開(kāi)啟 pullMessageService 服務(wù)
                this.pullMessageService.start();
                // 5. 開(kāi)啟 rebalanceService 服務(wù)
                this.rebalanceService.start();
                // 6. 開(kāi)啟 發(fā)送消息服務(wù)
                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                log.info("the client factory [{}] start OK", this.clientId);
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
                break;
            case SHUTDOWN_ALREADY:
                break;
            case START_FAILED:
                throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
            default:
                break;
        }
    }
}

主要這幾步驟操作。

  • 1、獲取NameServer地址
    如果啟動(dòng) Producer 時(shí)沒(méi)有指定 NameServer,則程序會(huì)向一個(gè)Http地址發(fā)送請(qǐng)求來(lái)獲取NameServer地址。通過(guò)這種方式可以動(dòng)態(tài)的配置 NameServer。從而達(dá)到動(dòng)態(tài)增加和刪除NameServer服務(wù)。
public static String getWSAddr() {
    String wsDomainName = System.getProperty("rocketmq.namesrv.domain", DEFAULT_NAMESRV_ADDR_LOOKUP);
    String wsDomainSubgroup = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr");
    String wsAddr = "http://" + wsDomainName + ":8080/rocketmq/" + wsDomainSubgroup;
    if (wsDomainName.indexOf(":") > 0) {
        wsAddr = "http://" + wsDomainName + "/rocketmq/" + wsDomainSubgroup;
    }
    return wsAddr;
}
  • 2、啟動(dòng) Netty 客戶(hù)端服務(wù)
  • 3、調(diào)用startScheduledTask() 方法設(shè)置多個(gè)定時(shí)任務(wù)
  • 4、開(kāi)啟 pullMessageService 服務(wù)
  • 5、開(kāi)啟 rebalanceService 服務(wù)
  • 6、開(kāi)啟 發(fā)送消息服務(wù)

startScheduledTask() 方法:定時(shí)任務(wù)

private void startScheduledTask() {
    // 1.如果 NameServer 地址默認(rèn)沒(méi)配置,則定時(shí)向一個(gè)Http地址獲取 
    if (null == this.clientConfig.getNamesrvAddr()) {
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
                } catch (Exception e) {
                    log.error("ScheduledTask fetchNameServerAddr exception", e);
                }
            }
        }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
    }

    // 2. 定時(shí)的從 NameServer 中獲取 Topic、broker、queue 相關(guān)信息
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                MQClientInstance.this.updateTopicRouteInfoFromNameServer();
            } catch (Exception e) {
                log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
            }
        }
    }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);

    // 3. 定時(shí)清理無(wú)效的Broker,并向所有的Broker 發(fā)送心跳數(shù)據(jù)
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                MQClientInstance.this.cleanOfflineBroker();
                MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
            } catch (Exception e) {
                log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
            }
        }
    }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);

    // 4. 定時(shí)的持久化 Consumer 端消費(fèi)每個(gè) queue的 offset 數(shù)據(jù)。
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                MQClientInstance.this.persistAllConsumerOffset();
            } catch (Exception e) {
                log.error("ScheduledTask persistAllConsumerOffset exception", e);
            }
        }
    }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

    // 5. 調(diào)整消費(fèi)端的線(xiàn)程數(shù)
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            try {
                MQClientInstance.this.adjustThreadPool();
            } catch (Exception e) {
                log.error("ScheduledTask adjustThreadPool exception", e);
            }
        }
    }, 1, 1, TimeUnit.MINUTES);
}
  • 1、定時(shí)更新 NameServer 地址
    每個(gè)2分鐘,程序會(huì)向一個(gè)Http地址發(fā)送請(qǐng)求來(lái)獲取NameServer地址來(lái)動(dòng)態(tài)更新NameServer地址。

  • 2、 定時(shí)的從 NameServer 中獲取 Topic、broker、queue 相關(guān)信息
    默認(rèn)每隔 30秒去 NameServer 中獲取Topic、broker、queue等相關(guān)信息。
    如果有新broker注冊(cè)或下線(xiàn),producer端會(huì)在30秒之內(nèi)感知。

  • 3、定時(shí)清理無(wú)效的Broker,并向所有的Broker 發(fā)送心跳數(shù)據(jù).
    默認(rèn)每隔 30 秒向 Broker 發(fā)送心跳數(shù)據(jù) 和 用戶(hù)自定義的 filterclass 類(lèi)。

  • 4、定時(shí)的持久化 Consumer 端消費(fèi)每個(gè) queue的 offset 數(shù)據(jù)。
    默認(rèn)每隔 5 秒持久或 Consumer 消費(fèi)的 queue 的 offset信息。
    持久化分為,遠(yuǎn)程持久化和本地持久化。
    MessageModel.CLUSTERING 模式 queue的offset 保存到 broker上。
    BROADCASTING("BROADCASTING") 模式 queue 的 offset 保存在本地。

  • 5、調(diào)整消費(fèi)端的線(xiàn)程數(shù)
    每隔 1 分鐘計(jì)算每一個(gè)queue中消息擠壓的數(shù)量,如果超過(guò)100000條,則增加消費(fèi)線(xiàn)程的并發(fā)數(shù),如果小于80000條則減少消費(fèi)者的線(xiàn)程數(shù)。
    不過(guò)進(jìn)入源碼中看,調(diào)整消費(fèi)者的線(xiàn)程數(shù)都注釋掉了。


最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容