MQProducer

從類(lèi)關(guān)系中可以看出,MQProducer 有兩種實(shí)現(xiàn)方式。一個(gè)是 DefaultMQProducer,另一個(gè)是 TransactionMQProducer。
- DefaultMQProducer: 我們常用的生產(chǎn)者。
- TransactionMQProducer:繼承自 DefaultMQProducer,并支持事務(wù)消息。
下面我們來(lái)分析下 DefaultMQProducer 啟動(dòng)的過(guò)程。
啟動(dòng)示例
public class Producer {
public static void main(String[] args) throws UnsupportedEncodingException {
try {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("....");
......
producer.start();
......
}catch(Exception e){}
}
}
創(chuàng)建 DefaultMQProducer 實(shí)例,然后制定一些參數(shù),調(diào)用 start() 方法就開(kāi)啟了生產(chǎn)者。
DefaultMQProducer 參數(shù)分析
public class DefaultMQProducer extends ClientConfig implements MQProducer {
//producer 組名
private String producerGroup;
// Topic 名字,默認(rèn)為“TBW102”
private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
// 創(chuàng)建 Topic 默認(rèn)的4個(gè)隊(duì)列
private volatile int defaultTopicQueueNums = 4;
// 發(fā)送消息超時(shí)時(shí)間
private int sendMsgTimeout = 3000;
// 當(dāng)發(fā)送的消息大于 4K 時(shí),開(kāi)始?jí)嚎s消息。
private int compressMsgBodyOverHowmuch = 1024 * 4;
//同步發(fā)送消息,發(fā)送失敗時(shí)再?lài)L試發(fā)送2次數(shù)
private int retryTimesWhenSendFailed = 2;
// 異步發(fā)送消息,發(fā)送失敗時(shí)再?lài)L試發(fā)送2次數(shù)
private int retryTimesWhenSendAsyncFailed = 2;
//發(fā)送broker消息存儲(chǔ)失敗時(shí),是否嘗試去試發(fā)送其他的broker
private boolean retryAnotherBrokerWhenNotStoreOK = false;
//最大允許發(fā)送字節(jié)數(shù)
private int maxMessageSize = 1024 * 1024 * 4; // 4M
DefaultMQProducer 中定義的類(lèi)屬性
- producerGroup: 生產(chǎn)者組名
- createTopicKey :Topic 名字,默認(rèn)為“TBW102”
- defaultTopicQueueNums :創(chuàng)建 Topic 默認(rèn)的4個(gè)隊(duì)列
- sendMsgTimeout :默認(rèn)發(fā)送消息3秒超時(shí)
- compressMsgBodyOverHowmuch :當(dāng)發(fā)送的消息大于 4K 時(shí),開(kāi)始?jí)嚎s消息。
- retryTimesWhenSendFailed :同步發(fā)送消息,發(fā)送失敗時(shí)再?lài)L試發(fā)送2次數(shù)。
- retryTimesWhenSendAsyncFailed :異步發(fā)送消息,發(fā)送失敗時(shí)再?lài)L試發(fā)送2次數(shù)
- retryAnotherBrokerWhenNotStoreOK :發(fā)送broker消息存儲(chǔ)失敗時(shí),是否嘗試去試發(fā)送其他的broker
DefaultMQProducer 還有可以設(shè)置其他的參數(shù),這里就不說(shuō)明了。
Producer 啟動(dòng)
public void start() throws MQClientException {
this.defaultMQProducerImpl.start();
}
public void start() throws MQClientException {
this.start(true);
}
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
// 1. 只有 serviceState 狀態(tài)為 CREATE_JUST 時(shí),才啟動(dòng) Producer
case CREATE_JUST:
//2. 防止啟動(dòng)多個(gè) Producer,先把 serviceState 狀態(tài)修改為 START_FAILED。
this.serviceState = ServiceState.START_FAILED;
// 3. 檢查 groupName 是否合法
this.checkConfig();
//4. 判斷是否需要設(shè)置 InstanceName 。
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
// 5. 構(gòu)建 MQClientInstance 對(duì)象。
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
// 6. 將 DefaultMQProducerImpl 對(duì)象注冊(cè)到 ConcurrentHashMap<String/* group */, MQProducerInner> producerTable 中
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
// 7.以主題名"TBW102"為key值,新初始化的TopicPublishInfo對(duì)象為value值存入DefaultMQProducerImpl.topicPublishInfoTable變量中
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
// 8. 啟動(dòng) 第五步創(chuàng)建的 MQClientInstance 實(shí)例。
if (startFactory) {
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
// 9. 設(shè)置DefaultMQProducerImpl的ServiceState為RUNNING
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
// 10. 向所有的 broker 發(fā)送心跳和上傳 FilterClass
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
- 啟動(dòng)Producer的時(shí)候判斷 serviceState 的當(dāng)前狀態(tài),只有 serviceState 狀態(tài)為 CREATE_JUST 時(shí),才啟動(dòng) Producer。否則拋出異常信息。
2、同時(shí)防止啟動(dòng)多個(gè) Producer,先把 serviceState 狀態(tài)修改為 START_FAILED。
3、 檢查 groupName 是否合法。比如不能為空,是否符合正則 ^[%|a-zA-Z0-9_-]+$,并且最大長(zhǎng)度不能超過(guò) 255(CHARACTER_MAX_LENGTH = 255);
groupName 也不能等于 DEFAULT_PRODUCER。只要滿(mǎn)足上面條件,則拋異常信息。
4、如果 producerGroup 不等于 CLIENT_INNER_PRODUCER_GROUP = "CLIENT_INNER_PRODUCER" ,然后調(diào)用 changeInstanceNameToPID() 方法判斷名字不是 "DEFAULT" 則更改 instanceName。
public void changeInstanceNameToPID() {
if (this.instanceName.equals("DEFAULT")) {
this.instanceName = String.valueOf(UtilAll.getPid());
}
}
public static int getPid() {
RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
String name = runtime.getName(); // format: "pid@hostname"
try {
return Integer.parseInt(name.substring(0, name.indexOf('@')));
.....
}
5、構(gòu)建 MQClientInstance 對(duì)象。
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
String clientId = clientConfig.buildMQClientId();
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
return instance;
}
- 5.1 首先生成 clientId:ip@instanceName 或 ip@instanceName@unitName
- 5.2 如果 factoryTable 中是不已經(jīng)存在 MQClientInstance 實(shí)例,則創(chuàng)建。 (下面有單獨(dú)分析該源碼)
6、將 DefaultMQProducerImpl 對(duì)象注冊(cè)到 ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();中
private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
if (null == group || null == producer) {
return false;
}
MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
if (prev != null) {
log.warn("the producer group[{}] exist already.", group);
return false;
}
return true;
}
7、以主題名"TBW102"為key值,新初始化的TopicPublishInfo對(duì)象為value值存入DefaultMQProducerImpl.topicPublishInfoTable變量中
8、調(diào)用 第五步創(chuàng)建的 MQClientInstance 實(shí)例 的start() 方法。
該方法做了很多事情:
- 獲取NameServer地址
- 啟動(dòng) Netty 客戶(hù)端服務(wù)
- 設(shè)置多個(gè)定時(shí)任務(wù)
- 開(kāi)啟 pullMessageService 服務(wù)
- 開(kāi)啟 rebalanceService 服務(wù)
- 開(kāi)啟 發(fā)送消息服務(wù)
下面有具體代碼分析MQClientInstance.start() 方法。
9、設(shè)置DefaultMQProducerImpl的ServiceState為RUNNING
10、向所有的 broker 發(fā)送心跳和上傳 FilterClass
創(chuàng)建MQClientInstance實(shí)例(第5.2步)
上面 5.2 步驟中創(chuàng)建MQClientInstance 的代碼如下:
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
this.clientConfig = clientConfig;
this.instanceIndex = instanceIndex;
this.nettyClientConfig = new NettyClientConfig();
this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
this.clientRemotingProcessor = new ClientRemotingProcessor(this);
// Netty 中注冊(cè)接收請(qǐng)求的處理器。
this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
//設(shè)置 NameServer 地址。
if (this.clientConfig.getNamesrvAddr() != null) {
this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());
}
// 客戶(hù)端ID
this.clientId = clientId;
//創(chuàng)建 MQAdminImpl 對(duì)象進(jìn)行和 NameServer 進(jìn)行交互,比如創(chuàng)建Topic、獲取 Queue等
this.mQAdminImpl = new MQAdminImpl(this);
// 創(chuàng)建 pullMessageService 服務(wù)
this.pullMessageService = new PullMessageService(this);
// 創(chuàng)建 rebalanceService 服務(wù)
this.rebalanceService = new RebalanceService(this);
// 創(chuàng)建 DefaultMQProducer 服務(wù)
this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
this.defaultMQProducer.resetClientConfig(clientConfig);
// 開(kāi)啟 Comsumer 統(tǒng)計(jì)服務(wù)
this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
log.info("Created a new client Instance, InstanceIndex:{}, ClientID:{}, ClientConfig:{}, ClientVersion:{}, SerializerType:{}",
this.instanceIndex,
this.clientId,
this.clientConfig,
MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());
}
主要功能:
- 創(chuàng)建 MQAdminImpl 對(duì)象進(jìn)行和 NameServer 進(jìn)行交互,比如創(chuàng)建Topic、獲取 Queue等
- 創(chuàng)建 pullMessageService 服務(wù)
- 創(chuàng)建 rebalanceService 服務(wù),供 Consumer 端使用
- 創(chuàng)建 DefaultMQProducer 服務(wù),
- 開(kāi)啟 Comsumer 統(tǒng)計(jì)服務(wù)。統(tǒng)計(jì)最近一段時(shí)間內(nèi),消費(fèi)成功個(gè)數(shù)、消費(fèi)失敗個(gè)數(shù)等信息。
啟動(dòng)MQClientInstance 服務(wù) (第8步)
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// 1. 如果配置NameServer地址,則從默認(rèn)服務(wù)器地址中獲?。ㄔ摰刂凡豢筛淖儯? if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// 2. 啟動(dòng) Netty 客戶(hù)端服務(wù)
this.mQClientAPIImpl.start();
// 3. 設(shè)置多個(gè)定時(shí)任務(wù)
this.startScheduledTask();
// 4. 開(kāi)啟 pullMessageService 服務(wù)
this.pullMessageService.start();
// 5. 開(kāi)啟 rebalanceService 服務(wù)
this.rebalanceService.start();
// 6. 開(kāi)啟 發(fā)送消息服務(wù)
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
主要這幾步驟操作。
- 1、獲取NameServer地址
如果啟動(dòng) Producer 時(shí)沒(méi)有指定 NameServer,則程序會(huì)向一個(gè)Http地址發(fā)送請(qǐng)求來(lái)獲取NameServer地址。通過(guò)這種方式可以動(dòng)態(tài)的配置 NameServer。從而達(dá)到動(dòng)態(tài)增加和刪除NameServer服務(wù)。
public static String getWSAddr() {
String wsDomainName = System.getProperty("rocketmq.namesrv.domain", DEFAULT_NAMESRV_ADDR_LOOKUP);
String wsDomainSubgroup = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr");
String wsAddr = "http://" + wsDomainName + ":8080/rocketmq/" + wsDomainSubgroup;
if (wsDomainName.indexOf(":") > 0) {
wsAddr = "http://" + wsDomainName + "/rocketmq/" + wsDomainSubgroup;
}
return wsAddr;
}
- 2、啟動(dòng) Netty 客戶(hù)端服務(wù)
- 3、調(diào)用startScheduledTask() 方法設(shè)置多個(gè)定時(shí)任務(wù)
- 4、開(kāi)啟 pullMessageService 服務(wù)
- 5、開(kāi)啟 rebalanceService 服務(wù)
- 6、開(kāi)啟 發(fā)送消息服務(wù)
startScheduledTask() 方法:定時(shí)任務(wù)
private void startScheduledTask() {
// 1.如果 NameServer 地址默認(rèn)沒(méi)配置,則定時(shí)向一個(gè)Http地址獲取
if (null == this.clientConfig.getNamesrvAddr()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
} catch (Exception e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
// 2. 定時(shí)的從 NameServer 中獲取 Topic、broker、queue 相關(guān)信息
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
// 3. 定時(shí)清理無(wú)效的Broker,并向所有的Broker 發(fā)送心跳數(shù)據(jù)
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.cleanOfflineBroker();
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
// 4. 定時(shí)的持久化 Consumer 端消費(fèi)每個(gè) queue的 offset 數(shù)據(jù)。
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
// 5. 調(diào)整消費(fèi)端的線(xiàn)程數(shù)
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.adjustThreadPool();
} catch (Exception e) {
log.error("ScheduledTask adjustThreadPool exception", e);
}
}
}, 1, 1, TimeUnit.MINUTES);
}
1、定時(shí)更新 NameServer 地址
每個(gè)2分鐘,程序會(huì)向一個(gè)Http地址發(fā)送請(qǐng)求來(lái)獲取NameServer地址來(lái)動(dòng)態(tài)更新NameServer地址。2、 定時(shí)的從 NameServer 中獲取 Topic、broker、queue 相關(guān)信息
默認(rèn)每隔 30秒去 NameServer 中獲取Topic、broker、queue等相關(guān)信息。
如果有新broker注冊(cè)或下線(xiàn),producer端會(huì)在30秒之內(nèi)感知。3、定時(shí)清理無(wú)效的Broker,并向所有的Broker 發(fā)送心跳數(shù)據(jù).
默認(rèn)每隔 30 秒向 Broker 發(fā)送心跳數(shù)據(jù) 和 用戶(hù)自定義的 filterclass 類(lèi)。4、定時(shí)的持久化 Consumer 端消費(fèi)每個(gè) queue的 offset 數(shù)據(jù)。
默認(rèn)每隔 5 秒持久或 Consumer 消費(fèi)的 queue 的 offset信息。
持久化分為,遠(yuǎn)程持久化和本地持久化。
MessageModel.CLUSTERING 模式 queue的offset 保存到 broker上。
BROADCASTING("BROADCASTING") 模式 queue 的 offset 保存在本地。-
5、調(diào)整消費(fèi)端的線(xiàn)程數(shù)
每隔 1 分鐘計(jì)算每一個(gè)queue中消息擠壓的數(shù)量,如果超過(guò)100000條,則增加消費(fèi)線(xiàn)程的并發(fā)數(shù),如果小于80000條則減少消費(fèi)者的線(xiàn)程數(shù)。
不過(guò)進(jìn)入源碼中看,調(diào)整消費(fèi)者的線(xiàn)程數(shù)都注釋掉了。
