首页 > 数据库 > redis7源码分析:redis 多线程模型解析

redis7源码分析:redis 多线程模型解析

时间:2023-10-02 18:12:18浏览次数:41  
标签:IO redis7 list server 源码 threads io 线程 多线程

多线程模式中,在main函数中会执行InitServerLast

void InitServerLast() {
    bioInit();
    // 关键一步, 这里启动了多条线程,用于执行命令,redis起名为IO 线程
    initThreadedIO();
    set_jemalloc_bg_thread(server.jemalloc_bg_thread);
    server.initial_memory_usage = zmalloc_used_memory();
}

1. 看initThreadedIO

networking.c

void initThreadedIO(void) {
    server.io_threads_active = 0; /* We start with threads not active. */

    /* Indicate that io-threads are currently idle */
    io_threads_op = IO_THREADS_OP_IDLE;

    /* Don't spawn any thread if the user selected a single thread:
     * we'll handle I/O directly from the main thread. */
    if (server.io_threads_num == 1) return;

    if (server.io_threads_num > IO_THREADS_MAX_NUM) {
        serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
                             "The maximum number is %d.", IO_THREADS_MAX_NUM);
        exit(1);
    }

    /* Spawn and initialize the I/O threads. */
    for (int i = 0; i < server.io_threads_num; i++) {
        /* Things we do for all the threads including the main thread. */
        // 为线程创建一个 list,并将该list 的指针加入到io_threads_list中
        io_threads_list[i] = listCreate();
        if (i == 0) continue; /* Thread 0 is the main thread. */

        /* Things we do only for the additional threads. */
        pthread_t tid;
        pthread_mutex_init(&io_threads_mutex[i],NULL);
        setIOPendingCount(i, 0);
        pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
        // 这里创建线程,并将线程id 假如到io_threads数组中
        if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
            serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
            exit(1);
        }
        io_threads[i] = tid;
    }
}
这样在该函数中,创建了线程 并 为该线程分配了一个 list

2. 当client 发命令过来时,会调用到readQueryFromClient

void readQueryFromClient(connection *conn) {
    client *c = connGetPrivateData(conn);
    int nread, big_arg = 0;
    size_t qblen, readlen;

    /* Check if we want to read from the client later when exiting from
     * the event loop. This is the case if threaded I/O is enabled. */
    // 多线程模式时,在这里会return
    if (postponeClientRead(c)) return;
......
}

3. 看postponeClientRead

int postponeClientRead(client *c) {
    if (server.io_threads_active &&
        server.io_threads_do_reads &&
        !ProcessingEventsWhileBlocked &&
        !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_BLOCKED)) &&
        io_threads_op == IO_THREADS_OP_IDLE)
    {
        // 把该client 假如到server的clients_pending_read中,放到list头
        listAddNodeHead(server.clients_pending_read中,c);
        // 互相引用一下
        c->pending_read_list_node = listFirst(server.clients_pending_read);
        return 1;
    } else {
        return 0;
    }
}

4. IO线程创建后 持续干活,会调用IOThreadMain

void *IOThreadMain(void *myid) {
    /* The ID is the thread number (from 0 to server.iothreads_num-1), and is
     * used by the thread to just manipulate a single sub-array of clients. */
    // myid 也是对应的list数组的id
    long id = (unsigned long)myid;
    char thdname[16];

......

    while(1) {
......

        /* Process: note that the main thread will never touch our list
         * before we drop the pending count to 0. */
        listIter li;
        listNode *ln;
        // 拿到对应的 list
        listRewind(io_threads_list[id],&li);
        // 遍历list
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            if (io_threads_op == IO_THREADS_OP_WRITE) {
                writeToClient(c,0);
            } else if (io_threads_op == IO_THREADS_OP_READ) {
                // 执行读取操作
                readQueryFromClient(c->conn);
            } else {
                serverPanic("io_threads_op value is unknown");
            }
        }
        listEmpty(io_threads_list[id]);
        setIOPendingCount(id, 0);
    }
}

那么什么时候去将io_threads_op 设置为IO_THREADS_OP_READ
在eventloop 的 beforeSleep中, beforeSleep 会调用handleClientsWithPendingReadsUsingThreads
int handleClientsWithPendingReadsUsingThreads(void) {
......
    /* Distribute the clients across N different lists. */
    listIter li;
    listNode *ln;
    // 获取之前postponeClientRead中添加到list中 的所有元素list
    listRewind(server.clients_pending_read,&li);
    int item_id = 0;
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        int target_id = item_id % server.io_threads_num;
        // 将元素即对应的client,添加到对应的线程的list中去
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }

    /* Give the start condition to the waiting threads, by setting the
     * start condition atomic var. */
    // 将io线程标志 置为 IO_THREADS_OP_READ
    io_threads_op = IO_THREADS_OP_READ;
    // 设置所有 IO线程 待处理的请求的个数,求和
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        setIOPendingCount(j, count);
    }

    /* Also use the main thread to process a slice of clients. */
    listRewind(io_threads_list[0],&li);
    // 主线程也有自己的 list,即index == 0 的位置的list, 在这里也参与IO操作,不浪费一点性能
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        readQueryFromClient(c->conn);
    }
    listEmpty(io_threads_list[0]);

    /* Wait for all the other threads to end their work. */
    // 这里是再次求和,知道发现所有的客户端请求完成才退出循环
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += getIOPendingCount(j);
        if (pending == 0) break;
    }

    // 这里都执行完成了,再讲IO线程标志位职位IDLE
    io_threads_op = IO_THREADS_OP_IDLE;
