在上一章中,我们观察了PollManager节点背后的一些行为逻辑,但还有一些地方与本章有一些关联而没有讲到,这次我们就补上这些拼图。(本文章源自作者对于源码的观察理解以及其他资料的学习结合后的产物,仅用于自我复习,如有错误敬请见谅)
按照惯例我们先看一下ConnectionManager启动的一个整体代码:
void ConnectionManager::start()
{
poll_manager_ = PollManager::instance();
poll_conn_ = poll_manager_->addPollThreadListener(boost::bind(&ConnectionManager::removeDroppedConnections,
this));
// Bring up the TCP listener socket
tcpserver_transport_ = boost::make_shared<TransportTCP>(&poll_manager_->getPollSet());
if (!tcpserver_transport_->listen(network::getTCPROSPort(),
MAX_TCPROS_CONN_QUEUE,
boost::bind(&ConnectionManager::tcprosAcceptConnection, this, boost::placeholders::_1)))
{
ROS_FATAL("Listen on port [%d] failed", network::getTCPROSPort());
ROS_BREAK();
}
// Bring up the UDP listener socket
udpserver_transport_ = boost::make_shared<TransportUDP>(&poll_manager_->getPollSet());
if (!udpserver_transport_->createIncoming(0, true))
{
ROS_FATAL("Listen failed");
ROS_BREAK();
}
}
他先取了一个poll_manager的一个实例。之后绑定定期清除无效连接的函数到poll_manager_这个信号槽。他会在调用signal()的时候被启用。
对于这一部分代码:
tcpserver_transport_ = boost::make_shared<TransportTCP>(&poll_manager_->getPollSet());
if (!tcpserver_transport_->listen(network::getTCPROSPort(),
MAX_TCPROS_CONN_QUEUE,
boost::bind(&ConnectionManager::tcprosAcceptConnection, this, boost::placeholders::_1)))
{
ROS_FATAL("Listen on port [%d] failed", network::getTCPROSPort());
ROS_BREAK();
}
他先创建了一个TransportTCP的指针,这里传入了一个PollSet去进行初始化管理socket套接字。后面的这一部分是启动TCP连接,实际上是启动了listen函数,tcprosAcceptConnection是连接后产生的回调函数,这里会把它设置为tcpserver_transport_的accept_cb_成员变量,在产生连接后被调用,然后构造一个connection结构体,并添加到连接池connections_中。值得注意到是这里里只能指针里是static的poll_manager的引用,返回了一个pollset,这个非常重要,如果不这么写那么返回的Pollset之间将毫无关系
我们来简单看一下这个监听函数的部分代码:
bool TransportTCP::listen(int port, int backlog, const AcceptCallback& accept_cb)
{
is_server_ = true;
accept_cb_ = accept_cb;
if (s_use_ipv6_)
{
sock_ = socket(AF_INET6, SOCK_STREAM, 0);
sockaddr_in6 *address = (sockaddr_in6 *)&server_address_;
address->sin6_family = AF_INET6;
address->sin6_addr = isOnlyLocalhostAllowed() ?
in6addr_loopback :
in6addr_any;
address->sin6_port = htons(port);
sa_len_ = sizeof(sockaddr_in6);
}
else
{
sock_ = socket(AF_INET, SOCK_STREAM, 0);
sockaddr_in *address = (sockaddr_in *)&server_address_;
address->sin_family = AF_INET;
address->sin_addr.s_addr = isOnlyLocalhostAllowed() ?
htonl(INADDR_LOOPBACK) :
INADDR_ANY;
address->sin_port = htons(port);
sa_len_ = sizeof(sockaddr_in);
}
在后面listen函数会检查是否初始化了socket:
if (!initializeSocket())
{
return false;
}
在这个init函数内部中如果前半段检查初始化成功,他会调用一个函数:
poll_set_->addSocket(sock_, boost::bind(&TransportTCP::socketUpdate, this, boost::placeholders::_1), shared_from_this());
这里的sock_就是之前我们创建的sock_,这里的update是不是很熟悉,没错就是PollManager里面update的func,他把他们添加到poll_set里面。socketUpdate方法添加到epoll的回调中去。socketUpdate方法中包含了socket的读写回调,因此对socket的任何操作都会在epoll唤醒时触发。
我们看一下addSocket函数:
bool PollSet::addSocket(int fd, const SocketUpdateFunc& update_func, const TransportPtr& transport)
{
SocketInfo info;
info.fd_ = fd;
info.events_ = 0;
info.transport_ = transport;
info.func_ = update_func;
{
boost::mutex::scoped_lock lock(socket_info_mutex_);
bool b = socket_info_.insert(std::make_pair(fd, info)).second;
if (!b)
{
ROSCPP_LOG_DEBUG("PollSet: Tried to add duplicate fd [%d]", fd);
return false;
}
add_socket_to_watcher(epfd_, fd);
sockets_changed_ = true;
}
signal();
return true;
}
之后我们看一下update函数,梳理会讲:
void TransportTCP::socketUpdate(int events)
{
{
boost::recursive_mutex::scoped_lock lock(close_mutex_);
if (closed_)
{
return;
}
// Handle read events before err/hup/nval, since there may be data left on the wire
if ((events & POLLIN) && expecting_read_)
{
if (is_server_)
{
// Should not block here, because poll() said that it's ready
// for reading
TransportTCPPtr transport = accept();
if (transport)
{
ROS_ASSERT(accept_cb_);
accept_cb_(transport);
}
}
else
{
if (read_cb_)
{
read_cb_(shared_from_this());
}
}
}
这里他就是把他加入到socket表里去了。之后就和pollmanager衔接上了。
我们回到start函数,后面其实就是设置了TransportTCP的一个回调函数以及开了一个服务端套接字。后面则是启动一个UDP服务。
udpserver_transport_ = boost::make_shared<TransportUDP>(&poll_manager_->getPollSet());
if (!udpserver_transport_->createIncoming(0, true))
{
ROS_FATAL("Listen failed");
ROS_BREAK();
}
}
我们着重看一下这个回调函数:
void ConnectionManager::tcprosAcceptConnection(const TransportTCPPtr& transport)
{
std::string client_uri = transport->getClientURI();
ROSCPP_LOG_DEBUG("TCPROS received a connection from [%s]", client_uri.c_str());
ConnectionPtr conn(boost::make_shared<Connection>());
addConnection(conn);
conn->initialize(transport, true, boost::bind(&ConnectionManager::onConnectionHeaderReceived, this, boost::placeholders::_1, boost::placeholders::_2));
}
首先他获得了客户端的uri也就是ip和端口。之后创建了一个ConnectionPtr的对象将他放到连接池中。这个Connection也就是负责数据的头部发送,发送,连接中断检查的负责模块,也是网络编程中的一个概念。简单来说就是每有一个需求过来,就从连接池里面拿一个连接出来,给他创建连接.之后他对这个连接进行了初始化。
初始化函数如下:
void Connection::initialize(const TransportPtr& transport, bool is_server, const HeaderReceivedFunc& header_func)
{
ROS_ASSERT(transport);
transport_ = transport;
header_func_ = header_func;
is_server_ = is_server;
transport_->setReadCallback(boost::bind(&Connection::onReadable, this, boost::placeholders::_1));
transport_->setWriteCallback(boost::bind(&Connection::onWriteable, this, boost::placeholders::_1));
transport_->setDisconnectCallback(boost::bind(&Connection::onDisconnect, this, boost::placeholders::_1));
if (header_func)
{
read(4, boost::bind(&Connection::onHeaderLengthRead, this, boost::placeholders::_1, boost::placeholders::_2, boost::placeholders::_3, boost::placeholders::_4));
}
}
可能大家对transport和connection有一点迷糊了。那么这里就说一下二者的关系:
-
依赖关系:
Connection
类依赖于Transport
类进行实际的数据传输。Connection
对象通常包含一个Transport
对象,用于处理底层的网络通信。
-
抽象层次:
Transport
处理的是底层的传输细节,例如如何通过TCP或UDP发送和接收数据。Connection
处理的是应用层的连接管理,例如连接的建立和断开、数据的编码和解码、头部信息的处理等。
-
初始化和使用:
- 在ROS中,通常会先创建一个
Transport
对象,然后使用这个Transport
对象来初始化一个Connection
对象。 Connection
对象使用Transport
对象来发送和接收数据,同时提供更高层次的接口来简化这些操作。
- 在ROS中,通常会先创建一个
之后他设置了3种回调。之后检查ConnectionManager::onConnectionHeaderReceived函数是否为非空回调,函数如下:
bool ConnectionManager::onConnectionHeaderReceived(const ConnectionPtr& conn, const Header& header)
{
bool ret = false;
std::string val;
if (header.getValue("topic", val))
{
ROSCPP_CONN_LOG_DEBUG("Connection: Creating TransportSubscriberLink for topic [%s] connected to [%s]",
val.c_str(), conn->getRemoteString().c_str());
TransportSubscriberLinkPtr sub_link(boost::make_shared<TransportSubscriberLink>());
sub_link->initialize(conn);
ret = sub_link->handleHeader(header);
}
else if (header.getValue("service", val))
{
ROSCPP_LOG_DEBUG("Connection: Creating ServiceClientLink for service [%s] connected to [%s]",
val.c_str(), conn->getRemoteString().c_str());
ServiceClientLinkPtr link(boost::make_shared<ServiceClientLink>());
link->initialize(conn);
ret = link->handleHeader(header);
}
else
{
ROSCPP_LOG_DEBUG("Got a connection for a type other than 'topic' or 'service' from [%s]. Fail.",
conn->getRemoteString().c_str());
return false;
}
return ret;
}
梳理:看了这么多大家可能有点模糊。在这里我们梳理一下过程。首先他先取一个PollManager的静态实例,然后绑定了一个定期清除无效连接的函数到poll_manager_这个信号槽。他会在调用signal()的时候被启用。之后创建了一个transportTCP调用了listen函数,在listen函数中他会将创建一个socket套接字,并其与Update函数添加到poll_set_的epoll中,每次触发就进行回调。update如果触发则触发socket的读写回调,在update中如果是服务端且为读事件则建立新链接,否则正常读写。值得注意的是这里的建立新链接的回调则是最开始的tcprosAcceptConnection函数。之后又开了一个UDP。在之后就是pollmanager里面的内容了,pollmanager中他启动poll_manager的回调函数,运行槽函数,并设置了一个100ms的时延,去轮询的执行Update.
标签:ROS,socket,ConnectionManager,Connection,源码,poll,boost,transport,函数 From: https://blog.csdn.net/m0_64060063/article/details/140459782