上一篇簡(jiǎn)單分析了mqtt-transport的代碼,說(shuō)明了設(shè)備連接的流程,相對(duì)的設(shè)備狀態(tài)上報(bào)的流程基本一致,所以就不再自己分析了,有需要的小伙伴可以自己去研究一下。
接下來(lái)準(zhǔn)備探索core和rule-engine的源碼
雖然官網(wǎng)上把它分成了兩個(gè)獨(dú)立的部分,但由于這兩個(gè)服務(wù)之間有一些交叉的功能,所以被直接寫(xiě)在了一個(gè)服務(wù)里面,關(guān)鍵代碼都在application這個(gè)服務(wù)里面。
這個(gè)module里面有兩個(gè)啟動(dòng)項(xiàng),分別是ThingsboardInstallApplication和ThingsboardServerApplication,其中ThingsboardInstallApplication這個(gè)啟動(dòng)項(xiàng)對(duì)應(yīng)的是tb的安裝服務(wù),主要是初始化數(shù)據(jù)庫(kù)相關(guān)的信息。而ThingsboardServerApplication則對(duì)應(yīng)core和rule-engine服務(wù)。打包的時(shí)候會(huì)打包在一起,然后通過(guò)啟動(dòng)類的方式來(lái)顯示的指定具體要啟動(dòng)的服務(wù)。
可以看到tb其實(shí)做的確實(shí)很到位,連數(shù)據(jù)庫(kù)的初始化也包裝成了一個(gè)服務(wù),只需要通過(guò)啟動(dòng)參數(shù)的方式就可以完成整個(gè)數(shù)據(jù)庫(kù)的初始化,而不需要自己去跑腳本,而且連數(shù)據(jù)庫(kù)的升級(jí)也封裝了進(jìn)去,這個(gè)確實(shí)很贊!
說(shuō)完了一堆廢話,我們轉(zhuǎn)回正題,開(kāi)始分析core和rule-engine服務(wù),先看一下大致的包結(jié)構(gòu),主要有一下這些包(config、utils之類的包就不深入研究了)
actors
controller
install
service
主要有一些難度的是actor這個(gè)包,所以這次就針對(duì)這個(gè)Actor模型來(lái)進(jìn)行一次詳細(xì)的分析。
Actor詳解
Actor模型是TB處理消息的核心機(jī)制,具體的Actor模型的相關(guān)知識(shí)可以自行百度,這里這談TB的Actor設(shè)計(jì)
1、由org.thingsboard.server.actors.service.DefaultActorService進(jìn)行Actor系統(tǒng)的初始化
@PostConstruct
public void initActorSystem() {
log.info("Initializing actor system.");
actorContext.setActorService(this);
TbActorSystemSettings settings = new TbActorSystemSettings(actorThroughput, schedulerPoolSize, maxActorInitAttempts);
system = new DefaultTbActorSystem(settings);
system.createDispatcher(APP_DISPATCHER_NAME, initDispatcherExecutor(APP_DISPATCHER_NAME, appDispatcherSize));
system.createDispatcher(TENANT_DISPATCHER_NAME, initDispatcherExecutor(TENANT_DISPATCHER_NAME, tenantDispatcherSize));
system.createDispatcher(DEVICE_DISPATCHER_NAME, initDispatcherExecutor(DEVICE_DISPATCHER_NAME, deviceDispatcherSize));
system.createDispatcher(RULE_DISPATCHER_NAME, initDispatcherExecutor(RULE_DISPATCHER_NAME, ruleDispatcherSize));
actorContext.setActorSystem(system);
appActor = system.createRootActor(APP_DISPATCHER_NAME, new AppActor.ActorCreator(actorContext));
// 將appActor作為根消息傳遞的代理actor
actorContext.setAppActor(appActor);
TbActorRef statsActor = system.createRootActor(TENANT_DISPATCHER_NAME, new StatsActor.ActorCreator(actorContext, "StatsActor"));
// 將statsActor作為根統(tǒng)計(jì)數(shù)據(jù)的代理
actorContext.setStatsActor(statsActor);
log.info("Actor system initialized.");
}
2、初始化的時(shí)候會(huì)先創(chuàng)建一個(gè)Actor系統(tǒng)(org.thingsboard.server.actors.DefaultTbActorSystem)并設(shè)置他的一些線程池以及Actor系統(tǒng)的上下文(org.thingsboard.server.actors.ActorSystemContext)。
3、接下來(lái)會(huì)創(chuàng)建一個(gè)根Actor(org.thingsboard.server.actors.app.AppActor),并將其設(shè)置為Actor系統(tǒng)的根Actor,這個(gè)Actor是所有消息的入口,是其他所有Actor的Parent。
4、初始化完成之后,通過(guò)Spring的ApplicationReadyEvent事件,DefaultActorService會(huì)往DefaultTbActorSystem的根Actor的Mail里面發(fā)送一條AppInitMsg消息。
5、AppActor會(huì)從它的Mail里面把消息拿出來(lái),通過(guò)process方法進(jìn)行處理。
@Override
protected boolean doProcess(TbActorMsg msg) {
if (!ruleChainsInitialized) {
initTenantActors();
ruleChainsInitialized = true;
if (msg.getMsgType() != MsgType.APP_INIT_MSG) {
log.warn("Rule Chains initialized by unexpected message: {}", msg);
}
}
switch (msg.getMsgType()) {
case APP_INIT_MSG:
break;
case PARTITION_CHANGE_MSG:
ctx.broadcastToChildren(msg);
break;
case COMPONENT_LIFE_CYCLE_MSG:
onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
break;
case QUEUE_TO_RULE_ENGINE_MSG:
onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg);
break;
case TRANSPORT_TO_DEVICE_ACTOR_MSG:
onToDeviceActorMsg((TenantAwareMsg) msg, false);
break;
case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG:
case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG:
case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG:
case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:
case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
onToDeviceActorMsg((TenantAwareMsg) msg, true);
break;
default:
return false;
}
return true;
}
5、第一次收到消息的時(shí)候會(huì)先進(jìn)行TenantActor的初始化,從數(shù)據(jù)庫(kù)的tenant表里面拿出所有的數(shù)據(jù),并進(jìn)行加載,每一個(gè)Tenant對(duì)應(yīng)一個(gè)TenantActor。
private void initTenantActors() {
log.info("Starting main system actor.");
try {
// This Service may be started for specific tenant only.
Optional<TenantId> isolatedTenantId = systemContext.getServiceInfoProvider().getIsolatedTenant();
if (isolatedTenantId.isPresent()) {
Tenant tenant = systemContext.getTenantService().findTenantById(isolatedTenantId.get());
if (tenant != null) {
log.debug("[{}] Creating tenant actor", tenant.getId());
getOrCreateTenantActor(tenant.getId());
log.debug("Tenant actor created.");
} else {
log.error("[{}] Tenant with such ID does not exist", isolatedTenantId.get());
}
} else if (systemContext.isTenantComponentsInitEnabled()) {
//分頁(yè)查詢的方式加載租戶信息,每個(gè)租戶會(huì)對(duì)應(yīng)一個(gè)TenantActor
PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(tenantService::findTenants, ENTITY_PACK_LIMIT);
boolean isRuleEngine = systemContext.getServiceInfoProvider().isService(ServiceType.TB_RULE_ENGINE);
boolean isCore = systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE);
for (Tenant tenant : tenantIterator) {
if (isCore || (isRuleEngine && !tenant.isIsolatedTbRuleEngine())) {
log.debug("[{}] Creating tenant actor", tenant.getId());
getOrCreateTenantActor(tenant.getId());
log.debug("[{}] Tenant actor created.", tenant.getId());
}
}
}
log.info("Main system actor started.");
} catch (Exception e) {
log.warn("Unknown failure", e);
}
}
6、初始化TenantActor的時(shí)候同時(shí)會(huì)初始化其對(duì)應(yīng)的RuleChainActor
@Override
public void init(TbActorCtx ctx) throws TbActorException {
super.init(ctx);
log.info("[{}] Starting tenant actor.", tenantId);
try {
Tenant tenant = systemContext.getTenantService().findTenantById(tenantId);
if (tenant == null) {
cantFindTenant = true;
log.info("[{}] Started tenant actor for missing tenant.", tenantId);
} else {
// This Service may be started for specific tenant only.
Optional<TenantId> isolatedTenantId = systemContext.getServiceInfoProvider().getIsolatedTenant();
isRuleEngineForCurrentTenant = systemContext.getServiceInfoProvider().isService(ServiceType.TB_RULE_ENGINE);
isCore = systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE);
if (isRuleEngineForCurrentTenant) {
try {
if (isolatedTenantId.map(id -> id.equals(tenantId)).orElseGet(() -> !tenant.isIsolatedTbRuleEngine())) {
log.info("[{}] Going to init rule chains", tenantId);
initRuleChains();
} else {
isRuleEngineForCurrentTenant = false;
}
} catch (Exception e) {
cantFindTenant = true;
}
}
log.info("[{}] Tenant actor started.", tenantId);
}
} catch (Exception e) {
log.warn("[{}] Unknown failure", tenantId, e);
// TODO: throw this in 3.1?
// throw new TbActorException("Failed to init actor", e);
}
}
protected void initRuleChains() {
for (RuleChain ruleChain : new PageDataIterable<>(link -> ruleChainService.findTenantRuleChains(tenantId, link), ContextAwareActor.ENTITY_PACK_LIMIT)) {
RuleChainId ruleChainId = ruleChain.getId();
log.debug("[{}|{}] Creating rule chain actor", ruleChainId.getEntityType(), ruleChain.getId());
TbActorRef actorRef = getOrCreateActor(ruleChainId, id -> ruleChain);
visit(ruleChain, actorRef);
log.debug("[{}|{}] Rule Chain actor created.", ruleChainId.getEntityType(), ruleChainId.getId());
}
}
7、由于一個(gè)Tenant對(duì)應(yīng)多個(gè)Rule Chain,并且有一個(gè)RootChain,所以TenantActor初始化的時(shí)候會(huì)將其RootChain賦給rootChain,并將RuleChainActor的Mail代理給到rootChainActor。
8、RuleChainActor初始化的時(shí)候同時(shí)會(huì)初始化其對(duì)應(yīng)的RuleChainActorMessageProcessor,RuleChainActorMessageProcessor會(huì)負(fù)責(zé)消息的具體處理。
9、RuleChainActorMessageProcessor初始化的時(shí)候會(huì)初始化RuleChain對(duì)應(yīng)的RuleNodeActor,主要通過(guò)rule_node 和 relation這兩張表中的數(shù)據(jù)進(jìn)行排序,每一個(gè)RuleNode都會(huì)對(duì)應(yīng)一個(gè)RuleNodeActor。
@Override
public void start(TbActorCtx context) {
if (!started) {
RuleChain ruleChain = service.findRuleChainById(tenantId, entityId);
if (ruleChain != null) {
List<RuleNode> ruleNodeList = service.getRuleChainNodes(tenantId, entityId);
log.trace("[{}][{}] Starting rule chain with {} nodes", tenantId, entityId, ruleNodeList.size());
// Creating and starting the actors;
for (RuleNode ruleNode : ruleNodeList) {
log.trace("[{}][{}] Creating rule node [{}]: {}", entityId, ruleNode.getId(), ruleNode.getName(), ruleNode);
TbActorRef ruleNodeActor = createRuleNodeActor(context, ruleNode);
//RuleNodeCtx對(duì)象包含了self這個(gè)對(duì)rule chain的Mail引用,所有的rule node可以通過(guò)self的方式將數(shù)據(jù)給到RuleChainActor
nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode));
}
initRoutes(ruleChain, ruleNodeList);
started = true;
}
} else {
onUpdate(context);
}
}
private void initRoutes(RuleChain ruleChain, List<RuleNode> ruleNodeList) {
nodeRoutes.clear();
// Populating the routes map;
for (RuleNode ruleNode : ruleNodeList) {
List<EntityRelation> relations = service.getRuleNodeRelations(TenantId.SYS_TENANT_ID, ruleNode.getId());
log.trace("[{}][{}][{}] Processing rule node relations [{}]", tenantId, entityId, ruleNode.getId(), relations.size());
if (relations.size() == 0) {
nodeRoutes.put(ruleNode.getId(), Collections.emptyList());
} else {
for (EntityRelation relation : relations) {
log.trace("[{}][{}][{}] Processing rule node relation [{}]", tenantId, entityId, ruleNode.getId(), relation.getTo());
//校驗(yàn)rule node與relation中的是否對(duì)應(yīng)
if (relation.getTo().getEntityType() == EntityType.RULE_NODE) {
RuleNodeCtx ruleNodeCtx = nodeActors.get(new RuleNodeId(relation.getTo().getId()));
if (ruleNodeCtx == null) {
throw new IllegalArgumentException("Rule Node [" + relation.getFrom() + "] has invalid relation to Rule node [" + relation.getTo() + "]");
}
}
nodeRoutes.computeIfAbsent(ruleNode.getId(), k -> new ArrayList<>())
.add(new RuleNodeRelation(ruleNode.getId(), relation.getTo(), relation.getType()));
}
}
}
firstId = ruleChain.getFirstRuleNodeId();
firstNode = nodeActors.get(firstId);
state = ComponentLifecycleState.ACTIVE;
}
10、RuleNodeActor初始化的時(shí)候會(huì)創(chuàng)建對(duì)應(yīng)的RuleNodeActorMessageProcessor,RuleNodeActorMessageProcessor初始化的時(shí)候會(huì)通過(guò)反射的方式拿到RuleNode對(duì)應(yīng)的TbNode接口實(shí)現(xiàn)類,消息實(shí)際會(huì)通過(guò)這個(gè)實(shí)現(xiàn)類來(lái)進(jìn)行處理。
private TbNode initComponent(RuleNode ruleNode) throws Exception {
TbNode tbNode = null;
if (ruleNode != null) {
Class<?> componentClazz = Class.forName(ruleNode.getType());
tbNode = (TbNode) (componentClazz.newInstance());
tbNode.init(defaultCtx, new TbNodeConfiguration(ruleNode.getConfiguration()));
}
return tbNode;
}
至此,Actor系統(tǒng)就初始化完成了,整理一下可以得到這樣的一張圖,左邊是流程圖,右邊是結(jié)構(gòu)圖

