首页 > 编程语言 >iceoryx源码阅读(八)——IPC通信机制

iceoryx源码阅读(八)——IPC通信机制

时间:2024-05-08 23:44:50浏览次数:31  
标签:IPC return cxx iceoryx 发送 源码 error const LINE

目录

1 整体结构

通过前面的介绍,订阅者、发布者与Roudi守护进程之间也需要通信,如上文介绍的,请求Roudi守护进村创建并配置端口数据。整体结构如下图所示:

image

由于通信层在类Unix操作系统和Windows操作系统下实现不同(见下面的代码片段),所以我们分开介绍其实现。

#if defined(_WIN32)
using IoxIpcChannelType = iox::posix::NamedPipe;
#else
using IoxIpcChannelType = iox::posix::UnixDomainSocket;
#endif

接下来我们从数据的序列化和反序列化开始。

2 序列化与反序列化

前一篇文章中,这部分通信没有使用三方框架,使用简单的字符串拼接的方式进行序列化,如下所示:

template <typename T>
void IpcMessage::addEntry(const T& entry) noexcept
{
    std::stringstream newEntry;
    newEntry << entry;

    if (!isValidEntry(newEntry.str()))
    {
        LogError() << "\'" << newEntry.str().c_str() << "\' is an invalid IPC channel entry";
        m_isValid = false;
    }
    else
    {
        m_msg.append(newEntry.str() + m_separator);
        ++m_numberOfElements;
    }
}

template <typename T>
IpcMessage& IpcMessage::operator<<(const T& entry) noexcept
{
    addEntry(entry);
    return *this;
}

上面的代码较为简单,这里不作详细解释了。反序列化也很简单,这里就贴一下代码了,非常简单粗暴的实现:

std::string IpcMessage::getElementAtIndex(const uint32_t index) const noexcept
{
    std::string messageRemainder(m_msg);
    size_t startPos = 0u;
    size_t endPos = messageRemainder.find_first_of(m_separator, startPos);

    for (uint32_t counter = 0u; endPos != std::string::npos; ++counter)
    {
        if (counter == index)
        {
            return messageRemainder.substr(startPos, endPos - startPos);
        }

        startPos = endPos + 1u;
        endPos = messageRemainder.find_first_of(m_separator, startPos);
    }

    return std::string();
}

3 类Unix系统的实现

正如在 引言 中介绍的,类Unix系统使用Unix域套接字实现IPC通信机制。由UnixDomainSocket封装初始化、销毁、发送和接收等逻辑,这里我们主要介绍发送和接收逻辑的具体实现。

3.1 发送函数send

职责:
封装客户端的消息发送逻辑

参数:

  • msg:待发送的消息。
cxx::expected<IpcChannelError> UnixDomainSocket::send(const std::string& msg) const noexcept
{
    // we also support timedSend. The setsockopt call sets the timeout for all further sendto calls, so we must set
    // it to 0 to turn the timeout off
    return timedSend(msg, units::Duration::fromSeconds(0ULL));
}

发送函数send只是简单地调用地超时时间的发送函数timedSend。输入的超时时间为0,意味着立即发送。timedSend的实现如下所示:

cxx::expected<IpcChannelError> UnixDomainSocket::timedSend(const std::string& msg,
                                                           const units::Duration& timeout) const noexcept
{
    if (msg.size() > m_maxMessageSize)
    {
        return cxx::error<IpcChannelError>(IpcChannelError::MESSAGE_TOO_LONG);
    }

    if (IpcChannelSide::SERVER == m_channelSide)
    {
        std::cerr << "sending on server side not supported for unix domain socket \"" << m_name << "\"" << std::endl;
        return cxx::error<IpcChannelError>(IpcChannelError::INTERNAL_LOGIC_ERROR);
    }

    auto tv = timeout.timeval();
    auto setsockoptCall = posixCall(iox_setsockopt)(m_sockfd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv))
                              .failureReturnValue(ERROR_CODE)
                              .ignoreErrnos(EWOULDBLOCK)
                              .evaluate();

    if (setsockoptCall.has_error())
    {
        return cxx::error<IpcChannelError>(convertErrnoToIpcChannelError(setsockoptCall.get_error().errnum));
    }
    auto sendCall = posixCall(iox_sendto)(m_sockfd, msg.c_str(), msg.size() + NULL_TERMINATOR_SIZE, 0, nullptr, 0)
                        .failureReturnValue(ERROR_CODE)
                        .evaluate();

    if (sendCall.has_error())
    {
        return cxx::error<IpcChannelError>(convertErrnoToIpcChannelError(sendCall.get_error().errnum));
    }
    return cxx::success<void>();
}

