Thingsboard源碼探索(2)

上一篇簡(jiǎn)單分析了mqtt-transport的代碼,說(shuō)明了設(shè)備連接的流程,相對(duì)的設(shè)備狀態(tài)上報(bào)的流程基本一致,所以就不再自己分析了,有需要的小伙伴可以自己去研究一下。

接下來(lái)準(zhǔn)備探索core和rule-engine的源碼

雖然官網(wǎng)上把它分成了兩個(gè)獨(dú)立的部分,但由于這兩個(gè)服務(wù)之間有一些交叉的功能,所以被直接寫(xiě)在了一個(gè)服務(wù)里面,關(guān)鍵代碼都在application這個(gè)服務(wù)里面。

這個(gè)module里面有兩個(gè)啟動(dòng)項(xiàng),分別是ThingsboardInstallApplication和ThingsboardServerApplication,其中ThingsboardInstallApplication這個(gè)啟動(dòng)項(xiàng)對(duì)應(yīng)的是tb的安裝服務(wù),主要是初始化數(shù)據(jù)庫(kù)相關(guān)的信息。而ThingsboardServerApplication則對(duì)應(yīng)core和rule-engine服務(wù)。打包的時(shí)候會(huì)打包在一起,然后通過(guò)啟動(dòng)類的方式來(lái)顯示的指定具體要啟動(dòng)的服務(wù)。

可以看到tb其實(shí)做的確實(shí)很到位,連數(shù)據(jù)庫(kù)的初始化也包裝成了一個(gè)服務(wù),只需要通過(guò)啟動(dòng)參數(shù)的方式就可以完成整個(gè)數(shù)據(jù)庫(kù)的初始化,而不需要自己去跑腳本,而且連數(shù)據(jù)庫(kù)的升級(jí)也封裝了進(jìn)去,這個(gè)確實(shí)很贊!

說(shuō)完了一堆廢話,我們轉(zhuǎn)回正題,開(kāi)始分析core和rule-engine服務(wù),先看一下大致的包結(jié)構(gòu),主要有一下這些包(config、utils之類的包就不深入研究了)

actors
controller
install
service

主要有一些難度的是actor這個(gè)包,所以這次就針對(duì)這個(gè)Actor模型來(lái)進(jìn)行一次詳細(xì)的分析。

Actor詳解

Actor模型是TB處理消息的核心機(jī)制,具體的Actor模型的相關(guān)知識(shí)可以自行百度,這里這談TB的Actor設(shè)計(jì)

1、由org.thingsboard.server.actors.service.DefaultActorService進(jìn)行Actor系統(tǒng)的初始化

    @PostConstruct
    public void initActorSystem() {
        log.info("Initializing actor system.");
        actorContext.setActorService(this);
        TbActorSystemSettings settings = new TbActorSystemSettings(actorThroughput, schedulerPoolSize, maxActorInitAttempts);
        system = new DefaultTbActorSystem(settings);

        system.createDispatcher(APP_DISPATCHER_NAME, initDispatcherExecutor(APP_DISPATCHER_NAME, appDispatcherSize));
        system.createDispatcher(TENANT_DISPATCHER_NAME, initDispatcherExecutor(TENANT_DISPATCHER_NAME, tenantDispatcherSize));
        system.createDispatcher(DEVICE_DISPATCHER_NAME, initDispatcherExecutor(DEVICE_DISPATCHER_NAME, deviceDispatcherSize));
        system.createDispatcher(RULE_DISPATCHER_NAME, initDispatcherExecutor(RULE_DISPATCHER_NAME, ruleDispatcherSize));

        actorContext.setActorSystem(system);

        appActor = system.createRootActor(APP_DISPATCHER_NAME, new AppActor.ActorCreator(actorContext));

        // 將appActor作為根消息傳遞的代理actor
        actorContext.setAppActor(appActor);

        TbActorRef statsActor = system.createRootActor(TENANT_DISPATCHER_NAME, new StatsActor.ActorCreator(actorContext, "StatsActor"));

        // 將statsActor作為根統(tǒng)計(jì)數(shù)據(jù)的代理
        actorContext.setStatsActor(statsActor);

        log.info("Actor system initialized.");
    }

