首页 > 其他分享 >物联网平台 Thingsboard rule engine 规则引擎 - 设备消息处理过程

物联网平台 Thingsboard rule engine 规则引擎 - 设备消息处理过程

时间:2024-06-14 09:58:10浏览次数:27  
标签:engine case DEVICE rule break MSG ACTOR msg Thingsboard

设备消息上报到规则引擎过程

第一步: 消息入口 org.thingsboard.server.actors.app.AppActor#doProcess 中找到 case QUEUE_TO_RULE_ENGINE_MSG: onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg); break;

    @Override
    protected boolean doProcess(TbActorMsg msg) {
        if (!ruleChainsInitialized) {
            if (MsgType.APP_INIT_MSG.equals(msg.getMsgType())) {
                initTenantActors();
                ruleChainsInitialized = true;
            } else {
                if (!msg.getMsgType().isIgnoreOnStart()) {
                    log.warn("Attempt to initialize Rule Chains by unexpected message: {}", msg);
                }
                return true;
            }
        }
        switch (msg.getMsgType()) {
            case APP_INIT_MSG:
                break;
            case PARTITION_CHANGE_MSG:
                ctx.broadcastToChildren(msg);
                break;
            case COMPONENT_LIFE_CYCLE_MSG:
                onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
                break;
            case QUEUE_TO_RULE_ENGINE_MSG:
                onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg);
                break;
            case TRANSPORT_TO_DEVICE_ACTOR_MSG:
                onToDeviceActorMsg((TenantAwareMsg) msg, false);
                break;
            case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG:
            case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG:
            case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG:
            case DEVICE_EDGE_UPDATE_TO_DEVICE_ACTOR_MSG:
            case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:
            case DEVICE_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
            case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
            case REMOVE_RPC_TO_DEVICE_ACTOR_MSG:
                onToDeviceActorMsg((TenantAwareMsg) msg, true);
                break;
            case EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG:
            case EDGE_SYNC_REQUEST_TO_EDGE_SESSION_MSG:
            case EDGE_SYNC_RESPONSE_FROM_EDGE_SESSION_MSG:
                onToEdgeSessionMsg((EdgeSessionMsg) msg);
                break;
            case SESSION_TIMEOUT_MSG:
                ctx.broadcastToChildrenByType(msg, EntityType.TENANT);
                break;
            default:
                return false;
        }
        return true;
    }
    private void onQueueToRuleEngineMsg(QueueToRuleEngineMsg msg) {
        if (TenantId.SYS_TENANT_ID.equals(msg.getTenantId())) {
            msg.getMsg().getCallback().onFailure(new RuleEngineException("Message has system tenant id!"));
        } else {
            getOrCreateTenantActor(msg.getTenantId()).ifPresentOrElse(actor -> {
                actor.tell(msg);
            }, () -> msg.getMsg().getCallback().onSuccess());
        }
    }

第二步: org.thingsboard.server.actors.TbActorMailbox#enqueue 方法调用了 tryProcessQueue(true); 再调用了 dispatcher.getExecutor().execute(this::processMailbox); 再调用了actor.process(msg);

