首页 > 其他分享 >boost asio库的一些记录(个人用)

boost asio库的一些记录(个人用)

时间:2023-06-16 20:13:11浏览次数:43  
标签:asio std node 记录 data len recv boost

BOOST asio

#include<iostream>
#include<boost/asio.hpp>
#include<boost/date_time/posix_time/posix_time.hpp>
int main()
{
    boost::asio::io_service io; //上下文,事件轮询处理框架(类似libevent的event_base)
    boost::asio::deadline_timer t(io,boost::posix_time::seconds(5));//定时事件
    t.wait();//等待定时事件执行结束, 阻塞在这    
    std::cout<<"aaaaa"<<std::endl;
    return 0;
}

boost库设定定时事件

async_wait()异步等待

  1 #include<iostream>
  2 #include<boost/asio.hpp>
  3 #include<boost/date_time/posix_time/posix_time.hpp>
  4 
  5 void func1(const boost::system::error_code&){
  6     std::cout<<"任务执行"<<std::endl;
  7 }
  8 int main()
  9 {
 10     boost::asio::io_service io;
 11     boost::asio::deadline_timer t(io,boost::posix_time::seconds(5));//定时事件
        //异步等待,不会在此阻塞,继续执行下面代码,时间到后执行回调函数
 12     t.async_wait(func1);//定时等待事件到时间后执行func1 函数,登记函数到事件池
 13     std::cout<<"1111"<<std::endl;//主线程中不等待该定时事件到时
 14     io.run();//while(1) 循环 ioservice 不断循环事件池,若事件池里没有事件则run退出,有则等待执行 事件5秒后执行func1 事件池空则退出run                               
 15     std::cout<<"2222"<<std::endl;
 16     return 0;
 17 }

111
任务执行
222

boost库的另一个关于时间的头文件

  1 #include<iostream>
  2 #include<boost/asio.hpp>
  3 #include<boost/asio/steady_timer.hpp>
  4 //#include<boost/date_time/posix_time/posix_time.hpp>
  5 
  6 void func1(const boost::system::error_code&){
  7     std::cout<<"任务执行"<<std::endl;
  8 }
  9 int main()
 10 {
 11     boost::asio::io_service io;
 12     boost::asio::steady_timer t(io);//定时事件
 13     t.expires_from_now(std::chrono::seconds(5));//定时5s
 14     t.wait();//main阻塞
 15     std::cout<<"eeeee"<<std::endl;                                                                                                                                  
 16     return 0;
 17 }

使用boost::bind绑定回调函数
  1 #include<iostream>
  2 #include<boost/asio.hpp>
  3 //#include<boost/asio/steady_timer.hpp>
  4 #include<boost/date_time/posix_time/posix_time.hpp>
  5 #include<boost/bind.hpp>                                                                 
  7 void print(const boost::system::error_code&,boost::asio::deadline_timer* t,int* count)
  8 {
  9     if(*count<5){
 10         std::cout<<*count<<"   "<<std::endl;
 11         ++(*count);
 12         //t为定时事件对象,执行本函数时,t已到时失效了,t->expires_at()获取到时时间,又重新设定到时时间
 13         t->expires_at(t->expires_at()+boost::posix_time::seconds(1));
 14         //t到时后,执行下面绑定的函数   bind绑定函数名和相关参数
 15         t->async_wait(boost::bind(print,boost::asio::placeholders::error,t,count));
 16     }
 17 }
 18 int main()
 19 {
 20     boost::asio::io_service io;
 21     int count=0;
 22     boost::asio::deadline_timer t(io,boost::posix_time::seconds(1));//定时事件1s
 23     t.async_wait(boost::bind(print,boost::asio::placeholders::error,&t,&count));
 24     io.run();
 25     std::cout<<"end  count is:"<<count<<std::endl;
 26     return 0;
 27 }

 t.async_wait([&t,&count](boost::asio::placeholders::error err){print(err,&t,&count);});//到时后执行lambda函数,lambda函数里执行print()
 //改为使用lambda,上面代码语法有错

io_service的io.run();执行结束后,io的状态就改变了,后面再io.run()不执行了