2、初始化的時(shí)候會(huì)先創(chuàng)建一個(gè)Actor系統(tǒng)(org.thingsboard.server.actors.DefaultTbActorSystem)并設(shè)置他的一些線程池以及Actor系統(tǒng)的上下文(org.thingsboard.server.actors.ActorSystemContext)。

3、接下來(lái)會(huì)創(chuàng)建一個(gè)根Actor(org.thingsboard.server.actors.app.AppActor),并將其設(shè)置為Actor系統(tǒng)的根Actor,這個(gè)Actor是所有消息的入口,是其他所有Actor的Parent。

4、初始化完成之后,通過(guò)Spring的ApplicationReadyEvent事件,DefaultActorService會(huì)往DefaultTbActorSystem的根Actor的Mail里面發(fā)送一條AppInitMsg消息。

5、AppActor會(huì)從它的Mail里面把消息拿出來(lái),通過(guò)process方法進(jìn)行處理。

   @Override
    protected boolean doProcess(TbActorMsg msg) {
        if (!ruleChainsInitialized) {
            initTenantActors();
            ruleChainsInitialized = true;
            if (msg.getMsgType() != MsgType.APP_INIT_MSG) {
                log.warn("Rule Chains initialized by unexpected message: {}", msg);
            }
        }
        switch (msg.getMsgType()) {
            case APP_INIT_MSG:
                break;
            case PARTITION_CHANGE_MSG:
                ctx.broadcastToChildren(msg);
                break;
            case COMPONENT_LIFE_CYCLE_MSG:
                onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
                break;
            case QUEUE_TO_RULE_ENGINE_MSG:
                onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg);
                break;
            case TRANSPORT_TO_DEVICE_ACTOR_MSG:
                onToDeviceActorMsg((TenantAwareMsg) msg, false);
                break;
            case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG:
            case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG:
            case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG:
            case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:
            case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
                onToDeviceActorMsg((TenantAwareMsg) msg, true);
                break;
            default:
                return false;
        }
        return true;
    }

5、第一次收到消息的時(shí)候會(huì)先進(jìn)行TenantActor的初始化,從數(shù)據(jù)庫(kù)的tenant表里面拿出所有的數(shù)據(jù),并進(jìn)行加載,每一個(gè)Tenant對(duì)應(yīng)一個(gè)TenantActor。

    private void initTenantActors() {
        log.info("Starting main system actor.");
        try {
            // This Service may be started for specific tenant only.
            Optional<TenantId> isolatedTenantId = systemContext.getServiceInfoProvider().getIsolatedTenant();
            if (isolatedTenantId.isPresent()) {
                Tenant tenant = systemContext.getTenantService().findTenantById(isolatedTenantId.get());
                if (tenant != null) {
                    log.debug("[{}] Creating tenant actor", tenant.getId());
                    getOrCreateTenantActor(tenant.getId());
                    log.debug("Tenant actor created.");
                } else {
                    log.error("[{}] Tenant with such ID does not exist", isolatedTenantId.get());
                }
            } else if (systemContext.isTenantComponentsInitEnabled()) {
                //分頁(yè)查詢的方式加載租戶信息,每個(gè)租戶會(huì)對(duì)應(yīng)一個(gè)TenantActor
                PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(tenantService::findTenants, ENTITY_PACK_LIMIT);
                boolean isRuleEngine = systemContext.getServiceInfoProvider().isService(ServiceType.TB_RULE_ENGINE);
                boolean isCore = systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE);
                for (Tenant tenant : tenantIterator) {
                    if (isCore || (isRuleEngine && !tenant.isIsolatedTbRuleEngine())) {
                        log.debug("[{}] Creating tenant actor", tenant.getId());
                        getOrCreateTenantActor(tenant.getId());
                        log.debug("[{}] Tenant actor created.", tenant.getId());
                    }
                }
            }
            log.info("Main system actor started.");
        } catch (Exception e) {
            log.warn("Unknown failure", e);
        }
    }

