首页 > 编程语言 >C++编程:实现一个简单的消息总线

C++编程:实现一个简单的消息总线

时间:2024-11-15 18:15:46浏览次数:3  
标签:std 订阅 end MessageBus 编程 总线 C++ 消息 超时

文章目录

0. 引言

在之前的文章中,我们介绍了使用C++11实现的非阻塞消息总线message_bus。然而,原始实现的代码结构不够优雅,且性能方面存在一些不足。

本文将介绍如何使用 C++ 编写一个改进的消息总线系统,其中包括以下功能:

  • 消息发布:发布消息给所有订阅该消息的订阅者。
  • 消息订阅:允许订阅者订阅感兴趣的消息,并注册回调函数以便处理接收到的消息。
  • 定时任务调度:支持周期性的任务调度,例如定时检查超时的订阅。
  • 超时管理:每个订阅可以设置超时回调,如果在指定时间内未收到消息,将执行该回调。

完整代码链接:message_bus

1. 设计思路

在设计该消息总线系统时,我们的核心目标是通过简单的接口实现复杂的功能,同时确保系统的性能和可靠性。为了实现这一目标,我们将把系统分成多个模块,分别处理消息的发布与订阅、超时检查和任务调度。

1.1 关键类设计

  • MessageBus:负责管理消息的发布和订阅,支持消息回调处理及超时管理。
  • PeriodicTaskScheduler:负责定期检查超时情况和执行周期性任务。
  • SubscriptionItem:表示订阅项,包含回调函数、超时设置等信息。

1.2 类图

在此处,我们使用类图来展示系统中各个类及其之间的关系。这样可以帮助我们清晰地理解各个模块如何协作。

manages 1 1..* manages 1 1..* uses 1 1..* MessageBus +publishMessage(messageId: int, messageContent: string, additionalData: int) +checkAndHandleTimeouts() +subscribeToMessage(item: SubscriptionItem) +clearAllSubscriptions() +stop() +start() PeriodicTaskScheduler +startTask(intervalMs: int, task: function) +stop() +isStopped() SubscriptionItem +messageCallback: function +timeoutCallback: function +timeoutIntervalMilliseconds: int +timeoutTimestampMicroseconds: long +subscribedMessageIds: list +subscriptionType: SubscriptionType «enumeration» SubscriptionType ALWAYS_SUBSCRIBE ONCE_SUBSCRIBE

1.3 时序图

时序图展示了在 publishMessage 函数执行时,消息如何流动并与订阅者和任务调度器进行交互。

User MessageBus SubscriptionItem TaskScheduler publishMessage(messageId, messageContent) Lock callbackMapMutex Lock timeoutCallbackListMutex Call messageCallback Handle timeout if exists checkAndHandleTimeouts() checkTimeouts() Call timeoutCallback if timeout occurs Done User MessageBus SubscriptionItem TaskScheduler

1.4 流程图

流程图描述了系统的运行流程,包括消息发布、超时检查等关键步骤。

Timeout Check Loop Yes No Yes No Yes No Timeout Reached? Timeout Check Call Timeout Callback Start MessageBus Created Start Task Scheduler Publish Message Message ID Exists? Call Message Callback Do Nothing Once Subscribe? Unsubscribe After One Time Keep Subscribed End

2. 代码结构与设计

2.1 消息回调与订阅项

为了灵活处理消息和超时回调,我们使用了 std::function 来定义回调函数,并通过结构体 SubscriptionItem 存储相关信息。

using MessageCallback = std::function<void(const std::vector<std::uint8_t>& messageContent, std::int32_t additionalData)>; 
using TimeoutCallback = std::function<void()>;

struct SubscriptionItem {
  MessageCallback messageCallback = nullptr;  // 消息回调函数
  TimeoutCallback timeoutCallback = nullptr;  // 超时回调函数
  std::int32_t timeoutIntervalMilliseconds = 1000;  // 超时时间间隔,单位:毫秒
  std::int64_t timeoutTimestampMicroseconds = 0;  // 超时戳,单位:微秒
  std::vector<std::int32_t> subscribedMessageIds;  // 订阅的消息ID
  SubscriptionType subscriptionType = SubscriptionType::ALWAYS_SUBSCRIBE;  // 订阅类型
};

每个 SubscriptionItem 包含一个消息回调函数(messageCallback)、一个超时回调函数(timeoutCallback)、订阅的消息 ID 以及超时管理信息(如超时间隔和超时时间戳)。

