多线程模式中,在main函数中会执行InitServerLast
void InitServerLast() {
bioInit();
// 关键一步, 这里启动了多条线程,用于执行命令,redis起名为IO 线程
initThreadedIO();
set_jemalloc_bg_thread(server.jemalloc_bg_thread);
server.initial_memory_usage = zmalloc_used_memory();
}
1. 看initThreadedIO
networking.c
void initThreadedIO(void) {
server.io_threads_active = 0; /* We start with threads not active. */
/* Indicate that io-threads are currently idle */
io_threads_op = IO_THREADS_OP_IDLE;
/* Don't spawn any thread if the user selected a single thread:
* we'll handle I/O directly from the main thread. */
if (server.io_threads_num == 1) return;
if (server.io_threads_num > IO_THREADS_MAX_NUM) {
serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
"The maximum number is %d.", IO_THREADS_MAX_NUM);
exit(1);
}
/* Spawn and initialize the I/O threads. */
for (int i = 0; i < server.io_threads_num; i++) {
/* Things we do for all the threads including the main thread. */
// 为线程创建一个 list,并将该list 的指针加入到io_threads_list中
io_threads_list[i] = listCreate();
if (i == 0) continue; /* Thread 0 is the main thread. */
/* Things we do only for the additional threads. */
pthread_t tid;
pthread_mutex_init(&io_threads_mutex[i],NULL);
setIOPendingCount(i, 0);
pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
// 这里创建线程,并将线程id 假如到io_threads数组中
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
exit(1);
}
io_threads[i] = tid;
}
}
这样在该函数中,创建了线程 并 为该线程分配了一个 list
2. 当client 发命令过来时,会调用到readQueryFromClient
void readQueryFromClient(connection *conn) {
client *c = connGetPrivateData(conn);
int nread, big_arg = 0;
size_t qblen, readlen;
/* Check if we want to read from the client later when exiting from
* the event loop. This is the case if threaded I/O is enabled. */
// 多线程模式时,在这里会return
if (postponeClientRead(c)) return;
......
}
3. 看postponeClientRead
int postponeClientRead(client *c) {
if (server.io_threads_active &&
server.io_threads_do_reads &&
!ProcessingEventsWhileBlocked &&
!(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_BLOCKED)) &&
io_threads_op == IO_THREADS_OP_IDLE)
{
// 把该client 假如到server的clients_pending_read中,放到list头
listAddNodeHead(server.clients_pending_read中,c);
// 互相引用一下
c->pending_read_list_node = listFirst(server.clients_pending_read);
return 1;
} else {
return 0;
}
}
4. IO线程创建后 持续干活,会调用IOThreadMain
void *IOThreadMain(void *myid) {
/* The ID is the thread number (from 0 to server.iothreads_num-1), and is
* used by the thread to just manipulate a single sub-array of clients. */
// myid 也是对应的list数组的id
long id = (unsigned long)myid;
char thdname[16];
......
while(1) {
......
/* Process: note that the main thread will never touch our list
* before we drop the pending count to 0. */
listIter li;
listNode *ln;
// 拿到对应的 list
listRewind(io_threads_list[id],&li);
// 遍历list
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (io_threads_op == IO_THREADS_OP_WRITE) {
writeToClient(c,0);
} else if (io_threads_op == IO_THREADS_OP_READ) {
// 执行读取操作
readQueryFromClient(c->conn);
} else {
serverPanic("io_threads_op value is unknown");
}
}
listEmpty(io_threads_list[id]);
setIOPendingCount(id, 0);
}
}
那么什么时候去将io_threads_op 设置为IO_THREADS_OP_READ
呢
在eventloop 的 beforeSleep中, beforeSleep 会调用handleClientsWithPendingReadsUsingThreads
int handleClientsWithPendingReadsUsingThreads(void) {
......
/* Distribute the clients across N different lists. */
listIter li;
listNode *ln;
// 获取之前postponeClientRead中添加到list中 的所有元素list
listRewind(server.clients_pending_read,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
int target_id = item_id % server.io_threads_num;
// 将元素即对应的client,添加到对应的线程的list中去
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
/* Give the start condition to the waiting threads, by setting the
* start condition atomic var. */
// 将io线程标志 置为 IO_THREADS_OP_READ
io_threads_op = IO_THREADS_OP_READ;
// 设置所有 IO线程 待处理的请求的个数,求和
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
setIOPendingCount(j, count);
}
/* Also use the main thread to process a slice of clients. */
listRewind(io_threads_list[0],&li);
// 主线程也有自己的 list,即index == 0 的位置的list, 在这里也参与IO操作,不浪费一点性能
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
readQueryFromClient(c->conn);
}
listEmpty(io_threads_list[0]);
/* Wait for all the other threads to end their work. */
// 这里是再次求和,知道发现所有的客户端请求完成才退出循环
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += getIOPendingCount(j);
if (pending == 0) break;
}
// 这里都执行完成了,再讲IO线程标志位职位IDLE
io_threads_op = IO_THREADS_OP_IDLE;
......
}
从源码中的解释也可以看出,虽然加入了多线程,加快了命令处理的速度,但是如果某个命令特别耗时,
会影响 beforeSleep函数一直卡住,导致后续的命令一直pending,不能加入到io线程的list中
所以还是要注意命令的执行效率
5. 最后每个线程单独调用 readQueryFromClient, 由上面分析,当前io_threads_op 为 read 状态
void readQueryFromClient(connection *conn) {
client *c = connGetPrivateData(conn);
int nread, big_arg = 0;
size_t qblen, readlen;
/* Check if we want to read from the client later when exiting from
* the event loop. This is the case if threaded I/O is enabled. */
// 此时不会return, 会往下执行命令操作了
if (postponeClientRead(c)) return;
......
}
int postponeClientRead(client *c) {
// io线程来的, io_threads_op 为 read,返回0
if (server.io_threads_active &&
server.io_threads_do_reads &&
!ProcessingEventsWhileBlocked &&
!(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_BLOCKED)) &&
io_threads_op == IO_THREADS_OP_IDLE)
{
listAddNodeHead(server.clients_pending_read,c);
c->pending_read_list_node = listFirst(server.clients_pending_read);
return 1;
} else {
return 0;
}
}