rocketmq之同一個(gè)進(jìn)程中能不能啟動(dòng)多個(gè)producer或者consumer

以consumer為例來記錄下這個(gè)問題。
1、在consumer的啟動(dòng)過程中,有兩個(gè)擋板,第一個(gè)是DefaultMQPushConsumerImpl,第二個(gè)是MQClientInstance,就是說,假如兩個(gè)consumer使用的是同一個(gè)DefaultMQPushConsumerImpl對象,那么是不能啟動(dòng)成功的;第二個(gè)亦然;擋板有先后順序,第一個(gè)擋住了,就到不了第二個(gè)了。下面記下第一個(gè)擋板的實(shí)現(xiàn):

//只有當(dāng)DefaultMQPushConsumerImpl對象的serviceState屬性的值為CREATE_JUST,才會(huì)接著做啟動(dòng)的工作:開啟一些線程,初始化一些數(shù)據(jù);
public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                .......
                this.serviceState = ServiceState.RUNNING;
                .......
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }
    }

DefaultMQPushConsumerImpl對象是在DefaultMQPushConsumer對象初始化的時(shí)候new出來的,所以想要不一樣,使用不同的DefaultMQPushConsumer對象即可;
2、第二個(gè)擋板的實(shí)現(xiàn)方法和第一個(gè)是一樣的,下面重點(diǎn)記下兩個(gè)consumer如何才能拿到不同的MQClientInstance對象;MQClientInstance對象是在過了第一個(gè)擋板后初始化的,記下初始化代碼:

his.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

可以看到是先拿到一個(gè)單例的MQClientManager對象,然后再獲取MQClientInstance對象,接著看下代碼:

public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
        //構(gòu)建clientId
        String clientId = clientConfig.buildMQClientId();
        //第一次進(jìn)來拿到的是null   假如我們在同一個(gè)進(jìn)程中啟動(dòng)多個(gè)生產(chǎn)者,假如clientId一樣,那么使用的MQClientInstance就是一樣的
        MQClientInstance instance = this.factoryTable.get(clientId);
        if (null == instance) {
        
            instance =
                new MQClientInstance(clientConfig.cloneClientConfig(),
                    this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
            //因?yàn)樗芯€程拿到的都是同一個(gè)MQClientManager對象,所以MQClientManager對象中的數(shù)據(jù)是所有線程共享的
            MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
            if (prev != null) {
                instance = prev;
                log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
            } else {
                log.info("Created new MQClientInstance for clientId:[{}]", clientId);
            }
        }

        return instance;
    }

可以看到,能不能拿到不同的MQClientInstance對象,關(guān)鍵在于clientId的構(gòu)建,看下代碼:

public String buildMQClientId() {
        StringBuilder sb = new StringBuilder();
        sb.append(this.getClientIP());

        sb.append("@");
        sb.append(this.getInstanceName());
        if (!UtilAll.isBlank(this.unitName)) {
            sb.append("@");
            sb.append(this.unitName);
        }

        return sb.toString();
    }

可以看到,假如我們在構(gòu)建DefaultMQPushConsumer對象的時(shí)候,指定不同的unitName屬性,就可以構(gòu)建出不同的clientId,從而拿到不同的MQClientInstance,最終完成在同一個(gè)進(jìn)程中啟動(dòng)兩個(gè)consumer。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容