async_write()异步写,生命周期

对于异步,在其调用回调函数前,asio对涉及的socket等的生命周期不做保证,因此用户必须保证相关对象的生命

enable_shared_from_this

shared_from_this();在bind()里/在(类成员函数里)调用本类执行该函数的对象

(获得自身对象):在类的构造函数里不能调用shared_from_this(),因为构造还没完

在多线程中,有一个线程专门执行io_service的run();其他线程要把事件加载到执行io.run()的线程,使用io_service.poet()将要加载的事件打包成lambda函数

boost库(网络通信上库,使用按照库函数,标志点,端口通信)
//客户端
int send_data_by_send(){
    std::string raw_ip_address = "127.0.0.1";
    unsigned short port_num = 3333;
    try {
        asio::ip::tcp::endpoint
            ep(asio::ip::address::from_string(raw_ip_address),
                port_num);//建立网络通信端点
        asio::io_service ios;
        // Step 1. Allocating and opening the socket.
        //socket放到io_service,协议是endpoint.protocol()
        asio::ip::tcp::socket sock(ios, ep.protocol());//socket IP v4
        sock.connect(ep);
        std::string buf = "Hello World!";
  //send要求参数是MutableBufferSequence,asio::buffer()返回mutable_buffer_1,是MutableBufferSequence的适配器,可作为MutableBufferSequence的参数
        int send_length = sock.send(asio::buffer(buf.c_str(), buf.length(endpoint)));
        if (send_length <= 0) {
            cout << "send failed" << endl;
            return 0;
        }
    }
    catch (system::system_error& e) {
        std::cout << "Error occured! Error code = " << e.code()
            << ". Message: " << e.what();
        return e.code().value();
    }
    return 0;
}
std::cin.getline(char*,length);// 从标准输入流中读取数据到char*
------------------------------------------------------------
char reply[MAX_LENGTH];  //读取服务端发送的数据
        size_t reply_length = boost::asio::read(sock,
            boost::asio::buffer(reply, request_length));//const_buffer_1
        std::cout << "Reply is: ";
        std::cout.write(reply, reply_length);//将reply的数据输出到标准输出流
服务器端进行通信相关 生成一个acceptor的socket,用来接收新的连接。
int accept_new_connection(){
    const int BACKLOG_SIZE = 30;
    unsigned short port_num = 3333;
    asio::ip::tcp::endpoint ep(asio::ip::address_v4::any(),
        port_num);
    asio::io_context  ios;
    try {
        asio::ip::tcp::acceptor acceptor(ios, ep.protocol());// 生成一个acceptor的socket,用来接收新的连接
        acceptor.bind(ep);
        acceptor.listen(BACKLOG_SIZE);
        for (;;) {
            asio::ip::tcp::socket sock(ios);//与建立连接的客户端交互通信的socket
            acceptor.accept(sock);
        }
    }
    catch (system::system_error& e) {
        std::cout << "Error occured! Error code = " << e.code()
            << ". Message: " << e.what();
        return e.code().value();
    }
}
void session(socket_ptr sock) {
    try {
        for (;;) {
            char data[max_length];
            memset(data, '\0', max_length);
            boost::system::error_code error;
            size_t length = sock->read_some(boost::asio::buffer(data, max_length), error);
            if (error == boost::asio::error::eof) {
                std::cout << "connection closed by peer" << endl;
                break;
            }
            else if (error) {
                throw boost::system::system_error(error);
            }
            //获取对端的端点,address
            cout << "receive from " << sock->remote_endpoint().address().to_string() << endl;
            cout << "receive message is " << data << endl;
            //回传信息值
            boost::asio::write(*sock, boost::asio::buffer(data, length));
        }
    }
    catch (std::exception& e) {
        std::cerr << "Exception in thread: " << e.what() << "\n" << std::endl;
    }
}
buffer
boost::asio提供了asio::mutable_buffer 和 asio::const_buffer这两个结构,他们是一段连续的空间,首字节存储了后续数据的长度。
asio::mutable_buffer用于写服务,asio::const_buffer用于读服务。但是这两个结构都没有被asio的api直接使用。
对于api的buffer参数,asio提出了MutableBufferSequence和ConstBufferSequence概念,他们是由多个asio::mutable_buffer和asio::const_buffer组成的。也就是说boost::asio为了节省空间,将一部分连续的空间组合起来,作为参数交给api使用。
我们可以理解为MutableBufferSequence的数据结构为std::vector<asio::mutable_buffer>