逐段代码分析:

  • LINE 04 ~ LINE 13: 错误处理——消息长度过长、类型服务端。整体结构图中,黄色的

  • LINE 15 ~ LINE 24: 调用POSIX接口(类Unix系统调用)setsockopt,设置超时时间。

  • LINE 25 ~ LINE 32: 调用POSIX接口(类Unix系统调用)sendto发送数据。

可以看到,Unix版本的发送实现就是简单地调用系统调用。

3.2 接收函数receive

职责:
封装消息接收逻辑。

返回:
消息字符串或错误类型。

cxx::expected<std::string, IpcChannelError> UnixDomainSocket::receive() const noexcept
{
    // we also support timedReceive. The setsockopt call sets the timeout for all further recvfrom calls, so we must set
    // it to 0 to turn the timeout off
    struct timeval tv = {};
    tv.tv_sec = 0;
    tv.tv_usec = 0;

    return timedReceive(units::Duration(tv));
}

接收函数receive只是简单地调用地超时时间的发送函数timedReceive。输入的超时时间为0,即没有结果立即返回。timedReceive的实现如下所示:

cxx::expected<std::string, IpcChannelError>
UnixDomainSocket::timedReceive(const units::Duration& timeout) const noexcept
{
    if (IpcChannelSide::CLIENT == m_channelSide)
    {
        std::cerr << "receiving on client side not supported for unix domain socket \"" << m_name << "\"" << std::endl;
        return cxx::error<IpcChannelError>(IpcChannelError::INTERNAL_LOGIC_ERROR);
    }

    auto tv = timeout.timeval();
    auto setsockoptCall = posixCall(iox_setsockopt)(m_sockfd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv))
                              .failureReturnValue(ERROR_CODE)
                              .ignoreErrnos(EWOULDBLOCK)
                              .evaluate();

    if (setsockoptCall.has_error())
    {
        return cxx::error<IpcChannelError>(convertErrnoToIpcChannelError(setsockoptCall.get_error().errnum));
    }
    // NOLINTJUSTIFICATION needed for recvfrom
    // NOLINTNEXTLINE(hicpp-avoid-c-arrays, cppcoreguidelines-avoid-c-arrays)
    char message[MAX_MESSAGE_SIZE + 1];

    auto recvCall = posixCall(iox_recvfrom)(m_sockfd, &message[0], MAX_MESSAGE_SIZE, 0, nullptr, nullptr)
                        .failureReturnValue(ERROR_CODE)
                        .suppressErrorMessagesForErrnos(EAGAIN, EWOULDBLOCK)
                        .evaluate();
    message[MAX_MESSAGE_SIZE] = 0;

    if (recvCall.has_error())
    {
        return cxx::error<IpcChannelError>(convertErrnoToIpcChannelError(recvCall.get_error().errnum));
    }
    return cxx::success<std::string>(&message[0]);
}

逐段代码分析:

  • LINE 04 ~ LINE 08: 错误处理——通道类型服务端。整体结构图中,黄色的。

  • LINE 10 ~ LINE 19: 调用POSIX接口(类Unix系统调用)setsockopt,设置超时时间。

  • LINE 22 ~ LINE 33: 调用POSIX接口(类Unix系统调用)recvfrom接收数据。

4 Windows系统的实现

由于Windows不支持Unix域套接字,使用共享内存的方式来模拟。每引入一个发布者或订阅者,都需要开辟两条通道——收和发,每条通道会使用单独一块共享内存,即需要开辟两块共享内存。

4.1 发送函数send

职责:
封装消息发送逻辑。

参数:

  • msg:待发送的消息。
