首页 > 数据库 >Redis源码浅析二:命令执行

Redis源码浅析二:命令执行

时间:2024-08-25 23:24:44浏览次数:13  
标签:querybuf cmd Redis server 源码 flags client && 浅析

1. 入口:readQueryFromClient

在redis启动的时候,我们还要关注一个重点,在initServer的时候,会执行aeCreateFileEvent,这里我们还有深入学习一下
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

记住这个readQueryFromClient,它是client request到server端处理的入口,有点类似于netty里面的入栈处理器

2. readQueryFromClient

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    client *c = (client*) privdata;  // 获取客户端对象
    int nread, readlen;
    size_t qblen;

    UNUSED(el);
    UNUSED(mask);

    // 读取的最大长度,默认是16KB
    readlen = PROTO_IOBUF_LEN;

    // 如果是多条批量请求,并且当前正在处理的批量回复的大小大于一定阈值
    if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
        && c->bulklen >= PROTO_MBULK_BIG_ARG)
    {
        // 计算当前需要读取的数据量
        ssize_t remaining = (size_t)(c->bulklen + 2) - sdslen(c->querybuf);

        if (remaining > 0 && remaining < readlen) readlen = remaining;
    }

    // 获取当前查询缓冲区的长度
    qblen = sdslen(c->querybuf);

    // 如果当前查询缓冲区的长度超过了历史峰值,则更新峰值
    if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;

    // 为查询缓冲区预留足够的空间以读取新的数据
    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);

    // 从客户端 socket 读取数据
    nread = read(fd, c->querybuf + qblen, readlen);

    // 如果读取出错
    if (nread == -1) {
        if (errno == EAGAIN) {
            // 如果错误是由于资源暂时不可用,直接返回,等待下次事件循环
            return;
        } else {
            // 处理其他错误,如网络错误
            serverLog(LL_VERBOSE, "Reading from client: %s", strerror(errno));
            freeClient(c);
            return;
        }
    }
    // 如果读取到的数据为 0,表示客户端已经关闭连接
    else if (nread == 0) {
        serverLog(LL_VERBOSE, "Client closed connection");
        freeClient(c);
        return;
    }
    // 如果客户端是主节点(用于复制的客户端)
    else if (c->flags & CLIENT_MASTER) {
        // 将读取到的数据追加到主节点的待处理缓冲区
        c->pending_querybuf = sdscatlen(c->pending_querybuf, c->querybuf + qblen, nread);
    }

    // 增加查询缓冲区的已用大小
    sdsIncrLen(c->querybuf, nread);

    // 更新客户端最后一次交互时间
    c->lastinteraction = server.unixtime;

    // 如果客户端是主节点,更新读取偏移量(用于复制)
    if (c->flags & CLIENT_MASTER) c->read_reploff += nread;

    // 增加统计数据中的网络输入字节数
    server.stat_net_input_bytes += nread;

    // 如果查询缓冲区长度超过最大允许长度,关闭客户端连接
    if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
        sds ci = catClientInfoString(sdsempty(), c), bytes = sdsempty();
        bytes = sdscatrepr(bytes, c->querybuf, 64);
        serverLog(LL_WARNING, "Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
        sdsfree(ci);
        sdsfree(bytes);
        freeClient(c);
        return;
    }

    // 处理输入缓冲区的命令,并将其传播到从节点(如果有)
    processInputBufferAndReplicate(c);
}

总结:readQueryFromClient 函数负责从客户端 socket 读取请求数据,将其存储在查询缓冲区中,并根据不同的情况进行处理(如命令解析、复制状态更新等)。如果数据读取不完整或发生错误,它会正确处理这些情况,并确保不会由于某个客户端的异常状态影响到整个 Redis 服务器的稳定性。通过这种方式,Redis 在单线程模式下实现了高效的非阻塞 I/O 操作和多客户端管理

3. processInputBufferAndReplicate

void processInputBufferAndReplicate(client *c) {
	//非主client
    if (!(c->flags & CLIENT_MASTER)) {
    	//命令解析
        processInputBuffer(c);
    }
    //主client master
    else {
        size_t prev_offset = c->reploff;

        processInputBuffer(c);
        size_t applied = c->reploff - prev_offset;
        //传播到从机
        if (applied) {
            replicationFeedSlavesFromMasterStream(server.slaves,
                    c->pending_querybuf, applied);
            sdsrange(c->pending_querybuf,applied,-1);
        }
    }
}

4. processInputBuffer