asio的buffer()函数,该函数接收多种形式的字节流,该函数返回asio::mutable_buffers_1 o或者asio::const_buffers_1结构的对象。
如果传递给buffer()的参数是一个只读类型,则函数返回asio::const_buffers_1 类型对象。
如果传递给buffer()的参数是一个可写类型,则返回asio::mutable_buffers_1 类型对象。
asio::const_buffers_1和asio::mutable_buffers_1是asio::mutable_buffer和asio::const_buffer的适配器,提供了符合MutableBufferSequence和ConstBufferSequence概念的接口,所以他们可以作为boost::asio的api函数的参数使用。
write_some可以每次向指定的空间写入固定的字节数,如果写缓冲区满了,就只写一部分,返回写入的字节数。sock.write_some(asio::buffer());
send函数会一次性将buffer中的内容发送给对端,如果有部分字节因为发送缓冲区满无法发送,则阻塞等待,直到发送缓冲区可用,则继续发送完成。 int send_length = sock.send(asio::buffer(buf.c_str(), buf.length()));
write函数,可以一次性将所有数据发送给对端,如果发送缓冲区满了则阻塞,直到发送缓冲区可用,将数据发送完成。
int send_length  = asio::write(sock, asio::buffer(buf.c_str(), buf.length()));
同步读read_some sock.read_some(asio::buffer(char*,lenth));
receive 可以一次性同步接收对方发送的数据int receive_length =  sock.receive(asio::buffer(buffer_receive, BUFF_SIZE));
read   int receive_length = asio::read(sock, asio::buffer(buffer_receive, BUFF_SIZE));

由于读写中,没有数据,会不断阻塞。为了后面继续执行,通过异步方式进行,当有数据读写时,调用回调函数处理

boost异步写async
void Session::WriteToSocketErr(const std::string& buf) {//发送数据
    _send_node = make_shared<MsgNode>(buf.c_str(), buf.length());
    //异步发送数据,因为异步所以不会一下发送完
    this->_socket->async_write_some(asio::buffer(_send_node->_msg, 
        _send_node->_total_len),//缓冲区满后,不会阻塞等待缓冲区可以写入数据,而是执行其他事,缓冲区可写数据时,执行绑定的回调函数继续发送
        std::bind(&Session::WriteCallBackErr,
            this, std::placeholders::_1, std::placeholders::_2, _send_node));
}
//std::size_t bytes_transferred获取前面发送了多少数据
void Session::WriteCallBackErr(const boost::system::error_code& ec, 
    std::size_t bytes_transferred, std::shared_ptr<MsgNode> msg_node) {
    if (bytes_transferred + msg_node->_cur_len 
        < msg_node->_total_len) {//数据没发送完
        _send_node->_cur_len += bytes_transferred;
        //继续发送,绑定回调函数
        this->_socket->async_write_some(asio::buffer(_send_node->_msg+_send_node->_cur_len,
            _send_node->_total_len-_send_node->_cur_len),
            std::bind(&Session::WriteCallBackErr,
                this, std::placeholders::_1, std::placeholders::_2, _send_node));
    }
}



//最大报文接收大小                        要发送的数据通过MsgNode发送(MsgNode接收要发送的数据)
const int RECVSIZE = 1024;
class  MsgNode {
public :
    MsgNode(const char* msg,  int total_len): _total_len(total_len), _cur_len(0){
        _msg = new char[total_len];
        memcpy(_msg, msg, total_len);
    }
    MsgNode(int total_len) :_total_len(total_len), _cur_len(0) {
        _msg = new char[total_len];
    }
    ~MsgNode(){
        delete[]_msg;
    }
    //消息首地址
    char* _msg;
    //总长度
    int _total_len;
    //当前长度
    int _cur_len;
};