cxx::expected<IpcChannelError> NamedPipe::send(const std::string& message) const noexcept
{
    if (!m_isInitialized)
    {
        return cxx::error<IpcChannelError>(IpcChannelError::NOT_INITIALIZED);
    }

    if (message.size() > MAX_MESSAGE_SIZE)
    {
        return cxx::error<IpcChannelError>(IpcChannelError::MESSAGE_TOO_LONG);
    }

    cxx::Expects(!m_data->sendSemaphore().wait().has_error());
    IOX_DISCARD_RESULT(m_data->messages.push(Message_t(cxx::TruncateToCapacity, message)));
    cxx::Expects(!m_data->receiveSemaphore().post().has_error());

    return cxx::success<>();
}

逐段代码分析:

  • LINE 03 ~ LINE 11: 错误处理——未初始化(消息队列共享内存未创建)、消息长度过长。这里没有判断是服务端还是客户端,估计是不同人实现的。

  • LINE 13 ~ LINE 15: 第14行,往消息队列(共享内存)中存入消息。第13行是通过发送信号量判断消息队列是否已满,若已满,则一直等待,直到接收端读取消息,唤醒发送端。第15行是唤醒接收端读取消息。

iceoryx还提供了timedSend函数,带有超时机制,即超时则发送失败。还提供了不等待的版本trySend,若队列已满,则发送失败。这两个函数本文不做介绍。

4.2 接收函数receive

职责:
封装消息接收逻辑。

返回:
消息字符串或错误类型。

cxx::expected<std::string, IpcChannelError> NamedPipe::receive() const noexcept
{
    if (!m_isInitialized)
    {
        return cxx::error<IpcChannelError>(IpcChannelError::NOT_INITIALIZED);
    }

    cxx::Expects(!m_data->receiveSemaphore().wait().has_error());
    auto message = m_data->messages.pop();
    if (message.has_value())
    {
        cxx::Expects(!m_data->sendSemaphore().post().has_error());
        return cxx::success<std::string>(message->c_str());
    }
    return cxx::error<IpcChannelError>(IpcChannelError::INTERNAL_LOGIC_ERROR);
}

逐段代码分析:

  • LINE 03 ~ LINE 06: 错误处理——未初始化(消息队列共享内存未创建)。这里没有判断是服务端还是客户端,估计是不同人实现的。

  • LINE 08 ~ LINE 14: 第14行,往消息队列(共享内存)中存入消息。第8行是通过接收信号量判断消息队列是否为空,若为空,则一直等待,直到发送端发送消息,唤醒发送端。第12行是唤醒发送端发送消息。

iceoryx还提供了timedReceive函数,带有超时机制,即超时则接收失败。还提供了不等待的版本tryReceive,若队列为空,则接收失败。这两个函数本文不做介绍。

5 Roudi的监听逻辑

Roudi启动后,会开启一个线程来监听和处理来自客户端(订阅者、发布者)的请求,如下所示:

void RouDi::startProcessRuntimeMessagesThread() noexcept
{
    m_handleRuntimeMessageThread = std::thread(&RouDi::processRuntimeMessages, this);
    posix::setThreadName(m_handleRuntimeMessageThread.native_handle(), "IPC-msg-process");
}

线程执行函数为processRuntimeMessages,内部就是一个循环,如下所示:

void RouDi::processRuntimeMessages() noexcept
{
    runtime::IpcInterfaceCreator roudiIpcInterface{IPC_CHANNEL_ROUDI_NAME};

    // the logger is intentionally not used, to ensure that this message is always printed
    std::cout << "RouDi is ready for clients" << std::endl;

    while (m_runHandleRuntimeMessageThread)
    {
        // read RouDi's IPC channel
        runtime::IpcMessage message;
        if (roudiIpcInterface.timedReceive(m_runtimeMessagesThreadTimeout, message))
        {
            auto cmd = runtime::stringToIpcMessageType(message.getElementAtIndex(0).c_str());
            std::string runtimeName = message.getElementAtIndex(1);

            processMessage(message, cmd, RuntimeName_t(cxx::TruncateToCapacity, runtimeName));
        }
    }
}

通过上述代码可知,发送给Roudi的所有消息,第一项为请求类型,第二项为运行。这里调用了processMessage函数,这和上一篇文章中的 3.5 RouDi::processMessage 关联了。

标签:IPC,return,cxx,iceoryx,发送,源码,error,const,LINE
From: https://www.cnblogs.com/lijihong-jerry/p/18156475