......
}
从源码中的解释也可以看出,虽然加入了多线程,加快了命令处理的速度,但是如果某个命令特别耗时,
会影响 beforeSleep函数一直卡住,导致后续的命令一直pending,不能加入到io线程的list中

所以还是要注意命令的执行效率

5. 最后每个线程单独调用 readQueryFromClient, 由上面分析,当前io_threads_op 为 read 状态
void readQueryFromClient(connection *conn) {
    client *c = connGetPrivateData(conn);
    int nread, big_arg = 0;
    size_t qblen, readlen;

    /* Check if we want to read from the client later when exiting from
     * the event loop. This is the case if threaded I/O is enabled. */
	// 此时不会return, 会往下执行命令操作了
    if (postponeClientRead(c)) return;
......
}

int postponeClientRead(client *c) {
    // io线程来的, io_threads_op 为 read,返回0
    if (server.io_threads_active &&
        server.io_threads_do_reads &&
        !ProcessingEventsWhileBlocked &&
        !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_BLOCKED)) &&
        io_threads_op == IO_THREADS_OP_IDLE)
    {
        listAddNodeHead(server.clients_pending_read,c);
        c->pending_read_list_node = listFirst(server.clients_pending_read);
        return 1;
    } else {
        return 0;
    }
}

到此分析结束

多线程模型为 主线程接收 accept 和 read, write, 但不操作,将对应的client 假如到对应iothread的list中,然后由子线程处理。

标签:IO,redis7,list,server,源码,threads,io,线程,多线程
From: https://www.cnblogs.com/gradyblog/p/17740286.html

相关文章

  • redis7源码分析:redis 单线程模型解析,一条get命令执行流程
    有了下文的梳理后redis启动流程再来解析redis在单线程模式下解析并处理客户端发来的命令1.当clientfd可读时,会回调readQueryFromClient函数voidreadQueryFromClient(connection*conn){client*c=connGetPrivateData(conn);intnread,big_arg=0;size_......
  • redis7源码分析:redis 启动流程
    1.redis由server.c的main函数启动intmain(intargc,char**argv){...//上面的部分为读取配置和启动命令参数解析,看到这一行下面为启动流程serverLog(LL_WARNING,"oO0OoO0OoO0OoRedisisstartingoO0OoO0OoO0Oo");... //这里对服务进行初始化操作ini......
  • Python爬虫源码,Behance 作品图片及内容 selenium 采集爬虫
    前面有分享过requests采集Behance作品信息的爬虫,这篇带来另一个版本供参考,使用的是无头浏览器selenium采集,主要的不同方式是使用selenium驱动浏览器获取到页面源码,后面获取信息的话与前篇一致。Python爬虫源码,Behance作品图片及内容采集爬虫附工具脚本!理论上,几乎所有的页面内......
  • redis 源码分析:Jedis 哨兵模式连接原理
    1.可以从单元测试开始入手查看类JedisSentinelPoolprivatestaticfinalStringMASTER_NAME="mymaster";protectedstaticfinalHostAndPortsentinel1=HostAndPorts.getSentinelServers().get(1);protectedstaticfinalHostAndPortsentinel2=HostAndPorts......
  • pthread实现多线程矩阵乘法
    #include<pthread.h>#include<stdio.h>#include<windows.h>#include<iostream>usingnamespacestd;#pragmacomment(lib,"pthreadVC2.lib")#definerowCount1300#definemediumCount1500#definecolumnCount5000#definen_threa......
  • diskqueue的数据定义,运转核心ioloop()源码详解
     nsq中diskqueue是nsq消息持久化的核心,内容较多,一共分为多篇1.diskqueue是什么,为什么需要它,整体架构图,对外接口2.diskqueue的元数据文件,数据文件,启动入口,元数据文件读写及保存3.diskqueue的数据定义,运转核心ioloop()源码详解4. diskqueue怎么写入消息,怎么对外发送消息前面一篇......
  • 9.1 运用API创建多线程
    在Windows平台下创建多线程有两种方式,读者可以使用CreateThread函数,或者使用beginthreadex函数均可,两者虽然都可以用于创建多线程环境,但还是存在一些差异的,首先CreateThread函数它是Win32API的一部分,而_beginthreadex是C/C++运行库的一部分,在参数返回值类型方面,CreateThread返回......
  • lapce源码学习-编译调试
    master分支调试1、报错:`#![feature]`maynotbeusedonthestablereleasechannel2、Channel切换到nightly,报错:thetraitbound`file_type::FileType:std::sealed::Sealed`isnotsatisfied3、Channel切换到beta,编译ok,但提示不能调试rustupinstallbeta4、编译成功后,......
  • Android中OkHttp源码阅读二(责任链模式)
    AndroidOkHttp源码阅读详解一看OkHttp源码,发现OkHttp里面使用了责任链设计模式,所以才要学习责任链设计模式小节2最终会返回ResponseResponsegetResponseWithInterceptorChain()throwsIOException{//Buildafullstackofinterceptors.List<Interceptor>inte......
  • qemu源码分析(9)--Apple的学习笔记
    一, 前言本章节主要是再把GPIO创建的内容进行细化,搞明白gpio是否一个object,还和其它什么内容有关。二,分析 GPIOA,GPIOB等包括他们的寄存器都是object。每个对象都会再object_new的时候分配空间,比如GPIOA和GPIOB都有自己的空间。创建GPIOA,主要包括创建goio-peripheral类型及在conta......