要发送的总数据保存在MsgNode里,指针_send_node指向该类,通过服务端的socket调用async_write_some异步发送MsgNode类里的数据,调用bind绑定的类函数继续执行发送(本对象) std::placeholders::_1,std::placeholders::_1(保留执行回调函数的第一、二个参数),把存储数据的MsgNode指针传给WriteCallBackErr函数,std::size_t bytes_transferred为成功发送数据的数量,判断发送的数量与总数量的关系,没全部发送则继续socket调用async_write_some异步发送,继续bind()本函数(回调)

在WriteCallBackErr里使用shared_ptr 增加Msg存活时间(增加共享指针引用),writetosocket函数结束后,MsgNode对象会释放掉,会自动释放掉MsgNode,指针也失效

异步发送,对于多个要发送数据的线程,执行发送函数,发送顺序没保证,改为通过队列方式发送

对于每个要发送的消息,将其放到MsgNode队列里,通过_send_pending判断当前数据是否发送完

class Session{
public:
    void WriteCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred);
    void WriteToSocket(const std::string &buf);
private:
    std::queue<std::shared_ptr<MsgNode>> _send_queue;
    std::shared_ptr<asio::ip::tcp::socket> _socket;
    bool _send_pending;
};
void Session::WriteToSocket(const std::string& buf){
    _send_queue.emplace(new MsgNode(buf.c_str(), buf.length())); //将要发送的数据装进类对象插入发送队列,按顺序发送
    if (_send_pending) {//pending状态true说明上一次有未发送完的数据
        return;//return掉,没有继续发送,但是插入到了发送队列里,在发送完前面的后,从队列里取出继续发送
    }
    //异步发送数据,因为异步所以不会一下发送完
    this->_socket->async_write_some(asio::buffer(buf), std::bind(&Session::WriteCallBack, this, std::placeholders::_1, std::placeholders::_2));
    _send_pending = true;//没发送完,true,不发送新数据
}
void Session::WriteCallBack(const boost::system::error_code & ec,  std::size_t bytes_transferred){
    if (ec.value() != 0) {
        std::cout << "Error , code is " << ec.value() << " . Message is " << ec.message();
        return;
    }
    auto & send_data = _send_queue.front();//取出队首元素即当前未发送完数据继续发送
    send_data->_cur_len += bytes_transferred;
    //数据未发送完, 则继续发送
    if (send_data->_cur_len < send_data->_total_len) {
        this->_socket->async_write_some(asio::buffer(send_data->_msg + send_data->_cur_len, send_data->_total_len-send_data->_cur_len),
            std::bind(&Session::WriteCallBack,
            this, std::placeholders::_1, std::placeholders::_2));
        return;
    }
    //如果发送完,则pop出队首元素
    _send_queue.pop();
    //如果队列为空,则说明所有数据都发送完,将pending设置为false
    if (_send_queue.empty()) {
        _send_pending = false;
    }
    //如果队列不是空,则继续将队首元素发送(发送等待发送的数据)
    if (!_send_queue.empty()) {
        auto& send_data = _send_queue.front();
        this->_socket->async_write_some(asio::buffer(send_data->_msg + send_data->_cur_len, send_data->_total_len - send_data->_cur_len),
            std::bind(&Session::WriteCallBack,
                this, std::placeholders::_1, std::placeholders::_2));
    }
}

改为async_send()一次发送完数据(内部多次调用async_write_some)( 异步)