Actor消息處理詳解
這里以設(shè)備狀態(tài)上報(bào)為例來(lái)進(jìn)行分析
1、org.thingsboard.server.service.queue.DefaultTbRuleEngineConsumerService.launchConsumer()方法接收到來(lái)自Transport的消息,并將消息丟到Actor系統(tǒng)上下文ActorSystemContext中。
private void forwardToRuleEngineActor(String queueName, TenantId tenantId, ToRuleEngineMsg toRuleEngineMsg, TbMsgCallback callback) {
TbMsg tbMsg = TbMsg.fromBytes(queueName, toRuleEngineMsg.getTbMsg().toByteArray(), callback);
QueueToRuleEngineMsg msg;
ProtocolStringList relationTypesList = toRuleEngineMsg.getRelationTypesList();
Set<String> relationTypes = null;
if (relationTypesList != null) {
if (relationTypesList.size() == 1) {
relationTypes = Collections.singleton(relationTypesList.get(0));
} else {
relationTypes = new HashSet<>(relationTypesList);
}
}
msg = new QueueToRuleEngineMsg(tenantId, tbMsg, relationTypes, toRuleEngineMsg.getFailureMessage());
actorContext.tell(msg);
}
2、ActorSystemContext把數(shù)據(jù)發(fā)送到根Actor(即AppActor)的郵箱中。
public void tell(TbActorMsg tbActorMsg) {
appActor.tell(tbActorMsg);
}
3、AppActor從郵箱中收到消息然后通過(guò)process方法進(jìn)行處理,根據(jù)消息類型,會(huì)找到對(duì)應(yīng)設(shè)備的TenantActor并將消息投遞到期郵箱中進(jìn)行下一步的處理。
private void onQueueToRuleEngineMsg(QueueToRuleEngineMsg msg) {
if (SYSTEM_TENANT.equals(msg.getTenantId())) {
msg.getTbMsg().getCallback().onFailure(new RuleEngineException("Message has system tenant id!"));
} else {
if (!deletedTenants.contains(msg.getTenantId())) {
getOrCreateTenantActor(msg.getTenantId()).tell(msg);
} else {
msg.getTbMsg().getCallback().onSuccess();
}
}
}
4、TenantActor從郵箱中拿到數(shù)據(jù)并通過(guò)process方法進(jìn)行處理,他會(huì)首先去尋找其root chain actor,并將消息投遞到Rule Chain Actor對(duì)應(yīng)的TbActorMailbox郵箱中進(jìn)行處理。
private void onQueueToRuleEngineMsg(QueueToRuleEngineMsg msg) {
if (!isRuleEngineForCurrentTenant) {
log.warn("RECEIVED INVALID MESSAGE: {}", msg);
return;
}
TbMsg tbMsg = msg.getTbMsg();
if (tbMsg.getRuleChainId() == null) {
if (getRootChainActor() != null) {
getRootChainActor().tell(msg);
} else {
tbMsg.getCallback().onFailure(new RuleEngineException("No Root Rule Chain available!"));
log.info("[{}] No Root Chain: {}", tenantId, msg);
}
} else {
try {
ctx.tell(new TbEntityActorId(tbMsg.getRuleChainId()), msg);
} catch (TbActorNotRegisteredException ex) {
log.trace("Received message for non-existing rule chain: [{}]", tbMsg.getRuleChainId());
//TODO: 3.1 Log it to dead letters queue;
tbMsg.getCallback().onSuccess();
}
}
}
5、RuleChainActor從郵箱中拿到數(shù)據(jù)之后通過(guò)process方法將消息丟給它的RuleChainActorMessageProcessor進(jìn)行處理。
@Override
protected boolean doProcess(TbActorMsg msg) {
switch (msg.getMsgType()) {
case COMPONENT_LIFE_CYCLE_MSG:
onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
break;
case QUEUE_TO_RULE_ENGINE_MSG:
processor.onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg);
.....
6、RuleChainActorMessageProcessor接收到數(shù)據(jù)之后,會(huì)先確認(rèn)有沒(méi)有指定處理的node,如果沒(méi)有就將其給到這個(gè)RuleChain對(duì)應(yīng)的第一個(gè)RuleNodeActor進(jìn)行處理。
void onQueueToRuleEngineMsg(QueueToRuleEngineMsg envelope) {
TbMsg msg = envelope.getTbMsg();
log.trace("[{}][{}] Processing message [{}]: {}", entityId, firstId, msg.getId(), msg);
if (envelope.getRelationTypes() == null || envelope.getRelationTypes().isEmpty()) {
try {
checkActive(envelope.getTbMsg());
//從 rule chain對(duì)象中獲取到第一個(gè)rule node的id
RuleNodeId targetId = msg.getRuleNodeId();
RuleNodeCtx targetCtx;
//正常情況下沒(méi)有指定的話會(huì)是個(gè)null
if (targetId == null) {
targetCtx = firstNode;
msg = msg.copyWithRuleChainId(entityId);
} else {
targetCtx = nodeActors.get(targetId);
}
if (targetCtx != null) {
log.trace("[{}][{}] Pushing message to target rule node", entityId, targetId);
pushMsgToNode(targetCtx, msg, "");
} else {
//當(dāng)rule node 不存在的時(shí)候直接返回成功結(jié)果
log.trace("[{}][{}] Rule node does not exist. Probably old message", entityId, targetId);
msg.getCallback().onSuccess();
}
} catch (RuleNodeException rne) {
envelope.getTbMsg().getCallback().onFailure(rne);
} catch (Exception e) {
envelope.getTbMsg().getCallback().onFailure(new RuleEngineException(e.getMessage()));
}
} else {
onTellNext(envelope.getTbMsg(), envelope.getTbMsg().getRuleNodeId(), envelope.getRelationTypes(), envelope.getFailureMessage());
}
}
7、RuleNodeActor從郵箱中收到消息之后會(huì)通過(guò)process方法,將數(shù)據(jù)發(fā)到RuleNodeActorMessageProcessor進(jìn)行進(jìn)一步的處理。
8、RuleNodeActorMessageProcessor會(huì)將消息丟給TbNode的實(shí)現(xiàn)類進(jìn)行數(shù)據(jù)的最后處理。
void onRuleChainToRuleNodeMsg(RuleChainToRuleNodeMsg msg) throws Exception {
checkActive(msg.getMsg());
if (ruleNode.isDebugMode()) {
systemContext.persistDebugInput(tenantId, entityId, msg.getMsg(), msg.getFromRelationType());
}
try {
tbNode.onMsg(msg.getCtx(), msg.getMsg());
} catch (Exception e) {
msg.getCtx().tellFailure(msg.getMsg(), e);
}
}
9、TbNode的實(shí)現(xiàn)類處理完之后會(huì)調(diào)用DefaultTbContext,告知處理結(jié)果,然后會(huì)根據(jù)返回結(jié)果重新拼裝消息,并將消息傳遞回RuleChainActor。
private void tellNext(TbMsg msg, Set<String> relationTypes, Throwable th) {
if (nodeCtx.getSelf().isDebugMode()) {
relationTypes.forEach(relationType -> mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, relationType, th));
}
nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getId(), relationTypes, msg, th != null ? th.getMessage() : null));
}
10、RuleChainActor接收到消息,由于消息類型發(fā)生了變更,所有會(huì)調(diào)用其對(duì)應(yīng)的RuleChainActorMessageProcessor的onTellNext方法進(jìn)行下一步處理。
@Override
protected boolean doProcess(TbActorMsg msg) {
switch (msg.getMsgType()) {
case COMPONENT_LIFE_CYCLE_MSG:
onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
break;
case QUEUE_TO_RULE_ENGINE_MSG:
processor.onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg);
break;
//消息從rule node發(fā)往rule chain中的下一個(gè) rule node
//rule node會(huì)通過(guò) DefaultTbContext 獲取rule chain中的下一個(gè)node,并將消息進(jìn)行投遞
case RULE_TO_RULE_CHAIN_TELL_NEXT_MSG:
processor.onTellNext((RuleNodeToRuleChainTellNextMsg) msg);
break;
case RULE_CHAIN_TO_RULE_CHAIN_MSG:
processor.onRuleChainToRuleChainMsg((RuleChainToRuleChainMsg) msg);
break;
case PARTITION_CHANGE_MSG:
processor.onPartitionChangeMsg((PartitionChangeMsg) msg);
break;
case STATS_PERSIST_TICK_MSG:
onStatsPersistTick(id);
break;
default:
return false;
}
return true;
}
11、RuleChainActorMessageProcessor會(huì)根據(jù)rule node的關(guān)聯(lián)關(guān)系找到下一個(gè)處理消息的RuleNodeActor,當(dāng)沒(méi)有找到的時(shí)候則說(shuō)明這條數(shù)據(jù)已經(jīng)處理完畢。如果可以找到則丟到pushToTarget()這個(gè)方法進(jìn)行處理。
private void onTellNext(TbMsg msg, RuleNodeId originatorNodeId, Set<String> relationTypes, String failureMessage) {
try {
checkActive(msg);
EntityId entityId = msg.getOriginator();
TopicPartitionInfo tpi = systemContext.resolve(ServiceType.TB_RULE_ENGINE, msg.getQueueName(), tenantId, entityId);
//根據(jù) relationTypes過(guò)濾出下一個(gè)target
List<RuleNodeRelation> relations = nodeRoutes.get(originatorNodeId).stream()
.filter(r -> contains(relationTypes, r.getType()))
.collect(Collectors.toList());
int relationsCount = relations.size();
if (relationsCount == 0) {
log.trace("[{}][{}][{}] No outbound relations to process", tenantId, entityId, msg.getId());
if (relationTypes.contains(TbRelationTypes.FAILURE)) {
RuleNodeCtx ruleNodeCtx = nodeActors.get(originatorNodeId);
if (ruleNodeCtx != null) {
msg.getCallback().onFailure(new RuleNodeException(failureMessage, ruleChainName, ruleNodeCtx.getSelf()));
} else {
log.debug("[{}] Failure during message processing by Rule Node [{}]. Enable and see debug events for more info", entityId, originatorNodeId.getId());
msg.getCallback().onFailure(new RuleEngineException("Failure during message processing by Rule Node [" + originatorNodeId.getId().toString() + "]"));
}
} else {
msg.getCallback().onSuccess();
}
} else if (relationsCount == 1) {
for (RuleNodeRelation relation : relations) {
log.trace("[{}][{}][{}] Pushing message to single target: [{}]", tenantId, entityId, msg.getId(), relation.getOut());
pushToTarget(tpi, msg, relation.getOut(), relation.getType());
}
} else {
MultipleTbQueueTbMsgCallbackWrapper callbackWrapper = new MultipleTbQueueTbMsgCallbackWrapper(relationsCount, msg.getCallback());
log.trace("[{}][{}][{}] Pushing message to multiple targets: [{}]", tenantId, entityId, msg.getId(), relations);
for (RuleNodeRelation relation : relations) {
EntityId target = relation.getOut();
putToQueue(tpi, msg, callbackWrapper, target);
}
}
} catch (RuleNodeException rne) {
msg.getCallback().onFailure(rne);
} catch (Exception e) {
msg.getCallback().onFailure(new RuleEngineException("onTellNext - " + e.getMessage()));
}
}
12、服務(wù)下一個(gè)target的類型是RULE_NODE則將消息發(fā)送給它,如果是RULE_CHAIN則重新組裝消息,并通知RuleChainActor對(duì)應(yīng)的TenantActor進(jìn)行處理。
private void pushToTarget(TopicPartitionInfo tpi, TbMsg msg, EntityId target, String fromRelationType) {
if (tpi.isMyPartition()) {
switch (target.getEntityType()) {
case RULE_NODE:
pushMsgToNode(nodeActors.get(new RuleNodeId(target.getId())), msg, fromRelationType);
break;
case RULE_CHAIN:
parent.tell(new RuleChainToRuleChainMsg(new RuleChainId(target.getId()), entityId, msg, fromRelationType));
break;
}
} else {
putToQueue(tpi, msg, new TbQueueTbMsgCallbackWrapper(msg.getCallback()), target);
}
}
13、TenantActor收到來(lái)自RuleChainActor發(fā)來(lái)的消息之后會(huì)尋找下一個(gè)RuleChainActor來(lái)處理消息。
private void onRuleChainMsg(RuleChainAwareMsg msg) {
getOrCreateActor(msg.getRuleChainId()).tell(msg);
}
至此整個(gè)消息的處理流程大致梳理了一下,至于消息在TenantActor、RuleChainActor和RuleNodeActor之間多次的流轉(zhuǎn)就不再贅述了。
有空會(huì)補(bǔ)一張流程圖再進(jìn)行梳理一遍。