private void enqueue(TbActorMsg msg, boolean highPriority) {
        if (!destroyInProgress.get()) {
            if (highPriority) {
                highPriorityMsgs.add(msg);
            } else {
                normalPriorityMsgs.add(msg);
            }
            tryProcessQueue(true);
        } else {
            if (highPriority && msg.getMsgType().equals(MsgType.RULE_NODE_UPDATED_MSG)) {
                synchronized (this) {
                    if (stopReason == TbActorStopReason.INIT_FAILED) {
                        destroyInProgress.set(false);
                        stopReason = null;
                        initActor();
                    } else {
                        msg.onTbActorStopped(stopReason);
                    }
                }
            } else {
                msg.onTbActorStopped(stopReason);
            }
        }
    }
  private void tryProcessQueue(boolean newMsg) {
        if (ready.get() == READY) {
            if (newMsg || !highPriorityMsgs.isEmpty() || !normalPriorityMsgs.isEmpty()) {
                if (busy.compareAndSet(FREE, BUSY)) {
                    dispatcher.getExecutor().execute(this::processMailbox);
                } else {
                    log.trace("[{}] MessageBox is busy, new msg: {}", selfId, newMsg);
                }
            } else {
                log.trace("[{}] MessageBox is empty, new msg: {}", selfId, newMsg);
            }
        } else {
            log.trace("[{}] MessageBox is not ready, new msg: {}", selfId, newMsg);
        }
    }
    private void processMailbox() {
        boolean noMoreElements = false;
        for (int i = 0; i < settings.getActorThroughput(); i++) {
            TbActorMsg msg = highPriorityMsgs.poll();
            if (msg == null) {
                msg = normalPriorityMsgs.poll();
            }
            if (msg != null) {
                try {
                    log.debug("[{}] Going to process message: {}", selfId, msg);
                    actor.process(msg);
                } catch (TbRuleNodeUpdateException updateException) {
                    stopReason = TbActorStopReason.INIT_FAILED;
                    destroy(updateException.getCause());
                } catch (Throwable t) {
                    log.debug("[{}] Failed to process message: {}", selfId, msg, t);
                    ProcessFailureStrategy strategy = actor.onProcessFailure(t);
                    if (strategy.isStop()) {
                        system.stop(selfId);
                    }
                }
            } else {
                noMoreElements = true;
                break;
            }
        }
        if (noMoreElements) {
            busy.set(FREE);
            dispatcher.getExecutor().execute(() -> tryProcessQueue(false));
        } else {
            dispatcher.getExecutor().execute(this::processMailbox);
        }
    }

第三步:调用了 org.thingsboard.server.actors.service.ContextAwareActor#doProcess 跟踪实现找到了调用了 TenantActordoProcess
在这里插入图片描述
第四步:查看org.thingsboard.server.actors.tenant.TenantActor#doProcess 方法,方法根据消息类型进行消息分发 case QUEUE_TO_RULE_ENGINE_MSG: onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg); break;

    @Override
    protected boolean doProcess(TbActorMsg msg) {
        if (cantFindTenant) {
            log.info("[{}] Processing missing Tenant msg: {}", tenantId, msg);
            if (msg.getMsgType().equals(MsgType.QUEUE_TO_RULE_ENGINE_MSG)) {
                QueueToRuleEngineMsg queueMsg = (QueueToRuleEngineMsg) msg;
                queueMsg.getMsg().getCallback().onSuccess();
            } else if (msg.getMsgType().equals(MsgType.TRANSPORT_TO_DEVICE_ACTOR_MSG)) {
                TransportToDeviceActorMsgWrapper transportMsg = (TransportToDeviceActorMsgWrapper) msg;
                transportMsg.getCallback().onSuccess();
            }
            return true;
        }
        switch (msg.getMsgType()) {
            case PARTITION_CHANGE_MSG:
                PartitionChangeMsg partitionChangeMsg = (PartitionChangeMsg) msg;
                ServiceType serviceType = partitionChangeMsg.getServiceType();
                if (ServiceType.TB_RULE_ENGINE.equals(serviceType)) {
                    //To Rule Chain Actors
                    broadcast(msg);
                } else if (ServiceType.TB_CORE.equals(serviceType)) {
                    List<TbActorId> deviceActorIds = ctx.filterChildren(new TbEntityTypeActorIdPredicate(EntityType.DEVICE) {
                        @Override
                        protected boolean testEntityId(EntityId entityId) {
                            return super.testEntityId(entityId) && !isMyPartition(entityId);
                        }
                    });
                    deviceActorIds.forEach(id -> ctx.stop(id));
                }
                break;
            case COMPONENT_LIFE_CYCLE_MSG:
                onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
                break;
            case QUEUE_TO_RULE_ENGINE_MSG:
                onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg);
                break;
            case TRANSPORT_TO_DEVICE_ACTOR_MSG:
                onToDeviceActorMsg((DeviceAwareMsg) msg, false);
                break;
            case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG:
            case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG:
            case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG:
            case DEVICE_EDGE_UPDATE_TO_DEVICE_ACTOR_MSG:
            case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:
            case DEVICE_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
            case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
            case REMOVE_RPC_TO_DEVICE_ACTOR_MSG:
                onToDeviceActorMsg((DeviceAwareMsg) msg, true);
                break;
            case SESSION_TIMEOUT_MSG:
                ctx.broadcastToChildrenByType(msg, EntityType.DEVICE);
                break;
            case RULE_CHAIN_INPUT_MSG:
            case RULE_CHAIN_OUTPUT_MSG:
            case RULE_CHAIN_TO_RULE_CHAIN_MSG:
                onRuleChainMsg((RuleChainAwareMsg) msg);
                break;
            case EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG:
            case EDGE_SYNC_REQUEST_TO_EDGE_SESSION_MSG:
            case EDGE_SYNC_RESPONSE_FROM_EDGE_SESSION_MSG:
                onToEdgeSessionMsg((EdgeSessionMsg) msg);
                break;
            default:
                return false;
        }
        return true;
    }