2.2 消息总线类 MessageBus

MessageBus 类是整个消息总线的核心,提供了消息发布、订阅管理以及超时检查等功能。

class MessageBus {
 public:
  static MessageBus& instance() {
    static MessageBus instance;
    return instance;
  }

  void publishMessage(std::int32_t messageId, const std::vector<std::uint8_t>& messageContent, std::int32_t additionalData = 0);
  void checkAndHandleTimeouts();
  bool subscribeToMessage(const SubscriptionItem& item);
  void clearAllSubscriptions();
  void stop();
  void start();
};

MessageBus 提供了以下方法:

  • publishMessage:发布消息给所有订阅该消息 ID 的订阅者。
  • checkAndHandleTimeouts:检查所有订阅项是否超时并执行相应的回调。
  • subscribeToMessage:订阅指定消息 ID 的消息并注册相关的回调函数。
  • clearAllSubscriptions:清空所有订阅项。
  • startstop:启动和停止定时任务调度器。

2.3 定时任务调度器 PeriodicTaskScheduler

为了能够定期执行任务(如超时检查),我们实现了一个内嵌的定时任务调度器类。

class PeriodicTaskScheduler {
 public:
   PeriodicTaskScheduler() : stopped_(true), tryToStop_(false) {}

   void startTask(std::int32_t intervalMs, const std::function<void()>& task);
   void stop();
   bool isStopped() const;

 private:
   std::atomic<bool> stopped_;
   std::atomic<bool> tryToStop_;
   std::mutex mutex_;
   std::condition_variable stopCond_;
};

PeriodicTaskScheduler 使用 std::atomic<bool> 控制任务的启动与停止,通过 startTask 方法以指定的时间间隔启动一个新线程执行任务。

3. 核心功能实现

3.1 消息发布

void MessageBus::publishMessage(std::int32_t messageId, const std::vector<std::uint8_t>& messageContent, std::int32_t additionalData) {
  std::unique_lock<std::mutex> callbackMapLock(callbackMapMutex_, std::defer_lock);
  std::unique_lock<std::mutex> timeoutCallbackListLock(timeoutCallbackListMutex_, std::defer_lock);
  std::lock(callbackMapLock, timeoutCallbackListLock);

  // 清除已超时的订阅项
  auto it = timeoutCallbackList_.begin();
  while (it != timeoutCallbackList_.end()) {
    if (std::find((*it)->subscribedMessageIds.begin(), (*it)->subscribedMessageIds.end(), messageId) != (*it)->subscribedMessageIds.end()) {
      it = timeoutCallbackList_.erase(it);
    } else {
      ++it;
    }
  }

  // 调用回调函数
  auto callbackIt = callbackMap_.find(messageId);
  if (callbackIt

 != callbackMap_.end()) {
    for (auto& item : callbackIt->second) {
      if (item->messageCallback) {
        item->messageCallback(messageContent, additionalData);
      }
      if (item->subscriptionType == SubscriptionType::ONCE_SUBSCRIBE) {
        unsubscribe(messageId, item);
      }
    }
  }
}

3.2 超时检查

void MessageBus::checkAndHandleTimeouts() {
  std::unique_lock<std::mutex> timeoutCallbackListlck(timeoutCallbackListMutex_);
  std::int64_t currentTime = getTimeStamp();
  
  for (auto it = timeoutCallbackList_.begin(); it != timeoutCallbackList_.end();) {
    if ((*it)->timeoutTimestampMicroseconds <= currentTime) {
      if ((*it)->timeoutCallback) {
        (*it)->timeoutCallback();
      }
      for (std::int32_t msgId : (*it)->subscribedMessageIds) {
        unsubscribe(msgId, *it);
      }
      it = timeoutCallbackList_.erase(it);
    } else {
      ++it;
    }
  }
}

以上代码定期检查所有订阅项是否超时,并触发超时回调。

4. 测试代码

#include <atomic>
#include <chrono>
#include <functional>
#include <iostream>
#include <random>
#include <string>
#include <thread>
#include <vector>

#include "message_bus.hpp"

using namespace MessageBusSystem;

// 订阅回调函数
void messageCallback(const std::vector<std::uint8_t>& messageContent, std::int32_t additionalData) {
  // 这里模拟消息的处理过程
}

void timeoutCallback() {
  std::cout << "Timeout callback triggered!" << std::endl;
}