//不能与async_write_some混合使用
void Session::WriteAllToSocket(const std::string& buf) {
    //插入发送队列
    _send_queue.emplace(new MsgNode(buf.c_str(), buf.length()));
    //pending状态说明上一次有未发送完的数据
    if (_send_pending) {
        return;
    }
    //异步发送数据,因为异步所以不会一下发送完
    this->_socket->async_send(asio::buffer(buf), 
        std::bind(&Session::WriteAllCallBack, this,
            std::placeholders::_1, std::placeholders::_2));
    _send_pending = true;
}
void Session::WriteAllCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred){
    if (ec.value() != 0) {
        std::cout << "Error occured! Error code = "
            << ec.value()
            << ". Message: " << ec.message();
        return;
    }
    //如果发送完,则pop出队首元素
    _send_queue.pop();
    //如果队列为空,则说明所有数据都发送完,将pending设置为false
    if (_send_queue.empty()) {
        _send_pending = false;
    }
    //如果队列不是空,则继续将队首元素发送
    if (!_send_queue.empty()) {
        auto& send_data = _send_queue.front();
        this->_socket->async_send(asio::buffer(send_data->_msg + send_data->_cur_len, send_data->_total_len - send_data->_cur_len),
            std::bind(&Session::WriteAllCallBack,
                this, std::placeholders::_1, std::placeholders::_2));
    }
}
异步读(没读完,不接收新的读,在读中有新数据到达,丢弃)
class Session {
public:
    void ReadFromSocket();
    void ReadCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred);
private:
    std::shared_ptr<asio::ip::tcp::socket> _socket;
    std::shared_ptr<MsgNode> _recv_node;
    bool _recv_pending;
};
void Session::ReadFromSocket() {
    if (_recv_pending) {
        return;
    }
    //可以调用构造函数直接构造,但不可用已经构造好的智能指针赋值
    /*auto _recv_nodez = std::make_unique<MsgNode>(RECVSIZE);
    _recv_node = _recv_nodez;*/
    _recv_node = std::make_shared<MsgNode>(RECVSIZE);//初始化一个MsgNode对象存储读到的数据
    _socket->async_read_some(asio::buffer(_recv_node->_msg, _recv_node->_total_len), std::bind(&Session::ReadCallBack, this,std::placeholders::_1, std::placeholders::_2));
    _recv_pending = true;//正在读,置为true
}
void Session::ReadCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred){
    _recv_node->_cur_len += bytes_transferred;
    //没读完继续读
    if (_recv_node->_cur_len < _recv_node->_total_len) {
        _socket->async_read_some(asio::buffer(_recv_node->_msg+_recv_node->_cur_len,
            _recv_node->_total_len - _recv_node->_cur_len), std::bind(&Session::ReadCallBack, this,
            std::placeholders::_1, std::placeholders::_2));
        return;
    }
    //判断为已读完
    //将数据投递到队列里交给逻辑线程处理,此处略去
    //如果读完了则将标记置为false
    _recv_pending = false;
    //指针置空
    _recv_node = nullptr;    
}
不管读没读完,都调用bind()绑定的函数,在bind()函数里判断读没读完,读完则置为false;

上面可改为使用async_receive()

async_read_some和async_receive不能混合使用,否则会出现逻辑问题。

void Session::ReadAllFromSocket(const std::string& buf) {
    if (_recv_pending) {
        return;
    }
    //可以调用构造函数直接构造,但不可用已经构造好的智能指针赋值
    /*auto _recv_nodez = std::make_unique<MsgNode>(RECVSIZE);
    _recv_node = _recv_nodez;*/
    _recv_node = std::make_shared<MsgNode>(RECVSIZE);
    _socket->async_receive(asio::buffer(_recv_node->_msg, _recv_node->_total_len), std::bind(&Session::ReadAllCallBack, this,
        std::placeholders::_1, std::placeholders::_2));
    _recv_pending = true;
}
void Session::ReadAllCallBack(const boost::system::error_code& ec, std::size_t bytes_transferred) {
    _recv_node->_cur_len += bytes_transferred;
    //将数据投递到队列里交给逻辑线程处理,此处略去
    //如果读完了则将标记置为false
    _recv_pending = false;
    //指针置空
    _recv_node = nullptr;
}
简单读写
session:
void Session::Start(){//服务器开启后,就监听客户端发来的消息
    memset(_data, 0, max_length);
    //读取客户端发送的消息,读到的信息到handle_read中处理(不用管能一次读到多少数据,读到多少后面发送多少)
    _socket.async_read_some(boost::asio::buffer(_data, max_length),
        std::bind(&Session::handle_read, this, placeholders::_1,
            placeholders::_2)
    );
}
void Session::handle_read(const boost::system::error_code& error, size_t bytes_transfered) {
    if (!error) {
        //读到多少数据就发送多少数据,async_write 发送后到handle_write处理,继续读数据
        cout << "server receive data is " << _data << endl;
        boost::asio::async_write(_socket, boost::asio::buffer(_data, bytes_transfered), 
            std::bind(&Session::handle_write, this, placeholders::_1));
    }
    else {
        delete this;
    }
}
void Session::handle_write(const boost::system::error_code& error) {
    if (!error) {
        memset(_data, 0, max_length);
        //继续读数据 ,读后又到handle_read中处理读到多少数据
        _socket.async_read_some(boost::asio::buffer(_data, max_length),
                                std::bind(&Session::handle_read,
            this, placeholders::_1, placeholders::_2));
    }
    else {
        delete this;
    }
}
利用C11模拟伪闭包实现连接的安全回收

