首页 > 其他分享 >基于boost的聊天服务器改进(二)

基于boost的聊天服务器改进(二)

时间:2023-08-03 20:47:40浏览次数:37  
标签:std read _. 聊天 chat msg 服务器 boost

首先,基于上一节,我们将其改进成,一个io_services在多个线程中,来并发处理。

通常server中只是监听accept,即读操作,不存在线程不安全的问题,而session中,一般会有socket的读写read/write,存在线程不安全的问题

决处理session和room可能存在的线程不安全的问题

其中chat_room中的join/leave/deliver, 我们使用strand(尽量不加锁),对participants进行对应的操作

 void chat_room::join(chat_session_ptr participant) {
        //std::lock_guard<std::mutex> lock(m_mutex);
        m_strand.post([this, participant]{
                participants_.insert(participant);
                for (const auto& msg : recent_msgs_)
                participant->deliver(msg);
                });
  }

  void chat_room::leave(chat_session_ptr participant) {
        //std::lock_guard<std::mutex> lock(m_mutex);
        m_strand.post([this,participant]{
    participants_.erase(participant);});
  }

  void chat_room::deliver(const chat_message &msg) {
        //std::lock_guard<std::mutex> lock(m_mutex);
        m_strand.post([this, msg]{
    recent_msgs_.push_back(msg);
    while (recent_msgs_.size() > max_recent_msgs)
      recent_msgs_.pop_front();

    for (auto& participant : participants_)
      participant->deliver(msg);
        });
  }

