CSDN搬家失败,手动导出markdown后再导入博客园
Binary vs Component
首先说明下,Apollo 的核心概念是组件,通过组件可以实现资源的自动管理和调度。Cyber RT 中只能使用 C++ 语言实现 Component,Python 版的 API 只能用来写传统的二进制可执行文件,参考官方文档中这两种方式的区别:
- Binary vs Component
There are two options to use Cyber RT framework for applications:
- Binary based: the application is compiled separately into a binary, which communicates with other cyber modules by creating its own
Reader
andWriter
. - Component based: the application is compiled into a Shared Library. By inheriting the Component class and writing the corresponding dag description file, the Cyber RT framework will load and run the application dynamically.
The essential Component interface
- The component's
Init()
function is like the main function that does some initialization of the algorithm. - Component's
Proc()
function works like Reader's callback function that is called by the framework when a message arrives.
Advantages of using Component
- Component can be loaded into different processes through the launch file, and the deployment is flexible.
- Component can change the received channel name by modifying the dag file without recompiling.
- Component supports receiving multiple types of data.
- Component supports providing multiple fusion strategies.
因此 Python 通常用来进行算法模型的前期测试,待测试通过后还是需要将 Python 版代码改写为 C++ 并编译,防止使用 Python 脚本造成代码泄露 。同时,更重要的是通过 CyberRT 的组件功能实现资源的自动调度,防止各模块之间竞争造成主次不分等情况。
新建模块
在 apollo/modules 文件夹下新建一个 my_prediction 文件夹,我们后续的所有操作都是在这个文件夹下进行。(可选 --)然后在 my_prediction 下面新建一个 BUILD 文件,用于后续的编译,这一步是为了和 C++ 版本一致,实际上 Python 版可以直接运行 py 文件,不一定需要编译成可执行文件。
# BUILD文件
load("@rules_python//python:defs.bzl", "py_binary") # 把py_binary这个函数导入
package(default_visibility = ["//visibility:public"]) # 模块可见性,使其他模块都能访问这个模块
py_binary(
name = "my_prediction_py", # 生成的可执行文件名字
main = "trajectory_prediction.py", # 指定文件,否则会去srcs里面寻找与name同名的py
srcs = [
"trajectory_prediction.py",
],
deps = [
"//cyber/python/cyber_py3:cyber",
"//modules/perception/proto:perception_obstacle_py_pb2",
"//modules/localization/proto:localization_py_pb2",
"//modules/prediction/proto:prediction_obstacle_py_pb2"
],
)
导入包
首先直接讲 Python 主程序怎么写,以预测模块为例,预测模块需要读取感知、定位等模块的数据,因此需要导入这些模块的 proto message 或者说 proto 产生的 Python 类。
需要注意的是首先要把 bazel-bin, bazel-out 等路径加入环境变量,防止找不到文件。然后从 perception_obstacle_pb2 和 localization_pb2 导入要读取的类,从 prediction_obstacle_pb2 导入要写入的类。
import sys
sys.path.append('.')
sys.path.append('bazel-bin')
sys.path.append('bazel-out')
from cyber.python.cyber_py3 import cyber
from modules.perception.proto.perception_obstacle_pb2 import PerceptionObstacles
from modules.localization.proto.localization_pb2 import LocalizationEstimate
from modules.prediction.proto.prediction_obstacle_pb2 import PredictionObstacles
读取通道数据
主函数
if __name__ == "__main__":
cyber.init()
if not cyber.ok():
print('Well, something went wrong.')
sys.exit(1)
test_node = cyber.Node('listener')
writer = test_node.create_writer('/apollo/prediction/perception_obstacles', PredictionObstacles)
test_node.create_reader('/apollo/perception/obstacles', PerceptionObstacles, callback, args={'writer': writer})
# 示例,args作为参数传入callback中
test_node.create_reader('/apollo/localization/pose', LocalizationEstimate, ego_callback)
test_node.spin()
cyber.shutdown()
上面主函数中,先建立了一个 test_node 节点,然后给节点添加了一个 writer(用于写入预测通道)和 2 个 reader(用于读取感知和定位通道)。 reader 读取了该通道的数据后会将其传入 callback 函数中进行处理,同时可以用最后一个参数 args 向 callback 中传入 writer 用于算法处理后的发布,或者直接通过 global 关键字将 writer 作为全局变量传入 callback。
回调函数
def callback(data, args=None):
"""
Reader message callback.
"""
wt = args['writer']
for obs in data.perception_obstacle:
value = gen_data_line(obs, frame_id)
######################################
## your process ##
######################################
wt.write(prediction_obstacles)
经过这些步骤,即可实现一个 Python 版本的预测模块。
发布 / 运行
如前文所述,走到这一步直接在 docker 里面运行 Python 脚本即可。
如果想规范一点,像 C++ 一样编译成可执行文件。由于我们已经编写了 BUILD 文件,因此可以直接在 Apollo 根目录下运行
bash apollo.sh build_opt my_prediction # 编译
./bazel-bin/modules/my_predicion/my_prediction_py # 运行
当然这一步本质上是生成了个映射,本质上还是去找你的 py 文件。
PS: proto message 的读写
读取
首先看下 PerceptionObstacles 的格式
message PerceptionObstacles {
repeated PerceptionObstacle perception_obstacle = 1; // An array of obstacles
optional apollo.common.Header header = 2; // Header
optional apollo.common.ErrorCode error_code = 3 [default = OK];
optional LaneMarkers lane_marker = 4;
optional CIPVInfo cipv_info = 5; // Closest In Path Vehicle (CIPV)
optional double fusion_timestamp = 6;
}
perception_obstacle 是 repeated 类型,因此是列表,需要用 for 循环读取,其他变量直接读即可。
写入
message PredictionObstacles {
// timestamp is included in header
optional apollo.common.Header header = 1;
// make prediction for multiple obstacles
repeated PredictionObstacle prediction_obstacle = 2;
// perception error code
optional apollo.common.ErrorCode perception_error_code = 3;
// start timestamp
optional double start_timestamp = 4;
// end timestamp
optional double end_timestamp = 5;
// self driving car intent
optional Intent intent = 6;
// Scenario
optional Scenario scenario = 7;
}
看下要写入的 PredictionObstacles,其中字段 2 是一个 repeated 型的 PredictionObstacle,跟上面 PerceptionObstacles.proto 中的一样,就是要把感知到的障碍物再原样填进去,所有采用了 CopyFrom 的方法直接复制。这是对 repeated 型数据的一种写法。
prediction_obstacles = PredictionObstacles()
for obs in data.perception_obstacle:
prediction_obstacle = prediction_obstacles.prediction_obstacle.add()
# 写入感知的数据
prediction_obstacle.perception_obstacle.CopyFrom(obs)
node_id = obs.measurements[0].id
另一种是第三行这种,采用 add() 方法一条一条往里加。更多细节可以参考 protobuf 的 Python API 文档
标签:perception,Python,py,Component,prediction,bazel,obstacle,模块 From: https://www.cnblogs.com/algorithmSpace/p/18200244