主要通过智能指针增加引用的方式增加Session的生命周期,同时把session的指针 指针放到map里

同时为了使函数结束后指针不被释放,将session放入map中,保证session不被自动释放。

方便Server管理Session,因为我们后期会做一些重连踢人等业务逻辑,我们在Server类中添加成员变量,该变量为一个map类型,key为Session的uid,value为该Session的智能指针。

在map中,用pair<uuid,session>管理具体的指针

队列全双工
void CSession::Send(char* msg, int max_length) {//要发送消息
    bool pending = false;
    std::lock_guard<std::mutex> lock(_send_lock);
    if (_send_que.size() > 0) {//如果队列里有数据要发生,就将消息放到队列里,同时pending=true
        pending = true;
    }
    _send_que.push(make_shared<MsgNode>(msg, max_length));//将消息放到队列里
    if (pending) {
        return;
    }
    //原先队列里没有消息要发送,即发送消息
    boost::asio::async_write(_socket, boost::asio::buffer(msg, max_length), 
        std::bind(&CSession::HandleWrite, this, std::placeholders::_1, shared_from_this()));
}
处理沾包

处理粘包的方式主要采用应用层定义收发包格式的方式

在发送数据时,将要发送的数据格式化:

格式变为消息长度+消息内容的方式(整体存储的数据)(将实际要发送多少数据的记录也一同发送出去)

如要发送的消息10个字节,用2个字节存10(2qwertyuiop)

class MsgNode
{
    friend class CSession;
public:
    //发送的消息变为max_len + HEAD_LENGTH(实际要发送有效消息的长度+记录要发出去的有效消息的长度)
    MsgNode(char * msg, short max_len):_total_len(max_len + HEAD_LENGTH),_cur_len(0){
        _data = new char[_total_len+1]();
        memcpy(_data, &max_len, HEAD_LENGTH);//如要发送的消息10个字节,用2个字节存10
        memcpy(_data+ HEAD_LENGTH, msg, max_len);
        _data[_total_len] = '\0';
    }
    MsgNode(short max_len):_total_len(max_len),_cur_len(0) {
        _data = new char[_total_len +1]();
    }
    ~MsgNode() {
        delete[] _data;
    }
    void Clear() {
        ::memset(_data, 0, _total_len);
        _cur_len = 0;
    }
private:
    short _cur_len;
    short _total_len;
    char* _data;
};

max_len + HEAD_LENGTH HEAD_LENGTH为头部大小

_recv_msg_node用来存储接受的消息体信息
_recv_head_node用来存储接收的头部信息
_b_head_parse表示是否处理完头部信息

不全代码的理解:

