文章目录
0. 引言
前几天被问到“如何优化Linux中Domain Socket的线程间通信实时性?”当时的回答感觉不够好,经过仔细思考后,我整理出以下优化策略,考虑的是高并发和低延迟场景中的应用优化。
1. 使用 epoll
边缘触发模式
非不要不选择阻塞模式
阻塞式 read()
在单客户端的情况下,能够立即响应数据的到达,但其局限性在于:
- 无法同时处理多个 I/O 操作。如果同时需要接收和发送数据,阻塞式
read()
会在读取数据时阻塞当前线程,直到数据可用,这使得线程无法在等待数据时执行其他任务(例如发送数据)。 也就是处理双向通信不够高效。 - 阻塞导致线程空闲。即使线程处于阻塞状态,系统仍需要为其调度,但线程无法做任何实际工作。这样会浪费 CPU 时间,降低系统的响应性和资源利用率。
边缘触发(ET)模式
epoll
的 边缘触发 模式(ET)在文件描述符的状态发生变化时仅触发一次事件。当状态从“不可读”变为“可读”时,epoll
只会通知一次,后续不会触发事件直到状态再次变化。这减少了重复触发事件的系统调用,降低了上下文切换的频率。
优点
- 减少系统调用和上下文切换:边缘触发模式比水平触发模式(LT)减少了不必要的系统调用。
- 更低延迟:每个事件只触发一次,避免了多次触发导致的等待时间。
- 更高效率:配合非阻塞 I/O 使用,避免了重复的事件通知。
示例
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET; // 设置为边缘触发模式
ev.data.fd = sockfd;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &ev) == -1) {
perror("epoll_ctl");
exit(EXIT_FAILURE);
}
2. 使用实时调度策略
Linux 提供了 SCHED_FIFO
和 SCHED_RR
等实时调度策略,可以降低调度延迟。通过 sched_setscheduler()
函数设置线程调度策略,有助于提升线程的响应速度。
struct sched_param param;
param.sched_priority = 99; // 设置较高的优先级
sched_setscheduler(pid, SCHED_FIFO, ¶m); // 设置实时调度策略
3. CPU 绑定
将线程绑定到特定的 CPU 核,减少跨核调度和缓存失效,降低延迟。
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(cpu_id, &cpuset); // 将线程绑定到指定的 CPU 核
pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset);
4. 使用无锁缓冲区
使用无锁缓冲区可以减少CPU时间片切换次数:
- 无锁队列:使用原子操作管理数据结构,避免传统锁机制的性能瓶颈,减少线程同步的开销。
实现请见:C++生产者-消费者无锁缓冲区的简单实现
5. 优化消息传递的大小和频率
每次发送或接收的数据大小直接影响通信延迟。频繁的小数据传输会增加 I/O 操作次数,导致延迟增加。优化措施包括:
- 批量传输:将多个小消息合并为一个大消息,减少系统调用次数和上下文切换频率。
- 调整缓冲区大小:根据应用需求调整套接字的发送和接收缓冲区大小,以避免缓冲区过小导致频繁的上下文切换。
int bufsize = 8192; // 请根据实际设置合适的缓冲区大小
setsockopt(socket_fd, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize));
setsockopt(socket_fd, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize));
6. 使用 SO_RCVTIMEO
和 SO_SNDTIMEO
SO_RCVTIMEO
和 SO_SNDTIMEO
是用来防止套接字在接收或发送数据时无限期阻塞的选项。当设置了这些超时选项后,套接字在等待数据时会在超时后返回错误(如 EAGAIN
或 EWOULDBLOCK
),从而提高应用程序的响应性。然而,这些选项不能直接解决由于 CPU 调度延迟引起的实时性问题。它们的作用仅仅是在指定时间内没有完成操作时返回错误,而不是保证操作在一定时间内完成。
// 设置接收超时时间
struct timeval recv_timeout = { 1, 0 }; // 1 seconds
if (setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &recv_timeout, sizeof(recv_timeout)) == -1) {
perror("setsockopt SO_RCVTIMEO");
close(sock);
exit(EXIT_FAILURE);
}
// 设置发送超时时间
struct timeval send_timeout = { 1, 0 }; // 1 seconds
if (setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, &send_timeout, sizeof(send_timeout)) == -1) {
perror("setsockopt SO_SNDTIMEO");
close(sock);
exit(EXIT_FAILURE);
}
7. 示例代码
// g++ -o uds_server uds_server.cpp -pthread
#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <sys/un.h>
#include <cstring>
#include <cerrno>
#include <atomic>
#include <pthread.h>
#include <sched.h>
#define SOCKET_PATH "/tmp/uds_socket"
#define MAX_EVENTS 10
#define BUF_SIZE 1024
#define SOCKET_BACKLOG 5
// 无锁环形缓冲区
class LockFreeBytesBuffer {
public:
static const std::size_t kBufferSize = 10240U; // 缓冲区大小
LockFreeBytesBuffer() noexcept : readerIndex_(0U), writerIndex_(0U) {
std::memset(buffer_, 0, kBufferSize);
}
bool append(const char* data, std::size_t length) noexcept;
std::size_t beginRead(const char** target) noexcept;
void endRead(std::size_t length) noexcept;
private:
char buffer_[kBufferSize];
std::atomic<std::size_t> readerIndex_;
std::atomic<std::size_t> writerIndex_;
};
bool LockFreeBytesBuffer::append(const char* data, std::size_t length) noexcept {
const std::size_t currentWriteIndex = writerIndex_.load(std::memory_order_relaxed);
const std::size_t currentReadIndex = readerIndex_.load(std::memory_order_acquire);
const std::size_t freeSpace = (currentReadIndex + kBufferSize - currentWriteIndex - 1U) % kBufferSize;
if (length > freeSpace) {
return false; // 缓冲区满
}
const std::size_t pos = currentWriteIndex % kBufferSize;
const std::size_t firstPart = std::min(length, kBufferSize - pos);
std::memcpy(&buffer_[pos], data, firstPart);
std::memcpy(&buffer_[0], data + firstPart, length - firstPart);
writerIndex_.store(currentWriteIndex + length, std::memory_order_release);
return true;
}
std::size_t LockFreeBytesBuffer::beginRead(const char** target) noexcept {
const std::size_t currentReadIndex = readerIndex_.load(std::memory_order_relaxed);
const std::size_t currentWriteIndex = writerIndex_.load(std::memory_order_acquire);
const std::size_t availableData = (currentWriteIndex - currentReadIndex) % kBufferSize;
if (availableData == 0U) {
return 0U; // 缓冲区空
}
const std::size_t pos = currentReadIndex % kBufferSize;
*target = &buffer_[pos];
return std::min(availableData, kBufferSize - pos);
}
void LockFreeBytesBuffer::endRead(std::size_t length) noexcept {
const std::size_t currentReadIndex = readerIndex_.load(std::memory_order_relaxed);
readerIndex_.store(currentReadIndex + length, std::memory_order_release);
}
// 设置套接字为非阻塞
int setSocketNonBlocking(int sockfd) {
int flags = fcntl(sockfd, F_GETFL, 0);
if (flags == -1) {
fprintf(stderr, "Error getting socket flags: %s\n", strerror(errno));
return -1;
}
if (fcntl(sockfd, F_SETFL, flags | O_NONBLOCK) == -1) {
fprintf(stderr, "Error setting socket to non-blocking: %s\n", strerror(errno));
return -1;
}
return 0;
}
// 设置实时调度策略
void setRealTimeScheduling() {
struct sched_param param;
param.sched_priority = 99; // 设置较高的优先级
if (sched_setscheduler(0, SCHED_FIFO, ¶m) == -1) {
fprintf(stderr, "Error setting real-time scheduler: %s\n", strerror(errno));
exit(EXIT_FAILURE);
}
}
// 绑定线程到指定 CPU
void setThreadAffinity(int cpuId) {
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(cpuId, &cpuset);
if (pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset) != 0) {
fprintf(stderr, "Error setting thread affinity: %s\n", strerror(errno));
exit(EXIT_FAILURE);
}
}
// 处理新连接
void handleNewConnection(int epollFd, int sockfd) {
struct epoll_event ev;
int connfd = accept(sockfd, nullptr, nullptr);
if (connfd == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return;
}
fprintf(stderr, "Error accepting connection: %s\n", strerror(errno));
return;
}
if (setSocketNonBlocking(connfd) == -1) {
close(connfd);
return;
}
ev.events = EPOLLIN | EPOLLET; // 设置为边缘触发模式
ev.data.fd = connfd;
if (epoll_ctl(epollFd, EPOLL_CTL_ADD, connfd, &ev) == -1) {
fprintf(stderr, "Error adding connection to epoll: %s\n", strerror(errno));
close(connfd);
}
}
// 处理读取数据
void handleRead(int epollFd, struct epoll_event& event, LockFreeBytesBuffer& buffer) {
char buf[BUF_SIZE];
ssize_t nread = read(event.data.fd, buf, sizeof(buf));
if (nread == -1) {
if (errno != EAGAIN) {
fprintf(stderr, "Error reading data: %s\n", strerror(errno));
epoll_ctl(epollFd, EPOLL_CTL_DEL, event.data.fd, nullptr);
close(event.data.fd);
}
} else if (nread == 0) {
epoll_ctl(epollFd, EPOLL_CTL_DEL, event.data.fd, nullptr);
close(event.data.fd); // 连接关闭
} else {
fprintf(stdout, "Received data: %.*s\n", static_cast<int>(nread), buf);
if (!buffer.append(buf, nread)) {
fprintf(stderr, "Error appending to buffer: Buffer overflow!\n");
}
}
}
// 处理写操作
void handleWrite(int epollFd, struct epoll_event& event, LockFreeBytesBuffer& buffer) {
const char* data;
std::size_t len = buffer.beginRead(&data);
if (len > 0) {
ssize_t nwrite = write(event.data.fd, data, len);
if (nwrite == -1) {
if (errno != EAGAIN) {
fprintf(stderr, "Error writing data: %s\n", strerror(errno));
epoll_ctl(epollFd, EPOLL_CTL_DEL, event.data.fd, nullptr);
close(event.data.fd);
}
} else {
buffer.endRead(nwrite);
}
}
}
// 主函数
int main() {
int sockfd, epollFd;
struct sockaddr_un addr;
struct epoll_event ev, events[MAX_EVENTS];
// 设置实时调度
setRealTimeScheduling();
// 设置线程亲和性
setThreadAffinity(0); // 绑定到 CPU 0
// 创建 Unix Domain Socket
sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
if (sockfd == -1) {
fprintf(stderr, "Error creating socket: %s\n", strerror(errno));
return EXIT_FAILURE;
}
// 设置套接字为非阻塞
if (setSocketNonBlocking(sockfd) == -1) {
close(sockfd);
return EXIT_FAILURE;
}
// 绑定套接字到文件路径
std::memset(&addr, 0, sizeof(struct sockaddr_un));
addr.sun_family = AF_UNIX;
std::strcpy(addr.sun_path, SOCKET_PATH);
unlink(SOCKET_PATH);
if (bind(sockfd, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr)) == -1) {
fprintf(stderr, "Error binding socket: %s\n", strerror(errno));
close(sockfd);
return EXIT_FAILURE;
}
// 监听连接请求
if (listen(sockfd, SOCKET_BACKLOG) == -1) {
fprintf(stderr, "Error listening on socket: %s\n", strerror(errno));
close(sockfd);
return EXIT_FAILURE;
}
// 创建 epoll 实例
epollFd = epoll_create1(0);
if (epollFd == -1) {
fprintf(stderr, "Error creating epoll instance: %s\n", strerror(errno));
close(sockfd);
return EXIT_FAILURE;
}
// 将服务器套接字加入 epoll
ev.events = EPOLLIN | EPOLLET; // 边缘触发模式
ev.data.fd = sockfd;
if (epoll_ctl(epollFd, EPOLL_CTL_ADD, sockfd, &ev) == -1) {
fprintf(stderr, "Error adding socket to epoll: %s\n", strerror(errno));
close(sockfd);
close(epollFd);
return EXIT_FAILURE;
}
LockFreeBytesBuffer buffer;
// 主循环,等待并处理事件
while (true) {
int n = epoll_wait(epollFd, events, MAX_EVENTS, -1);
if (n == -1) {
fprintf(stderr, "Error in epoll_wait: %s\n", strerror(errno));
break;
}
for (int i = 0; i < n; i++) {
if (events[i].data.fd == sockfd) {
// 处理新连接
handleNewConnection(epollFd, sockfd);
} else if (events[i].events & EPOLLIN) {
// 处理读取数据
handleRead(epollFd, events[i], buffer);
} else if (events[i].events & EPOLLOUT) {
// 处理写操作
handleWrite(epollFd, events[i], buffer);
}
}
}
close(epollFd);
close(sockfd);
return EXIT_SUCCESS;
}
这个程序监听 Unix 域套接字 /tmp/uds_socket
,能够处理多个客户端的连接,并异步地读取和写入数据:
- 监听和接受连接:服务器首先通过
bind
和listen
绑定套接字,然后通过accept
等待来自客户端的连接。 - 异步 I/O 事件处理:使用
epoll
来监听并处理事件(如接收数据、发送数据、错误等)。 - epoll边缘触发:通过设置非阻塞 I/O 和边缘触发模式,程序能够高效地处理大量并发连接。
- 缓冲区管理:使用环形缓冲区管理接收的数据。
其他阅读
- 非Domain Socket的优化请参考:Linux编程:嵌入式ARM平台Linux网络实时性能优化
- Linux 编程:高实时性场景下的内核线程调度与网络包发送优化
- Linux I/O编程:I/O多路复用与异步 I/O对比