以及对chat_sesion中的读写回调进行包裹,避免在多线程中造成问题。

 void do_write() {
    auto self(shared_from_this());
    boost::asio::async_write(
        socket_, boost::asio::buffer(write_msgs_.front().data(),
                                     write_msgs_.front().length()),
                m_strand.wrap(
        [this, self](boost::system::error_code ec, std::size_t /*length*/) {
          if (!ec) {
            write_msgs_.pop_front();
            if (!write_msgs_.empty()) {
              do_write();
            }
          } else {
            room_.leave(shared_from_this());
          }
        }));
  }
 void do_read_header() {     auto self(shared_from_this());     boost::asio::async_read(         socket_,         boost::asio::buffer(read_msg_.data(), chat_message::header_length),         m_strand.wrap(         [this, self](boost::system::error_code ec, std::size_t /*length*/) {           if (!ec && read_msg_.decode_header()) {             do_read_body();           } else {             std::cout << "Player leave the room\n";             room_.leave(shared_from_this());           }         }));   }
  void do_read_body() {     auto self(shared_from_this());     boost::asio::async_read(         socket_, boost::asio::buffer(read_msg_.body(), read_msg_.body_length()),         m_strand.wrap(         [this, self](boost::system::error_code ec, std::size_t /*length*/) {           if (!ec) {             //room_.deliver(read_msg_);             handleMessage();             do_read_header();           } else {             room_.leave(shared_from_this());           }         }));   }
 

整体代码逻辑如下:

 

#include "chat_message.h"
#include "JsonObject.h"
#include "Protocal.pb.h"

#include <boost/asio.hpp>

#include <chrono>
#include <deque>
#include <iostream>
#include <list>
#include <memory>
#include <set>
#include <thread>
#include <utility>
#include <mutex>

#include <cstdlib>

using boost::asio::ip::tcp;

//----------------------------------------------------------------------

using chat_message_queue = std::deque<chat_message>;
using chat_message_queue2 = std::list<chat_message>;
//----------------------------------------------------------------------

// stread_clock
std::chrono::system_clock::time_point base;

//----------------------------------------------------------------------

class chat_session;
using chat_session_ptr = std::shared_ptr<chat_session>;
class chat_room {
public:
    chat_room(boost::asio::io_service& io_service) : m_strand(io_service) {}
public:
    void join(chat_session_ptr);
    void leave(chat_session_ptr);
    void deliver(const chat_message&);
private:
    boost::asio::io_service::strand m_strand;
    //std::mutex m_mutex;
  std::set<chat_session_ptr> participants_;
  enum { max_recent_msgs = 100 };
  chat_message_queue recent_msgs_;
};
//----------------------------------------------------------------------

class chat_session : public std::enable_shared_from_this<chat_session> {
public:
  chat_session(tcp::socket socket, chat_room &room)
      : socket_(std::move(socket)), room_(room),
            m_strand(socket_.get_io_service()) {}

  void start() {
    room_.join(shared_from_this());
    do_read_header();
  }

  void deliver(const chat_message &msg) {
        // first false
        m_strand.post([this, msg]{
    bool write_in_progress = !write_msgs_.empty();
    write_msgs_.push_back(msg);
    if (!write_in_progress) {
            // first
      do_write();
    }});
  }

private:
  void do_read_header() {
    auto self(shared_from_this());
    boost::asio::async_read(
        socket_,
        boost::asio::buffer(read_msg_.data(), chat_message::header_length),
                m_strand.wrap(
        [this, self](boost::system::error_code ec, std::size_t /*length*/) {
          if (!ec && read_msg_.decode_header()) {
            do_read_body();
          } else {
            std::cout << "Player leave the room\n";
            room_.leave(shared_from_this());
          }
        }));
  }

  void do_read_body() {
    auto self(shared_from_this());
    boost::asio::async_read(
        socket_, boost::asio::buffer(read_msg_.body(), read_msg_.body_length()),
                m_strand.wrap(
        [this, self](boost::system::error_code ec, std::size_t /*length*/) {
          if (!ec) {
            //room_.deliver(read_msg_);
                        handleMessage();
            do_read_header();
          } else {
            room_.leave(shared_from_this());
          }
        }));
  }

  template <typename T> T toObject() {
    T obj;
    std::stringstream ss(std::string(
        read_msg_.body(), read_msg_.body() + read_msg_.body_length()));
    boost::archive::text_iarchive oa(ss);
    oa &obj;
    return obj;
  }

  bool fillProtobuf(::google::protobuf::Message* msg) {
    std::string ss(
        read_msg_.body(), read_msg_.body() + read_msg_.body_length());
        auto ok = msg->ParseFromString(ss);
    return ok;
  }

    ptree toPtree() {
        ptree obj;
        std::stringstream ss(
                std::string(read_msg_.body(),
                    read_msg_.body() + read_msg_.body_length()));
        boost::property_tree::read_json(ss, obj);
        return obj;
    }

  void handleMessage() {
        auto n = std::chrono::system_clock::now() - base;
        std::cout << "i'm in " << std::this_thread::get_id() << " time "
            << std::chrono::duration_cast<std::chrono::milliseconds>(n).count() << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(3));
    if (read_msg_.type() == MT_BIND_NAME) {
            PBindName bindName;
            if(fillProtobuf(&bindName))
                m_name = bindName.name();
    } else if (read_msg_.type() == MT_CHAT_INFO) {
            PChat chat;
            if(!fillProtobuf(&chat)) return;
            m_chatInformation = chat.information();

      auto rinfo = buildRoomInfo();
      chat_message msg;
      msg.setMessage(MT_ROOM_INFO, rinfo);
      room_.deliver(msg);

    } else {
      // not valid msg do nothing
    }
  }

  void do_write() {
    auto self(shared_from_this());
    boost::asio::async_write(
        socket_, boost::asio::buffer(write_msgs_.front().data(),
                                     write_msgs_.front().length()),
                m_strand.wrap(
        [this, self](boost::system::error_code ec, std::size_t /*length*/) {
          if (!ec) {
            write_msgs_.pop_front();
            if (!write_msgs_.empty()) {
              do_write();
            }
          } else {
            room_.leave(shared_from_this());
          }
        }));
  }

  tcp::socket socket_;
  chat_room &room_;
  chat_message read_msg_;
  chat_message_queue write_msgs_;
    std::string m_name;
    std::string m_chatInformation;
    boost::asio::io_service::strand m_strand;
    std::string buildRoomInfo() const {
        PRoomInformation roomInfo;
        roomInfo.set_name(m_name);
        roomInfo.set_information(m_chatInformation);
        std::string out;
        auto ok = roomInfo.SerializeToString(&out);
        assert(ok);
        return out;
    }
//    RoomInformation buildRoomInfo() const {
//        RoomInformation info;
//        info.name.nameLen = m_name.size();
//        std::memcpy(info.name.name, m_name.data(), m_name.size());
//        info.chat.infoLen = m_chatInformation.size();
//        std::memcpy(info.chat.information, m_chatInformation.data(),
//                m_chatInformation.size());
//        return info;
//    }
};


  void chat_room::join(chat_session_ptr participant) {
        //std::lock_guard<std::mutex> lock(m_mutex);
        m_strand.post([this, participant]{
                participants_.insert(participant);
                for (const auto& msg : recent_msgs_)
                participant->deliver(msg);
                });
  }

  void chat_room::leave(chat_session_ptr participant) {
        //std::lock_guard<std::mutex> lock(m_mutex);
        m_strand.post([this,participant]{
    participants_.erase(participant);});
  }

  void chat_room::deliver(const chat_message &msg) {
        //std::lock_guard<std::mutex> lock(m_mutex);
        m_strand.post([this, msg]{
    recent_msgs_.push_back(msg);
    while (recent_msgs_.size() > max_recent_msgs)
      recent_msgs_.pop_front();

    for (auto& participant : participants_)
      participant->deliver(msg);
        });
  }


//----------------------------------------------------------------------

class chat_server {
public:
  chat_server(boost::asio::io_service &io_service,
              const tcp::endpoint &endpoint)
      : acceptor_(io_service, endpoint), socket_(io_service), room_(io_service) {
    do_accept();
  }

private:
  void do_accept() {
    acceptor_.async_accept(socket_, [this](boost::system::error_code ec) {
      if (!ec) {
        auto session =
            std::make_shared<chat_session>(std::move(socket_), room_);
        session->start();
      }

      do_accept();
    });
  }

  tcp::acceptor acceptor_;
  tcp::socket socket_;
  chat_room room_;
};

//----------------------------------------------------------------------

int main(int argc, char *argv[]) {
  try {
        GOOGLE_PROTOBUF_VERIFY_VERSION;
    if (argc < 2) {
      std::cerr << "Usage: chat_server <port> [<port> ...]\n";
      return 1;
    }
        base = std::chrono::system_clock::now();

    boost::asio::io_service io_service;

    std::list<chat_server> servers;
    for (int i = 1; i < argc; ++i) {
      tcp::endpoint endpoint(tcp::v4(), std::atoi(argv[i]));
      servers.emplace_back(io_service, endpoint);
    }

        std::vector<std::thread> threadGroup;
        for(int i = 0; i < 5; ++i) {
            threadGroup.emplace_back([&io_service, i]{
                    std::cout << i << " name is " << std::this_thread::get_id() << std::endl;
                    io_service.run();});
        }

        std::cout << "main thread name is " << std::this_thread::get_id() << std::endl;
        io_service.run();
        for(auto& v : threadGroup) v.join();
  } catch (std::exception &e) {
    std::cerr << "Exception: " << e.what() << "\n";
  }

    google::protobuf::ShutdownProtobufLibrary();
  return 0;
}

client.cpp

#include "chat_message.h"
#include "structHeader.h"
#include "JsonObject.h"
#include "SerilizationObject.h"
#include "Protocal.pb.h"

#include <boost/asio.hpp>

#include <chrono>
#include <deque>
#include <iostream>
#include <memory>
#include <thread>
#include <vector>

#include <cstdlib>
#include <cassert>

using boost::asio::ip::tcp;

using chat_message_queue = std::deque<chat_message>;


class chat_client {
public:
  chat_client(boost::asio::io_service &io_service,
              tcp::resolver::iterator endpoint_iterator)
      : io_service_(io_service), socket_(io_service) {
    do_connect(endpoint_iterator);
  }

  void write(const chat_message &msg) {
    io_service_.post([this, msg]() {
      bool write_in_progress = !write_msgs_.empty();
      write_msgs_.push_back(msg);
      if (!write_in_progress) {
        do_write();
      }
    });
  }

  void close() {
    io_service_.post([this]() { socket_.close(); });
  }

private:
  void do_connect(tcp::resolver::iterator endpoint_iterator) {
    boost::asio::async_connect(
        socket_, endpoint_iterator,
        [this](boost::system::error_code ec, tcp::resolver::iterator) {
          if (!ec) {
            do_read_header();
          }
        });
  }

  void do_read_header() {
    boost::asio::async_read(
        socket_,
        boost::asio::buffer(read_msg_.data(), chat_message::header_length),
        [this](boost::system::error_code ec, std::size_t /*length*/) {
          if (!ec && read_msg_.decode_header()) {
            do_read_body();
          } else {
            socket_.close();
          }
        });
  }

  void do_read_body() {
    boost::asio::async_read(
        socket_, boost::asio::buffer(read_msg_.body(), read_msg_.body_length()),
        [this](boost::system::error_code ec, std::size_t /*length*/) {
          if (!ec) {
            if (read_msg_.type() == MT_ROOM_INFO) {
              //SRoomInfo info;
                        std::string buffer(read_msg_.body(),
                              read_msg_.body() + read_msg_.body_length());

                        PRoomInformation roomInfo;
                        auto ok = roomInfo.ParseFromString(buffer);
                        //if(!ok) throw std::runtime_error("not valid message");
              //std::stringstream ss(buffer);
                            //ptree tree;
                            //boost::property_tree::read_json(ss, tree);
                        if (ok) {
                        std::cout << "client: '";
                        std::cout << roomInfo.name();
                        std::cout << "' says '";
                        std::cout
                        << roomInfo.information();
                        std::cout << "'\n";
                        }
//              boost::archive::text_iarchive ia(ss);
//              ia & info;
//              std::cout << "client: '";
//              std::cout << info.name();
//              std::cout << "' says '";
//              std::cout << info.information();
//              std::cout << "'\n";
            }
            do_read_header();
          } else {
            socket_.close();
          }
        });
  }

  void do_write() {
    boost::asio::async_write(
        socket_, boost::asio::buffer(write_msgs_.front().data(),
                                     write_msgs_.front().length()),
        [this](boost::system::error_code ec, std::size_t /*length*/) {
          if (!ec) {
            write_msgs_.pop_front();
            if (!write_msgs_.empty()) {
              do_write();
            }
          } else {
            socket_.close();
          }
        });
  }

private:
  boost::asio::io_service &io_service_;
  tcp::socket socket_;
  chat_message read_msg_;
  chat_message_queue write_msgs_;
};

int main(int argc, char *argv[]) {
  try {
        GOOGLE_PROTOBUF_VERIFY_VERSION;
    if (argc != 3) {
      std::cerr << "Usage: chat_client <host> <port>\n";
      return 1;
    }

    boost::asio::io_service io_service;
        std::vector<std::unique_ptr<chat_client>> clientGroup;

    tcp::resolver resolver(io_service);
    auto endpoint_iterator = resolver.resolve({argv[1], argv[2]});
        for(int i = 0; i < 10; ++i) {
            clientGroup.emplace_back(std::make_unique<chat_client>(
                        io_service, endpoint_iterator));
        }

    std::thread t([&io_service]() { io_service.run(); });

    char line[chat_message::max_body_length + 1];
        // ctrl-d
    while (std::cin.getline(line, chat_message::max_body_length + 1)) {
      chat_message msg;
            auto type = 0;
            std::string input(line, line + std::strlen(line));
            std::string output;
            if(parseMessage4(input, &type, output)) {
                msg.setMessage(type, output.data(), output.size());
                for(auto& v : clientGroup)
                    v->write(msg);
                std::cout << "write message for server " << output.size() << std::endl;
            }
    }

        for(auto& v: clientGroup)
            v->close();
    t.join();
  } catch (std::exception &e) {
    std::cerr << "Exception: " << e.what() << "\n";
  }

    google::protobuf::ShutdownProtobufLibrary();
  return 0;
}

、多线程调度情况:

asio规定:只能在调用io_service::run的线程中才能调用事件完成处理器。

注:事件完成处理器就是你async_accept、async_write等注册的句柄,类似于回调的东西。

单线程:

如果只有一个线程调用io_service::run,根据asio的规定,事件完成处理器也只能在这个线程中执行。也就是说,你所有代码都在同一个线程中运行,因此变量的访问是安全的。

多线程:

如果有多个线程同时调用io_service::run以实现多线程并发处理。对于asio来说,这些线程都是平等的,没有主次之分。如果你投递的一个请求比如async_write完成时,asio将随机的激活调用io_service::run的线程。并在这个线程中调用事件完成处理器(async_write当时注册的句柄)。如果你的代码耗时较长,这个时候你投递的另一个async_write请求完成时,asio将不等待你的代码处理完成,它将在另外的一个调用io_service::run线程中,调用async_write当时注册的句柄。也就是说,你注册的事件完成处理器有可能同时在多个线程中调用。

当然你可以使用 boost::asio::io_service::strand让完成事件处理器的调用【当处理函数不是线程安全的,强烈建议使用这种方式】,在同一时间只有一个, 比如下面的的代码:

socket_.async_read_some(boost::asio::buffer(buffer_),
strand_.wrap(
boost::bind(&connection::handle_read, shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred)));

...

boost::asio::io_service::strand strand_;

 

此时async_read_some完成后调用handle_read时,必须等待其它handle_read调用完成时才能被执行(async_read_some引起的handle_read调用)。

多线程调用时,还有一个重要的问题,那就是无序化。比如说,你短时间内投递多个async_write,那么完成处理器的调用并不是按照你投递async_write的顺序调用的。asio第一次调用完成事件处理器,有可能是第二次async_write返回的结果,也有可能是第3次的。使用strand也是这样的。strand只是保证同一时间只运行一个完成处理器,但它并不保证顺序。

详细见

https://blog.csdn.net/KnightOnHourse/article/details/80292713

https://www.jianshu.com/p/70286c2ab544

https://www.cnblogs.com/my_life/articles/5331789.html

标签:std,read,_.,聊天,chat,msg,服务器,boost
From: https://www.cnblogs.com/bwbfight/p/17604396.html

相关文章

  • 基于GPT搭建私有知识库聊天机器人(四)问答实现
    前文链接:基于GPT搭建私有知识库聊天机器人(一)实现原理基于GPT搭建私有知识库聊天机器人(二)环境安装基于GPT搭建私有知识库聊天机器人(三)向量数据训练在前面的文章中,我们介绍了如何使用GPT模型搭建私有知识库聊天机器人的基本原理、环境安装、数据向量化。本文将进一步介绍如何使用lang......
  • 基于GPT搭建私有知识库聊天机器人(五)函数调用
    文章链接:基于GPT搭建私有知识库聊天机器人(一)实现原理基于GPT搭建私有知识库聊天机器人(二)环境安装基于GPT搭建私有知识库聊天机器人(三)向量数据训练基于GPT搭建私有知识库聊天机器人(四)问答实现OpenAI在6月13日发布了几个重磅更新,其中包括:开放了16k上下文的GPT-3.5-Turbo模型gpt-3.5-t......
  • 基于GPT搭建私有知识库聊天机器人(六)仿chatGPT打字机效果
    文章链接:基于GPT搭建私有知识库聊天机器人(一)实现原理基于GPT搭建私有知识库聊天机器人(二)环境安装基于GPT搭建私有知识库聊天机器人(三)向量数据训练基于GPT搭建私有知识库聊天机器人(四)问答实现基于GPT搭建私有知识库聊天机器人(五)函数调用在前几篇文章中,我们已经了解了如何使用GPT模......
  • 基于GPT搭建私有知识库聊天机器人(三)向量数据训练
    前文链接:基于GPT搭建私有知识库聊天机器人(一)实现原理基于GPT搭建私有知识库聊天机器人(二)环境安装基于GPT搭建私有知识库聊天机器人(四)问答实现在前面的文章中,我们介绍了实现原理和基本环境安装。本文将重点介绍数据训练的流程,以及如何加载、切割、训练数据,并使用向量数据库Milvus进......
  • 基于GPT搭建私有知识库聊天机器人(二)环境安装
    文章链接:基于GPT搭建私有知识库聊天机器人(一)实现原理基于GPT搭建私有知识库聊天机器人(三)向量数据训练基于GPT搭建私有知识库聊天机器人(四)问答实现1、需要安装的包pip3installflask//python开发web框架pip3installlangchain//LLM开发框架pip3installopenai......
  • 基于GPT搭建私有知识库聊天机器人(一)实现原理
    文章链接:基于GPT搭建私有知识库聊天机器人(二)环境安装基于GPT搭建私有知识库聊天机器人(三)向量数据训练基于GPT搭建私有知识库聊天机器人(四)问答实现1、成品演示支持微信聊天支持网页聊天支持微信语音对话支持私有知识文件训练,并针对文件提问步骤1:准备本地文件a.txt,支持pdf、txt、mark......
  • 64核RISC-V服务器能打了吗?
    最近看到“澎峰科技”的微信公众号,看到他们发布了第一款RISC-V服务器,芯片是算能的SG2042,带64个RISC-V核心(阿里平头哥的C910v核),2.0GHz主频,最大支持128GB内存。这应该算是全球第一款RISC-V服务器吧,找了一些资料。和大家一起品品。处理器采用贴片,不是那种socket。没什么好评论的。......
  • 【软件工具安装使用】boost的安装使用和交叉编译
    前言 一、编译安装 二、交叉编译安装注意空格;注意路径; 参考1. BoostC++Libraries;2. ubuntu22.04源码编译安装boost库_奇妙之二进制的博客-CSDN博客;3. Boost1.74.0交叉编译_hrx-@@的博客-CSDN博客;4. BoostGettingStartedonUnixVariants-1.82.0;5. Bo......
  • Typecho建站:腾讯云轻量应用服务器搭建博客网站教程
    腾讯云轻量应用服务器自带Typecho应用模板镜像,腾讯云提供的Typecho模板镜像是基于CentOS7.664位操作系统,并已预置Nginx、PHP、MariaDB软件程序,使用Typecho应用模板可以快速搭建博客、企业官网、电商及论坛等各类网站。腾讯云服务器网分享使用腾讯云轻量应用服务器Typecho应用模板......
  • RTMP流媒体服务器LiteCVR安防监控平台有关帧数的知识点讲解
    RTMP流媒体服务器LiteCVR平台支持多协议方式接入,例如市场主流标准协议(国标GB/T28181协议、RTMP、RTSP/Onvif)与厂家私有协议和SDK(海康Ehome、海康SDK、大华SDK、华为SDK、宇视SDK、乐橙SDK、萤石SDK)等。平台可运用于智慧工地、智慧安防、智慧工厂、智慧园区等场景。接下来我们将为大......