文章目录
- 一、TCP分包问题
- 1、长连接和短连接
- 2、长连接和短连接的分包方法
- 3、长连接和短连接的应用场景
- 二、TCP粘包问题
- 三、Buffer类的设计与使用
- 1、为什么需要应用层buffer?
- 2、如何设计并使用应用层Buffer?
- 3、Buffer类的设计
一、TCP分包问题
在TCP这种字节流协议上做应用层分包是网络编程的基本要求。分包是指在发生一个消息(message)或者一帧(frame)数据时,通过一定的处理,让接收方能从字节流中识别并截取(还原)出一个个消息。
1、长连接和短连接
【长连接】:
所谓长连接,指在一个TCP连接上可以连续发送多个数据包,在TCP连接保持期间,如果没有数据包发送,需要双方发检测包以维持此连接,一般需要自己做在线维持(不发生RST包和四次挥手)。
连接→数据传输→保持连接(心跳)→数据传输→保持连接(心跳)→……→关闭连接
这就要求长连接在没有数据通信时,定时发送数据包(心跳),以维持连接状态。
【短连接】:
短连接是指通信双方有数据交互时,建立一个TCP连接,数据发送完成后,则断开此TCP连接(管理起来比较简单,存在的连接都是有用的连接,不需要额外的控制手段);
连接→数据传输→关闭连接
2、长连接和短连接的分包方法
对于短连接的TCP服务,分包不是一个问题,只要发送方主动关闭连接,就表示一条消息发送完毕,接收方read返回0,从而知道消息的结尾。
对于长连接的TCP服务,分包有四种方法:
- 消息长度固定;
- 使用特殊字符或者字符串作为消息的边界,例如"\r\n";
- 在每条消息的头部增加一个长度字段,这是很常见的做法。
- 使用消息本身的格式来分包,例如xml格式和json格式的数据。
3、长连接和短连接的应用场景
长连接多用于操作频繁(读写),点对点的通讯,而且连接数不能太多情况。每个TCP连接都需要三步握手,这需要时间,如果每个操作都是先连接,再操作的话那么处理速度会降低很多,所以每个操作完后都不断开,次处理时直接发送数据包就OK了,不用建立TCP连接。例如:数据库的连接用长连接, 如果用短连接频繁的通信会造成socket错误,而且频繁的socket 创建也是对资源的浪费。
而像WEB网站的http服务一般都用短链接(http1.0只支持短连接,1.1keep alive 带时间,操作次数限制的长连接),因为长连接对于服务端来说会耗费一定的资源,而像WEB网站这么频繁的成千上万甚至上亿客户端的连接用短连接会更省一些资源,如果用长连接,而且同时有成千上万的用户,如果每个用户都占用一个连接的话,那可想而知吧。所以并发量大,但每个用户无需频繁操作情况下需用短连接好。
二、TCP粘包问题
数据的接收和发送是无关的,read()/recv() 函数不管数据发送了多少次,都会尽可能多的接收数据。也就是说,read()/recv() 和 write()/send() 的执行次数可能不同。
例如,write()/send() 重复执行三次,每次都发送字符串"abc",那么目标机器上的 read()/recv() 可能分三次接收,每次都接收"abc";也可能分两次接收,第一次接收"abcab",第二次接收"cabc";也可能一次就接收到字符串"abcabcabc"。
这就是数据的“粘包”问题,客户端发送的多个数据包被当做一个数据包接收。也称数据的无边界性,read()/recv() 函数不知道数据包的开始或结束标志(实际上也没有任何开始或结束标志),只把它们当做连续的数据流来处理。
三、Buffer类的设计与使用
1、为什么需要应用层buffer?
非阻塞I/O的核心思想是避免阻塞在read()或者write()或者其他I/O系统调用上,这样就可以最大限度地复用线程,让一个线程可以为多个socket连接服务。I/O线程只能阻塞在I/O复用上例如epoll_wait(),这样一来,应用层缓冲区是必须的。
【什么情况下用到output buffer?】:
假设我们要通过TCP连接来发送100k的数据,在执行write()时,操作系统受某些因素的影响只接受了80k数据,那么由于还剩20k我们该如何处理?是等待内核腾出空间来接受余下的20k?那如果等很久杂么办?难道我们的事件循要在你的等待时间内什么都做不成么?针对以上的问题我们就可以用一个应用层的write buffer来解决,把那些暂时内核无法接受的数据先存在buffer中,然后注册POLLOUT事件,一旦该socket可写了,我们就把缓存中的数据写进去,如果下次还写不完那么就继续注册POLLOUT事件下次继续写,如果20k数据已经写完了,立即停止关注POLLOUT事件,以免造成busy loop(不断地被触发)。如果buffer中的数据还没写完,程序又来了数据,那么此时的数据就应该追加到buffer数据尾部中。有了应用层的write buffer后我们的程序就完全不用关心数据到底一次能不能发完,这些都有网络库来操心。
【什么情况下用到input buffer?】:
TCP是一个无边界的字节流协议,接收方必须要处理“收到的数据尚不构成一条完整的消息”和“一次收到两条消息的数据”等等情况。一个常见的场景是,发送方send了两条10k字节的消息(共20k),接收方收到数据的情况可能是:
- 一次性收到20k数据;
- 分两次收到,第一次5k,第二次15k;
- 分三次收到,第一次6k,第二次8k,第三次6k;
- 等等任何可能。
以上情况俗称“粘包”问题。
我们在处理socket可读事件时,必须一次把socket数据读完,否则就会反复触发POLLIN事件(这里所述针对epoll的LT模式),造成主循环busy-loop,但是话又说回来,我们如果一次把socket的数据都读完,就没法保证具体某条消息的完整性了。那么我们的网络库该杂么做呢?我们的网络库因该先把读到的内容存在input buffer中,根据应用层协议判定是否是一个完整的包,等input buffer中有完整的消息了,再通知业务程序,这样就可提高速度。
2、如何设计并使用应用层Buffer?
在非阻塞网络中,如何设计并使用缓冲区?一方面我们希望减少系统调用,一次读的数据越多越划算,那么似乎应该准备一个大的缓冲区;另一方面,我们又希望减少占用内存,如果有10000个并发连接,每个连接都分配50k的读写缓冲区的,那么将占用1G内存。使用栈上空间可以解决这个问题。
底层默认会有一个1KB大小的vector,除此之外,muduo在栈上申请一个64KB大小的临时空间,这是当网络套接字数据太大时,可以将超出1KB的数据先放入这个临时空间,然后在追加到Buffer里面(这个时候Buffer就会resize一个适应的大小)。利用readv()系统调用可以实现。
【Note】:
readv和writev函数的功能可以概括为:对数据进行整合传输以及发送。通过writev函数可以将分散保存在多个buff的数据一并进行发送,通过readv可以由多个buff分别接受数据,适当的使用这两个函数可以减少I/O函数的调用次数。
#include <muduo/net/Buffer.h>
#include <muduo/net/SocketsOps.h>
#include <errno.h>
#include <sys/uio.h>
using namespace muduo;
using namespace muduo::net;
const char Buffer::kCRLF[] = "\r\n";
const size_t Buffer::kCheapPrepend;
const size_t Buffer::kInitialSize;
/********************************************************************
Modify : Eric Lee
Date : 2018-01-23
Description : 从socket读到缓冲区的方法是使用readv先读至Buffer_,
Buffer_空间如果不够会读入到栈上65536个字节大小的空间,然后以append的
方式追加入Buffer_。既考虑了避免系统调用带来开销,又不影响数据的接收。
*********************************************************************/
ssize_t Buffer::readFd(int fd, int* savedErrno)
{
/*
栈额外空间,用于从套接字往出来读时,当buffer暂时不够用时暂存数据,
待buffer重新分配足够空间后,在把数据交换给buffer。
*/
char extrabuf[65536];
/*
struct iovec {
ptr_t iov_base; // iov_base指向的缓冲区存放的是readv所接收的数据或是writev将要发送的数据
size_t iov_len; // iov_len在各种情况下分别确定了接收的最大长度以及实际写入的长度
};
*/
// 使用iovec分配两个连续的缓冲区
struct iovec vec[2];
const size_t writable = writableBytes();
// 第一块缓冲区,指向可写空间
vec[0].iov_base = begin() + writerIndex_;
vec[0].iov_len = writable;
// 第二块缓冲区,指向栈空间
vec[1].iov_base = extrabuf;
vec[1].iov_len = sizeof extrabuf;
/*
通过writev函数可以将分散保存在多个(writev的第三个参数表示缓冲区的数量)buff的数据一并进行发送,
通过readv可以由多个buff分别接受数据,适当的使用这两个函数可以减少I/O函数的调用次数
*/
// 如果可写空间大于65536,那么把数据直接写到缓冲区即可,否则就一个额外的栈空间
const int iovcnt = (writable < sizeof extrabuf) ? 2 : 1;
const ssize_t n = sockets::readv(fd, vec, iovcnt);
if (n < 0)
{
*savedErrno = errno;
}
// 第一块缓冲区足够容纳数据
else if (implicit_cast<size_t>(n) <= writable)
{
// 向后移动writerIndex
writerIndex_ += n;
}
// 第一块缓冲区空间不够,数据被接收到第二块缓冲区extrabuf,将其append至buffer
else
{
// 更新当前writerIndex(到buffer的末尾)
writerIndex_ = buffer_.size();
// 将额外空间的数据写入到缓冲区中
append(extrabuf, n - writable);
}
return n;
}
3、Buffer类的设计
muduo使用vector<char>
保存数据,因此size()是可变的,使用时像一个queue,在尾部写入数据,往头部写入数据,通过两个int索引(readIndex可类比队列的front 和 writeIndex可类比队列的rear)来维护。为什么要使用2个int的索引而不是用迭代器或者指针?因为一旦vector发生扩容,迭代器将会失效。
另外,muduo的设计者将Buffer前添加了一段8字节的预留空间(称之为prependable)。这样做的好处,比如Input Buffer收到来自网络的消息,并计算它的长度后,可以将这个长度值写入到prependable区域,简化使用。
【基本读写操作】:
写入操作将writeIndex向后移动。 读出操作将readIndex向后移动。
【自动增长】:
当要写入的数据大于Buffer的可写入大小后需要对Buffer进行扩容。 由底层的vector特性帮助实现。
【内部腾挪】:
跟队列类似,设想当读写操作频繁时,readIndex和writeIndex都会往后移动,留下了巨大的prependable空间,此时如果再写入数据,将会不会重新分配内存,而是将已有的数据往前挪动(同时挪动readIndex和writeIndex)。
【前方添加】:
这也就是为什么要预留8个字节的prependable空间,可以写入数据的头部信息,方便使用。
class Buffer : public muduo::copyable
{
public:
//初始化prepend为8个字节大小
static const size_t kCheapPrepend = 8;
//初始化Buffer大小为1024字节 默认
static const size_t kInitialSize = 1024;
//计算可读
size_t readableBytes() const
{ return writerIndex_ - readerIndex_; }
//计算可写
size_t writableBytes() const
{ return buffer_.size() - writerIndex_; }
//计算prependable大小
size_t prependableBytes() const
{ return readerIndex_; }
//用来返回数据内容的起始位置
const char* peek() const
{ return begin() + readerIndex_; }
//该函数用来回收Buffer空间,在读取Buffer的内容后
//调用此函数来挪动索引
void retrieve(size_t len)
{
assert(len <= readableBytes());
if (len < readableBytes())
{
readerIndex_ += len;
}
else
{
//当要回收的大小超过可读的个数,直接回收所有空间
retrieveAll();
}
}
//回收所有Buffer,将两个索引回归到初始位置
void retrieveAll()
{
readerIndex_ = kCheapPrepend;
writerIndex_ = kCheapPrepend;
}
//写入数据
void append(const char* /*restrict*/ data, size_t len)
{
//ensure保证Buffer有足够空间可写,其中关键函数makeSpace见后
ensureWritableBytes(len);
//copy(iterator first,iterator last,iterator result)
std::copy(data, data+len, beginWrite());
hasWritten(len);//挪动writeIndex
}
void ensureWritableBytes(size_t len)
{
if (writableBytes() < len)
{
makeSpace(len);
}
assert(writableBytes() >= len);
}
private:
void makeSpace(size_t len)
{
if (writableBytes() + prependableBytes() < len + kCheapPrepend)
{
//resize,这个时候是真的不够了
buffer_.resize(writerIndex_+len);
}
else
{
//将数据挪动到Buffer的前面,因为前面prependable还有富余空间
assert(kCheapPrepend < readerIndex_);
size_t readable = readableBytes();
std::copy(begin()+readerIndex_,
begin()+writerIndex_,
begin()+kCheapPrepend);
readerIndex_ = kCheapPrepend;
writerIndex_ = readerIndex_ + readable;
assert(readable == readableBytes());
}
}
private:
std::vector<char> buffer_;//底层存储
//读写索引
size_t readerIndex_;
size_t writerIndex_;
};