Redis在互联网架构存储系统中是使用最为广泛的中间件。基于内存实现了多中数据结构,通常被用作与内存数据 库、缓存、消息队列和流引擎。Redis 提供多种数据结构,如:字符串、散列表、列表、集合、范围 询的排序集合、位图、hyperloglogs、地理空间索引和流(注意:我们研究的基础版2.6 将不包含 hyperloglogs、地理空间索引、流)。Redis 内置了副本集自动复制、Lua 脚本、数据LRU策略、事务和不同级别的数据持久化策略,并通过两种方式提供高可用性:Redis Sentinel(哨兵模式)、Redis Cluster
所有的这些数据类型都是原子性操作,例如:像字符串追加值、增加某个哈希类型的value值、将元素推入列表、计算交集、并集 、差集、获取排序集中排名最高的成员。
为了达到Redis的最佳性能,Redis 使用基于内存的数据集,而不是基于磁盘的数据。Redis 通过设置 可以定期的将内存中的数据集持久化磁盘或在每次执行指令后都将数据持久化磁盘。如果您只需要一个功能丰富的网络化内存缓存,您也可以禁用该持久化特性。(注:使用磁盘持久化是为了保证数据不会丢失,因为内存是一个掉电就丢数据的存储器,而对于缓存而言为什么能够不启用持久化特性?因为缓 存就是对后端的真实数据的临时存储,允许丢失,丢失后只需要读后端的真实数据即可,但是这属于缓 存击穿,那么,是不是考虑对后端访问进行限流呢?但笔者发现:很多使用Redis的开发人员,压根就不是把它当作缓存使用,这很危险,因为redis最初就是设计来当缓存的,但是却被妖魔化出很多的功能,不是不能用,而是需要根据实际场景谨慎使用~)
Redis 支持副本集异步复制,其具有快速的非阻塞同步和副本集自动重新连接以及在网络发生阻塞时的 部分重新同步(注:为了保证高可用,那么Redis肯定设计出了异步复制当前数据到副本集服务器上, 但这里得考虑下如果复制途中,副本集服务器发生网络或者机器异常,当重新连接时如何做?全量重新 同步?或者部分同步?)
Redis由ANSIC编写,能在大多数POSIX系统上运行,如Linux、和MacOS X 除了C函数库没有外部依赖。Linux 和 OS X 是 Redis 开发和测试最多的两个操作系统,我们推荐使用 Linux 进行部署。Redis 可以在 Solaris 派生的系统(如 SmartOS)中工作,但支持非常有限。Windows 版本没有官方支持。
ANSI C与GNU C
ANSI C与GNU C均为C的标准定义。ANSI C标准 ANSI C标准被几乎所有广泛使用的编译器所支持, 多数C语言代码是在ANSI C基础上进行编写。ANSI C是美国国家标准协会(ANSI)对C语言发布的标 准,使用C的软件开发者被鼓励遵循ANSI C文档的要求。ANSI C经历了以下的历史过程:
1、C89标准:1983年,美国国家标准协会组成了一个委员会为了创建C语言的一套标准,经过漫 长的过程,该标准于1989年完成。因此,这个版本的语言也经常被称为C89标准。
2、 C90及后续标准:在1990年,ANSI C标准有了一些小的改动,这个版本有时候也被、称为C90 标准。随着时代的发展,ANSI C标准也在不断与时俱进。出现了C99标准、C11标准等 GNU C标准起源于GNU计划,它由Richard Stallman在1983年9月27日公开发起,目标是创建一套完全 自由的操作系统,但是它们设计了一整套编写操作系统的套件:编译器、汇编器、链接器等,唯独没有 操作系统,当时,Linus正在编写Linux,而这时正好编写系统的套件,就是这么巧:这时直接使用 GNU C的套件,而GNU 套件定义了GNU C 的标准(所以Linux也被称之为:GNU / Linux )。我们知 道任何东西都不会可能独立存在,因为ANSI C的影响甚远,GCC也不得不支持其中的标准,否则就会 成为孤家寡人,所以在GCC 4.7版本时,GCC 编译器已支持4种C语言标准:C89/C90、C94/C95、 C99/C11(不完全支持)。 那么GNU C与ANSI C有何区别呢?我们可以这样认为:GNU C 对 ANSI C进行了增强,补充了很多新特性:零长度和变量长度数组、case语句范围、语句表达式、typeof关键字、可变参数宏、标号元素、 当前函数名、特殊属性声明、内建函数等等特性
Redis 源码结构主要为下面几点:
1 deps:依赖库函数
2 src:主要源代码
3 tests:测试用例
4 utils:工具库
main函数在redis.c 中,我们知道C语言中,main函数作为入口函数,Redis的初始化和启动肯定也在其中,便从该函数逐个分析Redis的启动流程,Redis 就是:事件循环 + 数据结构。内部大量使用了ANSI C的语法和库函数,这需要我们了解它们才能更好理解,当然我们也可以直接使用 main 命令来查询这些函数的作用。现在我们就来从main 函数开始来看看Redis在初始化和启动时完成的操作,同时找到接受并处理客户端请求的入口。
main 函数
int main(int argc, char **argv) { struct timeval tv; zmalloc_enable_thread_safeness(); // 设置变量 static int zmalloc_thread_safe = 1; zmalloc_set_oom_handler(redisOutOfMemoryHandler); // 设置发生OOM时回调的函数 srand(time(NULL)^getpid()); // 使用当前进程的ID(getpid())与当前时间一起初始化随机数种子 gettimeofday(&tv,NULL); // 获取当前时间信息,放入timeval结构体 dictSetHashFunctionSeed(tv.tv_sec^tv.tv_usec^getpid()); // 使用当前时间的秒信息(tv_sec) 与毫秒信息(tv_usec) 与进程id 设置变量 static uint32_t dict_hash_function_seed = 5381,在hash函数中是用来散列key-value server.sentinel_mode = checkForSentinelMode(argc,argv); // 从参数中解析,看看是否以哨兵模式启动,这里我们忽略 initServerConfig(); // 初始化Redis 服务器的相关配置信息 if (server.sentinel_mode) { // 初始化哨兵模式(忽略) initSentinelConfig(); initSentinel(); } if (argc >= 2) { // 解析参数 int j = 1; sds options = sdsempty(); // 分配一个redis 字符串来保存参数 char *configfile = NULL;
// 根据参数值完成相应处理 if (strcmp(argv[1], "-v") == 0 || strcmp(argv[1], "--version") == 0) version(); if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-h") == 0) usage(); if (strcmp(argv[1], "--test-memory") == 0) { if (argc == 3) { memtest(atoi(argv[2]),50); exit(0); } else { fprintf(stderr,"Please specify the amount of memory to test in megabytes.\n"); fprintf(stderr,"Example: ./redis-server --test-memory 4096\n\n"); exit(1); } } if (argv[j][0] != '-' || argv[j][1] != '-') configfile = argv[j++]; while(j != argc) { if (argv[j][0] == '-' && argv[j][1] == '-') { // 解析参数名 if (sdslen(options)) options = sdscat(options,"\n"); options = sdscat(options,argv[j]+2); options = sdscat(options," "); } else { // 解析参数值 options = sdscatrepr(options,argv[j],strlen(argv[j])); options = sdscat(options," "); } j++; } resetServerSaveParams(); // 重置服务器参数:server.saveparams = NULL loadServerConfig(configfile,options); // 根据参数中指定的配置文件和参数初始化服务器配置 sdsfree(options); // 启动参数主要用于初始化服务器配置,使用完毕后释放其占用空间 } else { // 没有指定配置文件,那么打印警告日志 redisLog(REDIS_WARNING, "Warning: no config file specified, using the default config. In order to specify a config file use %s /path/to/%s.conf", argv[0], server.sentinel_mode ? "sentinel" : "redis"); } // 如果指定Redis 作为后台进程运行,那么调用daemonize fork出另外的进程在后台执行 if (server.daemonize) daemonize(); initServer(); // 完成Redis Server的剩余变量初始化 if (server.daemonize) createPidFile(); // 若Redis在后台执行,那么创建pid文件,告知当前正在后台 执行的Redis pid信息 redisAsciiArt(); // 打印Redis ascii编码的图形(忽略) if (!server.sentinel_mode) { // 执行只有在不运行在哨兵模式时才需要的动作 redisLog(REDIS_WARNING,"Server started, Redis version " REDIS_VERSION); // 如果在Linux平台下,指定了 OvercommitMemory 为 0,那么打印警告日志(注: overcommit_memory=0, 表示内核将检查是否有足够的可用内存供应用进程使用,如果有足够 的可 用内存,内存申请允许,否则,内存申请失败,并把错误返回给应用进程。 overcommit_memory=1, 表示内核允许分配所有的物理内存,而不管当前的内存状态如何。 overcommit_memory=2, 表示内 核允许分配超过所有物理内存和swap空间总和的内存) #ifdef __linux__ linuxOvercommitMemoryWarning(); #endif loadDataFromDisk(); // 从磁盘中加载之前保存的RDB或者AOF文件 if (server.ipfd > 0) // 正确设置TCP socket FD 文件描述符,表明当前可以正确接收来自客户端的 TCP连接 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port); if (server.sofd > 0) // 正确设置Unix socket FD 文件描述符,表明当前可以正确接收来自本机进程的 Unix套接字连接(什么是Unix套接字?一种本地进程通讯的方式,不经过协议栈,但可以使用Socket 来完成通讯) redisLog(REDIS_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket); } // 检查用户设置的内存是否足够,需要大于1M if (server.maxmemory > 0 && server.maxmemory < 1024*1024) { redisLog(REDIS_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory); } aeSetBeforeSleepProc(server.el,beforeSleep); // 设置执行睡眠前的回调函数:eventLoop- >beforesleep = beforesleep; aeMain(server.el); // 执行主事件循环,处理准备好的事件 aeDeleteEventLoop(server.el); // 退出循环,表明当前需要关闭Redis,那么关闭打开的事件循环 return 0; }
通过源码我们得知如下流程信息:
1 Redis在启动时,将会初始化服务器响应的配置信息,这些配置信息我们在后面用到时再详细讲
2 解析参数并根据客户端参数加载并解析配置文件,然后进一步初始化Redis Server的变量信息
3 检测内核的内存分配策略参数 检测TCP FD与Unix FD
4 设置事件循环eventloop并启动主循环处理程序 结束后删除事件循环结构
initServerConfig 函数
该函数用于初始化全局结构redisServer的变量,读者这里只需要观察初始化了哪些变量即可,不需要 关注初始化的细节,因为这里面的值都是写死的默认值,对于可配置的变量将可以通过后面解析config 文件的变量进行覆盖,当我们不需要使用这些变量时,不需要了解其意思,在后面函数中用到对应参数 时,再详细讲解。源码如下。 // Redis 服务器全局结构,其中的变量,我们在用到时在分析,这里只需要了解拥有该结构即可 struct redisServer { }; struct redisServer server; void initServerConfig() { ... // 初始化普通变量 /* 初始化副本集配置 */ server.masterauth = NULL; server.masterhost = NULL; server.masterport = 6379; server.master = NULL; server.repl_state = REDIS_REPL_NONE; server.repl_syncio_timeout = REDIS_REPL_SYNCIO_TIMEOUT; server.repl_serve_stale_data = 1; server.repl_slave_ro = 1; server.repl_down_since = time(NULL); server.slave_priority = REDIS_DEFAULT_SLAVE_PRIORITY; /* 初始化客户端输出缓冲区限制值 */ server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_NORMAL].hard_limit_bytes = 0; server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_NORMAL].soft_limit_bytes = 0; server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_NORMAL].soft_limit_seconds = 0; server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_SLAVE].hard_limit_bytes = 1024*1024*256; server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_SLAVE].soft_limit_bytes = 1024*1024*64; server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_SLAVE].soft_limit_seconds = 60; server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_PUBSUB].hard_limit_bytes = 1024*1024*32; server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_PUBSUB].soft_limit_bytes = 1024*1024*8; server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_PUBSUB].soft_limit_seconds = 60; /* 初始化浮点值计算变量 */ R_Zero = 0.0; R_PosInf = 1.0/R_Zero; R_NegInf = -1.0/R_Zero; R_Nan = R_Zero/R_Zero; /* 初始化命令行参数表(采用hash表结构来存储) */ server.commands = dictCreate(&commandTableDictType,NULL); populateCommandTable(); server.delCommand = lookupCommandByCString("del"); server.multiCommand = lookupCommandByCString("multi"); server.lpushCommand = lookupCommandByCString("lpush"); server.lpopCommand = lookupCommandByCString("lpop"); server.rpopCommand = lookupCommandByCString("rpop"); /* 初始化慢日志参数 */ server.slowlog_log_slower_than = REDIS_SLOWLOG_LOG_SLOWER_THAN; server.slowlog_max_len = REDIS_SLOWLOG_MAX_LEN; /* 初始化调试参数 */ server.assert_failed = "<no assertion failed>"; server.assert_file = "<no file>"; server.assert_line = 0; server.bug_report_start = 0; server.watchdog_period = 0; }
initServer 函数
该函数用于初始化RedisServer结构的剩余参数。该函数将会把Redis的动态信息进行初始化:
1 信号处理相关
2 全局结构
3 处理客户端连接的TCP端口、Unix端口和对应的处理函数:acceptTcpHandler、 acceptUnixHandler(至此:我们看到客户端的输入事件将会在这两个函数处理,而该两个函数如 何注册到事件循环中,以及事件循环如何接收客户端连接,我们下一篇详细讲解,这里了解即 可,因为我们的目的是观察Redis的启动流程,以及涉及到的全局结构)
4 Redis 数据库
5 LUA 执行脚本
6 慢查询日志
7 后台执行线程
void initServer() { int j; signal(SIGHUP, SIG_IGN); // 屏蔽SIGHUP信号,SIG_IGN表示 SIG_IGNORE 忽略该信号(该信号讲在控制进程或者终端退出时发出。手册描述:Hangup detected on controlling terminal or death ofcontrolling process) signal(SIGPIPE, SIG_IGN);// 屏蔽SIGPIPE信号(该信号: 在写入pipe管道时,没有进程读取数据时发出。手册描述:Broken pipe: write to pipe with no readers) setupSignalHandlers(); // 设置其他信号的处理函数 server.current_client = NULL; server.clients = listCreate(); server.clients_to_close = listCreate(); server.slaves = listCreate(); server.monitors = listCreate(); server.unblocked_clients = listCreate(); server.ready_keys = listCreate(); createSharedObjects(); // 创建redis共享对象:struct sharedObjectsStruct {},节省每次在使用共享对象时都需要分配和释放内存导致的性能损耗,考虑下常量池的设计 adjustOpenFilesLimit();// 调整redis 打开文件描述符的限制 server.el = aeCreateEventLoop(server.maxclients+1024); // 创建事件循环线程 server.db = zmalloc(sizeof(redisDb)*server.dbnum); // 创建redis 数据库对象:typedef struct redisDb {} redisDb // 初始化 TCP 端口 if (server.port != 0) { server.ipfd = anetTcpServer(server.neterr,server.port,server.bindaddr); if (server.ipfd == ANET_ERR) { redisLog(REDIS_WARNING, "Opening port %d: %s", server.port, server.neterr); exit(1); } } // 初始化Unix 端口 if (server.unixsocket != NULL) { unlink(server.unixsocket); /* don't care if this fails */ server.sofd = anetUnixServer(server.neterr,server.unixsocket,server.unixsocketperm); if (server.sofd == ANET_ERR) { redisLog(REDIS_WARNING, "Opening socket: %s", server.neterr); exit(1); } } // 检测端口是否有效 if (server.ipfd < 0 && server.sofd < 0) { redisLog(REDIS_WARNING, "Configured to not listen anywhere, exiting."); exit(1); } // 创建Redis 数据库 for (j = 0; j < server.dbnum; j++) { server.db[j].dict = dictCreate(&dbDictType,NULL); server.db[j].expires = dictCreate(&keyptrDictType,NULL); server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL); server.db[j].ready_keys = dictCreate(&setDictType,NULL); server.db[j].watched_keys = dictCreate(&keylistDictType,NULL); server.db[j].id = j; } // 初始化其他参数 ... aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL); // 创建时间事件函数 :serverCron if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE, acceptTcpHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.ipfd file event."); // 创建处理TCP客户端连接事件处理函数:acceptTcpHandler if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE, acceptUnixHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.sofd file event."); // 创建处理unix客户端连接处理函数:acceptUnixHandler if (server.aof_state == REDIS_AOF_ON) { // 打开AOF持久化文件 server.aof_fd = open(server.aof_filename, O_WRONLY|O_APPEND|O_CREAT,0644); if (server.aof_fd == -1) { redisLog(REDIS_WARNING, "Can't open the append-only file: %s" strerror(errno)); exit(1); } } scriptingInit(); // 初始化LUA 脚本执行环境 slowlogInit(); // 初始化慢查询日志模块 bioInit(); // 创建后台执行线程 } setupSignalHandlers 函数 该函数主要用于设置进程信号处理函数。响应描述如下。该函数不重要,如果读者没有OS信号处理机 制,较难理解它干了什么,这里只需要知道信号对应处理函数即可,等后面了解了进程如何处理信号 时,将手到擒来。源码如下。 static void sigtermHandler(int sig) { // 接收到终止信号时调用,打印日志 REDIS_NOTUSED(sig); redisLogFromHandler(REDIS_WARNING,"Received SIGTERM, scheduling shutdown..."); server.shutdown_asap = 1; } void setupSignalHandlers(void) { struct sigaction act; sigemptyset(&act.sa_mask); act.sa_flags = 0; act.sa_handler = sigtermHandler; sigaction(SIGTERM, &act, NULL); // 设置SIGTERM信号处理函数(接收终止信号时发出该信号) #ifdef HAVE_BACKTRACE // 使用backtrace函数时设置处理函数为:sigsegvHandler sigemptyset(&act.sa_mask); act.sa_flags = SA_NODEFER | SA_RESETHAND | SA_SIGINFO; act.sa_sigaction = sigsegvHandler; sigaction(SIGSEGV, &act, NULL); sigaction(SIGBUS, &act, NULL); sigaction(SIGFPE, &act, NULL); sigaction(SIGILL, &act, NULL); #endif return; }
createSharedObjects 函数
void createSharedObjects(void) { int j; shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n")); shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n")); shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n")); shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n")); shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n")); shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n")); shared.cnegone = createObject(REDIS_STRING,sdsnew(":-1\r\n")); shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n")); shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n")); shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n")); shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n")); shared.queued = createObject(REDIS_STRING,sdsnew("+QUEUED\r\n")); shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew( "-ERR Operation against a key holding the wrong kind of value\r\n")); shared.nokeyerr = createObject(REDIS_STRING,sdsnew( "-ERR no such key\r\n")); shared.syntaxerr = createObject(REDIS_STRING,sdsnew( "-ERR syntax error\r\n")); shared.sameobjecterr = createObject(REDIS_STRING,sdsnew( "-ERR source and destination objects are the same\r\n")); shared.outofrangeerr = createObject(REDIS_STRING,sdsnew( "-ERR index out of range\r\n")); shared.noscripterr = createObject(REDIS_STRING,sdsnew( "-NOSCRIPT No matching script. Please use EVAL.\r\n")); shared.loadingerr = createObject(REDIS_STRING,sdsnew( "-LOADING Redis is loading the dataset in memory\r\n")); shared.slowscripterr = createObject(REDIS_STRING,sdsnew( "-BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.\r\n")); shared.masterdownerr = createObject(REDIS_STRING,sdsnew( "-MASTERDOWN Link with MASTER is down and slave-serve-stale-data is set to 'no'.\r\n")); shared.bgsaveerr = createObject(REDIS_STRING,sdsnew( "-MISCONF Redis is configured to save RDB snapshots, but is currently not able to persist on disk. Commands that may modify the data set are disabled. Please check Redis logs for details about the error.\r\n")); shared.roslaveerr = createObject(REDIS_STRING,sdsnew( "-READONLY You can't write against a read only slave.\r\n")); shared.oomerr = createObject(REDIS_STRING,sdsnew( "-OOM command not allowed when used memory > 'maxmemory'.\r\n")); shared.space = createObject(REDIS_STRING,sdsnew(" ")); shared.colon = createObject(REDIS_STRING,sdsnew(":")); shared.plus = createObject(REDIS_STRING,sdsnew("+")); for (j = 0; j < REDIS_SHARED_SELECT_CMDS; j++) { shared.select[j] = createObject(REDIS_STRING, sdscatprintf(sdsempty(),"select %d\r\n", j)); } shared.messagebulk = createStringObject("$7\r\nmessage\r\n",13); shared.pmessagebulk = createStringObject("$8\r\npmessage\r\n",14); shared.subscribebulk = createStringObject("$9\r\nsubscribe\r\n",15); shared.unsubscribebulk = createStringObject("$11\r\nunsubscribe\r\n",18); shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n",17); shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n",19); shared.del = createStringObject("DEL",3); shared.rpop = createStringObject("RPOP",4); shared.lpop = createStringObject("LPOP",4); shared.lpush = createStringObject("LPUSH",5); for (j = 0; j < REDIS_SHARED_INTEGERS; j++) { shared.integers[j] = createObject(REDIS_STRING,(void*)(long)j); shared.integers[j]->encoding = REDIS_ENCODING_INT; } for (j = 0; j < REDIS_SHARED_BULKHDR_LEN; j++) { shared.mbulkhdr[j] = createObject(REDIS_STRING sdscatprintf(sdsempty(),"*%d\r\n",j)); shared.bulkhdr[j] = createObject(REDIS_STRING, sdscatprintf(sdsempty(),"$%d\r\n",j)); } }
adjustOpenFilesLimit 函数
我们知道Linux操作系统将会在rlimit中限制进程打开的文件数,该函数将尝试根据配置的最大客户端数 来提高打开文件的最大数量。 它还将包含32个额外的文件描述符,因为Redis还需要一些文件描述符用 于持久性、侦听套接字、日志文件等。 如果不能将限制相应地设置为配置的最大客户端数量,该函数 将执行保留当前服务器的设置。源码如下。 void adjustOpenFilesLimit(void) { rlim_t maxfiles = server.maxclients+32; // 增加描述符数量 struct rlimit limit; if (getrlimit(RLIMIT_NOFILE,&limit) == -1) { // 获取当前系统的描述符信息 redisLog(REDIS_WARNING,"Unable to obtain the current NOFILE limit (%s), assuming 1024 and setting the max clients configuration accordingly.", strerror(errno)); server.maxclients = 1024-32; } else { // 尝试修改当前进程的FD配置 rlim_t oldlimit = limit.rlim_cur; if (oldlimit < maxfiles) { rlim_t f; f = maxfiles; while(f > oldlimit) { limit.rlim_cur = f; limit.rlim_max = f; if (setrlimit(RLIMIT_NOFILE,&limit) != -1) break; f -= 128; } if (f < oldlimit) f = oldlimit; if (f != maxfiles) { server.maxclients = f-32; redisLog(REDIS_WARNING,"Unable to set the max number of files limit to %d (%s), setting the max clients configuration to %d.", (int) maxfiles, strerror(errno), (int) server.maxclients); } else { redisLog(REDIS_NOTICE,"Max number of open files set to %d", (int) maxfiles); } } } }
slowlogInit 函数
该函数用于初始化慢查询日志,可以看到这里将使用list列表来完成日志的存储。同样,对于列表对象的组织.
void slowlogInit(void) { server.slowlog = listCreate(); server.slowlog_entry_id = 0; listSetFreeMethod(server.slowlog,slowlogFreeEntry); }
bioInit 函数
该函数用于创建后台任务执行线程,可以看到所谓Redis是单线程,指的仅仅是响应客户端的线程为单 线程,并不代表Redis进程本身只有一个线程。该方法将利用pthread 线程库完成相应处理.
void bioInit(void) { pthread_attr_t attr; pthread_t thread; size_t stacksize; int j; /* 初始化线程互斥锁和条件变量,也即:控制线程互斥的结构 */ for (j = 0; j < REDIS_BIO_NUM_OPS; j++) { pthread_mutex_init(&bio_mutex[j],NULL); pthread_cond_init(&bio_condvar[j],NULL); bio_jobs[j] = listCreate(); // 初始化每个线程的任务队列列表 bio_pending[j] = 0; } /* 初始化线程栈大小 */ pthread_attr_init(&attr); pthread_attr_getstacksize(&attr,&stacksize); if (!stacksize) stacksize = 1; while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2; pthread_attr_setstacksize(&attr, stacksize); /* 创建线程,参数为:当前线程的序号,函数执行函数为 bioProcessBackgroundJobs */ for (j = 0; j < REDIS_BIO_NUM_OPS; j++) { void *arg = (void*)(unsigned long) j; if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) { redisLog(REDIS_WARNING,"Fatal: Can't initialize Background Jobs."); exit(1); } } } // 每个线程执行的函数 void *bioProcessBackgroundJobs(void *arg) { struct bio_job *job; unsigned long type = (unsigned long) arg; // 该值为线程序号,上面看到过,可用于获取当前线程的 互斥锁信息 sigset_t sigset; pthread_detach(pthread_self()); pthread_mutex_lock(&bio_mutex[type]); // 获取当前线程自己的互斥锁 // 设置线程处理信号 sigemptyset(&sigset); sigaddset(&sigset, SIGALRM); if (pthread_sigmask(SIG_BLOCK, &sigset, NULL)) redisLog(REDIS_WARNING, "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno)); // 循环处理任务列表中放入的任务 while(1) { listNode *ln; if (listLength(bio_jobs[type]) == 0) { pthread_cond_wait(&bio_condvar[type],&bio_mutex[type]); continue; } ln = listFirst(bio_jobs[type]); job = ln->value; pthread_mutex_unlock(&bio_mutex[type]); if (type == REDIS_BIO_CLOSE_FILE) { // 当前线程类型为执行BIO关闭文件操作 close((long)job->arg1); } else if (type == REDIS_BIO_AOF_FSYNC) {// 当前线程类型为执行AOF操作 aof_fsync((long)job->arg1); } else { redisPanic("Wrong job type in bioProcessBackgroundJobs()."); } zfree(job); pthread_mutex_lock(&bio_mutex[type]); listDelNode(bio_jobs[type],ln); bio_pending[type]--; } }
aeSetBeforeSleepProc 函数
该函数仅仅保留一个函数指针而已。在事件循环线程执行sleep操作之前回调。这里的回调仅仅处理未处理的命令并写入AOF数据,因为每次执行后,如果配置了AOF,那么需要持久化到磁盘
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) { eventLoop->beforesleep = beforesleep; } // 回调函数实现 void beforeSleep(struct aeEventLoop *eventLoop) { REDIS_NOTUSED(eventLoop); listNode *ln; redisClient *c; /* 尝试为刚刚解除阻塞的客户端处理未执行的命令 */ while (listLength(server.unblocked_clients)) { ln = listFirst(server.unblocked_clients); redisAssert(ln != NULL); c = ln->value; listDelNode(server.unblocked_clients,ln); c->flags &= ~REDIS_UNBLOCKED; /* 处理客户端输入缓冲区中的剩余数据 */ if (c->querybuf && sdslen(c->querybuf) > 0) { server.current_client = c; processInputBuffer(c); server.current_client = NULL; } } /* 将AOF缓冲区的数据写入磁盘 */ flushAppendOnlyFile(0); }
aeMain 函数
该函数将有主线程完成调用,接收事件循环的事件完成相应处理。 aeProcessEvents(eventLoop, AE_ALL_EVENTS)为主执行入口
void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { // 当没有显示标志停止时不断循环处理事件循环中的线程 if (eventLoop->beforesleep != NULL) // 在执行处理事件时回调beforesleep,因为操作底层IO事件时可能导致线程阻塞 eventLoop->beforesleep(eventLoop); aeProcessEvents(eventLoop, AE_ALL_EVENTS); // 完成实际事件处理,可以看到这里将处理所有类型的事件(AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS) #define AE_FILE_EVENTS 1 #define AE_TIME_EVENTS 2 ,事件类型分为:文件fd的事件、时间超时事件) } }
aeDeleteEventLoop 函数
该函数将在主线程完成处理后退出,然后释放事件循环结构
void aeDeleteEventLoop(aeEventLoop *eventLoop) { aeApiFree(eventLoop); // 处理实际事件循环fd的处理,比如:关闭epll的fd // 释放结构占用空间 zfree(eventLoop->events); zfree(eventLoop->fired); zfree(eventLoop); }
标签:STRING,Redis,REDIS,createObject,server,理解,深入,shared From: https://www.cnblogs.com/daomeidan/p/16357050.html