void CSession::HandleRead(const boost::system::error_code& error, size_t  bytes_transferred, std::shared_ptr<CSession> shared_self){
    if (!error) {
//已经移动的字符数(意思是读到的总数据长度(bytes_transferred加上头部已读的多,在读到的数据中减去头部需要的,减去的部分即为copy_len))
        int copy_len = 0;
        while (bytes_transferred>0) {//读到的数据
            if (!_b_head_parse) {//头部数据长度还未读完
                //收到的数据又不足头部大小,把数据在头部尾部全复制给头部
                if (bytes_transferred + _recv_head_node->_cur_len < HEAD_LENGTH) {
                    memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data+ copy_len, bytes_transferred);
                   //更新头部已接收信息的长度
                    _recv_head_node->_cur_len += bytes_transferred;
                    ::memset(_data, 0, MAX_LENGTH);
                    //继续接收数据
                    _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), 
                        std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
                    return;//后面代码不执行
                }
                //头部还未读完,但收到的数据比头部多
                //头部剩余未复制的长度
                int head_remain = HEAD_LENGTH - _recv_head_node->_cur_len;//头部还需要读的数据长度
                memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data+copy_len, head_remain);
                //更新已处理的data长度和剩余未处理的长度
                copy_len += head_remain;//从读出的数据中已使用掉的长度
                bytes_transferred -= head_remain;
                //获取头部数据
                short data_len = 0;
                memcpy(&data_len, _recv_head_node->_data, HEAD_LENGTH);
                cout << "data_len is " << data_len << endl;
                //头部长度非法
                if (data_len > MAX_LENGTH) {
                    std::cout << "invalid data length is " << data_len << endl;
                    _server->ClearSession(_uuid);
                    return;
                }
                _recv_msg_node = make_shared<MsgNode>(data_len);//根据头部表示的有效数据长度初始化MsgNode长度
                //消息的长度小于头部规定的长度,说明数据未收全,则先将部分消息放到接收节点里
                if (bytes_transferred < data_len) {//剩余读取到的数据长度
                    memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);
                    _recv_msg_node->_cur_len += bytes_transferred;
                    ::memset(_data, 0, MAX_LENGTH);
                    //继续读完成实际内容的读取
                    _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), 
                        std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
                    //头部处理完成
                    _b_head_parse = true;//该消息头部已完成
                    return;
                }
                消息的长度大于头部规定的长度
                memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, data_len);
                _recv_msg_node->_cur_len += data_len;
                copy_len += data_len;
                bytes_transferred -= data_len;
                _recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';
                cout << "receive data is " << _recv_msg_node->_data << endl;
                //此处可以调用Send发送测试
                Send(_recv_msg_node->_data, _recv_msg_node->_total_len);
                //继续轮询剩余未处理数据
                _b_head_parse = false;//新的数据其头部置为false;
                _recv_head_node->Clear(); //void Clear() ::memset(_data, 0, _total_len);_cur_len = 0;}
                if (bytes_transferred <= 0) {
                    ::memset(_data, 0, MAX_LENGTH);
                    _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), 
                        std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
                    return;
                }
                //继续while() 到新数据的头部那继续
                continue;
            }
            //已经处理完头部,处理上次未接受完的消息数据
            //接收的数据仍不足剩余未处理的
            int remain_msg = _recv_msg_node->_total_len - _recv_msg_node->_cur_len;
            if (bytes_transferred < remain_msg) {
                memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);
                _recv_msg_node->_cur_len += bytes_transferred;
                ::memset(_data, 0, MAX_LENGTH);
                _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), 
                    std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
                return;
            }
            memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, remain_msg);
            _recv_msg_node->_cur_len += remain_msg;
            bytes_transferred -= remain_msg;
            copy_len += remain_msg;
            _recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';
            cout << "receive data is " << _recv_msg_node->_data << endl;
            //此处可以调用Send发送测试
            Send(_recv_msg_node->_data, _recv_msg_node->_total_len);
            //继续轮询剩余未处理数据
            _b_head_parse = false;
            _recv_head_node->Clear();
            if (bytes_transferred <= 0) {
                ::memset(_data, 0, MAX_LENGTH);
                _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
                    std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
                return;
            }
            continue;
        }
    }
    else {
        std::cout << "handle read failed, error is " << error.what() << endl;
        Close();
        _server->ClearSession(_uuid);
    }
}

客户端修改

客户端的发送也要遵循先发送数据2个字节的数据长度,再发送数据消息的结构。
接收时也是先接收两个字节数据获取数据长度,再根据长度接收消息。

