首页 > 编程语言 >ROS源码学习分享_6_ConnectionManager

ROS源码学习分享_6_ConnectionManager

时间:2024-07-16 16:25:16浏览次数:23  
标签:ROS socket ConnectionManager Connection 源码 poll boost transport 函数

        在上一章中,我们观察了PollManager节点背后的一些行为逻辑,但还有一些地方与本章有一些关联而没有讲到,这次我们就补上这些拼图。(本文章源自作者对于源码的观察理解以及其他资料的学习结合后的产物,仅用于自我复习,如有错误敬请见谅)

        按照惯例我们先看一下ConnectionManager启动的一个整体代码:

void ConnectionManager::start()
{
  poll_manager_ = PollManager::instance();
  poll_conn_ = poll_manager_->addPollThreadListener(boost::bind(&ConnectionManager::removeDroppedConnections, 
								this));

  // Bring up the TCP listener socket
  tcpserver_transport_ = boost::make_shared<TransportTCP>(&poll_manager_->getPollSet());
  if (!tcpserver_transport_->listen(network::getTCPROSPort(), 
				    MAX_TCPROS_CONN_QUEUE, 
				    boost::bind(&ConnectionManager::tcprosAcceptConnection, this, boost::placeholders::_1)))
  {
    ROS_FATAL("Listen on port [%d] failed", network::getTCPROSPort());
    ROS_BREAK();
  }

  // Bring up the UDP listener socket
  udpserver_transport_ = boost::make_shared<TransportUDP>(&poll_manager_->getPollSet());
  if (!udpserver_transport_->createIncoming(0, true))
  {
    ROS_FATAL("Listen failed");
    ROS_BREAK();
  }
}

        他先取了一个poll_manager的一个实例。之后绑定定期清除无效连接的函数到poll_manager_这个信号槽。他会在调用signal()的时候被启用。

        对于这一部分代码:

  tcpserver_transport_ = boost::make_shared<TransportTCP>(&poll_manager_->getPollSet());
  if (!tcpserver_transport_->listen(network::getTCPROSPort(), 
				    MAX_TCPROS_CONN_QUEUE, 
				    boost::bind(&ConnectionManager::tcprosAcceptConnection, this, boost::placeholders::_1)))
  {
    ROS_FATAL("Listen on port [%d] failed", network::getTCPROSPort());
    ROS_BREAK();
  }

        他先创建了一个TransportTCP的指针,这里传入了一个PollSet去进行初始化管理socket套接字。后面的这一部分是启动TCP连接,实际上是启动了listen函数,tcprosAcceptConnection是连接后产生的回调函数,这里会把它设置为tcpserver_transport_的accept_cb_成员变量,在产生连接后被调用,然后构造一个connection结构体,并添加到连接池connections_中。值得注意到是这里里只能指针里是static的poll_manager的引用,返回了一个pollset,这个非常重要,如果不这么写那么返回的Pollset之间将毫无关系

        我们来简单看一下这个监听函数的部分代码:

