源码解读
component
node
reader/writer service/client parameter
schedule
transport
apollo/cyber/cyber.cc
CreateNode(){return std::unique_ptr<Node>(new Node(node_name,name_space))}
apollo/cyber/init.cc
Init()
OnShutdown()
apollo/cyber/state.h
WaitForShutdown() IsShutdown() OK AsyncShutdown()
CyberRT启动node的方式
main()函数中写通信逻辑并编译为单独的可执行文件的方法
主要流程
apollo::cyber::Init(argv[0]);// 启动cyber模块 初始化cyber框架
apollo::cyber::CreateNode ;// 创建一个节点 每个Node负责创建Reader, Writer, Service, Client来帮该组件获取信息或者传达信息。
Writer:
auto talker_node = apollo::cyber::CreateNode("talker");
talker_node->CreateWriter<Image>( //创建 Writer 或者 reader
while (apollo::cyber::OK()) {
auto msg = std::make_shared<Car>();
talker->Write(msg);
}
Reader :
auto listener_node = apollo::cyber::CreateNode("listener");
auto listener = listener_node->CreateReader<Car>( "car_speed", message_callback);
apollo::cyber::WaitForShutdown();
2.createNode() apollo/cyber/cyber.cc
01.NodeChannelImpl 和 NodeServiceImpl
Node是Server/Client/Writer/Reader的容器:
Server/Client/Writer/Reader是有向图的顶点。
Channel 是Writer到Reader的边缘
Service 是Server到Client的边缘
02.apollo/cyber/node/node.h
CreateWriter writer是cyber中发送消息的基本工具。每个writer都可以通过channel发送一些特定类型的消息 MessageT channel_name
std::shared_ptr<Write<Message_T>>
CreateReader
std::shared_ptr<Reader<Message_T>>
CreateService CreateClient
03. apollo/cyber/node/write.h apollo/cyber/node/writer_base.h
class Write:public WriteBase{}{
using TransmitterPtr = std::shared_ptr<transport::Transmitter<MessageT>>;
service_discovery::ChannelManagePtr
service_discovery::Managet::ChannelConnection
virtual bool Write(consst std::shared_ptr<Message_T>& msg_ptr)}
virtual bool Write(consst proto::RoleAttributes& role_attr)}: WriteBase(role_attr(), transmitter_(nullptr),channel_manage_(nulptr))
apollo/cyber/node/reader.h apollo/cyber/node/reader_base.h
class Reader:public ReaserBase{}
// blocker主要用于缓存消息,Blocker继承自BlockerBase Block 类是共享内存(Shared Memory, SHM)中的一个块,用于存储消息数据
using BlockerPtr = std::unique_ptr<blocker::Blocker<MessageT>>
using ReceiverPtr = std::shared_ptr<transport::Receiver<MessageT>>;
04: 其中Transport是工厂类,负责创建 Transmitter Receiver 以及Dispatcher(上层不直接使用)
通讯的差异通过Receiver Transmitter Dispatcher 三个类的多态形式来实现
Transmitter 和 Receiver 是 Writer 和 Reader 的通信实现,service_discovery 是二者的服务发现实现
Dispathcer也具有四种具体实现,具有单例属性,作为 Receiver 底层 Listener 运行的实体(调用回调和触发Listen)
Transmitter: 发送逻辑、
Receiver : 被动接收触发逻辑、数据缓存、数据同步
transport
apollo/cyber/transport/transmitter/transmitter.h
transport::Transmitter
intra_transmitter.h shm_transmitter.h rtps_transmitter.h hybrid_transmitter.h
apollo/cyber/transport/receiver/receiver.h
transport::Receiver
intra_receiver.h shm_receiver.h rtps_receiver.h hybrid_receiver.h
apollo/cyber/transport/dispatcher/dispatcher.h
每个Writer有Transmitter,每个Reader有Receiver。它们是负责消息发送与收取的类。Transmitter与Receiver的基类为Endpoint,
RTPS:RTPS部分基于eProsimar的Fast RTPS
RtpsReceiver 中的 dispatcher_成员指向单例 RtpsDispatcher
SHM:Segment类表示一块对应一个channel的共享内存,由SegmentFactory::CreateSegment函数创建
ShmDispatcher::OnMessage()函数进行消息派发
INTRA :用于进程内通信。由于读者和写者是在同一进程内,因此可以直接调用。在IntraTransmitter::Transmit()
intraDispatcher::OnMessage()
● 进程内传输使用的是函数直接调用(回调)的方式。调用链条是:
上层Writer---> IntraTransmitter--->IntraDispatcher--->(回调)IntraReceiver---> (回调)上层Reader。
● 进程间(本机)传输是通过共享内存辅助实现。链用链条是:
1、上层Writer---> Segment(共享内存)和Notifier(发送通知)
2、ShmDispatcher(有独立线程)---> (主动读取)Segment---> (回调)上层Reader。
● 进程间(路网络)传输是通过RTPS(DDS)实现。链用链条是:
1、上层Writer---> RtpsTransmitter打包成protobuf---> fastrtps发送到网络。
2、fastrtps接收到网络报文---> (回调)RtpsDispatcher---> (回调)RtpsReceiver---> (回调)上层Reader。
3.数据通信基础Protobuf
block时用来存储channel中的数据的,它也是其他类用来读写数据的对象
Receiver apollo-master/cyber/transport/receiver/receiver.h
4.编译后执行
talker 会一直给 listener 发送实时速度信息
加载启动Component的方式
mainboard
Cyber RT中,所有的 Comopnent 都会被编译成独立的.so文件,
Cyber RT 会根据开发者提供的配置文件,按需加载对应的 Component。
所以,开发者需要为.so文件编写好配置文.dag文件和.launch文件,以供 Cyber RT正确的加载执行Component。
Cyber RT提供两种
加载启动Component的方式,
使用 mainboard 启动component对应的dag文件。mainboard执行启动dag文件。
使用 cyber_launch工具启动component对应的launch文件,cyber_launch工具可以启动dag文件和二进制文件
mainboard
cyber/mainboard/mainboard.cc
cyber main函数中先解析dag参数,然后根据解析的参数,通过类加载器动态的加载对应的模块,然后调用Initialize方法初始化模块
apollo::cyber::mainboard::ModuleArgument
apollo::cyber::mainboard::ModuleController
1.解析参数是在"ModuleArgument"类中实现的,主要是解析加载DAG文件时候带的参数。arseArgument(argc,argv);// 2. 初始化cyber
2.apollo::cyber::Init(argv[0]); 启动cyber模块
3.ModuleController "实现cyber模块的加载,在"ModuleController::Init()"中调用"LoadAll()"来加载所有模块
cyber主函数在"ModuleController::Init()"进行模块的加载
模块首先通过classloader加载到内存,然后创建对象,并且调用模块的初始化方法。
component中每个模块都设计为可以动态加载和卸载,可以实时在线的开启和关闭模块,
实现的方式是通过classloader来进行动态的加载动态库
ModuleController::LoadModule
// 1. 加载动态库
class_loader_manager_.LoadLibrary(load_path);
// 3. 创建对象
std::shared_ptr<ComponentBase> base =class_loader_manager_.CreateClassObj<ComponentBase>(class_name);
// 4. 调用对象的Initialize方法
base->Initialize(component.config())
其中 component Apollo一共有2种类型的组件:消息触发型和定时触发型
bool Component<M0, M1, NullType, NullType>::Initialize
// 1. 创建Node
// 2. 调用用户自定义初始化 Init() ####调用用户自定义的初始化函数Init()(子类的Init方法)
// 3. 创建reader1 CreateReader
// 5. 创建回调,回调执行Proc() ptr-> Process(msg0, msg1); ### 实际上是执行用户定义算法Proc()函数
// scheduler::Instance() 创建数据访问 器DataVisitor ; RoutineFactory 创建协程,协程绑定回调func(执行proc)
// scheduler CreateTask
5. 消息写端 和消息读端
Reader初始化时创建的另一个关键对象为 Receiver
DataVisitor类,协程和Receiver类等。其中DataVisitor主要用于消息数据的访问
DataFusion由 DataDispather、CacheBuffer、DataVisitor三部分组成
DataVisitor本质功能就是数据存储器 DataDispatcher主要是将buffer放进对应channel的buffers中
cyber/data/data_vistor.h cyber/data/data_dispatcher.h cyber/data/cache_buffer.h
Component 的DataVisitor和DataDispatcher
一个数据处理过程对应一个DataVisitor,
通过在DataVisitor中注册Notify(唤醒对应的协程,协程执行绑定的回调函数),
并且注册对应的Buffer到DataDispather
DataFusion类是一个虚类,定义了数据融合的接口"Fusion()",Apollo里只提供了一种数据融合的方式
DataVisitor只访问一个消息,则不会对消息进行融合,
如果DataVisitor访问2个以上的数据,那么需要进行融合,并且注册融合回调。
之后CacheBuffer中会调用融合回调进行数据处理,而不会把数据放入CacheBuffer中
在component中自动帮我们创建了一个DataVisitor,订阅component中的消息,融合获取最新的消息之后,执行Proc回调。
数据的访问是通过"DataVisitor"来实现,数据的分发通过"DataDispatcher"来实现
DataVisitor构造完事后,
调用了croutine::CreateRoutineFactory创建了一个croutine::RoutineFactory,主要是创建协程工厂,并构建出要封装为协程的函数
主要两种方式:一种是通过Component的Proc()接口,它被调用时参数就是最新的消息。另一种是通过Reader的Observe()函数直接拿
cyber_launch
1.示例: cyber_launch start /apollo/modules/drivers/camera/launch/camera.launch
cyber_launch就是 cyber\tools\cyber_launch\cyber_launch py脚本文件
如果是可执行文件,就用py的多进程直接调起来, ProcessWrapper
如果是lib文件,要通过g_binary_name="mainboard"这个程序来动态加载lib库文件
01.cyber_launch 文件结构
module用于区分模块 name
dag_conf
process_name
dag文件有一个或者多个module_config,而每个module_config中对应一个或者多个components
02.cyber_launch.py分析
说明
RoutineFactory 协程通过工厂模式方法创建,里面包含一个回调函数和一个dv(数据访问器)DataVisitor。
Cyber RT工厂模式的对应 ComponentBse AbstractClassFactoryBase class_factory
Apollo 启动不同模块
2种启动方式。都是通过mainboard来启动程序
1. cyber_launch
2. dreamview
Apollo Dreamview 为启动一切模块的中心
前端通过websocket发送消息到后端,后端通过调用命令行启动模
start_command"和"stop_command"来启动和关闭
参考
深入探索:CyberRT架构的创新学习之旅 https://zhuanlan.zhihu.com/p/661888327
apollo介绍之Cyber框架(十) https://zhuanlan.zhihu.com/p/91322837
苯苯的嗷呜-CyberRt 源码解读(十二) https://zhuanlan.zhihu.com/p/653159026
Apollo的启动过程1——启动命令解析 https://gutsgwh1997.github.io/2020/02/04/Apollo%E7%9A%84%E5%90%AF%E5%8A%A8/
Apollo的启动过程2——功能模块加载
https://gutsgwh1997.github.io/2020/02/05/Apollo%E7%9A%84%E5%90%AF%E5%8A%A8%E8%BF%87%E7%A8%8B2%E2%80%94%E2%80%94%E5%8A%9F%E8%83%BD%E6%A8%A1%E5%9D%97%E5%8A%A0%E8%BD%BD/
Apollo的启动过程3——Cyber RT如何加载组件 https://gutsgwh1997.github.io/2020/02/05/
Cyber通信机制实践之Listener-Talker通信(C++)
【apollo】cyber底层通信--订阅方如何获取数据 https://blog.csdn.net/SWX230162/article/details/125343967
自动驾驶开发入门(三)---浅谈Apollo Cyber RT中的Transport
标签:launch,CyberRT,DataVisitor,cyber,解读,源码,Reader,apollo,加载
From: https://www.cnblogs.com/ytwang/p/18202700