字节序
protobuf

............

标签:asio,std,node,记录,data,len,recv,boost
From: https://www.cnblogs.com/persistencejunjie/p/17486419.html

相关文章

  • c++一些零碎记录
    c++11alignasstructalignas(8)S{}//定义结构体同时指定分配给结构体的大小为8字节alignof(与内存对齐相关)structobj{chara;intb;}alignof(obj)=4;//alignof(obj)=4表示对于obj,其内存对齐是以多少字节为单位对齐对于单个变量char其alignof(char)=1,单个字节对齐......
  • 2023-06-16:给你一份工作时间表 hours,上面记录着某一位员工每天的工作小时数。 我们认
    2023-06-16:给你一份工作时间表hours,上面记录着某一位员工每天的工作小时数。我们认为当员工一天中的工作小时数大于8小时的时候,那么这一天就是「劳累的一天」。所谓「表现良好的时间段」,意味在这段时间内,「劳累的天数」是严格大于「不劳累的天数」。请你返回「表现良好时间段」......
  • 【React工作记录一百零八】前端小知识点扫盲笔记记录9
    前言我是歌谣放弃很容易但是坚持一定很酷微信公众号关注前端小歌谣带你进入前端巅峰交流群今天继续对前端知识的小结如何截取字符串<!DOCTYPEhtml><htmllang="en"><head><metacharset="UTF-8"><metahttp-equiv="X-UA-Compatible"content="IE=edge">......
  • 【React工作记录一百零九】前端小知识点扫盲笔记记录10
    前言我是歌谣放弃很容易但是坚持一定很酷微信公众号关注前端小歌谣带你进入前端巅峰交流群今天继续对前端知识的小结对称数<!DOCTYPEhtml><htmllang="en"> <head> <metacharset="UTF-8"/> <metahttp-equiv="X-UA-Compatible"content="IE=edge"/>......
  • Linux 使用交叉编译工具链编译boost
    参考:Boost交叉编译执行./bootstrap.sh后,会生成project-config.jam。修改project-config.jam文件:#if!gccin[feature.values<toolset>]#{#usinggcc:;#}if!gccin[feature.values<toolset>]{usinggcc::/cross-tools/aarch64-poky-linux-gcc--sysro......
  • 记录--设计一个可选择不连续的时间范围的日期选择器
    这里给大家分享我在网上总结出来的一些知识,希望对大家有所帮助npm包:sta-datepicker效果图 需求普通的时间选择器要么只能单选,要么只能选范围,不可以随意选择若干个时间,同时大多数现成的时间选择器选择结束会收起来,很不方便。现在需求如下1、可以自己控制展开收起2、可......
  • 过滤实现条件查询记录:
    publicList<TbRemouldAirVO>airQueryByMap(List<TbRemouldAirVO>airVO,Map<String,String>map){//1.改造时间if(!StringUtils.isEmpty(map.get("remouldTimeAir"))){airVO=airVO.stream().filter(x->Objects.equals......
  • 关于vue2路由跳转问题记录
    1.vue路由间跳转和新开窗口的方式(query,params)路由间跳转配置:query方式:参数会在url中显示this.$router.push({path:'路由地址',query:{msg:'helloworld'}})params方式:传参数据不会在导航栏中显示,需要配合路由的name属性使用。this.$......
  • Markdown语法学习记录
    ##小记markdown语法是写博客所需要的基本的语法,而且也比较容易掌握,以下是我个人学习的基础的语法。##标题一共有六级标题,先说一级标题一级标题的语法是#+空格+标题二级标题的语法是##+空格+标题 ......想创建多少级的标题就在前面加多少个#号##字体**粗体***斜体*......
  • 2023 6月记录
    6.6[CF1830E]BullySort很离谱的结论题!这个操作很奇怪,但是它有一个重要的性质:一个数永远只会向一个方向移动。考虑这个操作的过程,每次其实是将\(p_i\neqi\)的最大的\(i\)归位,所以如果一个数向右移,它以后就不会向左移。而一个数如果向右移,那么它一定是一个后缀\(min\),而......