RocketMQ源碼閱讀——debug環(huán)境構(gòu)建

rocketMQ是阿里開源的一款十分優(yōu)秀的消息隊(duì)列,rocketMQ具有很多其他消息隊(duì)列不具有的特性,像事務(wù)消息,推拉結(jié)合的Api等等。更重要的是rocketMQ是用java開發(fā)的,像RabbitMQ這種用erlang進(jìn)行開發(fā),對(duì)于我們搞java的來說學(xué)習(xí)成本不言而喻。從14年開始使用rocketMQ開始,就對(duì)rocketMQ有很大的興趣,對(duì)于消息隊(duì)列的實(shí)現(xiàn)機(jī)制也是十分的好奇,并且,rocketMQ更是加入了apache。這也進(jìn)一步的激發(fā)了我的學(xué)習(xí)興趣,和幾個(gè)小伙伴商量開始學(xué)習(xí)rocketMQ的源碼。
目前RocketMQ的代碼托管在github上:
老的地址:https://github.com/alibaba/RocketMQ
新的地址:https://github.com/apache/incubator-rocketmq
由于阿里向apache捐贈(zèng)了rocketMQ,所以下面的地址算是遷移過去的。
但是兩者之間是有差異的,Apache上的代碼注釋幾乎全被刪掉了,并且有一些啟動(dòng)腳本也被刪掉了,兩者的代碼目前差別不大,我選擇的是后者,同時(shí)和前者對(duì)照著學(xué)習(xí)。

無論學(xué)什么,尤其是這種比較大的開源項(xiàng)目,那么首先就是通過一個(gè)使用場景進(jìn)行切入。首先我們需要在本地搭建一個(gè)可以debug的開發(fā)環(huán)境,通過producer生產(chǎn)一個(gè)消息和consumer消費(fèi)一個(gè)消息這樣的流程來學(xué)習(xí)rocketMQ,首先將rocketMQ fork一下,方便自己來注釋。

目前整體的代碼結(jié)構(gòu)是這樣的:

Paste_Image.png

可以看到rocketMQ的分包還是十分的清晰的,主要幾塊是broker、namesrv、client和store。remoting是一個(gè)通信層,只關(guān)注實(shí)現(xiàn)的話,可以暫且放一放remoting。

首先我們需要在我的idea下跑通namesrv和broker,同時(shí)可以利用producer和consumer進(jìn)行收發(fā)消息。對(duì)于Apache版本的項(xiàng)目編譯打包可能有些困難,我們可以參考老的版本的install.sh進(jìn)行打包:

Paste_Image.png

打包歸打包,但是我們暫時(shí)可能用不上,我們的目的是在idea上能將namesrv和broker跑起來,我們找到namesrv的啟動(dòng)腳本mqnamesrv,這里主要調(diào)用了runserver腳本,namesrv的啟動(dòng)入口是com.alibaba.rocketmq.namesrv.Namesrv
Startup類。runserver.sh里主要做了一些jdk參數(shù)的配置:

error_exit ()
{
    echo "ERROR: $1 !!"
    exit 1
}

[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/opt/taobao/java
[ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!"

export JAVA_HOME
export JAVA="$JAVA_HOME/bin/java"
export BASE_DIR=$(dirname $0)/..
export CLASSPATH=.:${BASE_DIR}/conf:${CLASSPATH}

#===========================================================================================
# JVM Configuration
#===========================================================================================
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:PermSize=128m -XX:MaxPermSize=320m"
JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:+DisableExplicitGC"
JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${HOME}/rmq_srv_gc.log -XX:+PrintGCDetails"
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib"
#JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"

$JAVA ${JAVA_OPT} $@

這里主要設(shè)置了堆內(nèi)存的大小,垃圾回收方式和gc日志等。我們不使用腳本啟動(dòng),因?yàn)槲覀円{(diào)試,那么直接切到NamesrvStartup類去啟動(dòng)。
如果我們直接運(yùn)行的話會(huì)報(bào)一個(gè)rocketMQ home not set 的錯(cuò)誤,那么我們這里需要稍微改一下代碼:

            final NamesrvConfig namesrvConfig = new NamesrvConfig();
            namesrvConfig.setRocketmqHome("/home/zhiming/git/incubator-rocketmq");
            final NettyServerConfig nettyServerConfig = new NettyServerConfig();
            nettyServerConfig.setListenPort(9876);

這里主要做的是手動(dòng)的將namesrv的端口號(hào)和rocketMQhome進(jìn)行設(shè)置,這樣就能保證rocketMQ的namesrv能正常啟動(dòng)了。
接下來啟動(dòng)broker,同樣的方式我們找到broker的啟動(dòng)類:com.alibaba.rocketmq.broker.BrokerStartup。同樣我們也需要做一些改動(dòng):

            final BrokerConfig brokerConfig = new BrokerConfig();
            brokerConfig.setRocketmqHome("/home/zhiming/git/incubator-rocketmq");
            brokerConfig.setNamesrvAddr("localhost:9876");

主要是設(shè)置了rocketMQhome和namesrv的地址。這樣broker就可以連接上namesrv。
接下來我們就要寫producer和namesrv了,其實(shí)生產(chǎn)者和消費(fèi)者的例子是自帶的,在example包中的com.alibaba.rocketmq.example.quickstart下,有一個(gè)Consumer和Producer:
Producer:

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.alibaba.rocketmq.example.quickstart;

import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter;
import com.alibaba.rocketmq.client.producer.LocalTransactionState;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.remoting.common.RemotingHelper;

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.start();
        producer.setNamesrvAddr("localhost:9876");
        for (int i = 0; i < 1000; i++) {
            try {
                Message msg = new Message("TopicTest",
                        "TagA",
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
                );
                SendResult sendResult = producer.send(msg);
                LocalTransactionExecuter tranExecuter = new LocalTransactionExecuter() {
                    @Override
                    public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
                        return null;
                    }
                };
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        producer.shutdown();
    }
}

這里主要是循環(huán)想TopicTest發(fā)送1000次消息。

Consumer:

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.alibaba.rocketmq.example.quickstart;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.subscribe("TopicTest", "*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

這里主要是以監(jiān)聽的方式進(jìn)行消費(fèi)。

那么先啟動(dòng)namesrv再啟動(dòng)broker,然后開始啟動(dòng)producer最后啟動(dòng)consumer。我們可以看到消息可以正常的接收和發(fā)送了。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 姓名:周小蓬 16019110037 轉(zhuǎn)載自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw閱讀 34,888評(píng)論 13 425
  • 分布式開放消息系統(tǒng)(RocketMQ)的原理與實(shí)踐 來源:http://www.itdecent.cn/p/453...
    meng_philip123閱讀 13,217評(píng)論 6 104
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,506評(píng)論 19 139
  • 本文轉(zhuǎn)載自http://dataunion.org/?p=9307 背景介紹Kafka簡介Kafka是一種分布式的...
    Bottle丶Fish閱讀 5,583評(píng)論 0 34
  • 今天早晨代老師讓我6點(diǎn)40到校,指導(dǎo)我從那大群里邊兒寫日記,代老師還讓我看了她錄的一段機(jī)器人跳舞的視頻,我從來沒看...
    朱秉政閱讀 162評(píng)論 0 0

友情鏈接更多精彩內(nèi)容