6、初始化TenantActor的時(shí)候同時(shí)會(huì)初始化其對(duì)應(yīng)的RuleChainActor

    @Override
    public void init(TbActorCtx ctx) throws TbActorException {
        super.init(ctx);
        log.info("[{}] Starting tenant actor.", tenantId);
        try {
            Tenant tenant = systemContext.getTenantService().findTenantById(tenantId);
            if (tenant == null) {
                cantFindTenant = true;
                log.info("[{}] Started tenant actor for missing tenant.", tenantId);
            } else {
                // This Service may be started for specific tenant only.
                Optional<TenantId> isolatedTenantId = systemContext.getServiceInfoProvider().getIsolatedTenant();

                isRuleEngineForCurrentTenant = systemContext.getServiceInfoProvider().isService(ServiceType.TB_RULE_ENGINE);
                isCore = systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE);

                if (isRuleEngineForCurrentTenant) {
                    try {
                        if (isolatedTenantId.map(id -> id.equals(tenantId)).orElseGet(() -> !tenant.isIsolatedTbRuleEngine())) {
                            log.info("[{}] Going to init rule chains", tenantId);
                            initRuleChains();
                        } else {
                            isRuleEngineForCurrentTenant = false;
                        }
                    } catch (Exception e) {
                        cantFindTenant = true;
                    }
                }
                log.info("[{}] Tenant actor started.", tenantId);
            }
        } catch (Exception e) {
            log.warn("[{}] Unknown failure", tenantId, e);
//            TODO: throw this in 3.1?
//            throw new TbActorException("Failed to init actor", e);
        }
    }

    protected void initRuleChains() {
        for (RuleChain ruleChain : new PageDataIterable<>(link -> ruleChainService.findTenantRuleChains(tenantId, link), ContextAwareActor.ENTITY_PACK_LIMIT)) {
            RuleChainId ruleChainId = ruleChain.getId();
            log.debug("[{}|{}] Creating rule chain actor", ruleChainId.getEntityType(), ruleChain.getId());
            TbActorRef actorRef = getOrCreateActor(ruleChainId, id -> ruleChain);
            visit(ruleChain, actorRef);
            log.debug("[{}|{}] Rule Chain actor created.", ruleChainId.getEntityType(), ruleChainId.getId());
        }
    }

7、由于一個(gè)Tenant對(duì)應(yīng)多個(gè)Rule Chain,并且有一個(gè)RootChain,所以TenantActor初始化的時(shí)候會(huì)將其RootChain賦給rootChain,并將RuleChainActor的Mail代理給到rootChainActor。

8、RuleChainActor初始化的時(shí)候同時(shí)會(huì)初始化其對(duì)應(yīng)的RuleChainActorMessageProcessor,RuleChainActorMessageProcessor會(huì)負(fù)責(zé)消息的具體處理。

