前面的设计,我们对asio的使用都是单线程模式,为了提升网络io并发处理的效率,这一次我们设计多线程模式下asio的使用方式。总体来说asio有两个多线程模型,第一个是启动多个线程,每个线程管理一个iocontext。第二种是只启动一个iocontext,被多个线程共享,后面的文章会对比两个模式的区别,这里先介绍第一种模式,多个线程,每个线程管理独立的iocontext服务。
单线程和多线程对比
之前的单线程模式图如下
我们设计的IOServicePool类型的多线程模型如下:
IOServicePool多线程模式特点
1 每一个io_context跑在不同的线程里,所以同一个socket会被注册在同一个io_context里,它的回调函数也会被单独的一个线程回调,那么对于同一个socket,他的回调函数每次触发都是在同一个线程里,就不会有线程安全问题,网络io层面上的并发是线程安全的。
2 但是对于不同的socket,回调函数的触发可能是同一个线程(两个socket被分配到同一个io_context),也可能不是同一个线程(两个socket被分配到不同的io_context里)。所以如果两个socket对应的上层逻辑处理,如果有交互或者访问共享区,会存在线程安全问题。比如socket1代表玩家1,socket2代表玩家2,玩家1和玩家2在逻辑层存在交互,比如两个玩家都在做工会任务,他们属于同一个工会,工会积分的增加就是共享区的数据,需要保证线程安全。可以通过加锁或者逻辑队列的方式解决安全问题,我们目前采取了后者。
3 多线程相比单线程,极大的提高了并发能力,因为单线程仅有一个io_context服务用来监听读写事件,就绪后回调函数在一个线程里串行调用, 如果一个回调函数的调用时间较长肯定会影响后续的函数调用,毕竟是穿行调用。而采用多线程方式,可以在一定程度上减少前一个逻辑调用影响下一个调用的情况,比如两个socket被部署到不同的iocontext上,但是当两个socket部署到同一个iocontext上时仍然存在调用时间影响的问题。不过我们已经通过逻辑队列的方式将网络线程和逻辑线程解耦合了,不会出现前一个调用时间影响下一个回调触发的问题。
其中代码如下:
const.h
#pragma once #define MAX_LENGTH 1024*2 //头部总长度 #define HEAD_TOTAL_LEN 4 //头部id长度 #define HEAD_ID_LEN 2 //头部数据长度 #define HEAD_DATA_LEN 2 #define MAX_RECVQUE 10000 #define MAX_SENDQUE 1000 enum MSG_IDS { MSG_HELLO_WORD = 1001 };
Singleton.h
#pragma once #include <memory> #include <mutex> #include <iostream> using namespace std; template <typename T> class Singleton { protected: Singleton() = default; Singleton(const Singleton<T>&) = delete; Singleton& operator=(const Singleton<T>& st) = delete; static std::shared_ptr<T> _instance; public: static std::shared_ptr<T> GetInstance() { static std::once_flag s_flag; std::call_once(s_flag, [&]() { _instance = shared_ptr<T>(new T); }); return _instance; } void PrintAddress() { std::cout << _instance.get() << endl; } ~Singleton() { std::cout << "this is singleton destruct" << std::endl; } }; template <typename T> std::shared_ptr<T> Singleton<T>::_instance = nullptr;
MsgNode.h
#pragma once #include <string> #include "const.h" #include <iostream> #include <boost/asio.hpp> using namespace std; using boost::asio::ip::tcp; class LogicSystem; class MsgNode { public: MsgNode(short max_len) :_total_len(max_len), _cur_len(0) { _data = new char[_total_len + 1](); _data[_total_len] = '\0'; } ~MsgNode() { std::cout << "destruct MsgNode" << endl; delete[] _data; } void Clear() { ::memset(_data, 0, _total_len); _cur_len = 0; } short _cur_len; short _total_len; char* _data; }; class RecvNode :public MsgNode { friend class LogicSystem; public: RecvNode(short max_len, short msg_id); private: short _msg_id; }; class SendNode:public MsgNode { friend class LogicSystem; public: SendNode(const char* msg,short max_len, short msg_id); private: short _msg_id; };
MsgNode.cpp
#include "MsgNode.h" RecvNode::RecvNode(short max_len, short msg_id):MsgNode(max_len), _msg_id(msg_id){ } SendNode::SendNode(const char* msg, short max_len, short msg_id):MsgNode(max_len + HEAD_TOTAL_LEN) , _msg_id(msg_id){ //先发送id, 转为网络字节序 short msg_id_host = boost::asio::detail::socket_ops::host_to_network_short(msg_id); memcpy(_data, &msg_id_host, HEAD_ID_LEN); //转为网络字节序 short max_len_host = boost::asio::detail::socket_ops::host_to_network_short(max_len); memcpy(_data + HEAD_ID_LEN, &max_len_host, HEAD_DATA_LEN); memcpy(_data + HEAD_ID_LEN + HEAD_DATA_LEN, msg, max_len); }
LogicSystem.h
#pragma once #include "Singleton.h" #include <queue> #include <thread> #include "CSession.h" #include <queue> #include <map> #include <functional> #include "const.h" #include <json/json.h> #include <json/value.h> #include <json/reader.h> typedef function<void(shared_ptr<CSession>, const short &msg_id, const string &msg_data)> FunCallBack; class LogicSystem:public Singleton<LogicSystem> { friend class Singleton<LogicSystem>; public: ~LogicSystem(); void PostMsgToQue(shared_ptr < LogicNode> msg); private: LogicSystem(); void DealMsg(); void RegisterCallBacks(); void HelloWordCallBack(shared_ptr<CSession>, const short &msg_id, const string &msg_data); std::thread _worker_thread; std::queue<shared_ptr<LogicNode>> _msg_que; std::mutex _mutex; std::condition_variable _consume; bool _b_stop; std::map<short, FunCallBack> _fun_callbacks; };
LogicSystem.cpp
#include "LogicSystem.h" using namespace std; LogicSystem::LogicSystem():_b_stop(false){ RegisterCallBacks(); _worker_thread = std::thread (&LogicSystem::DealMsg, this); } LogicSystem::~LogicSystem(){ _b_stop = true; _consume.notify_one(); _worker_thread.join(); } void LogicSystem::PostMsgToQue(shared_ptr < LogicNode> msg) { std::unique_lock<std::mutex> unique_lk(_mutex); _msg_que.push(msg); //由0变为1则发送通知信号 if (_msg_que.size() == 1) { unique_lk.unlock(); _consume.notify_one(); } } void LogicSystem::DealMsg() { for (;;) { std::unique_lock<std::mutex> unique_lk(_mutex); //判断队列为空则用条件变量阻塞等待,并释放锁 while (_msg_que.empty() && !_b_stop) { _consume.wait(unique_lk); } //判断是否为关闭状态,把所有逻辑执行完后则退出循环 if (_b_stop ) { while (!_msg_que.empty()) { auto msg_node = _msg_que.front(); cout << "recv_msg id is " << msg_node->_recvnode->_msg_id << endl; auto call_back_iter = _fun_callbacks.find(msg_node->_recvnode->_msg_id); if (call_back_iter == _fun_callbacks.end()) { _msg_que.pop(); continue; } call_back_iter->second(msg_node->_session, msg_node->_recvnode->_msg_id, std::string(msg_node->_recvnode->_data, msg_node->_recvnode->_cur_len)); _msg_que.pop(); } break; } //如果没有停服,且说明队列中有数据 auto msg_node = _msg_que.front(); cout << "recv_msg id is " << msg_node->_recvnode->_msg_id << endl; auto call_back_iter = _fun_callbacks.find(msg_node->_recvnode->_msg_id); if (call_back_iter == _fun_callbacks.end()) { _msg_que.pop(); continue; } call_back_iter->second(msg_node->_session, msg_node->_recvnode->_msg_id, std::string(msg_node->_recvnode->_data, msg_node->_recvnode->_cur_len)); _msg_que.pop(); } } void LogicSystem::RegisterCallBacks() { _fun_callbacks[MSG_HELLO_WORD] = std::bind(&LogicSystem::HelloWordCallBack, this, placeholders::_1, placeholders::_2, placeholders::_3); } void LogicSystem::HelloWordCallBack(shared_ptr<CSession> session, const short &msg_id, const string &msg_data) { Json::Reader reader; Json::Value root; reader.parse(msg_data, root); std::cout << "recevie msg id is " << root["id"].asInt() << " msg data is " << root["data"].asString() << endl; root["data"] = "server has received msg, msg data is " + root["data"].asString(); std::string return_str = root.toStyledString(); session->Send(return_str, root["id"].asInt()); }
CSession.h
#pragma once #include <boost/asio.hpp> #include <boost/uuid/uuid_io.hpp> #include <boost/uuid/uuid_generators.hpp> #include <queue> #include <mutex> #include <memory> #include "const.h" #include "MsgNode.h" using namespace std; using boost::asio::ip::tcp; class CServer; class LogicSystem; class CSession: public std::enable_shared_from_this<CSession> { public: CSession(boost::asio::io_context& io_context, CServer* server); ~CSession(); tcp::socket& GetSocket(); std::string& GetUuid(); void Start(); void Send(char* msg, short max_length, short msgid); void Send(std::string msg, short msgid); void Close(); std::shared_ptr<CSession> SharedSelf(); private: void HandleRead(const boost::system::error_code& error, size_t bytes_transferred, std::shared_ptr<CSession> shared_self); void HandleWrite(const boost::system::error_code& error, std::shared_ptr<CSession> shared_self); tcp::socket _socket; std::string _uuid; char _data[MAX_LENGTH]; CServer* _server; bool _b_close; std::queue<shared_ptr<SendNode> > _send_que; std::mutex _send_lock; //收到的消息结构 std::shared_ptr<RecvNode> _recv_msg_node; bool _b_head_parse; //收到的头部结构 std::shared_ptr<MsgNode> _recv_head_node; }; class LogicNode { friend class LogicSystem; public: LogicNode(shared_ptr<CSession>, shared_ptr<RecvNode>); private: shared_ptr<CSession> _session; shared_ptr<RecvNode> _recvnode; };
CSession.cpp
#include "CSession.h" #include "CServer.h" #include <iostream> #include <sstream> #include <json/json.h> #include <json/value.h> #include <json/reader.h> #include "LogicSystem.h" CSession::CSession(boost::asio::io_context& io_context, CServer* server): _socket(io_context), _server(server), _b_close(false),_b_head_parse(false){ boost::uuids::uuid a_uuid = boost::uuids::random_generator()(); _uuid = boost::uuids::to_string(a_uuid); _recv_head_node = make_shared<MsgNode>(HEAD_TOTAL_LEN); } CSession::~CSession() { std::cout << "~CSession destruct" << endl; } tcp::socket& CSession::GetSocket() { return _socket; } std::string& CSession::GetUuid() { return _uuid; } void CSession::Start(){ ::memset(_data, 0, MAX_LENGTH); _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, SharedSelf())); } void CSession::Send(std::string msg, short msgid) { std::lock_guard<std::mutex> lock(_send_lock); int send_que_size = _send_que.size(); if (send_que_size > MAX_SENDQUE) { std::cout << "session: " << _uuid << " send que fulled, size is " << MAX_SENDQUE << endl; return; } _send_que.push(make_shared<SendNode>(msg.c_str(), msg.length(), msgid)); if (send_que_size > 0) { return; } auto& msgnode = _send_que.front(); boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_total_len), std::bind(&CSession::HandleWrite, this, std::placeholders::_1, SharedSelf())); } void CSession::Send(char* msg, short max_length, short msgid) { std::lock_guard<std::mutex> lock(_send_lock); int send_que_size = _send_que.size(); if (send_que_size > MAX_SENDQUE) { std::cout << "session: " << _uuid << " send que fulled, size is " << MAX_SENDQUE << endl; return; } _send_que.push(make_shared<SendNode>(msg, max_length, msgid)); if (send_que_size>0) { return; } auto& msgnode = _send_que.front(); boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_total_len), std::bind(&CSession::HandleWrite, this, std::placeholders::_1, SharedSelf())); } void CSession::Close() { _socket.close(); _b_close = true; } std::shared_ptr<CSession>CSession::SharedSelf() { return shared_from_this(); } void CSession::HandleWrite(const boost::system::error_code& error, std::shared_ptr<CSession> shared_self) { //增加异常处理 try { if (!error) { std::lock_guard<std::mutex> lock(_send_lock); //cout << "send data " << _send_que.front()->_data+HEAD_LENGTH << endl; _send_que.pop(); if (!_send_que.empty()) { auto& msgnode = _send_que.front(); boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_total_len), std::bind(&CSession::HandleWrite, this, std::placeholders::_1, shared_self)); } } else { std::cout << "handle write failed, error is " << error.what() << endl; Close(); _server->ClearSession(_uuid); } } catch (std::exception& e) { std::cerr << "Exception code : " << e.what() << endl; } } void CSession::HandleRead(const boost::system::error_code& error, size_t bytes_transferred, std::shared_ptr<CSession> shared_self){ try { if (!error) { //已经移动的字符数 int copy_len = 0; while (bytes_transferred > 0) { if (!_b_head_parse) { //收到的数据不足头部大小 if (bytes_transferred + _recv_head_node->_cur_len < HEAD_TOTAL_LEN) { memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data + copy_len, bytes_transferred); _recv_head_node->_cur_len += bytes_transferred; ::memset(_data, 0, MAX_LENGTH); _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self)); return; } //收到的数据比头部多 //头部剩余未复制的长度 int head_remain = HEAD_TOTAL_LEN - _recv_head_node->_cur_len; memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data + copy_len, head_remain); //更新已处理的data长度和剩余未处理的长度 copy_len += head_remain; bytes_transferred -= head_remain; //获取头部MSGID数据 short msg_id = 0; memcpy(&msg_id, _recv_head_node->_data, HEAD_ID_LEN); //网络字节序转化为本地字节序 msg_id = boost::asio::detail::socket_ops::network_to_host_short(msg_id); std::cout << "msg_id is " << msg_id << endl; //id非法 if (msg_id > MAX_LENGTH) { std::cout << "invalid msg_id is " << msg_id << endl; _server->ClearSession(_uuid); return; } short msg_len = 0; memcpy(&msg_len, _recv_head_node->_data+HEAD_ID_LEN, HEAD_DATA_LEN); //网络字节序转化为本地字节序 msg_len = boost::asio::detail::socket_ops::network_to_host_short(msg_len); std::cout << "msg_len is " << msg_len << endl; //id非法 if (msg_len > MAX_LENGTH) { std::cout << "invalid data length is " << msg_len << endl; _server->ClearSession(_uuid); return; } _recv_msg_node = make_shared<RecvNode>(msg_len, msg_id); //消息的长度小于头部规定的长度,说明数据未收全,则先将部分消息放到接收节点里 if (bytes_transferred < msg_len) { memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred); _recv_msg_node->_cur_len += bytes_transferred; ::memset(_data, 0, MAX_LENGTH); _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self)); //头部处理完成 _b_head_parse = true; return; } memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, msg_len); _recv_msg_node->_cur_len += msg_len; copy_len += msg_len; bytes_transferred -= msg_len; _recv_msg_node->_data[_recv_msg_node->_total_len] = '\0'; //cout << "receive data is " << _recv_msg_node->_data << endl; //此处将消息投递到逻辑队列中 LogicSystem::GetInstance()->PostMsgToQue(make_shared<LogicNode>(shared_from_this(), _recv_msg_node)); //继续轮询剩余未处理数据 _b_head_parse = false; _recv_head_node->Clear(); if (bytes_transferred <= 0) { ::memset(_data, 0, MAX_LENGTH); _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self)); return; } continue; } //已经处理完头部,处理上次未接受完的消息数据 //接收的数据仍不足剩余未处理的 int remain_msg = _recv_msg_node->_total_len - _recv_msg_node->_cur_len; if (bytes_transferred < remain_msg) { memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred); _recv_msg_node->_cur_len += bytes_transferred; ::memset(_data, 0, MAX_LENGTH); _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self)); return; } memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, remain_msg); _recv_msg_node->_cur_len += remain_msg; bytes_transferred -= remain_msg; copy_len += remain_msg; _recv_msg_node->_data[_recv_msg_node->_total_len] = '\0'; //cout << "receive data is " << _recv_msg_node->_data << endl; //此处将消息投递到逻辑队列中 LogicSystem::GetInstance()->PostMsgToQue(make_shared<LogicNode>(shared_from_this(), _recv_msg_node)); //继续轮询剩余未处理数据 _b_head_parse = false; _recv_head_node->Clear(); if (bytes_transferred <= 0) { ::memset(_data, 0, MAX_LENGTH); _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self)); return; } continue; } } else { std::cout << "handle read failed, error is " << error.what() << endl; Close(); _server->ClearSession(_uuid); } } catch (std::exception& e) { std::cout << "Exception code is " << e.what() << endl; } } LogicNode::LogicNode(shared_ptr<CSession> session, shared_ptr<RecvNode> recvnode):_session(session),_recvnode(recvnode) { }
CServer.h
#pragma once #include <boost/asio.hpp> #include "CSession.h" #include <memory.h> #include <map> #include <mutex> using namespace std; using boost::asio::ip::tcp; class CServer { public: CServer(boost::asio::io_context& io_context, short port); ~CServer(); void ClearSession(std::string); private: void HandleAccept(shared_ptr<CSession>, const boost::system::error_code & error); void StartAccept(); boost::asio::io_context &_io_context; short _port; tcp::acceptor _acceptor; std::map<std::string, shared_ptr<CSession>> _sessions; std::mutex _mutex; };
CServer.cpp
#include "CServer.h" #include <iostream> #include "AsioIOServicePool.h" CServer::CServer(boost::asio::io_context& io_context, short port):_io_context(io_context), _port(port), _acceptor(io_context, tcp::endpoint(tcp::v4(),port)) { cout << "Server start success, listen on port : " << _port << endl; StartAccept(); } CServer::~CServer() { cout << "Server destruct listen on port : " << _port << endl; } void CServer::HandleAccept(shared_ptr<CSession> new_session, const boost::system::error_code& error){ if (!error) { new_session->Start(); lock_guard<mutex> lock(_mutex); _sessions.insert(make_pair(new_session->GetUuid(), new_session)); } else { cout << "session accept failed, error is " << error.what() << endl; } StartAccept(); } void CServer::StartAccept() { auto &io_context = AsioIOServicePool::GetInstance()->GetIOService(); shared_ptr<CSession> new_session = make_shared<CSession>(io_context, this); _acceptor.async_accept(new_session->GetSocket(), std::bind(&CServer::HandleAccept, this, new_session, placeholders::_1)); } void CServer::ClearSession(std::string uuid) { lock_guard<mutex> lock(_mutex); _sessions.erase(uuid); }
AsioIOServicePool.h
#pragma once #include <vector> #include <boost/asio.hpp> #include "Singleton.h" class AsioIOServicePool:public Singleton<AsioIOServicePool> { friend Singleton<AsioIOServicePool>; public: using IOService = boost::asio::io_context; using Work = boost::asio::io_context::work; using WorkPtr = std::unique_ptr<Work>; ~AsioIOServicePool(); AsioIOServicePool(const AsioIOServicePool&) = delete; AsioIOServicePool& operator=(const AsioIOServicePool&) = delete; // 使用 round-robin 的方式返回一个 io_service boost::asio::io_context& GetIOService(); void Stop(); private: AsioIOServicePool(std::size_t size = std::thread::hardware_concurrency()); std::vector<IOService> _ioServices; std::vector<WorkPtr> _works; std::vector<std::thread> _threads; std::size_t _nextIOService; };
AsioIOServicePool.cpp
#include "AsioIOServicePool.h" #include <iostream> using namespace std; AsioIOServicePool::AsioIOServicePool(std::size_t size):_ioServices(size), _works(size), _nextIOService(0){ for (std::size_t i = 0; i < size; ++i) { _works[i] = std::unique_ptr<Work>(new Work(_ioServices[i])); } //遍历多个ioservice,创建多个线程,每个线程内部启动ioservice for (std::size_t i = 0; i < _ioServices.size(); ++i) { _threads.emplace_back([this, i]() { _ioServices[i].run(); }); } } AsioIOServicePool::~AsioIOServicePool() { std::cout << "AsioIOServicePool destruct" << endl; } boost::asio::io_context& AsioIOServicePool::GetIOService() { auto& service = _ioServices[_nextIOService++]; if (_nextIOService == _ioServices.size()) { _nextIOService = 0; } return service; } void AsioIOServicePool::Stop(){ //因为仅仅执行work.reset并不能让iocontext从run的状态中退出 //当iocontext已经绑定了读或写的监听事件后,还需要手动stop该服务。 for (auto& work : _works) { //把服务先停止 work->get_io_context().stop(); work.reset(); } for (auto& t : _threads) { t.join(); } }
main.cpp
#include <iostream> #include "CServer.h" #include "Singleton.h" #include "LogicSystem.h" #include <csignal> #include <thread> #include <mutex> #include "AsioIOServicePool.h" using namespace std; bool bstop = false; std::condition_variable cond_quit; std::mutex mutex_quit; int main() { try { auto pool = AsioIOServicePool::GetInstance(); boost::asio::io_context io_context; boost::asio::signal_set signals(io_context, SIGINT, SIGTERM); signals.async_wait([&io_context,pool](auto, auto) { io_context.stop(); pool->Stop(); }); CServer s(io_context, 10086); io_context.run(); } catch (std::exception& e) { std::cerr << "Exception: " << e.what() << endl; } }
具体可见:
https://llfc.club/category?catid=225RaiVNI8pFDD5L4m807g7ZwmF#!aid/2Qld2hoFIu8ycYBJXQdxwyWEBfy
标签:asio,std,include,node,data,len,msg,多线程,boost From: https://www.cnblogs.com/bwbfight/p/17594353.html