相关文章

  • [附源码]秦时明月6.2魔改版_搭建架设教程_附GM工具_安卓&苹果
    本教程仅限学习使用,禁止商用,一切后果与本人无关,此声明具有法律效应!!!!教程是本人亲自搭建成功的,绝对是完整可运行的,踩过的坑都给你们填上了一.演示视频 https://githubs.xyz/boot?app=50二.环境联网环境:centos7.6,放开所有端口单机环境:VM虚拟机......
  • 探索网站支付系统的奥秘,从Vue3和Spring Boot开始(入门级项目实战+在线教程)附赠项目源码
    你是否曾经在购物时,对着电脑屏幕前的“支付成功”四个字感到好奇?这背后的秘密究竟是什么?今天,让我们一起揭开支付系统的神秘面纱,探索其背后的技术实现。在这个基于Vue3和SpringBoot的支付项目实战中,我们将带你一步步了解支付系统的实现思路。这个项目不仅解决了常用支付方式的......
  • Android 源码单模块编译及调试
    以下内容基于Android12源码进行整理在整编完整个Android系统后,需要进行系统源码修改,不能每修改一次代码就对整个系统进行一次编译,这个很不现实,这时就需要对单模块进行编译并调试,下面的方法就很有用了。framework编译及替换编译自从Android12之后framework编译方式和之......
  • 27-Spring源码分析(二)
    AOP源码分析1.AOP概述AOP(AspectOrientProgramming)利用代理模式,通过代理对象对被代理的对象增加功能。所以,关键在于AOP框架自动创建AOP代理对象,代理模式分为静态代理和动态代理。AspectJ使用静态代理,编译时增强,在编译期生成代理对象;SpringAOP使用动态代理,运行时......
  • pdf.js源码分析-textLayer中的坐标计算
    在pdf.js中显示pdf内容和选择pdf文字属于不同的层,一个是canvas绘制,一个是使用dom进行布局,那么接下来先看一下在textLayer中的文字节点div是怎么计算每段文字的布局位置的吧。首先找到pdf.js源码中的text_layer.js文件,然后得到下面方法appendText方法,下面的解释是在字体没有发生旋......
  • FiddlerCore源码
    这是FiddlerCore5.0.2版本源码,支持.netframework和.netcore,百分百c#源码,功能全而且免费,无功能限制,还修复了多处bug。介绍:FiddlerCore可捕获和修改HTTP和HTTPS流量,而无需任何FiddlerUI。特点:HTTP和HTTPS流量捕获和修改。用于内容过滤和修改的强大对象模型。存......
  • JUC源码解析:深入解读偏向锁
    JUC源码解析:深入解读偏向锁本文使用jdk8几种锁状态介绍先介绍一下锁状态吧看偏向锁这一栏,它的内存存储了线程ID和Epoch,这一点尤为关键,意味着偏向锁没有内存可以存储对象头的hashCode,而其他锁是有地方存的.。也就意味着,,当锁对象被隐式(父类)或显试调用了has......
  • 在IDEA中加载OpenJDK源码
    之所以要阅读OpenJDK源码,是因为SunJDK的某些源码是缺失的,以JDK1.8为例,sun.reflect,sun.rmi及其子包下的类都是没有源码的。如下以下载OpenJDK1.8源码为例进行说明。下载OpenJDK源码文件,如下载zip格式的压缩包。解压OpenJDK源码压缩包文件,在IDEA中按如下路径加载:【File】......
  • qt 属性控件 使用qt提供的源码 qtpropertybrowser(D:\Qt\5.15.2\Src\qttools\src
    效果:   直接将头文件h和源文件cpp文件添加到项目中。cmakeLists.txt:file(GLOBqtpropertybrowser${QTPROPERTYBROWSER_DIR}/*.cpp${QTPROPERTYBROWSER_DIR}/*.h)include_directories("${QTPROPERTYBROWSER_DIR}")设置了源文件路径 只有一个cpp文件:#includ......
  • Kafka源码分析(四) - Server端-请求处理框架
    系列文章目录https://zhuanlan.zhihu.com/p/367683572一.总体结构先给一张概览图:服务端请求处理过程涉及到两个模块:kafka.network和kafka.server。1.1kafka.network该包是kafka底层模块,提供了服务端NIO通信能力基础。有4个核心类:SocketServer、Acceptor、Processor、Req......