设备消息上报到规则引擎过程
第一步: 消息入口
org.thingsboard.server.actors.app.AppActor#doProcess
中找到case QUEUE_TO_RULE_ENGINE_MSG: onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg); break;
@Override
protected boolean doProcess(TbActorMsg msg) {
if (!ruleChainsInitialized) {
if (MsgType.APP_INIT_MSG.equals(msg.getMsgType())) {
initTenantActors();
ruleChainsInitialized = true;
} else {
if (!msg.getMsgType().isIgnoreOnStart()) {
log.warn("Attempt to initialize Rule Chains by unexpected message: {}", msg);
}
return true;
}
}
switch (msg.getMsgType()) {
case APP_INIT_MSG:
break;
case PARTITION_CHANGE_MSG:
ctx.broadcastToChildren(msg);
break;
case COMPONENT_LIFE_CYCLE_MSG:
onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
break;
case QUEUE_TO_RULE_ENGINE_MSG:
onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg);
break;
case TRANSPORT_TO_DEVICE_ACTOR_MSG:
onToDeviceActorMsg((TenantAwareMsg) msg, false);
break;
case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG:
case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG:
case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG:
case DEVICE_EDGE_UPDATE_TO_DEVICE_ACTOR_MSG:
case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:
case DEVICE_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
case REMOVE_RPC_TO_DEVICE_ACTOR_MSG:
onToDeviceActorMsg((TenantAwareMsg) msg, true);
break;
case EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG:
case EDGE_SYNC_REQUEST_TO_EDGE_SESSION_MSG:
case EDGE_SYNC_RESPONSE_FROM_EDGE_SESSION_MSG:
onToEdgeSessionMsg((EdgeSessionMsg) msg);
break;
case SESSION_TIMEOUT_MSG:
ctx.broadcastToChildrenByType(msg, EntityType.TENANT);
break;
default:
return false;
}
return true;
}
private void onQueueToRuleEngineMsg(QueueToRuleEngineMsg msg) {
if (TenantId.SYS_TENANT_ID.equals(msg.getTenantId())) {
msg.getMsg().getCallback().onFailure(new RuleEngineException("Message has system tenant id!"));
} else {
getOrCreateTenantActor(msg.getTenantId()).ifPresentOrElse(actor -> {
actor.tell(msg);
}, () -> msg.getMsg().getCallback().onSuccess());
}
}
第二步:
org.thingsboard.server.actors.TbActorMailbox#enqueue
方法调用了tryProcessQueue(true);
再调用了dispatcher.getExecutor().execute(this::processMailbox);
再调用了actor.process(msg);
private void enqueue(TbActorMsg msg, boolean highPriority) {
if (!destroyInProgress.get()) {
if (highPriority) {
highPriorityMsgs.add(msg);
} else {
normalPriorityMsgs.add(msg);
}
tryProcessQueue(true);
} else {
if (highPriority && msg.getMsgType().equals(MsgType.RULE_NODE_UPDATED_MSG)) {
synchronized (this) {
if (stopReason == TbActorStopReason.INIT_FAILED) {
destroyInProgress.set(false);
stopReason = null;
initActor();
} else {
msg.onTbActorStopped(stopReason);
}
}
} else {
msg.onTbActorStopped(stopReason);
}
}
}
private void tryProcessQueue(boolean newMsg) {
if (ready.get() == READY) {
if (newMsg || !highPriorityMsgs.isEmpty() || !normalPriorityMsgs.isEmpty()) {
if (busy.compareAndSet(FREE, BUSY)) {
dispatcher.getExecutor().execute(this::processMailbox);
} else {
log.trace("[{}] MessageBox is busy, new msg: {}", selfId, newMsg);
}
} else {
log.trace("[{}] MessageBox is empty, new msg: {}", selfId, newMsg);
}
} else {
log.trace("[{}] MessageBox is not ready, new msg: {}", selfId, newMsg);
}
}
private void processMailbox() {
boolean noMoreElements = false;
for (int i = 0; i < settings.getActorThroughput(); i++) {
TbActorMsg msg = highPriorityMsgs.poll();
if (msg == null) {
msg = normalPriorityMsgs.poll();
}
if (msg != null) {
try {
log.debug("[{}] Going to process message: {}", selfId, msg);
actor.process(msg);
} catch (TbRuleNodeUpdateException updateException) {
stopReason = TbActorStopReason.INIT_FAILED;
destroy(updateException.getCause());
} catch (Throwable t) {
log.debug("[{}] Failed to process message: {}", selfId, msg, t);
ProcessFailureStrategy strategy = actor.onProcessFailure(t);
if (strategy.isStop()) {
system.stop(selfId);
}
}
} else {
noMoreElements = true;
break;
}
}
if (noMoreElements) {
busy.set(FREE);
dispatcher.getExecutor().execute(() -> tryProcessQueue(false));
} else {
dispatcher.getExecutor().execute(this::processMailbox);
}
}
第三步:调用了
org.thingsboard.server.actors.service.ContextAwareActor#doProcess
跟踪实现找到了调用了TenantActor
的doProcess
第四步:查看org.thingsboard.server.actors.tenant.TenantActor#doProcess
方法,方法根据消息类型进行消息分发case QUEUE_TO_RULE_ENGINE_MSG: onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg); break;
@Override
protected boolean doProcess(TbActorMsg msg) {
if (cantFindTenant) {
log.info("[{}] Processing missing Tenant msg: {}", tenantId, msg);
if (msg.getMsgType().equals(MsgType.QUEUE_TO_RULE_ENGINE_MSG)) {
QueueToRuleEngineMsg queueMsg = (QueueToRuleEngineMsg) msg;
queueMsg.getMsg().getCallback().onSuccess();
} else if (msg.getMsgType().equals(MsgType.TRANSPORT_TO_DEVICE_ACTOR_MSG)) {
TransportToDeviceActorMsgWrapper transportMsg = (TransportToDeviceActorMsgWrapper) msg;
transportMsg.getCallback().onSuccess();
}
return true;
}
switch (msg.getMsgType()) {
case PARTITION_CHANGE_MSG:
PartitionChangeMsg partitionChangeMsg = (PartitionChangeMsg) msg;
ServiceType serviceType = partitionChangeMsg.getServiceType();
if (ServiceType.TB_RULE_ENGINE.equals(serviceType)) {
//To Rule Chain Actors
broadcast(msg);
} else if (ServiceType.TB_CORE.equals(serviceType)) {
List<TbActorId> deviceActorIds = ctx.filterChildren(new TbEntityTypeActorIdPredicate(EntityType.DEVICE) {
@Override
protected boolean testEntityId(EntityId entityId) {
return super.testEntityId(entityId) && !isMyPartition(entityId);
}
});
deviceActorIds.forEach(id -> ctx.stop(id));
}
break;
case COMPONENT_LIFE_CYCLE_MSG:
onComponentLifecycleMsg((ComponentLifecycleMsg) msg);
break;
case QUEUE_TO_RULE_ENGINE_MSG:
onQueueToRuleEngineMsg((QueueToRuleEngineMsg) msg);
break;
case TRANSPORT_TO_DEVICE_ACTOR_MSG:
onToDeviceActorMsg((DeviceAwareMsg) msg, false);
break;
case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG:
case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG:
case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG:
case DEVICE_EDGE_UPDATE_TO_DEVICE_ACTOR_MSG:
case DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG:
case DEVICE_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
case SERVER_RPC_RESPONSE_TO_DEVICE_ACTOR_MSG:
case REMOVE_RPC_TO_DEVICE_ACTOR_MSG:
onToDeviceActorMsg((DeviceAwareMsg) msg, true);
break;
case SESSION_TIMEOUT_MSG:
ctx.broadcastToChildrenByType(msg, EntityType.DEVICE);
break;
case RULE_CHAIN_INPUT_MSG:
case RULE_CHAIN_OUTPUT_MSG:
case RULE_CHAIN_TO_RULE_CHAIN_MSG:
onRuleChainMsg((RuleChainAwareMsg) msg);
break;
case EDGE_EVENT_UPDATE_TO_EDGE_SESSION_MSG:
case EDGE_SYNC_REQUEST_TO_EDGE_SESSION_MSG:
case EDGE_SYNC_RESPONSE_FROM_EDGE_SESSION_MSG:
onToEdgeSessionMsg((EdgeSessionMsg) msg);
break;
default:
return false;
}
return true;
}
总结
标签:engine,case,DEVICE,rule,break,MSG,ACTOR,msg,Thingsboard From: https://blog.csdn.net/gzcsschen/article/details/139671859消息到这里就投递给了RuleChainActor进行处理,就进入到规则链了进行节点的分发处理。