文章目录
0. 引言
在之前的文章中,我们介绍了使用C++11实现的非阻塞消息总线message_bus。然而,原始实现的代码结构不够优雅,且性能方面存在一些不足。
本文将介绍如何使用 C++ 编写一个改进的消息总线系统,其中包括以下功能:
- 消息发布:发布消息给所有订阅该消息的订阅者。
- 消息订阅:允许订阅者订阅感兴趣的消息,并注册回调函数以便处理接收到的消息。
- 定时任务调度:支持周期性的任务调度,例如定时检查超时的订阅。
- 超时管理:每个订阅可以设置超时回调,如果在指定时间内未收到消息,将执行该回调。
完整代码链接:message_bus
1. 设计思路
在设计该消息总线系统时,我们的核心目标是通过简单的接口实现复杂的功能,同时确保系统的性能和可靠性。为了实现这一目标,我们将把系统分成多个模块,分别处理消息的发布与订阅、超时检查和任务调度。
1.1 关键类设计
- MessageBus:负责管理消息的发布和订阅,支持消息回调处理及超时管理。
- PeriodicTaskScheduler:负责定期检查超时情况和执行周期性任务。
- SubscriptionItem:表示订阅项,包含回调函数、超时设置等信息。
1.2 类图
在此处,我们使用类图来展示系统中各个类及其之间的关系。这样可以帮助我们清晰地理解各个模块如何协作。
1.3 时序图
时序图展示了在 publishMessage
函数执行时,消息如何流动并与订阅者和任务调度器进行交互。
1.4 流程图
流程图描述了系统的运行流程,包括消息发布、超时检查等关键步骤。
2. 代码结构与设计
2.1 消息回调与订阅项
为了灵活处理消息和超时回调,我们使用了 std::function
来定义回调函数,并通过结构体 SubscriptionItem
存储相关信息。
using MessageCallback = std::function<void(const std::vector<std::uint8_t>& messageContent, std::int32_t additionalData)>;
using TimeoutCallback = std::function<void()>;
struct SubscriptionItem {
MessageCallback messageCallback = nullptr; // 消息回调函数
TimeoutCallback timeoutCallback = nullptr; // 超时回调函数
std::int32_t timeoutIntervalMilliseconds = 1000; // 超时时间间隔,单位:毫秒
std::int64_t timeoutTimestampMicroseconds = 0; // 超时戳,单位:微秒
std::vector<std::int32_t> subscribedMessageIds; // 订阅的消息ID
SubscriptionType subscriptionType = SubscriptionType::ALWAYS_SUBSCRIBE; // 订阅类型
};
每个 SubscriptionItem
包含一个消息回调函数(messageCallback
)、一个超时回调函数(timeoutCallback
)、订阅的消息 ID 以及超时管理信息(如超时间隔和超时时间戳)。
2.2 消息总线类 MessageBus
MessageBus
类是整个消息总线的核心,提供了消息发布、订阅管理以及超时检查等功能。
class MessageBus {
public:
static MessageBus& instance() {
static MessageBus instance;
return instance;
}
void publishMessage(std::int32_t messageId, const std::vector<std::uint8_t>& messageContent, std::int32_t additionalData = 0);
void checkAndHandleTimeouts();
bool subscribeToMessage(const SubscriptionItem& item);
void clearAllSubscriptions();
void stop();
void start();
};
MessageBus
提供了以下方法:
publishMessage
:发布消息给所有订阅该消息 ID 的订阅者。checkAndHandleTimeouts
:检查所有订阅项是否超时并执行相应的回调。subscribeToMessage
:订阅指定消息 ID 的消息并注册相关的回调函数。clearAllSubscriptions
:清空所有订阅项。start
和stop
:启动和停止定时任务调度器。
2.3 定时任务调度器 PeriodicTaskScheduler
为了能够定期执行任务(如超时检查),我们实现了一个内嵌的定时任务调度器类。
class PeriodicTaskScheduler {
public:
PeriodicTaskScheduler() : stopped_(true), tryToStop_(false) {}
void startTask(std::int32_t intervalMs, const std::function<void()>& task);
void stop();
bool isStopped() const;
private:
std::atomic<bool> stopped_;
std::atomic<bool> tryToStop_;
std::mutex mutex_;
std::condition_variable stopCond_;
};
PeriodicTaskScheduler
使用 std::atomic<bool>
控制任务的启动与停止,通过 startTask
方法以指定的时间间隔启动一个新线程执行任务。
3. 核心功能实现
3.1 消息发布
void MessageBus::publishMessage(std::int32_t messageId, const std::vector<std::uint8_t>& messageContent, std::int32_t additionalData) {
std::unique_lock<std::mutex> callbackMapLock(callbackMapMutex_, std::defer_lock);
std::unique_lock<std::mutex> timeoutCallbackListLock(timeoutCallbackListMutex_, std::defer_lock);
std::lock(callbackMapLock, timeoutCallbackListLock);
// 清除已超时的订阅项
auto it = timeoutCallbackList_.begin();
while (it != timeoutCallbackList_.end()) {
if (std::find((*it)->subscribedMessageIds.begin(), (*it)->subscribedMessageIds.end(), messageId) != (*it)->subscribedMessageIds.end()) {
it = timeoutCallbackList_.erase(it);
} else {
++it;
}
}
// 调用回调函数
auto callbackIt = callbackMap_.find(messageId);
if (callbackIt
!= callbackMap_.end()) {
for (auto& item : callbackIt->second) {
if (item->messageCallback) {
item->messageCallback(messageContent, additionalData);
}
if (item->subscriptionType == SubscriptionType::ONCE_SUBSCRIBE) {
unsubscribe(messageId, item);
}
}
}
}
3.2 超时检查
void MessageBus::checkAndHandleTimeouts() {
std::unique_lock<std::mutex> timeoutCallbackListlck(timeoutCallbackListMutex_);
std::int64_t currentTime = getTimeStamp();
for (auto it = timeoutCallbackList_.begin(); it != timeoutCallbackList_.end();) {
if ((*it)->timeoutTimestampMicroseconds <= currentTime) {
if ((*it)->timeoutCallback) {
(*it)->timeoutCallback();
}
for (std::int32_t msgId : (*it)->subscribedMessageIds) {
unsubscribe(msgId, *it);
}
it = timeoutCallbackList_.erase(it);
} else {
++it;
}
}
}
以上代码定期检查所有订阅项是否超时,并触发超时回调。
4. 测试代码
#include <atomic>
#include <chrono>
#include <functional>
#include <iostream>
#include <random>
#include <string>
#include <thread>
#include <vector>
#include "message_bus.hpp"
using namespace MessageBusSystem;
// 订阅回调函数
void messageCallback(const std::vector<std::uint8_t>& messageContent, std::int32_t additionalData) {
// 这里模拟消息的处理过程
}
void timeoutCallback() {
std::cout << "Timeout callback triggered!" << std::endl;
}
void singleThreadTest() {
std::cout << "Running Single Thread Test..." << std::endl;
MessageBus& bus = MessageBus::instance();
// 创建一个订阅项
SubscriptionItem item;
item.messageCallback = messageCallback;
item.timeoutCallback = timeoutCallback;
item.timeoutIntervalMilliseconds = 1000; // 1秒超时
item.subscribedMessageIds.push_back(1);
bus.subscribeToMessage(item);
const std::string str{"Test message"};
const std::vector<std::uint8_t> testMsg(str.begin(), str.end());
// 开始消息总线
bus.start();
// 发布大量消息
auto start = std::chrono::high_resolution_clock::now();
for (int i = 0; i < 100000; ++i) {
bus.publishMessage(1, testMsg, i);
}
// 测量时间
auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> duration = end - start;
std::cout << "Single Thread Test Completed in " << duration.count() << " seconds." << std::endl;
bus.stop();
}
void multiThreadTest() {
std::cout << "Running Multi Thread Test..." << std::endl;
MessageBus& bus = MessageBus::instance();
// 创建订阅项
SubscriptionItem item;
item.messageCallback = messageCallback;
item.timeoutCallback = timeoutCallback;
item.timeoutIntervalMilliseconds = 500; // 0.5秒超时
item.subscribedMessageIds.push_back(1);
bus.subscribeToMessage(item);
const std::string str{"Test message"};
const std::vector<std::uint8_t> testMsg(str.begin(), str.end());
// 启动消息总线
bus.start();
// 发布消息的线程数
const int numThreads = 8;
std::vector<std::thread> threads;
auto start = std::chrono::high_resolution_clock::now();
// 多线程发布消息
for (int i = 0; i < numThreads; ++i) {
threads.emplace_back([&]() {
for (int j = 0; j < 10000; ++j) {
bus.publishMessage(1, testMsg, j);
}
});
}
// 等待所有线程完成
for (auto& t : threads) {
t.join();
}
// 测量时间
auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> duration = end - start;
std::cout << "Multi Thread Test Completed in " << duration.count() << " seconds." << std::endl;
bus.stop();
}
void messageTimeoutTest() {
std::cout << "Running Message Timeout Test..." << std::endl;
MessageBus& bus = MessageBus::instance();
// 创建订阅项,设置为超时回调
SubscriptionItem item;
item.messageCallback = messageCallback;
item.timeoutCallback = timeoutCallback;
item.timeoutIntervalMilliseconds = 2000; // 2秒超时
item.subscribedMessageIds.push_back(1);
bus.subscribeToMessage(item);
const std::string str{"Test message"};
const std::vector<std::uint8_t> testMsg(str.begin(), str.end());
// 启动消息总线
bus.start();
// 发布消息
bus.publishMessage(1, testMsg, 0);
// 等待超时回调
std::this_thread::sleep_for(std::chrono::seconds(3)); // 等待超过超时
// 停止消息总线
bus.stop();
}
void highFrequencyTest() {
std::cout << "Running High Frequency Test..." << std::endl;
MessageBus& bus = MessageBus::instance();
// 创建订阅项
SubscriptionItem item;
item.messageCallback = messageCallback;
item.timeoutCallback = timeoutCallback;
item.timeoutIntervalMilliseconds = 500; // 0.5秒超时
item.subscribedMessageIds.push_back(1);
bus.subscribeToMessage(item);
const std::string str{"High frequency message"};
const std::vector<std::uint8_t> testMsg(str.begin(), str.end());
// 启动消息总线
bus.start();
// 发布高频消息
auto start = std::chrono::high_resolution_clock::now();
for (int i = 0; i < 100000; ++i) {
bus.publishMessage(1, testMsg, i);
}
// 测量时间
auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> duration = end - start;
std::cout << "High Frequency Test Completed in " << duration.count() << " seconds." << std::endl;
bus.stop();
}
int main() {
// 单线程压力测试
singleThreadTest();
// 多线程压力测试
multiThreadTest();
// 消息超时测试
messageTimeoutTest();
// 高频消息测试
highFrequencyTest();
return 0;
}
执行结果如下:
Running Single Thread Test...
Single Thread Test Completed in 0.179179 seconds.
Running Multi Thread Test...
Multi Thread Test Completed in 0.224865 seconds.
Running Message Timeout Test...
Running High Frequency Test...
High Frequency Test Completed in 0.170551 seconds.
标签:std,订阅,end,MessageBus,编程,总线,C++,消息,超时
From: https://blog.csdn.net/stallion5632/article/details/143785271