bool TransportTCP::listen(int port, int backlog, const AcceptCallback& accept_cb)
{
  is_server_ = true;
  accept_cb_ = accept_cb;

  if (s_use_ipv6_)
  {
    sock_ = socket(AF_INET6, SOCK_STREAM, 0);
    sockaddr_in6 *address = (sockaddr_in6 *)&server_address_;
    address->sin6_family = AF_INET6;
    address->sin6_addr = isOnlyLocalhostAllowed() ? 
                         in6addr_loopback : 
                         in6addr_any;
    address->sin6_port = htons(port);
    sa_len_ = sizeof(sockaddr_in6);
  }
  else
  {
    sock_ = socket(AF_INET, SOCK_STREAM, 0);
    sockaddr_in *address = (sockaddr_in *)&server_address_;
    address->sin_family = AF_INET;
    address->sin_addr.s_addr = isOnlyLocalhostAllowed() ? 
                               htonl(INADDR_LOOPBACK) : 
                               INADDR_ANY;
    address->sin_port = htons(port);
    sa_len_ = sizeof(sockaddr_in);
  }

        在后面listen函数会检查是否初始化了socket:

  if (!initializeSocket())
  {
    return false;
  }

        在这个init函数内部中如果前半段检查初始化成功,他会调用一个函数:

    poll_set_->addSocket(sock_, boost::bind(&TransportTCP::socketUpdate, this, boost::placeholders::_1), shared_from_this());

        这里的sock_就是之前我们创建的sock_,这里的update是不是很熟悉,没错就是PollManager里面update的func,他把他们添加到poll_set里面。socketUpdate方法添加到epoll的回调中去。socketUpdate方法中包含了socket的读写回调,因此对socket的任何操作都会在epoll唤醒时触发。

        我们看一下addSocket函数:

bool PollSet::addSocket(int fd, const SocketUpdateFunc& update_func, const TransportPtr& transport)
{
  SocketInfo info;
  info.fd_ = fd;
  info.events_ = 0;
  info.transport_ = transport;
  info.func_ = update_func;

  {
    boost::mutex::scoped_lock lock(socket_info_mutex_);

    bool b = socket_info_.insert(std::make_pair(fd, info)).second;
    if (!b)
    {
      ROSCPP_LOG_DEBUG("PollSet: Tried to add duplicate fd [%d]", fd);
      return false;
    }

    add_socket_to_watcher(epfd_, fd);

    sockets_changed_ = true;
  }

  signal();

  return true;
}

之后我们看一下update函数,梳理会讲:

        

void TransportTCP::socketUpdate(int events)
{
  {
    boost::recursive_mutex::scoped_lock lock(close_mutex_);
    if (closed_)
    {
      return;
    }

    // Handle read events before err/hup/nval, since there may be data left on the wire
    if ((events & POLLIN) && expecting_read_)
    {
      if (is_server_)
      {
        // Should not block here, because poll() said that it's ready
        // for reading
        TransportTCPPtr transport = accept();
        if (transport)
        {
          ROS_ASSERT(accept_cb_);
          accept_cb_(transport);
        }
      }
      else
      {
        if (read_cb_)
        {
          read_cb_(shared_from_this());
        }
      }
    }

 这里他就是把他加入到socket表里去了。之后就和pollmanager衔接上了。

        我们回到start函数,后面其实就是设置了TransportTCP的一个回调函数以及开了一个服务端套接字。后面则是启动一个UDP服务。

  udpserver_transport_ = boost::make_shared<TransportUDP>(&poll_manager_->getPollSet());
  if (!udpserver_transport_->createIncoming(0, true))
  {
    ROS_FATAL("Listen failed");
    ROS_BREAK();
  }
}

        我们着重看一下这个回调函数:

void ConnectionManager::tcprosAcceptConnection(const TransportTCPPtr& transport)
{
  std::string client_uri = transport->getClientURI();
  ROSCPP_LOG_DEBUG("TCPROS received a connection from [%s]", client_uri.c_str());

  ConnectionPtr conn(boost::make_shared<Connection>());
  addConnection(conn);

  conn->initialize(transport, true, boost::bind(&ConnectionManager::onConnectionHeaderReceived, this, boost::placeholders::_1, boost::placeholders::_2));
}

        首先他获得了客户端的uri也就是ip和端口。之后创建了一个ConnectionPtr的对象将他放到连接池中。这个Connection也就是负责数据的头部发送,发送,连接中断检查的负责模块,也是网络编程中的一个概念。简单来说就是每有一个需求过来,就从连接池里面拿一个连接出来,给他创建连接.之后他对这个连接进行了初始化。

        初始化函数如下:

void Connection::initialize(const TransportPtr& transport, bool is_server, const HeaderReceivedFunc& header_func)
{
  ROS_ASSERT(transport);

  transport_ = transport;
  header_func_ = header_func;
  is_server_ = is_server;

  transport_->setReadCallback(boost::bind(&Connection::onReadable, this, boost::placeholders::_1));
  transport_->setWriteCallback(boost::bind(&Connection::onWriteable, this, boost::placeholders::_1));
  transport_->setDisconnectCallback(boost::bind(&Connection::onDisconnect, this, boost::placeholders::_1));

  if (header_func)
  {
    read(4, boost::bind(&Connection::onHeaderLengthRead, this, boost::placeholders::_1, boost::placeholders::_2, boost::placeholders::_3, boost::placeholders::_4));
  }
}

可能大家对transport和connection有一点迷糊了。那么这里就说一下二者的关系:

  • 依赖关系

    • Connection类依赖于Transport类进行实际的数据传输。Connection对象通常包含一个Transport对象,用于处理底层的网络通信。
  • 抽象层次

    • Transport处理的是底层的传输细节,例如如何通过TCP或UDP发送和接收数据。
    • Connection处理的是应用层的连接管理,例如连接的建立和断开、数据的编码和解码、头部信息的处理等。
  • 初始化和使用

    • 在ROS中,通常会先创建一个Transport对象,然后使用这个Transport对象来初始化一个Connection对象。
    • Connection对象使用Transport对象来发送和接收数据,同时提供更高层次的接口来简化这些操作。

        之后他设置了3种回调。之后检查ConnectionManager::onConnectionHeaderReceived函数是否为非空回调,函数如下:

bool ConnectionManager::onConnectionHeaderReceived(const ConnectionPtr& conn, const Header& header)
{
  bool ret = false;
  std::string val;
  if (header.getValue("topic", val))
  {
    ROSCPP_CONN_LOG_DEBUG("Connection: Creating TransportSubscriberLink for topic [%s] connected to [%s]", 
		     val.c_str(), conn->getRemoteString().c_str());

    TransportSubscriberLinkPtr sub_link(boost::make_shared<TransportSubscriberLink>());
    sub_link->initialize(conn);
    ret = sub_link->handleHeader(header);
  }
  else if (header.getValue("service", val))
  {
    ROSCPP_LOG_DEBUG("Connection: Creating ServiceClientLink for service [%s] connected to [%s]", 
		     val.c_str(), conn->getRemoteString().c_str());

    ServiceClientLinkPtr link(boost::make_shared<ServiceClientLink>());
    link->initialize(conn);
    ret = link->handleHeader(header);
  }
  else
  {
  	ROSCPP_LOG_DEBUG("Got a connection for a type other than 'topic' or 'service' from [%s].  Fail.", 
			 conn->getRemoteString().c_str());
    return false;
  }

  return ret;
}

梳理:看了这么多大家可能有点模糊。在这里我们梳理一下过程。首先他先取一个PollManager的静态实例,然后绑定了一个定期清除无效连接的函数到poll_manager_这个信号槽。他会在调用signal()的时候被启用。之后创建了一个transportTCP调用了listen函数,在listen函数中他会将创建一个socket套接字,并其与Update函数添加到poll_set_的epoll中,每次触发就进行回调。update如果触发则触发socket的读写回调,在update中如果是服务端且为读事件则建立新链接,否则正常读写。值得注意的是这里的建立新链接的回调则是最开始的tcprosAcceptConnection函数。之后又开了一个UDP。在之后就是pollmanager里面的内容了,pollmanager中他启动poll_manager的回调函数,运行槽函数,并设置了一个100ms的时延,去轮询的执行Update.

标签:ROS,socket,ConnectionManager,Connection,源码,poll,boost,transport,函数
From: https://blog.csdn.net/m0_64060063/article/details/140459782

相关文章

  • webpack2源码架构设计与构建流程
    入口初始化入口文件lib/webpack.jsfunctionwebpack(options,callback){ letcompiler; if(Array.isArray(options)){ compiler=newMultiCompiler(options.map(options=>webpack(options))); }elseif(typeofoptions==="object"){//......
  • Microsoft.Virtualization.Client.Common.dll文件丢失导致程序无法运行问题
    其实很多用户玩单机游戏或者安装软件的时候就出现过这种问题,如果是新手第一时间会认为是软件或游戏出错了,其实并不是这样,其主要原因就是你电脑系统的该dll文件丢失了或没有安装一些系统软件平台所需要的动态链接库,这时你可以下载这个Microsoft.Virtualization.Client.Common.dl......
  • Microsoft.Uev.SmbSyncProvider.dll文件丢失导致程序无法运行问题
    其实很多用户玩单机游戏或者安装软件的时候就出现过这种问题,如果是新手第一时间会认为是软件或游戏出错了,其实并不是这样,其主要原因就是你电脑系统的该dll文件丢失了或没有安装一些系统软件平台所需要的动态链接库,这时你可以下载这个Microsoft.Uev.SmbSyncProvider.dll文件(挑......
  • HAL库源码移植与使用之FSMC (例子加思路与理解,万字良心保证你能听懂)
    FMC和FSMC是一样的东西,只是FMC更可控地址更多又可以驱动SDRAM,用法都一样!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!总结:其实fsmc更像是一个有着特定转换时序功能的寄存器,每个fsmc映射在芯片内存里的地址都有一个这样的寄存器,你往这个映射的地址里赋值,这个赋值信息先到达对应fsmc寄存器,他不会像普通寄存器一样直接控制......
  • HAL库源码移植与使用之驱动LCD屏
    LCD屏会有烧屏残影的风险,但因其价格便宜他非常适合用于单片机显示显示屏分为以下几种:他的组成部包含玻璃基板、背光、驱动IC等LCD接口的种类MCU很简单,连51单片机都能驱动,但无法频繁刷新,一般有着20几个引脚引出的就是MCU接口我们常用的是就是MCU,下面讲的也是LCD屏幕MCU驱动......
  • HAL库源码移植与使用之正点原子OLED使用解析
    正点原子的OLED是使用SSD1306来驱动的,并设计了多种通讯方式,通过背后的焊点来选择这里以正点原子开发板最常用的8080并口通讯来讲引脚定义各正点原子开发板对OLED的接线8080时序图发送数据示例代码voidoled_wr_byte(uint8_tdata,uint8_tcmd){ OLED_RS(cmd); /......
  • 基于Java+SpringBoot+Vue的创新创业教育中心项目申报管理系统的设计与开发(源码+lw+部
    文章目录前言项目背景介绍技术栈后端框架SpringBoot前端框架Vue数据库MySQL(MyStructuredQueryLanguage)具体实现截图详细视频演示系统测试系统测试目的系统功能测试系统测试结论代码参考数据库参考源码获取前言......
  • 基于web的宠物商城设计与实现 毕业论文终稿+初稿+修改版论文+开题报告+答辩PPT+论文检
    !!!有需要的小伙伴可以通过文章末尾名片咨询我哦!!! ......
  • Springboot 校园安全通事件报告小程序-计算机毕业设计源码02445
    Springboot校园安全通事件报告小程序系统摘 要随着中国经济的飞速增长,消费者的智能化水平不断提高,许多智能手机和相关的软件正在得到更多的关注和支持。其中,校园安全通事件报告小程序系统更是深得消费者的喜爱,它的出现极大地改善了消费者的生活质量,同时,它还创造了一种快......
  • SSM豫东乡村生产信息管理系统-计算机毕业设计源码02969
     ssm豫东乡村生产信息管理系统摘要 在当前信息技术快速发展和普及的背景下,乡村地区的公共管理和农村生产活动也面临着新的挑战和需求。为了提升公共管理的效率和服务质量,以及支持农村生产的规划和组织,需要引入信息化手段来改善管理和运营。同时,随着乡村经济的发展和现代......