9、RuleChainActorMessageProcessor初始化的時(shí)候會(huì)初始化RuleChain對(duì)應(yīng)的RuleNodeActor,主要通過(guò)rule_node 和 relation這兩張表中的數(shù)據(jù)進(jìn)行排序,每一個(gè)RuleNode都會(huì)對(duì)應(yīng)一個(gè)RuleNodeActor。

    @Override
    public void start(TbActorCtx context) {
        if (!started) {
            RuleChain ruleChain = service.findRuleChainById(tenantId, entityId);
            if (ruleChain != null) {
                List<RuleNode> ruleNodeList = service.getRuleChainNodes(tenantId, entityId);
                log.trace("[{}][{}] Starting rule chain with {} nodes", tenantId, entityId, ruleNodeList.size());
                // Creating and starting the actors;
                for (RuleNode ruleNode : ruleNodeList) {
                    log.trace("[{}][{}] Creating rule node [{}]: {}", entityId, ruleNode.getId(), ruleNode.getName(), ruleNode);
                    TbActorRef ruleNodeActor = createRuleNodeActor(context, ruleNode);
                    //RuleNodeCtx對(duì)象包含了self這個(gè)對(duì)rule chain的Mail引用,所有的rule node可以通過(guò)self的方式將數(shù)據(jù)給到RuleChainActor
                    nodeActors.put(ruleNode.getId(), new RuleNodeCtx(tenantId, self, ruleNodeActor, ruleNode));
                }
                initRoutes(ruleChain, ruleNodeList);
                started = true;
            }
        } else {
            onUpdate(context);
        }
    }

    private void initRoutes(RuleChain ruleChain, List<RuleNode> ruleNodeList) {
        nodeRoutes.clear();
        // Populating the routes map;
        for (RuleNode ruleNode : ruleNodeList) {
            List<EntityRelation> relations = service.getRuleNodeRelations(TenantId.SYS_TENANT_ID, ruleNode.getId());
            log.trace("[{}][{}][{}] Processing rule node relations [{}]", tenantId, entityId, ruleNode.getId(), relations.size());
            if (relations.size() == 0) {
                nodeRoutes.put(ruleNode.getId(), Collections.emptyList());
            } else {
                for (EntityRelation relation : relations) {
                    log.trace("[{}][{}][{}] Processing rule node relation [{}]", tenantId, entityId, ruleNode.getId(), relation.getTo());
                    //校驗(yàn)rule node與relation中的是否對(duì)應(yīng)
                    if (relation.getTo().getEntityType() == EntityType.RULE_NODE) {
                        RuleNodeCtx ruleNodeCtx = nodeActors.get(new RuleNodeId(relation.getTo().getId()));
                        if (ruleNodeCtx == null) {
                            throw new IllegalArgumentException("Rule Node [" + relation.getFrom() + "] has invalid relation to Rule node [" + relation.getTo() + "]");
                        }
                    }
                    nodeRoutes.computeIfAbsent(ruleNode.getId(), k -> new ArrayList<>())
                            .add(new RuleNodeRelation(ruleNode.getId(), relation.getTo(), relation.getType()));
                }
            }
        }

        firstId = ruleChain.getFirstRuleNodeId();
        firstNode = nodeActors.get(firstId);
        state = ComponentLifecycleState.ACTIVE;
    }

10、RuleNodeActor初始化的時(shí)候會(huì)創(chuàng)建對(duì)應(yīng)的RuleNodeActorMessageProcessor,RuleNodeActorMessageProcessor初始化的時(shí)候會(huì)通過(guò)反射的方式拿到RuleNode對(duì)應(yīng)的TbNode接口實(shí)現(xiàn)類,消息實(shí)際會(huì)通過(guò)這個(gè)實(shí)現(xiàn)類來(lái)進(jìn)行處理。

    private TbNode initComponent(RuleNode ruleNode) throws Exception {
        TbNode tbNode = null;
        if (ruleNode != null) {
            Class<?> componentClazz = Class.forName(ruleNode.getType());
            tbNode = (TbNode) (componentClazz.newInstance());
            tbNode.init(defaultCtx, new TbNodeConfiguration(ruleNode.getConfiguration()));
        }
        return tbNode;
    }

至此,Actor系統(tǒng)就初始化完成了,整理一下可以得到這樣的一張圖,左邊是流程圖,右邊是結(jié)構(gòu)圖


tb-actor.png

Actor消息處理詳解

這里以設(shè)備狀態(tài)上報(bào)為例來(lái)進(jìn)行分析

1、org.thingsboard.server.service.queue.DefaultTbRuleEngineConsumerService.launchConsumer()方法接收到來(lái)自Transport的消息,并將消息丟到Actor系統(tǒng)上下文ActorSystemContext中。

    private void forwardToRuleEngineActor(String queueName, TenantId tenantId, ToRuleEngineMsg toRuleEngineMsg, TbMsgCallback callback) {
        TbMsg tbMsg = TbMsg.fromBytes(queueName, toRuleEngineMsg.getTbMsg().toByteArray(), callback);
        QueueToRuleEngineMsg msg;
        ProtocolStringList relationTypesList = toRuleEngineMsg.getRelationTypesList();
        Set<String> relationTypes = null;
        if (relationTypesList != null) {
            if (relationTypesList.size() == 1) {
                relationTypes = Collections.singleton(relationTypesList.get(0));
            } else {
                relationTypes = new HashSet<>(relationTypesList);
            }
        }
        msg = new QueueToRuleEngineMsg(tenantId, tbMsg, relationTypes, toRuleEngineMsg.getFailureMessage());
        actorContext.tell(msg);
    }