void processInputBuffer(client *c) {
    server.current_client = c;

    /* Keep processing while there is something in the input buffer */
    while(c->qb_pos < sdslen(c->querybuf)) {
        /* Return if clients are paused. */
        if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;

        /* Immediately abort if the client is in the middle of something. */
        if (c->flags & CLIENT_BLOCKED) break;

        /* Don't process input from the master while there is a busy script
         * condition on the slave. We want just to accumulate the replication
         * stream (instead of replying -BUSY like we do with other clients) and
         * later resume the processing. */
        if (server.lua_timedout && c->flags & CLIENT_MASTER) break;

        /* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is
         * written to the client. Make sure to not let the reply grow after
         * this flag has been set (i.e. don't process more commands).
         *
         * The same applies for clients we want to terminate ASAP. */
        if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;

        /* Determine request type when unknown. */
        if (!c->reqtype) {
            if (c->querybuf[c->qb_pos] == '*') {
                c->reqtype = PROTO_REQ_MULTIBULK;
            } else {
                c->reqtype = PROTO_REQ_INLINE;
            }
        }

        //如果是内联模式请求 (telnet)
        if (c->reqtype == PROTO_REQ_INLINE) {
            if (processInlineBuffer(c) != C_OK) break;

        }
        // todo 如果是普通模式请求 (resp)
        else if (c->reqtype == PROTO_REQ_MULTIBULK) {
            if (processMultibulkBuffer(c) != C_OK) break;
        } else {
            serverPanic("Unknown request type");
        }

        /* Multibulk processing could see a <= 0 length. */
        if (c->argc == 0) {
            resetClient(c);
        } else {
            /* Only reset the client when the command was executed. */
        	//todo 执行命令
            if (processCommand(c) == C_OK) {
                if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
                    /* Update the applied replication offset of our master. */
                    c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
                }

                /* Don't reset the client structure for clients blocked in a
                 * module blocking command, so that the reply callback will
                 * still be able to access the client argv and argc field.
                 * The client will be reset in unblockClientFromModule(). */
                if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE)
                    //重置client
                	resetClient(c);
            }
            /* freeMemoryIfNeeded may flush slave output buffers. This may
             * result into a slave, that may be the active client, to be
             * freed. */
            if (server.current_client == NULL) break;
        }
    }

    /* Trim to pos */
    if (c->qb_pos) {
        sdsrange(c->querybuf,c->qb_pos,-1);
        c->qb_pos = 0;
    }

    server.current_client = NULL;
}

5. processCommand

/**
 * 命令的执行
 */