总结

消息到这里就投递给了RuleChainActor进行处理,就进入到规则链了进行节点的分发处理。

标签:engine,case,DEVICE,rule,break,MSG,ACTOR,msg,Thingsboard
From: https://blog.csdn.net/gzcsschen/article/details/139671859

相关文章

  • kubernetes-ingress-nginx-rule的配置-将外部流量路由到集群内部的规则配置
    KubernetesIngress资源配置文件主要定义了如何通过NGINXIngress控制器来处理进入集群的HTTP/HTTPS流量apiVersion:networking.k8s.io/v1#表示这是一个Ingress资源,并使用了Kubernetes网络APIv1版本kind:Ingress#kind:定义了资源的类型。在这里是Ingr......
  • 【异常】使用Dbeaver链接TDengine提示SQL错误[9684]:ERROR (2318): Connection reset
    一、异常内容使用Dbeaver链接TDengine提示SQL错误[9684]:ERROR(2318):Connectionreset,报错截图如下二、报错说明“ERROR(2318):Connectionreset”表示客户端与服务器之间的连接被意外地重置。这通常发生在一个应用程序试图读取或写入数据,但是连接的另一端已经关......
  • How to be a senior Solidity Engineer
    一、能够熟练完成常见功能开发Solidity+Hardhat+Ethers熟悉Solidity语法和数据结构,能使用hardhat完成智能合约的自动化测试、优化、部署、交互和SDK封装。Openzipplin+Upgradeable熟悉Openzipplin的所有库文件,能够完成可升级合约的部署和升级。SmartcontractA......
  • ThingsBoard前端Vue版本开源啦!!!!
    ThingsVue......
  • Tdengine的时序数据库简介、单机部署、操作语句及java应用
    Tdengine的时序数据库简介、单机部署、操作语句及java应用   本文介绍了Tdengine的功能特点、应用场景、超级表和子表等概念,讲述了Tdengine2.6.0.34的单机部署,并介绍了taos数据库的常见使用方法及特色窗口查询方法,最后介绍了在java中的应用。一、tdengine简要介绍及应......
  • pcb-ruler
    白嫖PCB尺子--简易教程成果展示5把尺子所需材料:一台电脑(手机也行)可用的网络手STEPs进捷配官网,注册一个账号。之后进入在线计价的pcb计价页面,填写好我打框的参数。其他都直接默认。开始立即计价,选择免费的立即下单,加入购物车。在购物车结算中选择......
  • 将来自 Telegraf 的 JSON 数据扁平化,以便在 ThingsBoard 中使用
    我连接了ThingsBoard和Telegraf以可视化CPU使用率,但收到的数据是嵌套JSON格式。我尝试了不同的方法,但无法以扁平化的JSON格式获取数据。使用Telegraf1.30.0版本,数据以以下格式返回:[{"fields":{"usage_guest":0、"usage_guest_nice":0、......
  • css44 CSS The !important Rule
    https://www.w3schools.com/css/css_important.asp Whatis!important?The!importantruleinCSSisusedtoaddmoreimportancetoaproperty/valuethannormal.Infact,ifyouusethe!importantrule,itwilloverrideALLpreviousstylingrulesforthat......
  • java.lang.UnsatisfiedLinkError: no taos in java.library.path, TDengine 访问数
     TDengine linux部署连接驱动问题: java.lang.UnsatisfiedLinkError:notaosinjava.library.path解决方案有有两种:方法一:使用原生的连接需要安装客户端,docker应用的话需要安装tdengine客户端到相应应用容器里面:windows端的需要安装tdengine客户端注意使用driver驱动......
  • TDengine docker安装方法
    dockerrun-d--privileged=true\--restart=always--name=tdengine\-v/opt/taos/data:/var/lib/taos\-v/opt/taos/log:/var/log/taos\-v/usr/local/taos/driver/libtaos.so.3.2.0.0:/usr/lib/libtaos.so\-v/usr/share/zoneinfo:/usr/share/zoneinfo\-eTZ=A......