void singleThreadTest() {
  std::cout << "Running Single Thread Test..." << std::endl;
  MessageBus& bus = MessageBus::instance();

  // 创建一个订阅项
  SubscriptionItem item;
  item.messageCallback = messageCallback;
  item.timeoutCallback = timeoutCallback;
  item.timeoutIntervalMilliseconds = 1000;  // 1秒超时
  item.subscribedMessageIds.push_back(1);
  bus.subscribeToMessage(item);
  const std::string str{"Test message"};
  const std::vector<std::uint8_t> testMsg(str.begin(), str.end());

  // 开始消息总线
  bus.start();

  // 发布大量消息
  auto start = std::chrono::high_resolution_clock::now();
  for (int i = 0; i < 100000; ++i) {
    bus.publishMessage(1, testMsg, i);
  }

  // 测量时间
  auto end = std::chrono::high_resolution_clock::now();
  std::chrono::duration<double> duration = end - start;
  std::cout << "Single Thread Test Completed in " << duration.count() << " seconds." << std::endl;
  bus.stop();
}

void multiThreadTest() {
  std::cout << "Running Multi Thread Test..." << std::endl;
  MessageBus& bus = MessageBus::instance();

  // 创建订阅项
  SubscriptionItem item;
  item.messageCallback = messageCallback;
  item.timeoutCallback = timeoutCallback;
  item.timeoutIntervalMilliseconds = 500;  // 0.5秒超时
  item.subscribedMessageIds.push_back(1);
  bus.subscribeToMessage(item);
  const std::string str{"Test message"};
  const std::vector<std::uint8_t> testMsg(str.begin(), str.end());
  // 启动消息总线
  bus.start();

  // 发布消息的线程数
  const int numThreads = 8;
  std::vector<std::thread> threads;
  auto start = std::chrono::high_resolution_clock::now();

  // 多线程发布消息
  for (int i = 0; i < numThreads; ++i) {
    threads.emplace_back([&]() {
      for (int j = 0; j < 10000; ++j) {
        bus.publishMessage(1, testMsg, j);
      }
    });
  }

  // 等待所有线程完成
  for (auto& t : threads) {
    t.join();
  }

  // 测量时间
  auto end = std::chrono::high_resolution_clock::now();
  std::chrono::duration<double> duration = end - start;
  std::cout << "Multi Thread Test Completed in " << duration.count() << " seconds." << std::endl;
  bus.stop();
}

void messageTimeoutTest() {
  std::cout << "Running Message Timeout Test..." << std::endl;
  MessageBus& bus = MessageBus::instance();

  // 创建订阅项,设置为超时回调
  SubscriptionItem item;
  item.messageCallback = messageCallback;
  item.timeoutCallback = timeoutCallback;
  item.timeoutIntervalMilliseconds = 2000;  // 2秒超时
  item.subscribedMessageIds.push_back(1);
  bus.subscribeToMessage(item);
  const std::string str{"Test message"};
  const std::vector<std::uint8_t> testMsg(str.begin(), str.end());
  // 启动消息总线
  bus.start();

  // 发布消息
  bus.publishMessage(1, testMsg, 0);

  // 等待超时回调
  std::this_thread::sleep_for(std::chrono::seconds(3));  // 等待超过超时

  // 停止消息总线
  bus.stop();
}

void highFrequencyTest() {
  std::cout << "Running High Frequency Test..." << std::endl;
  MessageBus& bus = MessageBus::instance();

  // 创建订阅项
  SubscriptionItem item;
  item.messageCallback = messageCallback;
  item.timeoutCallback = timeoutCallback;
  item.timeoutIntervalMilliseconds = 500;  // 0.5秒超时
  item.subscribedMessageIds.push_back(1);
  bus.subscribeToMessage(item);
  const std::string str{"High frequency message"};
  const std::vector<std::uint8_t> testMsg(str.begin(), str.end());
  // 启动消息总线
  bus.start();

  // 发布高频消息
  auto start = std::chrono::high_resolution_clock::now();
  for (int i = 0; i < 100000; ++i) {
    bus.publishMessage(1, testMsg, i);
  }

  // 测量时间
  auto end = std::chrono::high_resolution_clock::now();
  std::chrono::duration<double> duration = end - start;
  std::cout << "High Frequency Test Completed in " << duration.count() << " seconds." << std::endl;
  bus.stop();
}

int main() {
  // 单线程压力测试
  singleThreadTest();
  // 多线程压力测试
  multiThreadTest();
  // 消息超时测试
  messageTimeoutTest();
  // 高频消息测试
  highFrequencyTest();

  return 0;
}