2、ActorSystemContext把數(shù)據(jù)發(fā)送到根Actor(即AppActor)的郵箱中。

public void tell(TbActorMsg tbActorMsg) {
        appActor.tell(tbActorMsg);
    }

3、AppActor從郵箱中收到消息然后通過(guò)process方法進(jìn)行處理,根據(jù)消息類型,會(huì)找到對(duì)應(yīng)設(shè)備的TenantActor并將消息投遞到期郵箱中進(jìn)行下一步的處理。

    private void onQueueToRuleEngineMsg(QueueToRuleEngineMsg msg) {
        if (SYSTEM_TENANT.equals(msg.getTenantId())) {
            msg.getTbMsg().getCallback().onFailure(new RuleEngineException("Message has system tenant id!"));
        } else {
            if (!deletedTenants.contains(msg.getTenantId())) {
                getOrCreateTenantActor(msg.getTenantId()).tell(msg);
            } else {
                msg.getTbMsg().getCallback().onSuccess();
            }
        }
    }

4、TenantActor從郵箱中拿到數(shù)據(jù)并通過(guò)process方法進(jìn)行處理,他會(huì)首先去尋找其root chain actor,并將消息投遞到Rule Chain Actor對(duì)應(yīng)的TbActorMailbox郵箱中進(jìn)行處理。

    private void onQueueToRuleEngineMsg(QueueToRuleEngineMsg msg) {
        if (!isRuleEngineForCurrentTenant) {
            log.warn("RECEIVED INVALID MESSAGE: {}", msg);
            return;
        }
        TbMsg tbMsg = msg.getTbMsg();
        if (tbMsg.getRuleChainId() == null) {
            if (getRootChainActor() != null) {
                getRootChainActor().tell(msg);
            } else {
                tbMsg.getCallback().onFailure(new RuleEngineException("No Root Rule Chain available!"));
                log.info("[{}] No Root Chain: {}", tenantId, msg);
            }
        } else {
            try {
                ctx.tell(new TbEntityActorId(tbMsg.getRuleChainId()), msg);
            } catch (TbActorNotRegisteredException ex) {
                log.trace("Received message for non-existing rule chain: [{}]", tbMsg.getRuleChainId());
                //TODO: 3.1 Log it to dead letters queue;
                tbMsg.getCallback().onSuccess();
            }
        }
    }

