阅读源码
首先快速地作了一个每个模块的简单功能介绍,后续会按照程序的顺序对每个模块深入源码学习
核心功能: 数据通信和任务调度
Cyber RT采用了基于Component模块和有向无环图(DAG)的动态加载配置
role_attributes
cyber/proto/role_attributes.proto中定义了两个数据结构:
SocketAddr:推测它应该是告知当前write应该往哪个地址中写
RoleAttributes:
host_name host_ip qos_profile socket_addr
process_id
node_name node_id
channel_name channel_id message_type proto_desc
service_name service_id
角色使用 RoleAttributes 进行标识
SocketAddr ip port
cyber/proto/topology_change.proto
enum ChangeType { CHANGE_NODE CHANGE_CHANNEL CHANGE_SERVICE CHANGE_PARTICIPANT};
enum OperateType { OPT_JOIN OPT_LEAVE};
enum RoleType {
ROLE_NODE
ROLE_WRITER ROLE_READER
ROLE_SERVER ROLE_CLIENT
ROLE_PARTICIPANT
};
message ChangeMsg {
timestamp
change_type ChangeType { CHANGE_NODE CHANGE_CHANNEL CHANGE_SERVICE CHANGE_PARTICIPANT};
role_type RoleType { ROLE_NODE ROLE_WRITER ROLE_READER ROLE_SERVER ROLE_CLIENT ROLE_PARTICIPANT }
operate_type OperateType { OPT_JOIN OPT_LEAVE};
RoleAttributes role_attr node_name node_id
channel_name channel_id message_type proto_desc
service_name service_id
};
NodeManager 和 ServiceManager中节点存储比较简单,使用Role-ID和RoleAttributes映射的map存储
通过 TopologyManager 的 AddChangeListener 来监听整个系统中拓扑的变化
拓扑消息的 Publisher和Subscriber
拓扑发现--channel加入、离开等信息的交换
cyber/service_discovery/specific_manager/channel_manager.cc
ChannelManager()--channel_nm= "channel_change_broadcast"
getMsgType()
GetWriters() 获取 Node的写对象 获取通过CyberRT框架中的 通道发送的数据
GetWritesOfNode 获取channel的写对象
GetWritesOfChannel 获取 发布channel_name的 Reader
GetReaders GetReadersOfNode GetReaderOfChanmel
GetUpstreamOfNode() GetDownstreamOfNode() GetFlowDirection
Reader/Writer基于ChannelManager管理channel信息
在 ReceiverManager::GetReceiver 调用了 transport::Transport::Instance()->CreateReceiver 进行Receiver的创建:
Participant
Participant 类封装fastrtps接口
Manager
cyber/transport/rtps/participant.h
cyber/transport/rtps/publisher.h cyber/transport/rtps/subscriber.h
Manager基于Participant
NodeManager ChannelManager ServiceManager 并有共同的基类Manager
class ChannelManager : public Manager { friend class TopologyManager;
class NodeManager : public Manager {
class ServiceManager : public Manager {
TopologyManager 初始化三个子管理器,。 它们分别为:
- NodeManager用于管理网络拓扑中的节点。
- ChannelManager用于管理channel,即网络拓扑中的边。
- ServiceManager用于管理Service和Client。
Participant 类封装fastrtps接口
成员 participant_ 是 Participant实例
Manager 基于 Participant 管理 topology消息的收发
cyber Fast RTPS的两种方式:
cyber/transport/rtps/participant.h
cyber/transport/rtps/publisher.h
cyber/transport/rtps/subscriber.h
两种方式
Publisher-Subscriber方式:对RTPS的简单抽象,用户只需要定义某个Topic的Publisher和Subscriber、传输方式就可以发布订阅数据。
Writer-Reader方式:更接近RTPS标准中的概念,能直接操作RTPS的读写端点的HistoryCache
核心功能
数据通信和任务调度
Node 是整个数据拓扑网络中的基本单元。
通过在节点中定义 Reader/Writer 或 Service/Client,
transport 多进程通信,为上层封装了底层数据传输的细节
通信功能基础分为 Transmitter、Receiver 和 service_discovery
Transmitter 和 Receiver 是 Writer 和 Reader 的通信实现,
都具有 SHM、RTPS、INTRA、HYBRID 的实现,对应不同的通信方式
过程: 数据分发器 DataDispather 和数据访问器 DataVistor DataNotifier 用于唤醒协程任务
ChannelBuffer 实现缓冲区管理,DataDispatcher进行消息分发,DataVisitor允许用户自定义通道和缓冲区大小
service_discovery 是二者的服务发现实现。
schedule 的功能是为 CRoutine(其实就是线程池中的task)提供调度策略
Resource Management Executor(执行器) Task
scheduler的调度单元-协程(coroutine)。
Scheduler 会将 node 与 CRoutine 建立联系,然后与 Processor 也建立联系
数据处理过程是通过协程来实现的
Component 是封装好的数据处理流程
Dag 文件是模块拓扑关系的配置文件
Launch 文件提供了一种启动模块的简单方法
message
cyber/node
cyber/data
Data目录下有如下类:
DataFusion,AllLatest,ChannelBuffer ,CacheBuffer ,DataDispatcher,DataVisitor,VisitorConfig,DataNotifier等 。
消息融合策略的定义AllLatest,多路消息下设定msg0为主消息,由msg0根据条件是否满足触发多路消息融合
Cyber的 Transport 为上层封装了底层数据传输的细节。
上层主要使用Transport,Transmitter,Receiver三个类,其中Transport是工厂类,负责创建Transmitter、Receiver以及Dispatcher(上层不直接使用)
cyber/message
整个数据通道channel 内的数据可以分为两部分:
CyberRT 框架定义的元数据 和 通道对应的消息格式(Message)所定义的消息数据
元数据
ChannelName、MessageType、FrameRatio、RawMessage Size 数据字
消息格式(Message)所定义的消息数据
cyber/transport/message/message_info.h
Message:消息是Cyber RT框架中数据交换的基本单位。消息定义了数据的结构和类型,用于在模块之间传递信息
// 创建消息对象 // 填充消息内容 // 发布消息
// 创建消息读取器 // 消息处理回调函数
调度
scheduler类是基类,其拥有两个子类,分别为
SchedulerClassic和SchedulerChoreography,分别对应两种策略,Classic(经典)策略与Choreophgray(编排)策略
Scheduler是个单例 实例化过程由scheduler_factory.cc提供
// 根据策略,实例化不同的调度器
调度配置 :
cyber/proto/scheduler_conf.proto
cyber/proto/classic_conf.proto
cyber/proto/choreography_conf.proto
ProcessorContext 只是基类,根据Scheduler的两种策略,分别对应着两种ProcessorContext,
分别是:
ClassicContext
ChoreographyContext
ProcessorContext
为每个group创建对应数量的Processor,并设置相关策略,这是由函数SchedulerClassic::CreateProcessor()来实现的。
Processor::Run这个线程不断的尝试从context_成员变量中获取协程
Scheduler::CreateTask这个函数是Scheduler基类的函数,
其内部会调用子类SchedulerClassic或SchedulerChoreography的DispatchTask()函数,
最后将子类的NotifyProcessor函数注册给DataVistor对象
C++ 语法
shared_from_this():
shared_from_this 获取一个 shared_ptr,指向当前对象,以确保在异步操作完成之前对象不会被销毁
weak_ptr的lock方法用于获取一个指向对象的shared_ptr
lambda表达式是C++11最重要也是最常用的特性之一
[capture](params) opt -> ret {body;};
auto x6 = [this, x, y] {return m_number + x + y; }; //
匿名函数内部,需要通过lambda表达式的捕获列表控制如何捕获外部变量,以及访问哪些变量。默认状态下lambda表达式无法修改通过复制方式捕获外部变
参考
http://epsilonjohn.club/2021/11/28/Cyber-RT%E7%B3%BB%E5%88%97%E4%B9%8B%E4%B8%AD%E6%9E%A2%E8%B0%83%E5%BA%A6Scheduler/
标签:name,proto,CyberRT,cyber,源码,ROLE,transport,简单,channel
From: https://www.cnblogs.com/ytwang/p/18677476