cyber服务发现完全依赖于fastDDS,下面从底层一步一步看下服务发现的整个过程。
topology_manager.cc
首先从这个类看起,这个类是和dds接壤的,dds发现后,完全由这个类接管,然后整体开始通信。
TopologyManager::TopologyManager()
: init_(false),
node_manager_(nullptr),
channel_manager_(nullptr),
service_manager_(nullptr),
participant_(nullptr),
participant_listener_(nullptr) {
Init();
}
从上面的构造函数开始分析,除了赋值一些变量,还有就是调用了init函数, 其中node_manager_、channel_manager_、service_manager_是拓扑结构的一些类成员,participant_、participant_listener_很明显就是FAST_RTPS内部的参与者以及监听事件。紧接着进行了init初始化。
init
init函数就是整个topology_manager初始化的过程,下面逐步分析代码。
bool TopologyManager::Init() {
if (init_.exchange(true)) {
return true;
}
node_manager_ = std::make_shared<NodeManager>();
channel_manager_ = std::make_shared<ChannelManager>();
service_manager_ = std::make_shared<ServiceManager>();
CreateParticipant();
bool result =
InitNodeManager() && InitChannelManager() && InitServiceManager();
if (!result) {
AERROR << "init manager failed.";
participant_ = nullptr;
delete participant_listener_;
participant_listener_ = nullptr;
node_manager_ = nullptr;
channel_manager_ = nullptr;
service_manager_ = nullptr;
init_.store(false);
return false;
}
return true;
}
上面首先是创建了三个share_ptr,分别对应NodeManager、ChannelManager、ServiceManager单个类,然后调用CreateParticipant创建participant_。然后对于三个mangger进行了初始化操作,,我们逐步进入分析。
node_manager_ = std::make_shared<NodeManager>();
首先就是上面的代码创建nodemanger,具体要看nodemanger类里面干了啥,所以暂时跳转到nodemanger.cc.
NodeManager、ChannelManager、ServiceManager他们三个都继承了manger类,主要用于记录节点间的关系及节点网络的图结构,紧接着,调用了CreateParticipant函数创建participant,再调用了initNodeMangger去初始化,具体要看函数实现。
CreateParticipant
bool TopologyManager::CreateParticipant() {
std::string participant_name =
common::GlobalData::Instance()->HostName() + '+' +
std::to_string(common::GlobalData::Instance()->ProcessId());
participant_listener_ = new ParticipantListener(std::bind(
&TopologyManager::OnParticipantChange, this, std::placeholders::_1));
participant_ = std::make_shared<transport::Participant>(
participant_name, 11511, participant_listener_);
return true;
}
这个函数就是使用fastdds的接口创建一个用于全局服务发现的participant,对于cyber的rtps相关的操作,可以看下participant.cc,里面可以看到和rtps官方demo一样的代码,用于通信。participant.cc中官方又封装了一层,然后来初始化和创建participant。
participant
Participant::Participant(const std::string& name, int send_port,
eprosima::fastrtps::ParticipantListener* listener)
: shutdown_(false),
name_(name),
send_port_(send_port),
listener_(listener),
fastrtps_participant_(nullptr) {}
eprosima::fastrtps::Participant* Participant::fastrtps_participant() {
if (shutdown_.load()) {
return nullptr;
}
std::lock_guard<std::mutex> lk(mutex_);
if (fastrtps_participant_ != nullptr) {
return fastrtps_participant_;
}
CreateFastRtpsParticipant(name_, send_port_, listener_);
return fastrtps_participant_;
}
void Participant::CreateFastRtpsParticipant(
const std::string& name, int send_port,
eprosima::fastrtps::ParticipantListener* listener) {
uint32_t domain_id = 80;
const char* val = ::getenv("CYBER_DOMAIN_ID");
if (val != nullptr) {
try {
domain_id = std::stoi(val);
} catch (const std::exception& e) {
AERROR << "convert domain_id error " << e.what();
return;
}
}
auto part_attr_conf = std::make_shared<proto::RtpsParticipantAttr>();
auto& global_conf = common::GlobalData::Instance()->Config();
if (global_conf.has_transport_conf() &&
global_conf.transport_conf().has_participant_attr()) {
part_attr_conf->CopyFrom(global_conf.transport_conf().participant_attr());
}
eprosima::fastrtps::ParticipantAttributes attr;
attr.rtps.port.domainIDGain =
static_cast<uint16_t>(part_attr_conf->domain_id_gain());
attr.rtps.port.portBase = static_cast<uint16_t>(part_attr_conf->port_base());
attr.rtps.builtin.discovery_config.discoveryProtocol =
eprosima::fastrtps::rtps::DiscoveryProtocol_t::SIMPLE;
attr.rtps.builtin.discovery_config.m_simpleEDP
.use_PublicationWriterANDSubscriptionReader = true;
attr.rtps.builtin.discovery_config.m_simpleEDP
.use_PublicationReaderANDSubscriptionWriter = true;
attr.domainId = domain_id;
/**
* The user should set the lease_duration and the announcement_period with
* values that differ in at least 30%. Values too close to each other may
* cause the failure of the writer liveliness assertion in networks with high
* latency or with lots of communication errors.
*/
attr.rtps.builtin.discovery_config.leaseDuration.seconds =
part_attr_conf->lease_duration();
attr.rtps.builtin.discovery_config.leaseDuration_anno
标签:std,participant,attr,流程,manager,APOLLO7.0,rtps,conf,解析
From: https://blog.csdn.net/qq_38643642/article/details/139604429