5、RuleChainActor從郵箱中拿到數(shù)據(jù)之后通過(guò)process方法將消息丟給它的RuleChainActorMessageProcessor進(jìn)行處理。

    @Override
    protected boolean doProcess(TbActorMsg msg) {
        switch (msg.getMsgType()) {
            case COMPONENT_LIFE_CYCLE_MSG:
                onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
                break;
            case QUEUE_TO_RULE_ENGINE_MSG:
                processor.onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg);
.....

6、RuleChainActorMessageProcessor接收到數(shù)據(jù)之后,會(huì)先確認(rèn)有沒(méi)有指定處理的node,如果沒(méi)有就將其給到這個(gè)RuleChain對(duì)應(yīng)的第一個(gè)RuleNodeActor進(jìn)行處理。

   void onQueueToRuleEngineMsg(QueueToRuleEngineMsg envelope) {
        TbMsg msg = envelope.getTbMsg();
        log.trace("[{}][{}] Processing message [{}]: {}", entityId, firstId, msg.getId(), msg);
        if (envelope.getRelationTypes() == null || envelope.getRelationTypes().isEmpty()) {
            try {
                checkActive(envelope.getTbMsg());
                //從 rule chain對(duì)象中獲取到第一個(gè)rule node的id
                RuleNodeId targetId = msg.getRuleNodeId();
                RuleNodeCtx targetCtx;
                //正常情況下沒(méi)有指定的話會(huì)是個(gè)null
                if (targetId == null) {
                    targetCtx = firstNode;
                    msg = msg.copyWithRuleChainId(entityId);
                } else {
                    targetCtx = nodeActors.get(targetId);
                }
                if (targetCtx != null) {
                    log.trace("[{}][{}] Pushing message to target rule node", entityId, targetId);
                    pushMsgToNode(targetCtx, msg, "");
                } else {
                    //當(dāng)rule node 不存在的時(shí)候直接返回成功結(jié)果
                    log.trace("[{}][{}] Rule node does not exist. Probably old message", entityId, targetId);
                    msg.getCallback().onSuccess();
                }
            } catch (RuleNodeException rne) {
                envelope.getTbMsg().getCallback().onFailure(rne);
            } catch (Exception e) {
                envelope.getTbMsg().getCallback().onFailure(new RuleEngineException(e.getMessage()));
            }
        } else {
            onTellNext(envelope.getTbMsg(), envelope.getTbMsg().getRuleNodeId(), envelope.getRelationTypes(), envelope.getFailureMessage());
        }
    }

7、RuleNodeActor從郵箱中收到消息之后會(huì)通過(guò)process方法,將數(shù)據(jù)發(fā)到RuleNodeActorMessageProcessor進(jìn)行進(jìn)一步的處理。

8、RuleNodeActorMessageProcessor會(huì)將消息丟給TbNode的實(shí)現(xiàn)類進(jìn)行數(shù)據(jù)的最后處理。

    void onRuleChainToRuleNodeMsg(RuleChainToRuleNodeMsg msg) throws Exception {
        checkActive(msg.getMsg());
        if (ruleNode.isDebugMode()) {
            systemContext.persistDebugInput(tenantId, entityId, msg.getMsg(), msg.getFromRelationType());
        }
        try {
            tbNode.onMsg(msg.getCtx(), msg.getMsg());
        } catch (Exception e) {
            msg.getCtx().tellFailure(msg.getMsg(), e);
        }
    }

9、TbNode的實(shí)現(xiàn)類處理完之后會(huì)調(diào)用DefaultTbContext,告知處理結(jié)果,然后會(huì)根據(jù)返回結(jié)果重新拼裝消息,并將消息傳遞回RuleChainActor。

    private void tellNext(TbMsg msg, Set<String> relationTypes, Throwable th) {
        if (nodeCtx.getSelf().isDebugMode()) {
            relationTypes.forEach(relationType -> mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), msg, relationType, th));
        }
        nodeCtx.getChainActor().tell(new RuleNodeToRuleChainTellNextMsg(nodeCtx.getSelf().getId(), relationTypes, msg, th != null ? th.getMessage() : null));
    }

10、RuleChainActor接收到消息,由于消息類型發(fā)生了變更,所有會(huì)調(diào)用其對(duì)應(yīng)的RuleChainActorMessageProcessor的onTellNext方法進(jìn)行下一步處理。

    @Override
    protected boolean doProcess(TbActorMsg msg) {
        switch (msg.getMsgType()) {
            case COMPONENT_LIFE_CYCLE_MSG:
                onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
                break;
            case QUEUE_TO_RULE_ENGINE_MSG:
                processor.onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg);
                break;
            //消息從rule node發(fā)往rule chain中的下一個(gè) rule node
            //rule node會(huì)通過(guò) DefaultTbContext 獲取rule chain中的下一個(gè)node,并將消息進(jìn)行投遞
            case RULE_TO_RULE_CHAIN_TELL_NEXT_MSG:
                processor.onTellNext((RuleNodeToRuleChainTellNextMsg) msg);
                break;
            case RULE_CHAIN_TO_RULE_CHAIN_MSG:
                processor.onRuleChainToRuleChainMsg((RuleChainToRuleChainMsg) msg);
                break;
            case PARTITION_CHANGE_MSG:
                processor.onPartitionChangeMsg((PartitionChangeMsg) msg);
                break;
            case STATS_PERSIST_TICK_MSG:
                onStatsPersistTick(id);
                break;
            default:
                return false;
        }
        return true;
    }