执行结果如下:

Running Single Thread Test...
Single Thread Test Completed in 0.179179 seconds.
Running Multi Thread Test...
Multi Thread Test Completed in 0.224865 seconds.
Running Message Timeout Test...
Running High Frequency Test...
High Frequency Test Completed in 0.170551 seconds.

标签:std,订阅,end,MessageBus,编程,总线,C++,消息,超时
From: https://blog.csdn.net/stallion5632/article/details/143785271

相关文章

  • 仓颉_Cangjie-函数式编程
    函数定义CC语言中,函数的声明告诉编译器函数的名称、返回类型和参数列表。函数的定义则提供了函数的实际体C++返回类型函数名(参数列表){//函数体//执行的操作//返回返回类型的值}Java函数的定义分为函数的声明和函数的实现Rust使用fn关键字定义函数。函......
  • 【C++】list 类深度解析:探索双向链表的奇妙世界
    ......
  • CSAPP 并发编程
    frompixiv前置知识进程逻辑控制流(简称逻辑流)CSAPPP508:一系列的程序计数器PC的值唯一地对应于包含在程序的可执目标文件中的指令或包含在运行时动态链接到程序的共享对象指令。这个PC值的序列叫逻辑控制流一个逻辑流的执行在时间上与另一个流重叠,称为并发流,这两个流被......
  • Python并发编程入门:使用concurrent.futures与asyncio
    Python并发编程入门:使用concurrent.futures与asyncio在现代应用中,并发编程已成为一种提升性能和效率的重要手段。Python提供了多种实现并发的方式,尤其是concurrent.futures和asyncio,分别适用于不同的并发场景。本文将带你深入了解这两种并发编程方式,帮助你轻松上手并......
  • C++命名空间介绍、定义、作用、是否允许嵌套
    本文章代码块默认为写了std命名空间的条件下,所以代码里面的输出直接写了cout,没写作用域什么是c++命名空间C++命名空间是一种机制,用于解决全局变量名或函数名之间的冲突问题。它可以将一组相关的变量、函数和类组织在一起,形成一个独立的命名空间,避免命名冲突。命名空间通过在......
  • 什么是 C++ 中的常量表达式? 有什么用途?and如何判断一个表达式是否是常量表达式?
    参考文献:constexpr介绍以及与const的区别-CSDN博客定义在C++中,常量表达式是一种在编译期间就能计算出结果的表达式。字面量常量:如整数字面量(1、2、3等)、字符字面量('a'、'b'等)、布尔字面量(true、false)和浮点字面量(3.14、2.718等)。例如,表达式3+4中的3和4就是整数字面量,整......
  • 【C++源码编译】
    C++源码到二进制可执行文件的过程与C语言类似,包括四个过程:预编译、编译、汇编、链接1、预编译C/C++编译过程中的第一个阶段,主要目的是对源代码进行处理和准备工作。下面是预编译的主要步骤:去除宏定义:将所有的#define删除,并展开所有的宏定义,将宏替换为具体的值或表达......
  • 2020年计挑赛往届真题(C++)
    因为17号要开赛了,甚至是用云端编辑器,debuff拉满,只能临时抱佛脚了各个选择题的选择项我就不标出来了,默认ABCD排,手打太麻烦了目录单选题:1.阅读以下语句:doublem=0;for(inti=3;i>0;i--)m+=1/i;将m保留三位小数输出,结果为()2.下列选项中,不是C++关键字的是()    3.下列选......
  • inline 函数:让你的 C++ 代码飞起来——深度剖析与实战技巧
    你是否曾经为C++代码中的函数调用开销感到烦恼?每次函数调用都需要创建栈帧、传递参数、跳转执行,这些看似微小的操作,累计起来就会成为性能瓶颈。在对性能要求苛刻的程序中,这些开销可能会影响到整体表现。今天,我们要聊的就是一个解决方案——inline函数。想象一下,如果编译器......
  • 深入探索 C++11 第一弹:现代 C++ 编程的基石与革新
    1、C++的发展历史C++11是C++的第⼆个主要版本,并且是从C++98起的最重要更新。C++11对C++语言的发展具有深远的影响,它使C++语言更加现代化、高效、灵活和易于使用,为开发者提供了更强大的工具和更好的编程体验,推动了C++在各个领域的广泛应用和持续发展。话不多说,下......