int processCommand(client *c) {
    /* The QUIT command is handled separately. Normal command procs will
     * go through checking for replication and QUIT will cause trouble
     * when FORCE_REPLICATION is enabled and would be implemented in
     * a regular command proc. */
	// quit校验
    if (!strcasecmp(c->argv[0]->ptr,"quit")) {
        addReply(c,shared.ok);
        c->flags |= CLIENT_CLOSE_AFTER_REPLY;
        return C_ERR;
    }

    /* Now lookup the command and check ASAP about trivial error conditions
     * such as wrong arity, bad command name and so forth. */
    // 在命令字典里查找命令是否存在 并赋值到client的 当前执行命令和最后执行命令
    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
    if (!c->cmd) {
        flagTransaction(c);
        sds args = sdsempty();
        int i;
        for (i=1; i < c->argc && sdslen(args) < 128; i++)
            args = sdscatprintf(args, "`%.*s`, ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr);
        addReplyErrorFormat(c,"unknown command `%s`, with args beginning with: %s",
            (char*)c->argv[0]->ptr, args);
        sdsfree(args);
        return C_OK;
    }
    //验证参数数目
    else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
               (c->argc < -c->cmd->arity)) {
        flagTransaction(c);
        addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
            c->cmd->name);
        return C_OK;
    }

    /* Check if the user is authenticated */
    //校验是否有权限
    if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)
    {
        flagTransaction(c);
        addReply(c,shared.noautherr);
        return C_OK;
    }

    /* If cluster is enabled perform the cluster redirection here.
     * However we don't perform the redirection if:
     * 1) The sender of this command is our master.
     * 2) The command has no key arguments. */
    //集群
    if (server.cluster_enabled &&
        !(c->flags & CLIENT_MASTER) &&
        !(c->flags & CLIENT_LUA &&
          server.lua_caller->flags & CLIENT_MASTER) &&
        !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 &&
          c->cmd->proc != execCommand))
    {
        int hashslot;
        int error_code;
        clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
                                        &hashslot,&error_code);
        if (n == NULL || n != server.cluster->myself) {
            if (c->cmd->proc == execCommand) {
                discardTransaction(c);
            } else {
                flagTransaction(c);
            }
            clusterRedirectClient(c,n,hashslot,error_code);
            return C_OK;
        }
    }

    /* Handle the maxmemory directive.
     *
     * First we try to free some memory if possible (if there are volatile
     * keys in the dataset). If there are not the only thing we can do
     * is returning an error.
     *
     * Note that we do not want to reclaim memory if we are here re-entering
     * the event loop since there is a busy Lua script running in timeout
     * condition, to avoid mixing the propagation of scripts with the propagation
     * of DELs due to eviction. */
    //最大内存
    if (server.maxmemory && !server.lua_timedout) {
    	//缓存淘汰
        int out_of_memory = freeMemoryIfNeeded() == C_ERR;
        /* freeMemoryIfNeeded may flush slave output buffers. This may result
         * into a slave, that may be the active client, to be freed. */
        if (server.current_client == NULL) return C_ERR;

        /* It was impossible to free enough memory, and the command the client
         * is trying to execute is denied during OOM conditions? Error. */
        if ((c->cmd->flags & CMD_DENYOOM) && out_of_memory) {
            flagTransaction(c);
            addReply(c, shared.oomerr);
            return C_OK;
        }
    }

    /* Don't accept write commands if there are problems persisting on disk
     * and if this is a master instance. */
    int deny_write_type = writeCommandsDeniedByDiskError();
    if (deny_write_type != DISK_ERROR_TYPE_NONE &&
        server.masterhost == NULL &&
        (c->cmd->flags & CMD_WRITE ||
         c->cmd->proc == pingCommand))
    {
        flagTransaction(c);
        if (deny_write_type == DISK_ERROR_TYPE_RDB)
            addReply(c, shared.bgsaveerr);
        else
            addReplySds(c,
                sdscatprintf(sdsempty(),
                "-MISCONF Errors writing to the AOF file: %s\r\n",
                strerror(server.aof_last_write_errno)));
        return C_OK;
    }

    /* Don't accept write commands if there are not enough good slaves and
     * user configured the min-slaves-to-write option. */
    if (server.masterhost == NULL &&
        server.repl_min_slaves_to_write &&
        server.repl_min_slaves_max_lag &&
        c->cmd->flags & CMD_WRITE &&
        server.repl_good_slaves_count < server.repl_min_slaves_to_write)
    {
        flagTransaction(c);
        addReply(c, shared.noreplicaserr);
        return C_OK;
    }

    /* Don't accept write commands if this is a read only slave. But
     * accept write commands if this is our master. */
    if (server.masterhost && server.repl_slave_ro &&
        !(c->flags & CLIENT_MASTER) &&
        c->cmd->flags & CMD_WRITE)
    {
        addReply(c, shared.roslaveerr);
        return C_OK;
    }

    /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
    if (c->flags & CLIENT_PUBSUB &&
        c->cmd->proc != pingCommand &&
        c->cmd->proc != subscribeCommand &&
        c->cmd->proc != unsubscribeCommand &&
        c->cmd->proc != psubscribeCommand &&
        c->cmd->proc != punsubscribeCommand) {
        addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context");
        return C_OK;
    }

    /* Only allow commands with flag "t", such as INFO, SLAVEOF and so on,
     * when slave-serve-stale-data is no and we are a slave with a broken
     * link with master. */
    if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&
        server.repl_serve_stale_data == 0 &&
        !(c->cmd->flags & CMD_STALE))
    {
        flagTransaction(c);
        addReply(c, shared.masterdownerr);
        return C_OK;
    }

    /* Loading DB? Return an error if the command has not the
     * CMD_LOADING flag. */
    if (server.loading && !(c->cmd->flags & CMD_LOADING)) {
        addReply(c, shared.loadingerr);
        return C_OK;
    }

    /* Lua script too slow? Only allow a limited number of commands. */
    if (server.lua_timedout &&
          c->cmd->proc != authCommand &&
          c->cmd->proc != replconfCommand &&
        !(c->cmd->proc == shutdownCommand &&
          c->argc == 2 &&
          tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&
        !(c->cmd->proc == scriptCommand &&
          c->argc == 2 &&
          tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))
    {
        flagTransaction(c);
        addReply(c, shared.slowscripterr);
        return C_OK;
    }

    /* Exec the command */
    //事务
    if (c->flags & CLIENT_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
    {
        queueMultiCommand(c);
        addReply(c,shared.queued);
    }
    // 真正执行
    else {
        call(c,CMD_CALL_FULL);
        c->woff = server.master_repl_offset;
        if (listLength(server.ready_keys))
            handleClientsBlockedOnKeys();
    }
    return C_OK;
}

看到这里基本上就可以了,不再往下追了,核心逻辑就是根据命令,比如get命令来找一个redisCommand结构体的指针,这个结构体包含了get命令处理函数(getCommand)、参数数量、命令属性;接下来,redis通过call函数调用get命令的处理函数getCommand,getCommand 函数根据提供的键 mykey 从数据库中获取对应的值,并将结果返回给客户端

6. addReply

void addReply(client *c, robj *obj) {
	// 判断是否需求返回
    if (prepareClientToWrite(c) != C_OK) return;

    if (sdsEncodedObject(obj)) {
    	//先写入缓冲区(buffer) 如果写入失败
        if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
        	//则写入应答列表
            _addReplyStringToList(c,obj->ptr,sdslen(obj->ptr));
    } else if (obj->encoding == OBJ_ENCODING_INT) {
        /* For integer encoded strings we just convert it into a string
         * using our optimized function, and attach the resulting string
         * to the output buffer. */
        char buf[32];
        size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
        if (_addReplyToBuffer(c,buf,len) != C_OK)
            _addReplyStringToList(c,buf,len);
    } else {
        serverPanic("Wrong obj->encoding in addReply()");
    }
}

prepareClientToWrite:
在这里插入图片描述
这里会写入到用户待处理列表,在beforeSleep函数那里会遍历
在这里插入图片描述

7. 总结

在这里插入图片描述

标签:querybuf,cmd,Redis,server,源码,flags,client,&&,浅析
From: https://blog.csdn.net/LittleStar_Cao/article/details/141536484

相关文章