11、RuleChainActorMessageProcessor會(huì)根據(jù)rule node的關(guān)聯(lián)關(guān)系找到下一個(gè)處理消息的RuleNodeActor,當(dāng)沒(méi)有找到的時(shí)候則說(shuō)明這條數(shù)據(jù)已經(jīng)處理完畢。如果可以找到則丟到pushToTarget()這個(gè)方法進(jìn)行處理。

 private void onTellNext(TbMsg msg, RuleNodeId originatorNodeId, Set<String> relationTypes, String failureMessage) {
        try {
            checkActive(msg);
            EntityId entityId = msg.getOriginator();
            TopicPartitionInfo tpi = systemContext.resolve(ServiceType.TB_RULE_ENGINE, msg.getQueueName(), tenantId, entityId);
            //根據(jù) relationTypes過(guò)濾出下一個(gè)target
            List<RuleNodeRelation> relations = nodeRoutes.get(originatorNodeId).stream()
                    .filter(r -> contains(relationTypes, r.getType()))
                    .collect(Collectors.toList());
            int relationsCount = relations.size();
            if (relationsCount == 0) {
                log.trace("[{}][{}][{}] No outbound relations to process", tenantId, entityId, msg.getId());
                if (relationTypes.contains(TbRelationTypes.FAILURE)) {
                    RuleNodeCtx ruleNodeCtx = nodeActors.get(originatorNodeId);
                    if (ruleNodeCtx != null) {
                        msg.getCallback().onFailure(new RuleNodeException(failureMessage, ruleChainName, ruleNodeCtx.getSelf()));
                    } else {
                        log.debug("[{}] Failure during message processing by Rule Node [{}]. Enable and see debug events for more info", entityId, originatorNodeId.getId());
                        msg.getCallback().onFailure(new RuleEngineException("Failure during message processing by Rule Node [" + originatorNodeId.getId().toString() + "]"));
                    }
                } else {
                    msg.getCallback().onSuccess();
                }
            } else if (relationsCount == 1) {
                for (RuleNodeRelation relation : relations) {
                    log.trace("[{}][{}][{}] Pushing message to single target: [{}]", tenantId, entityId, msg.getId(), relation.getOut());
                    pushToTarget(tpi, msg, relation.getOut(), relation.getType());
                }
            } else {
                MultipleTbQueueTbMsgCallbackWrapper callbackWrapper = new MultipleTbQueueTbMsgCallbackWrapper(relationsCount, msg.getCallback());
                log.trace("[{}][{}][{}] Pushing message to multiple targets: [{}]", tenantId, entityId, msg.getId(), relations);
                for (RuleNodeRelation relation : relations) {
                    EntityId target = relation.getOut();
                    putToQueue(tpi, msg, callbackWrapper, target);
                }
            }
        } catch (RuleNodeException rne) {
            msg.getCallback().onFailure(rne);
        } catch (Exception e) {
            msg.getCallback().onFailure(new RuleEngineException("onTellNext - " + e.getMessage()));
        }
    }

12、服務(wù)下一個(gè)target的類型是RULE_NODE則將消息發(fā)送給它,如果是RULE_CHAIN則重新組裝消息,并通知RuleChainActor對(duì)應(yīng)的TenantActor進(jìn)行處理。

    private void pushToTarget(TopicPartitionInfo tpi, TbMsg msg, EntityId target, String fromRelationType) {
        if (tpi.isMyPartition()) {
            switch (target.getEntityType()) {
                case RULE_NODE:
                    pushMsgToNode(nodeActors.get(new RuleNodeId(target.getId())), msg, fromRelationType);
                    break;
                case RULE_CHAIN:
                    parent.tell(new RuleChainToRuleChainMsg(new RuleChainId(target.getId()), entityId, msg, fromRelationType));
                    break;
            }
        } else {
            putToQueue(tpi, msg, new TbQueueTbMsgCallbackWrapper(msg.getCallback()), target);
        }
    }

13、TenantActor收到來(lái)自RuleChainActor發(fā)來(lái)的消息之后會(huì)尋找下一個(gè)RuleChainActor來(lái)處理消息。

   private void onRuleChainMsg(RuleChainAwareMsg msg) {
        getOrCreateActor(msg.getRuleChainId()).tell(msg);
    }

至此整個(gè)消息的處理流程大致梳理了一下,至于消息在TenantActor、RuleChainActor和RuleNodeActor之間多次的流轉(zhuǎn)就不再贅述了。

有空會(huì)補(bǔ)一張流程圖再進(jìn)行梳理一遍。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容