文章很长,且持续更新,建议收藏起来,慢慢读!疯狂创客圈总目录 博客园版 为您奉上珍贵的学习资源 :
免费赠送 :《尼恩Java面试宝典》 持续更新+ 史上最全 + 面试必备 2000页+ 面试必备 + 大厂必备 +涨薪必备
免费赠送 :《尼恩技术圣经+高并发系列PDF》 ,帮你 实现技术自由,完成职业升级, 薪酬猛涨!加尼恩免费领
免费赠送 经典图书:《Java高并发核心编程(卷1)加强版》 面试必备 + 大厂必备 +涨薪必备 加尼恩免费领
免费赠送 经典图书:《Java高并发核心编程(卷2)加强版》 面试必备 + 大厂必备 +涨薪必备 加尼恩免费领
免费赠送 经典图书:《Java高并发核心编程(卷3)加强版》 面试必备 + 大厂必备 +涨薪必备 加尼恩免费领
免费赠送 资源宝库: Java 必备 百度网盘资源大合集 价值>10000元 加尼恩领取
从0开始,手写Redis
说在前面
从0开始,手写一个Redis的学习价值在于:
-
可以深入地理解Redis的内部机制和原理,Redis可谓是面试的绝对重点和难点,
-
从而更好地掌握Redis的使用和优化。
-
帮助你提高编程能力和解决问题的能力。
-
手写一个Redis可以作为一个优质的简历轮子项目,注意,是高质量的简历轮子项目
很多小伙伴的项目都很low,极度缺乏轮子项目,so,轮子项目这就来了。
尼恩架构团队从架构师视角出发,基于尼恩 3高架构知识宇宙,写一本《从0开始,手写Redis》。
后面会不断升级,不断 迭代, 变成大家学习和面试的必读书籍。
最新《尼恩 架构笔记》《尼恩高并发三部曲》《尼恩Java面试宝典》的PDF文件,请到公号【技术自由圈】获取
本文目录
目录- 从0开始,手写Redis
作者介绍
一作:Khan, 架构师,十年后台开发经验 精通spring、rocketmq、多线程等框架和中间件源码。
二作: 尼恩,资深系统架构师、IT领域资深作家、著名博主。近20年高性能Web平台、高性能通信、高性能搜索、数据挖掘架构经验。资深的转架构导师,成功指导了N多个小伙升级架构师,有的小伙拿到年薪90W。
实现 Redis 协议解析器
首先,从0开始,实现协议解析器。
1、Redis网络协议详解
那实战之前,需要先搞清楚什么叫red的协议。
Redis用的什么协议?所谓的redis的协议就叫resp协议,它的全称叫REdis Serialization Portocol(RESP),就是redis的序列化协议,也就是reids的客户端和服务端通信的协议。即通过TCP连接,发送什么样的数据才能代表redis的通信。
RESP协议有五种数据的格式
-
正常回复
-
错误回复
-
整数
-
多行字符串
-
数组
1.1 正常回复
正常回复,是以+
开头,以\r\n
结尾,大家都知道\r\n
是CRLF对,是Windows系统常用的这个换行符,那么redis这里呢,也借鉴了Windows的这个换行符。
举个例子,比如说我们对redis发出一个指令,redis会回给我们一个正常。
正常就是OK,那他回给我们OK怎么回呢?
就是:
+OK\r\n
+
表示这是一个正常回复,OK是正常回复的内容,\r\n
是结束。
1.2 错误回复
错误回复是以-
开头,以\r\n
结尾的字符串形式,就是你发送的这个指令不对有问题。
redis会给你回复一个错误
例如:
-Error message\r\n
-
表示这是一个错误回复,message是错误回复的内容,\r\n
是结束。
1.3 整数
这个就不是单单是redis 服务端回复给客户端的消息了,是客户端和redis 服务端互相通信,客户端要发送一个整数的话,那就用这么一个格式,是:
开头写整数,然后以\r\n
结束。
例如:
:123456\r\n
1.4 多行字符串
多行字符串是以$
开头,后面跟字节数以杠\r\n
结尾
比如,如果要发送hello world,那么:
$11\r\nhello world\r\n
如果是空字符串,就是:
$0\r\n\r\n
如果字符串里面本来就有\r\n
,比如hello\r\nworld
那么:
$14\r\nhello\r\nworld\r\n
1.5 数组
数组是以*
开头,后面跟成员的个数
比如set key value这个字符串
TCP网络报文则为以下样式:
*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
*
后面的3表示有三个成员,$3
表示set是三个字符,下一个$3
表示key是三个字符,$5
表示value是五个字符。
2、实现constreply
2.1 Connection接口
Connection是在redis的协议层,它代表着一个redis的连接,之所以要写接口,因为未来Connection会有不同的实现,它跟持久化是相关的。它有什么方法呢?
一个是GetDBIndex方法,它是什么意思呢?大家知道redis通常它会有DB的概念,比如说它默认有16个DB,就是说每一个DB之间的KV是隔离开的。客户端连接查询的时候需要知道现在用的是哪个DB,就需要这个方法。
还有SelectDB方法,这个指令是切DB库,Redis的内核可能有16个库,甚至更多,那如果客户想在各个这个库之间去切换,其实就是切换 这个客户他的标识。
type Connection interface {
GetDBIndex() int
SelectDB(int)
}
2.2 Reply接口
这个接口主要是来代表一类数据,这一类数据是各种服务端对客户端的回复,叫Reply,都可以用这个接口来表示,所以就有很多的实现,它必须要实现一个方法叫ToBytes,就是把我们的回复的内容转成字节。之所以要把回复的内容转成字节,是因为TCP协议来回写的就是写字节流。
type Reply interface {
ToBytes() []byte
}
2.3 正常的常量回复
建立一个consts.go文件,consts.go文件用于保存一些固定的回复
2.3.1 PongReply
Redis有一个ping指令,客户端发ping,服务端就要回复pong,新建一个PongReply实现Reply接口:
/**
按照RESP协议的约定,正常回复以"+"开头,以"\r\n"结尾
*/
var pongbytes = []byte("+PONG\r\n")
func (r PongReply) ToBytes() []byte {
return pongbytes;
}
写一个make方法,方便调用:
func MakePongReply() *PongReply {
return &PongReply{}
}
2.3.2 OKReply
同理,再写一个回复OK的Reply:
// OkReply is +OK
type OkReply struct{}
var okBytes = []byte("+OK\r\n")
// ToBytes marshal redis.Reply
func (r *OkReply) ToBytes() []byte {
return okBytes
}
var theOkReply = new(OkReply)
// MakeOkReply returns a ok protocol
func MakeOkReply() *OkReply {
return theOkReply
}
2.3.3 NullBulkReply
这是一个空的块回复:
var nullBulkBytes = []byte("$-1\r\n")
// NullBulkReply is empty string
type NullBulkReply struct{}
// ToBytes marshal redis.Reply
func (r *NullBulkReply) ToBytes() []byte {
return nullBulkBytes
}
// MakeNullBulkReply creates a new NullBulkReply
func MakeNullBulkReply() *NullBulkReply {
return &NullBulkReply{}
}
2.3.4 EmptyMultiBulkReply
空数组回复:
var emptyMultiBulkBytes = []byte("*0\r\n")
// EmptyMultiBulkReply is a empty list
type EmptyMultiBulkReply struct{}
// ToBytes marshal redis.Reply
func (r *EmptyMultiBulkReply) ToBytes() []byte {
return emptyMultiBulkBytes
}
// MakeEmptyMultiBulkReply creates EmptyMultiBulkReply
func MakeEmptyMultiBulkReply() *EmptyMultiBulkReply {
return &EmptyMultiBulkReply{}
}
2.3.5 NoReply
空回复:
type NoReply struct{}
var noBytes = []byte("")
// ToBytes marshal redis.Reply
func (r *NoReply) ToBytes() []byte {
return noBytes
}
2.4 错误的常量回复
建立一个reply.go文件,在reply.go文件为所有错误回复类型写一个接口:
type ErrorReply interface {
Error() string
ToBytes() []byte
}
这一个接口实现了两个接口:实现了系统的builtin.go下的error接口以及resp\reply.go下的Reply接口,所以这是一个把两个接口缝合在一起的接口。
新建一个error.go,里面全是各种错误回复
2.4.1 UnknownErrReply
未知错误:
type UnknownErrReply struct{}
var unknownErrBytes = []byte("-Err unknown\r\n")
// ToBytes marshals redis.Reply
func (r *UnknownErrReply) ToBytes() []byte {
return unknownErrBytes
}
func (r *UnknownErrReply) Error() string {
return "Err unknown"
}
2.4.2 ArgNumErrReply
参数个数错误:
/*
*
Cmd用于将客户端传过来的命令回复过去
*/
type ArgNumErrReply struct {
Cmd string
}
// ToBytes marshals redis.Reply
func (r *ArgNumErrReply) ToBytes() []byte {
return []byte("-ERR wrong number of arguments for '" + r.Cmd + "' command\r\n")
}
func (r *ArgNumErrReply) Error() string {
return "ERR wrong number of arguments for '" + r.Cmd + "' command"
}
// MakeArgNumErrReply represents wrong number of arguments for command
func MakeArgNumErrReply(cmd string) *ArgNumErrReply {
return &ArgNumErrReply{
Cmd: cmd,
}
}
2.4.3 SyntaxErrReply
语法错误:
type SyntaxErrReply struct{}
var syntaxErrBytes = []byte("-Err syntax error\r\n")
var theSyntaxErrReply = &SyntaxErrReply{}
// MakeSyntaxErrReply creates syntax error
func MakeSyntaxErrReply() *SyntaxErrReply {
return theSyntaxErrReply
}
// ToBytes marshals redis.Reply
func (r *SyntaxErrReply) ToBytes() []byte {
return syntaxErrBytes
}
func (r *SyntaxErrReply) Error() string {
return "Err syntax error"
}
2.4.4 WrongTypeErrReply
数据类型错误:
type WrongTypeErrReply struct{}
var wrongTypeErrBytes = []byte("-WRONGTYPE Operation against a key holding the wrong kind of value\r\n")
// ToBytes marshals redis.Reply
func (r *WrongTypeErrReply) ToBytes() []byte {
return wrongTypeErrBytes
}
func (r *WrongTypeErrReply) Error() string {
return "WRONGTYPE Operation against a key holding the wrong kind of value"
}
2.4.5 ProtocolErrReply
接口协议错误,客户端发过来的指令不符合RESP规范时的回复:
type ProtocolErrReply struct {
Msg string
}
// ToBytes marshals redis.Reply
func (r *ProtocolErrReply) ToBytes() []byte {
return []byte("-ERR Protocol error: '" + r.Msg + "'\r\n")
}
func (r *ProtocolErrReply) Error() string {
return "ERR Protocol error '" + r.Msg + "' command"
}
2.5 实现自定义Reply
2.5.1 CRLF
定义一个变量CRLF
,用于表示Redis序列化协议中的行分隔符。Redis是一个内存数据库,使用自定义的序列化协议进行数据传输和存储。在Redis序列化协议中,每个数据结构都以某种特定的格式进行序列化,并且使用\r\n
作为行分隔符
var (
CRLF = "\r\n"
)
2.5.2 BulkReply
BulkReply是用来定义对字符串的回复的。
type BulkReply struct {
Arg []byte
}
BulkReply要实现Reply接口就要重写ToBytes()
方法,比如,如果输出是"hello world",根据RESP协议,就要转换为"$11\r\nhello world\r\n"
,strconv.Itoa(len(r.Arg))
把参数的长度转化为字符串,所以ToBytes()
的最终实现为:
func (r *BulkReply) ToBytes() []byte {
if r.Arg == nil {
return nullBulkBytes
}
return []byte("$" + strconv.Itoa(len(r.Arg)) + CRLF + string(r.Arg) + CRLF)
}
2.5.3 MultiBulkReply
数组的回复,注意参数Args [][]byte
是个二维数组:
type MultiBulkReply struct {
Args [][]byte
}
// MakeMultiBulkReply creates MultiBulkReply
func MakeMultiBulkReply(args [][]byte) *MultiBulkReply {
return &MultiBulkReply{
Args: args,
}
}
// ToBytes marshal redis.Reply
func (r *MultiBulkReply) ToBytes() []byte {
argLen := len(r.Args)
var buf bytes.Buffer
buf.WriteString("*" + strconv.Itoa(argLen) + CRLF)
for _, arg := range r.Args {
if arg == nil {
buf.WriteString("$-1" + CRLF)
} else {
buf.WriteString("$" + strconv.Itoa(len(arg)) + CRLF + string(arg) + CRLF)
}
}
return buf.Bytes()
}
上面这段代码定义了一个MultiBulkReply
类型的方法ToBytes()
,用于将MultiBulkReply
类型的对象转换为字节数组([]byte
类型)。
在Redis协议中,批量回复的数据类型由一个以*
开头的整数表示,后跟一组以$
开头的数据块。每个数据块都由一个以$
开头的长度表示,后跟实际数据和一个行分隔符组成。
在ToBytes()
方法中,首先获取MultiBulkReply
类型对象中参数的数量argLen
。然后,使用bytes.Buffer
类型的变量buf
创建一个缓冲区,并将*
和参数数量写入缓冲区。
接着,遍历MultiBulkReply
类型对象中的所有参数。如果参数为nil
,则将$-1
写入缓冲区,表示该参数为空。否则,将参数的长度和数据块写入缓冲区。
最后,将缓冲区转换为字节数组并返回。该方法返回的字节数组可以用于网络传输或持久化存储等操作。
2.5.4 StatusReply
状态回复:
type StatusReply struct {
Status string
}
// MakeStatusReply creates StatusReply
func MakeStatusReply(status string) *StatusReply {
return &StatusReply{
Status: status,
}
}
// ToBytes marshal redis.Reply
func (r *StatusReply) ToBytes() []byte {
return []byte("+" + r.Status + CRLF)
}
2.5.4 IntReply
数字回复:
type IntReply struct {
Code int64
}
// MakeIntReply creates int protocol
func MakeIntReply(code int64) *IntReply {
return &IntReply{
Code: code,
}
}
// ToBytes marshal redis.Reply
func (r *IntReply) ToBytes() []byte {
return []byte(":" + strconv.FormatInt(r.Code, 10) + CRLF)
}
2.5.4 StandardErrReply
标准错误回复:
// StandardErrReply represents server error
type StandardErrReply struct {
Status string
}
// MakeErrReply creates StandardErrReply
func MakeErrReply(status string) *StandardErrReply {
return &StandardErrReply{
Status: status,
}
}
// ToBytes marshal redis.Reply
func (r *StandardErrReply) ToBytes() []byte {
return []byte("-" + r.Status + CRLF)
}
func (r *StandardErrReply) Error() string {
return r.Status
}
判断是不是错误,根据第一个字节是不是"-":
func IsErrorReply(reply resp.Reply) bool {
return reply.ToBytes()[0] == '-'
}
3、实现ParseStream
客户端发给服务端的二进制字节流,里边有*
、$
以及\r\n
,这一节,我们就来把它们解析成真正的含义。
新建一个parser.go用于处理字节流的解析
3.1 Payload
定义一个Payload
类型,用于表示Redis命令的返回结果。
Payload
类型包含两个字段:Data
和Err
。Data
字段的类型是resp.Reply
,表示Redis命令的返回结果。resp
包提供了Redis协议的解析和序列化功能,Reply
类型是解析后的Redis命令返回结果的数据类型之一。Err
字段的类型是error
,表示Redis命令执行过程中可能出现的错误。
使用Payload
类型可以方便地封装Redis命令的返回结果,并进行传递和处理。在使用Payload
类型时,可以通过检查Err
字段来判断Redis命令是否执行成功,并通过Data
字段获取Redis命令的返回结果。
type Payload struct {
Data resp.Reply
Err error
}
3.2 readState
定义一个readState
类型,用于解析Redis协议中的读取状态。
readState
类型包含以下字段:
readingMultiLine
表示是否正在读取多行数据。expectedArgsCount
表示预期读取的参数数量。msgType
表示当前读取的消息类型。args
表示已经读取的参数列表。bulkLen
表示当前正在读取的块数据的长度。
使用readState
类型可以方便地记录Redis协议解析的状态,并在解析完整个协议时返回解析结果。在解析Redis命令时,可以通过读取状态来判断当前正在读取的协议数据类型,并逐步解析出完整的Redis命令和参数列表。
type readState struct {
readingMultiLine bool
expectedArgsCount int
msgType byte
args [][]byte
bulkLen int64
}
3.3 finished()
定义一个readState
类型的方法finished()
,用于判断是否已经完成了Redis协议的解析。
在Redis协议中,每个命令的参数数量是固定的,因此可以通过读取状态中的expectedArgsCount
字段来判断是否已经读取了所有的参数。如果已经读取了所有的参数,则说明协议解析已经完成。
在finished()
方法中,如果expectedArgsCount
大于0且已经读取的参数数量等于expectedArgsCount
,则返回true
,表示已经完成了Redis协议的解析。否则,返回false
,表示解析还未完成。
func (s *readState) finished() bool {
return s.expectedArgsCount > 0 && len(s.args) == s.expectedArgsCount
}
3.4 ParseStream()
定义一个ParseStream
函数,用于解析Redis协议的数据流并返回一个通道(channel)以供读取解析结果。
ParseStream
函数的第一个参数是一个实现了io.Reader
接口的对象,用于从数据流中读取Redis协议的数据。第二个参数是一个只读通道,用于向调用者返回解析结果。
在ParseStream
函数中,首先创建一个缓冲区通道ch
,用于在解析过程中存储解析结果。然后,启动一个协程parse0
来执行解析操作,将解析结果写入缓冲区通道中。最后,返回缓冲区通道,供调用者读取解析结果。
通过使用通道,ParseStream
函数可以在解析Redis协议的数据流时,实现异步、非阻塞的解析过程。这种方式可以提高解析效率,避免阻塞和死锁等问题。
func ParseStream(reader io.Reader) <-chan *Payload {
ch := make(chan *Payload)
go parse0(reader, ch)
return ch
}
3.5 readLine()
定义一个readLine
函数,用来读取一行数据,它有两个参数:一个是指向 bufio.Reader
的指针,另一个是指向 readState
结构体的指针。该函数返回三个值:一个字节切片 msg
,一个布尔值表示读取是否完成,和一个错误值。
该函数从 bufio.Reader
中读取数据,直到遇到换行符('\n'
)。如果 readState
结构体中的 bulkLen
字段为零,则函数一直读取,直到遇到换行符为止。如果 bulkLen
字段为非零值,则函数读取由 bulkLen
值确定的特定字节数。
如果读取操作遇到错误,函数将返回一个错误值以及部分读取的数据。如果读取操作成功,则函数检查数据是否以回车符('\r'
)和换行符('\n'
)结尾。如果不是,则函数返回一个表示协议错误的错误值。
如果读取操作成功并且数据以回车符和换行符结尾,则函数将 bulkLen
字段设置为零,并返回数据以及一个布尔值,表示读取操作已完成(false
)。
func readLine(bufReader *bufio.Reader, state *readState) ([]byte, bool, error) {
var msg []byte
var err error
if state.bulkLen == 0 {
msg, err = bufReader.ReadBytes('\n')
if err != nil {
return nil, true, err
}
if len(msg) == 0 || msg[len(msg)-2] != '\r' {
return nil, false, errors.New("protocol error:" + string(msg))
}
} else {
msg = make([]byte, state.bulkLen+2)
_, err = io.ReadFull(bufReader, msg)
if err != nil {
return nil, true, err
}
if len(msg) == 0 || msg[len(msg)-2] != '\r' || msg[len(msg)-1] != '\n' {
return nil, false, errors.New("protocol error:" + string(msg))
}
state.bulkLen = 0
}
return msg, false, nil
}
3.6 parseMultiBulkHeader
定义parseMultiBulkHeader
函数:
首先,该函数接收两个参数:msg
和 state
,其中 msg
是 Redis 返回的多行字符串的头部信息,state
是当前解析状态的一个结构体,用于存储解析过程中的相关状态信息。
然后,该函数使用 strconv.ParseUint()
方法将头部信息中的数字解析成一个无符号整数,存储在 expectedLine
变量中。如果解析失败,则返回一个错误。如果解析成功且得到的数字为 0,则表示 Redis 返回的是一个空的多行字符串,此时将 expectedArgsCount
置为 0,并返回。如果得到的数字大于 0,则表示 Redis 返回的是一个非空的多行字符串,此时将 msgType
置为头部信息的第一个字符,表示该多行字符串的类型,将 readingMultiLine
置为 true
,表示正在读取多行字符串,将 expectedArgsCount
置为 expectedLine
,表示该多行字符串中包含的行数,最后使用 make()
方法创建一个长度为 expectedLine
的空字节数组切片 args
,用于存储解析得到的每一行字符串的字节数组。
如果得到的数字小于 0,则表示头部信息格式不正确,将返回一个错误。
需要注意的是,在 Redis 协议解析器中,多行字符串是由多个行组成的,每个行都是一个简单字符串,多行字符串以 $
符号开头,后接一个数字,表示该多行字符串中行的数量,然后是多个以 \r\n
分隔的简单字符串,每个简单字符串以 $
符号开头,后接一个数字,表示该简单字符串的长度,然后是该简单字符串的内容。因此,解析多行字符串需要先解析头部信息,得到多行字符串中包含的行数,然后逐行解析。
func parseMultiBulkHeader(msg []byte, state *readState) error {
var err error
var expectedLine uint64
expectedLine, err = strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32)
if err != nil {
return errors.New("protocol error: " + string(msg))
}
if expectedLine == 0 {
state.expectedArgsCount = 0
return nil
} else if expectedLine > 0 {
// first line of multi bulk reply
state.msgType = msg[0]
state.readingMultiLine = true
state.expectedArgsCount = int(expectedLine)
state.args = make([][]byte, 0, expectedLine)
return nil
} else {
return errors.New("protocol error: " + string(msg))
}
}
3.7 parseBulkHeader
parseBulkHeader用于解析 Redis 返回的简单字符串和二进制安全字符串的长度信息
首先,该函数接收两个参数:msg
和 state
,其中 msg
是 Redis 返回的简单字符串或二进制安全字符串的长度信息,state
是当前解析状态的一个结构体,用于存储解析过程中的相关状态信息。
然后,该函数使用 strconv.ParseInt()
方法将长度信息解析成一个有符号整数,存储在 bulkLen
变量中。如果解析失败,则返回一个错误。如果解析成功且得到的数字为 -1,则表示 Redis 返回的是一个空的简单字符串或二进制安全字符串,此时直接返回。如果得到的数字大于 0,则表示 Redis 返回的是一个非空的二进制安全字符串,此时将 msgType
置为长度信息的第一个字符,表示该字符串的类型,将 readingMultiLine
置为 true
,表示正在读取二进制安全字符串,将 expectedArgsCount
置为 1,表示该字符串只包含一个元素,最后创建一个长度为 1 的空字节数组切片 args
,用于存储解析得到的字符串。如果得到的数字小于 0,则表示长度信息格式不正确,将返回一个错误。
需要注意的是,在 Redis 协议解析器中,简单字符串和二进制安全字符串的长度信息以 $
符号开头,后接一个数字,表示字符串的长度,然后是字符串的内容。因此,解析简单字符串和二进制安全字符串需要先解析长度信息,得到字符串的长度,然后再读取该长度的字节数组,即为字符串的内容。
func parseBulkHeader(msg []byte, state *readState) error {
var err error
state.bulkLen, err = strconv.ParseInt(string(msg[1:len(msg)-2]), 10, 64)
if err != nil {
return errors.New("protocol error: " + string(msg))
}
if state.bulkLen == -1 { // null bulk
return nil
} else if state.bulkLen > 0 {
state.msgType = msg[0]
state.readingMultiLine = true
state.expectedArgsCount = 1
state.args = make([][]byte, 0, 1)
return nil
} else {
return errors.New("protocol error: " + string(msg))
}
}
3.8 parseSingleLineReply
parseSingleLineReply用于解析 Redis 返回的单行回复。以下是该函数的详细解释:
该函数接收一个字节数组 msg
,该字节数组是 Redis 返回的单行回复,函数将该字节数组转换为字符串,然后根据字符串的第一个字符,将单行回复的类型分为三种情况:状态回复、错误回复和整数回复。
对于状态回复,函数使用 strings.TrimSuffix()
方法去掉字符串末尾的 \r\n
,然后调用 reply.MakeStatusReply()
方法创建一个状态回复对象,将该对象作为结果返回。
对于错误回复,函数也是使用 strings.TrimSuffix()
方法去掉字符串末尾的 \r\n
,然后调用 reply.MakeErrReply()
方法创建一个错误回复对象,将该对象作为结果返回。
对于整数回复,函数先将字符串转换为有符号整数,然后调用 reply.MakeIntReply()
方法创建一个整数回复对象,将该对象作为结果返回。如果转换失败,则说明返回的单行回复格式不正确,函数将返回一个错误。
需要注意的是,在 Redis 协议解析器中,单行回复以一种特定的格式返回,即以一个特定字符开头,后接一个字符串,最后以 \r\n
结尾。其中,以 +
开头的字符串表示状态回复,以 -
开头的字符串表示错误回复,以 :
开头的字符串表示整数回复。
func parseSingleLineReply(msg []byte) (resp.Reply, error) {
str := strings.TrimSuffix(string(msg), "\r\n")
var result resp.Reply
switch msg[0] {
case '+': // status reply
result = reply.MakeStatusReply(str[1:])
case '-': // err reply
result = reply.MakeErrReply(str[1:])
case ':': // int reply
val, err := strconv.ParseInt(str[1:], 10, 64)
if err != nil {
return nil, errors.New("protocol error: " + string(msg))
}
result = reply.MakeIntReply(val)
}
return result, nil
}
3.9 readBody
readBody用于读取 Redis 返回的多行字符串或二进制安全字符串中的每一行。以下是该函数的详细解释:
该函数接收两个参数:msg
和 state
,其中 msg
是 Redis 返回的多行字符串或二进制安全字符串的一行内容,state
是当前解析状态的一个结构体,用于存储解析过程中的相关状态信息。
首先,该函数将 msg
中的内容去除末尾的 \r\n
,并将结果存储在 line
变量中。然后,函数判断 line
的第一个字符是否为 $
。如果是,则表示该行是一个二进制安全字符串的长度信息,该函数使用 strconv.ParseInt()
方法将长度信息解析成一个有符号整数,存储在 bulkLen
变量中。如果解析失败,则返回一个错误。如果解析成功且得到的数字小于等于 0,则表示该行是一个空的二进制安全字符串,在多行字符串中,将空字符串添加到 args
中,并将 bulkLen
置为 0。如果得到的数字大于 0,则表示该行是一个非空的二进制安全字符串,将其添加到 args
中,并将 bulkLen
置为该字符串的长度。
如果 line
的第一个字符不是 $
,则表示该行是一个简单字符串,将其添加到 args
中。
最后,该函数返回一个空的错误。
需要注意的是,在 Redis 协议解析器中,多行字符串是由多个行组成的,每个行都是一个简单字符串或二进制安全字符串,多行字符串以 $
符号开头,后接一个数字,表示该多行字符串中行的数量,然后是多个以 \r\n
分隔的简单字符串或二进制安全字符串,每个简单字符串以 $
符号开头,后接一个数字,表示该简单字符串或二进制安全字符串的长度,然后是该简单字符串或二进制安全字符串的内容。因此,解析多行字符串需要先解析头部信息,得到多行字符串中包含的行数,然后逐行解析。
func readBody(msg []byte, state *readState) error {
line := msg[0 : len(msg)-2]
var err error
if line[0] == '$' {
// bulk reply
state.bulkLen, err = strconv.ParseInt(string(line[1:]), 10, 64)
if err != nil {
return errors.New("protocol error: " + string(msg))
}
if state.bulkLen <= 0 { // null bulk in multi bulks
state.args = append(state.args, []byte{})
state.bulkLen = 0
}
} else {
state.args = append(state.args, line)
}
return nil
}
3.10 parse0()
parse0()用于解析 Redis 返回的数据流并将解析结果发送到一个数据通道。以下是该函数的详细解释:
该函数接收两个参数:reader
和 ch
,其中 reader
是一个实现了 io.Reader
接口的对象,用于从 Redis 服务器读取数据流,ch
是一个通道,用于发送解析结果。
首先,该函数创建一个带有缓冲区的读取器 bufReader
,用于从 reader
中读取数据流。然后创建一个初始状态 state
,用于存储解析过程中的相关状态信息。接着进入一个无限循环,并在每次循环中执行以下操作:
- 读取一行数据。该函数调用
readLine()
方法从bufReader
中读取一行数据,并将读取的结果和错误信息存储在msg
、ioErr
和err
变量中。如果读取成功,则将msg
传递给解析函数进行解析,否则根据ioErr
判断是否遇到了 I/O 错误,如果是,则关闭数据通道并返回;否则说明遇到了协议错误,将错误信息发送到数据通道并重置解析状态state
,然后继续下一轮循环。 - 解析一行数据。根据读取到的数据
msg
和解析状态state
,该函数调用不同的解析函数进行解析。如果state.readingMultiLine
为false
,则表示正在解析一个新的回复,函数将根据第一个字符判断回复类型并调用相应的解析函数进行解析。如果state.readingMultiLine
为true
,则表示正在解析一个多行回复中的一行,函数将调用readBody()
方法读取该行的内容并存储在解析状态state
中,如果该多行回复中的所有行都已经读取完毕,则将解析得到的结果发送到数据通道,并重置解析状态state
,然后继续下一轮循环。
需要注意的是,在 Redis 协议解析器中,数据流由多个回复组成,每个回复由多个行组成。多行回复以 $
符号开头,后接一个数字,表示该多行回复中行的数量,然后是多个以 \r\n
分隔的简单字符串或二进制安全字符串。简单字符串和二进制安全字符串以 $
符号开头,后接一个数字,表示该简单字符串或二进制安全字符串的长度,然后是该简单字符串或二进制安全字符串的内容。单行回复以一种特定的格式返回,即以一个特定字符开头,后接一个字符串,最后以 \r\n
结尾。因此,解析数据流需要先解析头部信息,得到回复类型和行数,然后逐行解析。
func parse0(reader io.Reader, ch chan<- *Payload) {
defer func() {
if err := recover(); err != nil {
logger.Error(string(debug.Stack()))
}
}()
bufReader := bufio.NewReader(reader)
var state readState
var err error
var msg []byte
for {
// read line
var ioErr bool
msg, ioErr, err = readLine(bufReader, &state)
if err != nil {
if ioErr { // encounter io err, stop read
ch <- &Payload{
Err: err,
}
close(ch)
return
}
// protocol err, reset read state
ch <- &Payload{
Err: err,
}
state = readState{}
continue
}
// parse line
if !state.readingMultiLine {
// receive new response
if msg[0] == '*' {
// multi bulk reply
err = parseMultiBulkHeader(msg, &state)
if err != nil {
ch <- &Payload{
Err: errors.New("protocol error: " + string(msg)),
}
state = readState{} // reset state
continue
}
if state.expectedArgsCount == 0 {
ch <- &Payload{
Data: &reply.EmptyMultiBulkReply{},
}
state = readState{} // reset state
continue
}
} else if msg[0] == '$' { // bulk reply
err = parseBulkHeader(msg, &state)
if err != nil {
ch <- &Payload{
Err: errors.New("protocol error: " + string(msg)),
}
state = readState{} // reset state
continue
}
if state.bulkLen == -1 { // null bulk reply
ch <- &Payload{
Data: &reply.NullBulkReply{},
}
state = readState{} // reset state
continue
}
} else {
// single line reply
result, err := parseSingleLineReply(msg)
ch <- &Payload{
Data: result,
Err: err,
}
state = readState{} // reset state
continue
}
} else {
// receive following bulk reply
err = readBody(msg, &state)
if err != nil {
ch <- &Payload{
Err: errors.New("protocol error: " + string(msg)),
}
state = readState{} // reset state
continue
}
// if sending finished
if state.finished() {
var result resp.Reply
if state.msgType == '*' {
result = reply.MakeMultiBulkReply(state.args)
} else if state.msgType == '$' {
result = reply.MakeBulkReply(state.args[0])
}
ch <- &Payload{
Data: result,
Err: err,
}
state = readState{}
}
}
}
}
4、实现Connection
这一节,我们要写一个新的Handler让它处理用户发过来的请求,把这个请求转发给解析器,让解析器去解析。
4.1 Connection结构体
这个结构体代表我们的协议层,对每一个连接上的客户端的描述
conn
类型为net.Conn
:这个字段是一个网络套接字的连接。waitingReply
类型为wait.Wait
:这个字段是一个Wait
结构体的实例,可能用于同步并发访问连接。mu
类型为sync.Mutex
:这个字段是一个互斥锁,用于同步对连接的访问。selectDB
类型为int
:这个字段存储当前选定数据库的索引。
type Connection struct {
conn net.Conn
waitingReply wait.Wait
mu sync.Mutex
selectDB int
}
4.2 close方法
close方法的作用是关闭与此连接相关联的网络连接。具体来说,它首先调用waitingReply
字段上的WaitWithTimeout()
方法,该方法将等待所有正在进行的操作完成,或者在10秒后超时。然后它调用conn
字段上的Close()
方法来关闭连接。
最后,它返回nil
表示没有错误发生。如果在关闭连接时发生错误,将返回一个非nil
的error
值,以指示关闭连接时出现了问题。
因此,这个Close
方法允许用户以安全的方式关闭连接,并等待所有正在进行的操作完成。超时机制可以确保在某些情况下不会无限期地等待连接关闭,从而导致程序锁死。
func (c *Connection) Close() error {
c.waitingReply.WaitWithTimeout(10 * time.Second)
_ = c.conn.Close()
return nil
}
4.3 RemoteAddr
方法的作用是返回与此连接相关联的远程网络地址。它通过调用conn
字段上的RemoteAddr()
方法来实现,该方法也返回一个net.Addr
类型的值。
因此,这个RemoteAddr
方法允许用户获取连接的远程网络地址,以便进行网络编程中的相关操作,例如确定连接是从哪个主机发起的。
func (c *Connection) RemoteAddr() net.Addr {
return c.conn.RemoteAddr()
}
4.4 Write
Write是Connection
结构体类型的一个方法。这个方法的名称是Write
,它接受一个[]byte
类型的参数b
,并返回一个error
类型的值。
该方法的作用是向与此连接相关联的网络连接写入数据。具体来说,它首先检查b
的长度是否为0,如果是,则直接返回nil
表示没有错误发生。否则,它会在调用conn
字段上的Write()
方法将数据写入连接之前,使用互斥锁和等待组来同步并发访问连接。
在同步代码块内部,它将等待组的计数增加1,以表示有一个新的操作正在进行。然后,它使用defer
语句在函数返回时逆序地将等待组的计数减少1,并释放互斥锁以允许其他goroutine访问连接。
最后,它调用conn
字段上的Write()
方法来将数据写入连接,并返回可能发生的错误。
因此,这个Write
方法允许用户向连接写入数据,同时确保在并发访问连接时是安全的。等待组和互斥锁可确保多个goroutine不会同时写入连接,并且defer
语句可确保在写操作完成或发生错误后正确地释放资源。
func (c *Connection) Write(b []byte) error {
if len(b) == 0 {
return nil
}
c.mu.Lock()
c.waitingReply.Add(1)
defer func() {
c.waitingReply.Done()
c.mu.Unlock()
}()
_, err := c.conn.Write(b)
return err
}
5、实现RespHandler
将用户发过来的报文解析成实际的指令,以便用于通信
新建文件resp/handler/handler.go
和interface/database/database.go
5.1 定义Database 接口
type CmdLine = [][]byte
type Database interface {
Exec(client resp.Connection, args [][]byte) resp.Reply
Close()
AfterClientClose(c resp.Connection)
}
type DataEntity struct {
Data interface{}
}
CmdLine
是一个别名类型,它实际上是一个由[]byte
类型的切片组成的二维切片。这个类型可能被用来表示一行命令,其中每个字节切片代表一个参数。Database
是一个接口类型,它定义了与数据库相关的方法。具体来说,它有三个方法:Exec
用于执行命令,Close
用于关闭数据库,AfterClientClose
用于处理客户端连接关闭事件。Exec
方法接受一个resp.Connection
类型的参数client
,表示与客户端相关联的连接,以及一个[][]byte
类型的参数args
,表示命令参数。它返回一个resp.Reply
类型的值,表示命令的执行结果。Close
方法用于关闭数据库,没有参数和返回值。AfterClientClose
方法用于处理客户端连接关闭事件,接受一个resp.Connection
类型的参数表示已关闭的连接。
DataEntity
是一个结构体类型,它具有一个名为Data
的公共字段,类型为interface{}
。这个类型可能被用来表示数据库中的数据实体,其中Data
字段可以包含任何类型的数据。
5.2 RespHandler结构体
5.2.1 RespHandler
定义结构体RespHandler
。它有几个字段:
activeConn
类型为sync.Map
:这个字段是一个并发安全的映射,用于存储当前处于活动状态的连接。closing
类型为atomic.Boolean
:这个字段是一个原子布尔类型,用于表示服务器是否正在关闭。db
类型为databaseface.Database
:这个字段是一个实现了Database
接口的对象,表示与此响应处理程序关联的数据库。
该结构体是一个响应处理程序,用于处理来自客户端的请求。activeConn
字段用于跟踪当前处于活动状态的连接,以便在关闭服务器时关闭这些连接。closing
字段可以用于在关闭服务器时通知其他goroutine停止处理新的请求。db
字段则表示与此响应处理程序关联的数据库。
其中RespHandler
用于处理客户端发送的Redis协议请求,并与一个实现了Database
接口的对象进行交互来处理这些请求。
type RespHandler struct {
activeConn sync.Map
closing atomic.Boolean
db databaseface.Database
}
5.2.2 Close()方法
关闭协议层,即关闭整个redis
该方法的作用是关闭响应处理程序,并关闭所有当前处于活动状态的连接。具体来说,它首先将closing
字段设置为true
,表示服务器正在关闭。然后,它使用activeConn
字段中存储的连接列表来关闭所有当前处于活动状态的连接。在关闭每个连接之前,它调用client.Close()
方法来关闭连接。
最后,它调用db
字段上的Close()
方法来关闭与响应处理程序关联的数据库,并返回nil
表示没有错误发生。
需要注意的是,在关闭连接时,activeConn
字段是一个并发安全的映射,因此需要使用Range()
方法来遍历所有当前处于活动状态的连接
func (h *RespHandler) Close() error {
logger.Info("handler shutting down...")
h.closing.Set(true)
// TODO: concurrent wait
h.activeConn.Range(func(key interface{}, val interface{}) bool {
client := key.(*connection.Connection)
_ = client.Close()
return true
})
h.db.Close()
return nil
}
5.2.3 closeClient方法
该方法的作用是关闭一个客户端连接,并从activeConn
字段中删除它。具体来说,它首先调用client.Close()
方法来关闭连接。然后,它调用db
字段上的AfterClientClose()
方法来处理客户端连接关闭事件。最后,它使用activeConn
字段的Delete()
方法将连接从活动连接列表中删除。
因此,这个closeClient
方法允许响应处理程序在需要关闭某个客户端连接时,以正确的方式关闭该连接,并从活动连接列表中删除它。
func (h *RespHandler) closeClient(client *connection.Connection) {
_ = client.Close()
h.db.AfterClientClose(client)
h.activeConn.Delete(client)
}
5.2.4 实现Handle方法
Handle方法的作用是处理来自客户端的请求,直到连接关闭为止。具体来说,它首先检查服务器是否正在关闭,如果是,则拒绝新的连接并关闭连接。然后,它创建一个新的Connection
结构体,并将其与给定的网络连接相关联,并将连接添加到activeConn
字段中以跟踪它是活动的。
接下来,它使用parser.ParseStream()
方法从连接中解析出一个个请求,并对每个请求进行处理。如果解析时发生错误,则它会将错误回复发送给客户端,并关闭连接。如果请求没有数据,则它会记录一个错误并继续处理下一个请求。
如果请求是一个正确格式的多批量回复,则它将调用db
字段上的Exec()
方法来执行该命令,并将结果写回客户端。如果执行结果为空,则它将写入一个未知错误回复。
最后,在处理每个请求时,它都会检查连接是否已经关闭。如果是,则它将关闭连接,并从activeConn
字段中删除它。
因此,这个Handle
方法允许响应处理程序处理来自客户端的请求,并在客户端关闭连接时正确地关闭连接。它还确保在服务器关闭时,不会接受新的连接。
func (h *RespHandler) Handle(ctx context.Context, conn net.Conn) {
if h.closing.Get() {
// closing handler refuse new connection
_ = conn.Close()
}
client := connection.NewConn(conn)
h.activeConn.Store(client, 1)
ch := parser.ParseStream(conn)
for payload := range ch {
if payload.Err != nil {
if payload.Err == io.EOF ||
payload.Err == io.ErrUnexpectedEOF ||
strings.Contains(payload.Err.Error(), "use of closed network connection") {
// connection closed
h.closeClient(client)
logger.Info("connection closed: " + client.RemoteAddr().String())
return
}
// protocol err
errReply := reply.MakeErrReply(payload.Err.Error())
err := client.Write(errReply.ToBytes())
if err != nil {
h.closeClient(client)
logger.Info("connection closed: " + client.RemoteAddr().String())
return
}
continue
}
if payload.Data == nil {
logger.Error("empty payload")
continue
}
r, ok := payload.Data.(*reply.MultiBulkReply)
if !ok {
logger.Error("require multi bulk reply")
continue
}
result := h.db.Exec(client, r.Args)
if result != nil {
_ = client.Write(result.ToBytes())
} else {
_ = client.Write(unknownErrReplyBytes)
}
}
}
6、测试
6.1 新建一个EchoDatabase,它实现了database.Database接口:
type EchoDatabase struct {
}
func NewEchoDatabase() *EchoDatabase {
return &EchoDatabase{}
}
func (e EchoDatabase) Exec(client resp.Connection, args [][]byte) resp.Reply {
return reply.MakeMultiBulkReply(args)
}
func (e EchoDatabase) AfterClientClose(c resp.Connection) {
logger.Info("EchoDatabase AfterClientClose")
}
func (e EchoDatabase) Close() {
logger.Info("EchoDatabase Close")
}
6.2 定义方法MakeHandler
调用NewEchoDatabase,这样他就持有了一个最简单的redis内核,他会把接收到的指令回发出去
func MakeHandler() *RespHandler {
var db databaseface.Database
db = database.NewEchoDatabase()
return &RespHandler{
db: db,
}
}
*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
实现内存数据库
本章主要介绍如何实现内存KV数据库。
1、定义Dict接口
新建文件datastruct/dict/dict.go,定义Dict
该接口定义,包含了常见的字典操作方法。下面对每个方法进行简要说明:
-
Get(key string) (val interface{}, exists bool)
:根据键获取值,返回值和是否存在的布尔值。 -
Len()
:获取字典中键值对的数量。 -
Put(key string, val interface{}) (result int)
:存储一个键值对,并返回字典中键值对的数量。 -
PutIfAbsent(key string, val interface{}) (result int)
:只有当键不存在时才存储一个键值对,并返回字典中键值对的数量。 -
PutIfExists(key string, val interface{}) (result int)
:只有当键存在时才存储一个键值对,并返回字典中键值对的数量。 -
Remove(key string) (result int)
:根据键删除一个键值对,并返回字典中键值对的数量。 -
ForEach(consumer Consumer)
:遍历字典,并以每个键值对作为参数调用指定的方法。 -
Keys() []string
:获取字典中所有键的列表。 -
RandomKeys(limit int) []string
:获取一个随机的键列表,数量不超过指定的限制。 -
RandomDistinctKeysKeys(limit int) []string
:获取一个随机的不重复键列表,数量不超过指定的限制。 -
clear()
:清空字典中的所有键值对。
type Dict interface {
Get(key string) (val interface{}, exsits bool)
Len()
Put(key string, val interface{}) (result int)
PutIfAbsent(key string, val interface{}) (result int)
PutIfExists(key string, val interface{}) (result int)
Remove(key string) (result int)
ForEach(consumer Consumer)
Keys() []string
RandomKeys(limit int) []string
RandomDistinctKeysKeys(limit int) []string
clear()
}
type Consumer func(key string, val interface{}) bool
2、为Dict接口写一个实现
2.1 SyncDict结构体
该结构体定义了一个并发安全的字典,使用了Go语言标准库中的sync.Map
类型。
sync.Map是Go语言中的一个并发安全的字典类型,用于在多个goroutine之间共享数据。sync.Map的内部实现使用了一种特殊的算法,可以在无需使用锁的情况下保证并发安全性。
SyncDict结构体中包含了一个sync.Map
类型的成员变量m,用于存储键值对。通过使用sync.Map
类型,SyncDict类型可以提供并发安全的读写操作,避免在多个goroutine中出现数据竞争的问题。
type SyncDict struct {
m sync.Map
}
下面为SyncDict实现Dict
2.2 Get方法
该方法接受一个参数key,表示要获取的键。该方法返回两个值,一个是键对应的值val,另一个是表示键是否存在的布尔值exists。
在方法内部,使用sync.Map中的Load方法来获取指定键的值。如果键存在,则返回该键对应的值和true;如果键不存在,则返回nil和false。
因为SyncDict类型中的m成员变量是一个sync.Map
类型的变量,所以可以直接使用sync.Map
类型的方法Load来获取指定键的值。由于sync.Map
类型是并发安全的,因此在多个goroutine之间调用该方法时,也可以确保并发安全性。
func (dict *SyncDict) Get(key string) (val interface{}, exsits bool) {
val, ok := dict.m.Load(key)
return val, ok
}
2.3 Put方法
Put
方法用于向SyncDict
中添加一个键值对。
该方法接受两个参数,一个是key,表示要添加的键,另一个是val,表示要添加的值。该方法返回一个int类型的值,表示添加操作的结果。如果添加的键之前已经存在,则返回0;如果添加的键是新的,则返回1。
在方法内部,首先使用sync.Map
中的Load方法来判断要添加的键是否已经存在。如果键已经存在,则将新的值覆盖旧的值;如果键是新的,则使用sync.Map
中的Store方法来添加新的键值对。
最后,根据键是否已经存在来返回相应的结果。
由于SyncDict类型中的m成员变量是一个sync.Map
类型的变量,因此可以直接使用sync.Map
类型的方法Load和Store来判断键是否已经存在,以及添加键值对。由于sync.Map
类型是并发安全的,因此在多个goroutine之间调用该方法时,也可以确保并发安全性。
func (dict *SyncDict) Put(key string, val interface{}) (result int) {
_, existed := dict.m.Load(key)
dict.m.Store(key, val)
if existed {
return 0
}
return 1
}
2.4 Len方法
Len方法用于获取SyncDict中键值对的数量。
该方法不接受任何参数,返回一个int类型的值,表示SyncDict中键值对的数量。
在方法内部,首先定义一个变量length,用于记录键值对的数量。然后使用sync.Map
中的Range方法遍历所有的键值对,对每个键值对调用一个函数,该函数将length加1。最后返回length的值,即为SyncDict中键值对的数量。
由于SyncDict类型中的m成员变量是一个sync.Map
类型的变量,因此可以直接使用sync.Map
类型的方法Range来遍历所有的键值对,并对每个键值对进行处理。由于sync.Map
类型是并发安全的,因此在多个goroutine之间调用该方法时,也可以确保并发安全性。
func (dict *SyncDict) Len() int {
length := 0
dict.m.Range(func(key, value interface{}) bool {
length++
return true
})
return length
}
2.5 PutIfAbsent方法
PutIfAbsent用于向SyncDict中添加一个键值对,但仅当指定的键不存在时才添加。
该方法接受两个参数,一个是key,表示要添加的键,另一个是val,表示要添加的值。该方法返回一个int类型的值,表示添加操作的结果。如果添加的键之前已经存在,则返回0;如果添加的键是新的,则返回1。
在方法内部,首先使用sync.Map
中的Load方法来判断要添加的键是否已经存在。如果键已经存在,则直接返回0;如果键是新的,则使用sync.Map
中的Store方法来添加新的键值对。
最后,根据键是否已经存在来返回相应的结果。
由于SyncDict类型中的m成员变量是一个sync.Map
类型的变量,因此可以直接使用sync.Map
类型的方法Load和Store来判断键是否已经存在,以及添加键值对。由于sync.Map
类型是并发安全的,因此在多个goroutine之间调用该方法时,也可以确保并发安全性。
func (dict *SyncDict) PutIfAbsent(key string, val interface{}) (result int) {
_, existed := dict.m.Load(key)
if existed {
return 0
}
dict.m.Store(key, val)
return 1
}
2.6 PutIfExists方法
PutIfExists用于向SyncDict中更新一个已存在的键值对,如果键不存在则不进行任何操作。
该方法接受两个参数,一个是key,表示要更新的键,另一个是val,表示要更新的值。该方法返回一个int类型的值,表示更新操作的结果。如果更新的键存在,则返回1;如果更新的键不存在,则返回0。
在方法内部,首先使用sync.Map
中的Load方法来判断要更新的键是否已经存在。如果键已经存在,则使用sync.Map
中的Store方法来更新键值对,并返回1;如果键不存在,则直接返回0。
由于SyncDict类型中的m成员变量是一个sync.Map
类型的变量,因此可以直接使用sync.Map
类型的方法Load和Store来判断键是否已经存在,以及更新键值对。由于sync.Map
类型是并发安全的,因此在多个goroutine之间调用该方法时,也可以确保并发安全性。
func (dict *SyncDict) PutIfExists(key string, val interface{}) (result int) {
_, existed := dict.m.Load(key)
if existed {
dict.m.Store(key, val)
return 1
}
return 0
}
2.7 Remove方法
该方法用于从SyncDict中删除指定的键值对。
该方法接受一个参数key,表示要删除的键。该方法返回一个int类型的值,表示删除操作的结果。如果要删除的键存在,则返回1;如果要删除的键不存在,则返回0。
在方法内部,首先使用sync.Map
中的Load方法来判断要删除的键是否已经存在。如果键已经存在,则使用sync.Map
中的Delete方法来删除键值对,并返回1;如果键不存在,则直接返回0。
由于SyncDict类型中的m成员变量是一个sync.Map
类型的变量,因此可以直接使用sync.Map
类型的方法Load和Delete来判断键是否已经存在,以及删除键值对。由于sync.Map
类型是并发安全的,因此在多个goroutine之间调用该方法时,也可以确保并发安全性。
func (dict *SyncDict) Remove(key string) (result int) {
_, existed := dict.m.Load(key)
dict.m.Delete(key)
if existed {
return 1
}
return 0
}
2.8 ForEach方法
该方法用于对SyncDict中的所有键值对进行遍历,并对每个键值对进行操作。
该方法接受一个参数consumer,类型为Consumer。Consumer是一个函数类型,接受两个参数,一个是string类型的键,另一个是interface{}类型的值,表示对键值对进行的操作。
在方法内部,使用sync.Map
中的Range方法遍历所有的键值对,对每个键值对调用一个函数。该函数将键强制类型转换为string类型,并将键和值作为参数传递给consumer函数进行处理。如果consumer函数返回true,则继续遍历下一个键值对;如果consumer函数返回false,则停止遍历。
由于SyncDict类型中的m成员变量是一个sync.Map
类型的变量,因此可以直接使用sync.Map
类型的方法Range来遍历所有的键值对,并对每个键值对进行操作。由于sync.Map
类型是并发安全的,因此在多个goroutine之间调用该方法时,也可以确保并发安全性。
func (dict *SyncDict) ForEach(consumer Consumer) {
dict.m.Range(func(key, value interface{}) bool {
consumer(key.(string), value)
return true
})
}
2.9 Keys方法
Keys方法用于获取SyncDict中所有键的一个切片。
该方法不接受任何参数,返回一个[]string
类型的切片,其中包含了SyncDict中所有键的值。
在方法内部,首先创建一个[]string
类型的切片result,切片长度为SyncDict中键值对的数量。然后使用sync.Map
中的Range方法遍历所有的键值对,对每个键值对调用一个函数。该函数将键强制类型转换为string类型,并将键添加到result切片中。最后返回result切片即可。
由于SyncDict类型中的m成员变量是一个sync.Map
类型的变量,因此可以直接使用sync.Map
类型的方法Range来遍历所有的键值对,并获取所有的键。由于sync.Map
类型是并发安全的,因此在多个goroutine之间调用该方法时,也可以确保并发安全性。
func (dict *SyncDict) Keys() []string {
result := make([]string, dict.Len())
i := 0
dict.m.Range(func(key, value interface{}) bool {
result[i] = key.(string)
i++
return true
})
return result
}
2.10 RandomKeys方法
RandomKeys方法用于获取SyncDict中随机选取的若干个键的一个切片。
该方法接受一个参数limit,表示要获取的键的数量。方法返回一个[]string
类型的切片,其中包含了随机选取的limit个键的值。
在方法内部,首先创建一个[]string
类型的切片result,切片长度为SyncDict中键值对的数量。然后使用一个for循环,每次循环随机选取一个键,并将该键添加到result切片中。
具体地,每次循环使用sync.Map
中的Range方法遍历所有的键值对。在遍历过程中,将每个键添加到result切片中,并通过返回false来停止遍历。由于Range方法的遍历顺序是随机的,因此每次遍历得到的键也是随机的。循环limit次后,返回result切片即可。
需要注意的是,如果SyncDict中的键值对数量小于limit,则返回的切片长度将小于limit。如果需要获取的键的数量大于SyncDict中键值对的数量,则会重复选取已有的键。
func (dict *SyncDict) RandomKeys(limit int) []string {
result := make([]string, dict.Len())
for i := 0; i < limit; i++ {
dict.m.Range(func(key, value interface{}) bool {
result[i] = key.(string)
return false
})
}
return result
}
2.11 RandomDistinctKeysKeys方法
RandomDistinctKeysKeys
方法用于获取SyncDict中随机选取的若干个不重复键的一个切片。
该方法接受一个参数limit,表示要获取的键的数量。方法返回一个[]string
类型的切片,其中包含了随机选取的limit个不重复键的值。
在方法内部,首先创建一个[]string
类型的切片result,切片长度为SyncDict中键值对的数量。然后使用sync.Map
中的Range方法遍历所有的键值对,将每个键添加到result切片中。同时,在遍历过程中,判断result切片中已经包含的键的数量是否达到了limit。如果已经达到了limit,则通过返回false来停止遍历;否则,继续遍历。
接下来,对result切片进行洗牌操作,打乱其中的元素顺序。然后返回result切片的前limit个元素,即为随机选取的limit个不重复键的值。
需要注意的是,如果SyncDict中的键值对数量小于limit,则返回的切片长度将小于limit。如果需要获取的键的数量大于SyncDict中键值对的数量,则会重复选取已有的键。
func (dict *SyncDict) RandomDistinctKeysKeys(limit int) []string {
result := make([]string, dict.Len())
i := 0
dict.m.Range(func(key, value interface{}) bool {
result[i] = key.(string)
i++
if i == limit {
return false
}
return true
})
return result
}
3、DB结构体
sync.Map
是redis最底层存储结构,我们保存数据的结果到这一层,那它的上一层是什么?是DB,用过redis的同学都知道,默认redis有16个DB,你可以认为是16个分数据库。
所以需要在database这个文件夹里面新建一个叫做DB的数据结构。
3.1 DB结构体
该结构体表示一个简单的键值数据库。
该结构体有两个成员变量。第一个成员变量是index,表示当前数据库中最后一个键的索引值。第二个成员变量是data,是一个dict.Dict
类型的变量,表示存储所有键值对的字典。dict.Dict
是一个自定义的字典类型,用于实现线程安全的键值对存储。在该结构体中,键是一个string类型的值,值是一个DataEntity类型的值,DataEntity是一个自定义的数据实体类型,用于存储键值对的值。
该结构体可以用于实现一个简单的键值数据库,可以存储任意类型的数据。可以通过向data成员变量中添加键值对,实现数据的存储。可以通过查询键值对的键获取对应的值,实现数据的读取。同时,由于使用了dict.Dict
类型的变量实现存储,因此可以保证在多个goroutine之间并发访问时的线程安全性。
type DB struct {
index int
data dict.Dict
}
3.2 ExecFunc接口
定义ExecFunc接口,表示一个执行函数。
该函数类型接受两个参数。第一个参数是一个指向DB类型的指针,表示一个键值数据库。第二个参数是一个[][]byte
类型的切片,表示该函数的参数列表。函数需要根据参数列表和键值数据库进行一些操作,并返回一个resp.Reply
类型的值,表示操作的结果。
该函数类型可以用于定义键值数据库中的操作函数,例如获取键值、设置键值、删除键值等。在使用时,可以将函数定义为ExecFunc
类型的函数,并将其作为参数传递给一个执行函数,该执行函数可以根据传入的参数调用相应的操作函数,并返回操作结果。
需要注意的是,由于ExecFunc
类型的函数可以访问一个DB类型的变量,因此可以对键值数据库进行修改操作。因此,在使用时需要注意线程安全性,以避免多个goroutine并发修改同一个键值数据库时出现的问题。
type ExecFunc func(db *DB, args [][]byte) resp.Reply
3.3 Exec方法
该方法用于执行Redis协议中的命令,并返回命令执行的结果。
该方法接受两个参数,第一个参数是一个resp.Connection
类型的值,表示一个Redis客户端连接。第二个参数是一个[][]byte
类型的切片,表示Redis协议中的命令行参数列表。
在方法内部,首先根据命令行参数列表的第一个元素获取对应的命令处理函数。如果找不到对应的命令处理函数,则返回一个错误回复,表示未知命令。如果找到了对应的命令处理函数,则根据命令的参数个数检查命令行参数列表的长度是否符合要求。如果参数个数不符合要求,则返回一个错误回复,表示参数数量错误。
接着,调用命令处理函数,将DB类型的指针和除命令名以外的参数列表作为参数传递给该函数,执行命令操作,并返回执行结果。
该方法可用于实现Redis协议的命令执行,可以根据命令名和参数列表调用相应的命令处理函数,并返回执行结果。需要注意的是,由于该方法可以访问一个DB类型的变量,因此可以对键值数据库进行读写操作。在使用时需要注意线程安全性,以避免多个goroutine并发访问同一个键值数据库时出现的问题。
func (db *DB) Exec(c resp.Connection, cmdLine [][]byte) resp.Reply {
cmdName := strings.ToLower(string(cmdLine[0]))
cmd, ok := cmdTable[cmdName]
if !ok {
return reply.MakeErrReply("ERR unknown command '" + cmdName + "'")
}
if !validateArity(cmd.arity, cmdLine) {
return reply.MakeArgNumErrReply(cmdName)
}
fun := cmd.executor
return fun(db, cmdLine[1:])
}
3.4 GetEntity方法
该方法用于获取键值数据库中指定键的数据实体。
该方法接受一个参数key,表示要获取的数据实体的键。方法返回两个值,第一个值是一个指向DataEntity
类型的指针,表示获取到的数据实体。如果找不到指定键的数据实体,则该指针为nil。第二个值是一个bool类型的值,表示是否成功获取到数据实体。如果成功获取到数据实体,则该值为true;否则,该值为false。
在方法内部,首先使用db.data.Get()
方法从键值数据库中获取指定键的值。如果找不到指定键的值,则返回nil和false。如果找到了指定键的值,则将其转换为DataEntity
类型的指针,然后将该指针和true作为返回值返回。
该方法可用于从键值数据库中获取指定键的数据实体,可以根据需要在程序中调用该方法进行查询操作。需要注意的是,由于该方法可以访问一个DB类型的变量,因此可以对键值数据库进行读操作。在使用时需要注意线程安全性,以避免多个goroutine并发访问同一个键值数据库时出现的问题。
func (db *DB) GetEntity(key string) (*database.DataEntity, bool) {
raw, ok := db.data.Get(key)
if !ok {
return nil, false
}
entity, _ := raw.(*database.DataEntity)
return entity, true
}
3.5 PutEntity方法
该方法用于向键值数据库中添加一个键值对,其中键为key,值为entity。
该方法接受两个参数,第一个参数是一个string类型的值,表示要添加的键。第二个参数是一个指向DataEntity类型的指针,表示要添加的值。该方法返回一个int类型的值,表示添加操作的结果。如果添加成功,则返回一个大于等于0的整数,表示添加的键值对数量;如果添加失败,则返回一个小于0的整数,表示错误代码。
在方法内部,使用db.data.Put()
方法向键值数据库中添加指定的键值对。如果添加成功,则返回一个大于等于0的整数,表示添加的键值对数量;如果添加失败,则返回一个小于0的整数,表示错误代码。
该方法可用于向键值数据库中添加指定的键值对,可以根据需要在程序中调用该方法进行添加操作。需要注意的是,由于该方法可以访问一个DB类型的变量,因此可以对键值数据库进行写操作。在使用时需要注意线程安全性,以避免多个goroutine并发访问同一个键值数据库时出现的问题。
func (db *DB) PutEntity(key string, entity *database.DataEntity) int {
return db.data.Put(key, entity)
}
3.6 PutIfExists方法
该方法用于向键值数据库中添加一个键值对,只有在指定键已经存在时才执行添加操作。
该方法接受两个参数,第一个参数是一个string类型的值,表示要添加的键。第二个参数是一个指向DataEntity类型的指针,表示要添加的值。该方法返回一个int类型的值,表示添加操作的结果。如果添加成功,则返回一个大于等于0的整数,表示添加的键值对数量;如果添加失败,则返回一个小于0的整数,表示错误代码。
在方法内部,使用db.data.PutIfExists()
方法向键值数据库中添加指定的键值对。该方法只有在指定键已经存在时才执行添加操作。如果添加成功,则返回一个大于等于0的整数,表示添加的键值对数量;如果添加失败,则返回一个小于0的整数,表示错误代码。
该方法可用于向键值数据库中添加指定的键值对,只有在指定键已经存在时才执行添加操作,可以根据需要在程序中调用该方法进行添加操作。需要注意的是,由于该方法可以访问一个DB类型的变量,因此可以对键值数据库进行写操作。在使用时需要注意线程安全性,以避免多个goroutine并发访问同一个键值数据库时出现的问题。
func (db *DB) PutIfExists(key string, entity *database.DataEntity) int {
return db.data.PutIfExists(key, entity)
}
3.6 PutIfAbsent方法
该方法用于向键值数据库中添加一个键值对,只有在指定键不存在时才执行添加操作。
该方法接受两个参数,第一个参数是一个string类型的值,表示要添加的键。第二个参数是一个指向DataEntity类型的指针,表示要添加的值。该方法返回一个int类型的值,表示添加操作的结果。如果添加成功,则返回一个大于等于0的整数,表示添加的键值对数量;如果添加失败,则返回一个小于0的整数,表示错误代码。
在方法内部,使用db.data.PutIfAbsent()
方法向键值数据库中添加指定的键值对。该方法只有在指定键不存在时才执行添加操作。如果添加成功,则返回一个大于等于0的整数,表示添加的键值对数量;如果添加失败,则返回一个小于0的整数,表示错误代码。
该方法可用于向键值数据库中添加指定的键值对,只有在指定键不存在时才执行添加操作,可以根据需要在程序中调用该方法进行添加操作。需要注意的是,由于该方法可以访问一个DB类型的变量,因此可以对键值数据库进行写操作。在使用时需要注意线程安全性,以避免多个goroutine并发访问同一个键值数据库时出现的问题。
func (db *DB) PutIfAbsent(key string, entity *database.DataEntity) int {
return db.data.PutIfAbsent(key, entity)
}
4、command结构体
每一个指令都是一个command结构体,每一个command结构体里面有一个执行方法,我们去实现这个执行非法,施加到DB上。
该结构体用于表示一个Redis命令的定义,包括命令处理函数和命令的参数个数限制。
该结构体有两个字段,分别为executor和arity。executor是一个ExecFunc
类型的值,表示该命令的处理函数。arity是一个int类型的值,表示该命令所允许的参数个数限制。如果该值为正数,则表示该命令只允许具有该数量的参数;如果该值为负数,则表示该命令允许的最少参数个数为-arity。
该结构体可用于表示Redis命令的定义,可以在程序中定义一组命令并保存在一个命令列表中,用于执行Redis协议中的命令操作。需要注意的是,由于该结构体只是一个命令的定义,它本身并不包含命令的具体实现,因此需要在程序中定义对应的命令处理函数,并将其与该命令的定义关联起来。
type command struct {
executor ExecFunc
arity int
}
定义一个变量:
var cmdTable = make(map[string]*command)
该变量是一个map类型的值,用于保存Redis命令的定义。
该变量的键是一个string类型的值,表示Redis命令的名称。该变量的值是一个指向command类型的指针,表示对应命令的定义。
该变量可用于在程序中保存Redis命令的定义。可以通过定义一组command类型的值,并将其保存在cmdTable中,来定义一组可供Redis协议执行的命令操作。在程序中执行Redis命令时,可以根据命令名称从cmdTable中查找对应的命令定义,并执行该命令的处理函数。
需要注意的是,由于cmdTable是一个全局变量,因此可以在程序的任何部分访问它。在多线程环境下,需要注意对cmdTable的读写操作的线程安全性,以避免多个goroutine并发访问cmdTable时出现的问题。
实现一个注册指令的方法:
func RegisterCommand(name string, executor ExecFunc, arity int) {
name = strings.ToLower(name)
cmdTable[name] = &command{
executor: executor,
arity: arity,
}
}
该函数用于向Redis命令列表中添加一个新的命令定义。
该函数接受三个参数,分别为name、executor和arity。name是一个string类型的值,表示要添加的命令的名称。executor是一个ExecFunc类型的值,表示要添加的命令的处理函数。arity是一个int类型的值,表示要添加的命令所允许的参数个数限制。
在函数内部,首先将name转换为小写字母形式,并将其作为键,创建一个新的command类型的值,将executor和arity设置为对应的值,并将该值保存在cmdTable中,以完成对新命令的定义。
该函数可用于向Redis命令列表中添加一个新的命令定义,可以在程序中调用该函数来添加自定义的Redis命令。需要注意的是,由于cmdTable是一个全局变量,因此可以在程序的任何部分访问它。在多线程环境下,需要注意对cmdTable的读写操作的线程安全性,以避免多个goroutine并发访问cmdTable时出现的问题。
接下来我们就来实现我们需要的指令:
5、实现ping命令
新建一个database/ping.go文件用于实现ping命令,接收到一个ping,返回pong
Ping函数接受两个参数,分别为db和args。db是一个指向DB类型的指针,表示要操作的键值数据库。args是一个[][]byte
类型的值,表示PING命令所包含的参数列表。
在函数内部,首先判断args的长度,如果args的长度为0,则返回一个PongReply类型的值,表示对PING命令的响应为PONG。如果args的长度为1,则返回一个StatusReply类型的值,该值的内容为args[0]
所表示的字符串。如果args的长度不为0也不为1,则返回一个ErrReply类型的值,该值的内容为"ERR wrong number of arguments for 'ping' command",表示参数个数错误。
该函数可用于处理Redis协议中的PING命令,在程序中调用该函数来执行PING命令,并返回对应的响应。需要注意的是,由于该函数可以访问一个DB类型的变量,因此可以对键值数据库进行读写操作。在使用时需要注意线程安全性,以避免多个goroutine并发访问同一个键值数据库时出现的问题。
func Ping(db *DB, args [][]byte) resp.Reply {
if len(args) == 0 {
return &reply.PongReply{}
} else if len(args) == 1 {
return reply.MakeStatusReply(string(args[0]))
} else {
return reply.MakeErrReply("ERR wrong number of arguments for 'ping' command")
}
}
6、实现KEY命令
6.1 execDel方法
该函数用于处理Redis协议中的DEL命令,即删除一个或多个指定的键。
该函数接受两个参数,分别为db和args。db是一个指向DB类型的指针,表示要操作的键值数据库。args是一个[][]byte
类型的值,表示DEL
命令所包含的参数列表,即要删除的键的列表。
在函数内部,首先创建一个string类型的切片keys,将args中的所有[]byte
类型的值转换为对应的字符串,并保存到keys中。然后调用db.Removes()
方法,将keys作为参数传递给该方法,以执行删除操作。最后,将删除的键的数量作为int64类型的值,使用MakeIntReply()
方法创建一个IntReply类型的值,并将其作为函数的返回值返回。
该函数可用于处理Redis协议中的DEL
命令,在程序中调用该函数来执行DEL
命令,并返回对应的响应。需要注意的是,由于该函数可以访问一个DB类型的变量,因此可以对键值数据库进行写操作。在使用时需要注意线程安全性,以避免多个goroutine并发访问同一个键值数据库时出现的问题。
func execDel(db *DB, args [][]byte) resp.Reply {
keys := make([]string, len(args))
for i, v := range args {
keys[i] = string(v)
}
deleted := db.Removes(keys...)
return reply.MakeIntReply(int64(deleted))
}
6.2 execExists方法
该函数用于处理Redis协议中的EXISTS
命令,即检查一个或多个指定的键是否存在于键值数据库中。
该函数接受两个参数,分别为db和args。db是一个指向DB类型的指针,表示要操作的键值数据库。args是一个[][]byte
类型的值,表示EXISTS命令所包含的参数列表,即要检查的键的列表。
在函数内部,首先创建一个int64类型的变量result,用于保存存在于键值数据库中的键的数量,初始值为0。然后遍历args中的所有[][]byte
类型的值,将其转换为对应的字符串,并使用db.GetEntity()
方法检查该键是否存在于键值数据库中。如果存在,则将result的值加1。最后,使用MakeIntReply()
方法创建一个IntReply类型的值,将result的值作为参数传递给该方法,并将其作为函数的返回值返回。
该函数可用于处理Redis协议中的EXISTS
命令,在程序中调用该函数来执行EXISTS
命令,并返回对应的响应。需要注意的是,由于该函数可以访问一个DB类型的变量,因此可以对键值数据库进行读操作。在使用时需要注意线程安全性,以避免多个goroutine并发访问同一个键值数据库时出现的问题。
func execExists(db *DB, args [][]byte) resp.Reply {
result := int64(0)
for _, arg := range args {
key := string(arg)
_, exists := db.GetEntity(key)
if exists {
result++
}
}
return reply.MakeIntReply(result)
}
6.3 execFlushDB方法
该函数用于处理Redis协议中的FLUSHDB
命令,即清空当前数据库中的所有键值对。
该函数接受两个参数,分别为db和args。db是一个指向DB类型的指针,表示要操作的键值数据库。args是一个[][]byte
类型的值,表示FLUSHDB
命令所包含的参数列表,但在实际应用中该参数列表为空。
在函数内部,调用db.Flush()
方法,该方法会清空当前数据库中的所有键值对。然后使用OkReply()
方法创建一个OkReply
类型的值,并将其作为函数的返回值返回。
该函数可用于处理Redis协议中的FLUSHDB命令,在程序中调用该函数来执行FLUSHDB
命令,并返回对应的响应。需要注意的是,由于该函数可以访问一个DB类型的变量,因此可以对键值数据库进行写操作。在使用时需要注意线程安全性,以避免多个goroutine并发访问同一个键值数据库时出现的问题。
func execFlushDB(db *DB, args [][]byte) resp.Reply {
db.Flush()
return &reply.OkReply{}
}
6.4 execType方法
该函数用于处理Redis协议中的TYPE
命令,即获取指定键的数据类型。
该函数接受两个参数,分别为db和args。db是一个指向DB类型的指针,表示要操作的键值数据库。args是一个[][]byte
类型的值,表示TYPE命令所包含的参数列表,即要获取数据类型的键的名称。
在函数内部,首先将args中的第一个[][]byte
类型的值转换为对应的字符串,并使用db.GetEntity()
方法获取该键对应的Entity
类型的值。如果该键不存在于键值数据库中,则使用MakeStatusReply()
方法创建一个StatusReply
类型的值,将"none"作为参数传递给该方法,并将其作为函数的返回值返回。如果该键存在于键值数据库中,则使用switch语句判断该Entity类型的值的Data字段的类型。如果Data字段的类型为[]byte
,则使用MakeStatusReply()
方法创建一个StatusReply
类型的值,将"string"作为参数传递给该方法,并将其作为函数的返回值返回。如果Data字段的类型不为[]byte
,则使用UnknownErrReply()
方法创建一个UnknownErrReply
类型的值,并将其作为函数的返回值返回。
该函数可用于处理Redis协议中的TYPE
命令,在程序中调用该函数来执行TYPE命令,并返回对应的响应。需要注意的是,由于该函数可以访问一个DB类型的变量,因此可以对键值数据库进行读操作。在使用时需要注意线程安全性,以避免多个goroutine并发访问同一个键值数据库时出现的问题。
func execType(db *DB, args [][]byte) resp.Reply {
key := string(args[0])
entity, exists := db.GetEntity(key)
if !exists {
return reply.MakeStatusReply("none")
}
switch entity.Data.(type) {
case []byte:
return reply.MakeStatusReply("string")
}
return &reply.UnknownErrReply{}
}
6.5 execRename方法
该函数用于处理Redis协议中的RENAME命令,即将一个键名改为另一个键名。
该函数接受两个参数,分别为db和args。db是一个指向DB类型的指针,表示要操作的键值数据库。args是一个[][]byte
类型的值,表示RENAME命令所包含的参数列表,即要被重命名的键的名称和新名称。
在函数内部,首先判断args的长度是否为2,如果不是,则使用MakeErrReply()方法创建一个ErrReply类型的值,将"ERR wrong number of arguments for 'rename' command"作为参数传递给该方法,并将其作为函数的返回值返回。如果args的长度为2,则将args中的第一个[][]byte
类型的值和第二个[][]byte
类型的值分别转换为对应的字符串,分别保存到src和dest变量中。接着使用db.GetEntity()
方法获取键名为src的Entity类型的值,如果该键不存在于键值数据库中,则使用MakeErrReply()
方法创建一个ErrReply类型的值,将"no such key"作为参数传递给该方法,并将其作为函数的返回值返回。如果该键存在于键值数据库中,则使用db.PutEntity()
方法将键名为dest的Entity类型的值设置为该Entity类型的值,并使用db.Remove()
方法删除键名为src的键值对。最后,使用OkReply()
方法创建一个OkReply
类型的值,并将其作为函数的返回值返回。
该函数可用于处理Redis协议中的RENAME
命令,在程序中调用该函数来执行RENAME
命令,并返回对应的响应。需要注意的是,由于该函数可以访问一个DB类型的变量,因此可以对键值数据库进行读写操作。在使用时需要注意线程安全性,以避免多个goroutine并发访问同一个键值数据库时出现的问题。
func execRename(db *DB, args [][]byte) resp.Reply {
if len(args) != 2 {
return reply.MakeErrReply("ERR wrong number of arguments for 'rename' command")
}
src := string(args[0])
dest := string(args[1])
entity, ok := db.GetEntity(src)
if !ok {
return reply.MakeErrReply("no such key")
}
db.PutEntity(dest, entity)
db.Remove(src)
return &reply.OkReply{}
}
6.6 execRenameNx方法
该函数用于处理Redis协议中的RENAMENX
命令,即将一个键名改为另一个键名,当且仅当新键名不存在时才执行操作。
该函数接受两个参数,分别为db和args。db是一个指向DB类型的指针,表示要操作的键值数据库。args是一个[][]byte
类型的值,表示RENAMENX
命令所包含的参数列表,即要被重命名的键的名称和新名称。
在函数内部,首先将args中的第一个[][]byte
类型的值和第二个[][]byte
类型的值分别转换为对应的字符串,分别保存到src和dest变量中。然后使用db.GetEntity()
方法检查键名为dest的键值对是否存在于键值数据库中,如果存在,则使用MakeIntReply()
方法创建一个IntReply类型的值,将0作为参数传递给该方法,并将其作为函数的返回值返回。如果不存在,则使用db.GetEntity()
方法获取键名为src的Entity类型的值,如果该键不存在于键值数据库中,则使用MakeErrReply()
方法创建一个ErrReply类型的值,将"no such key"作为参数传递给该方法,并将其作为函数的返回值返回。如果该键存在于键值数据库中,则使用db.Removes()
方法清空键名为src和dest的键值对的过期时间,然后使用db.PutEntity()
方法将键名为dest的Entity类型的值设置为该Entity类型的值。最后,使用MakeIntReply()
方法创建一个IntReply类型的值,将1作为参数传递给该方法,并将其作为函数的返回值返回。
该函数可用于处理Redis协议中的RENAMENX
命令,在程序中调用该函数来执行RENAMENX
命令,并返回对应的响应。需要注意的是,由于该函数可以访问一个DB类型的变量,因此可以对键值数据库进行读写操作。在使用时需要注意线程安全性,以避免多个goroutine并发访问同一个键值数据库时出现的问题。
func execRenameNx(db *DB, args [][]byte) resp.Reply {
src := string(args[0])
dest := string(args[1])
_, ok := db.GetEntity(dest)
if ok {
return reply.MakeIntReply(0)
}
entity, ok := db.GetEntity(src)
if !ok {
return reply.MakeErrReply("no such key")
}
db.Removes(src, dest) // clean src and dest with their ttl
db.PutEntity(dest, entity)
return reply.MakeIntReply(1)
}
6.7 execKeys方法
该函数用于处理Redis协议中的KEYS
命令,即根据给定的通配符模式,返回所有与模式匹配的键名。
该函数接受两个参数,分别为db和args。db是一个指向DB类型的指针,表示要操作的键值数据库。args是一个[][]byte
类型的值,表示KEYS
命令所包含的参数列表,即通配符模式。
在函数内部,首先将args中的第一个[][]byte
类型的值转换为对应的字符串,并使用wildcard.CompilePattern()
方法将其编译为通配符模式。然后创建一个空的[][]byte
类型的值result,用于保存所有匹配的键名。接着使用db.data.ForEach()
方法遍历整个键值数据库,对于每个键值对,判断其键名是否与模式匹配。如果匹配,则将该键名转换为[]byte
类型,并将其添加到result中。最后,使用MakeMultiBulkReply()
方法创建一个MultiBulkReply
类型的值,以result作为参数传递给该方法,并将其作为函数的返回值返回。
该函数可用于处理Redis协议中的KEYS
命令,在程序中调用该函数来执行KEYS
命令,并返回对应的响应。需要注意的是,由于该函数可以访问一个DB类型的变量,因此可以对键值数据库进行读操作。在使用时需要注意线程安全性,以避免多个goroutine并发访问同一个键值数据库时出现的问题。
func execKeys(db *DB, args [][]byte) resp.Reply {
pattern := wildcard.CompilePattern(string(args[0]))
result := make([][]byte, 0)
db.data.ForEach(func(key string, val interface{}) bool {
if pattern.IsMatch(key) {
result = append(result, []byte(key))
}
return true
})
return reply.MakeMultiBulkReply(result)
}
6.8 初始化方法
该函数用于初始化命令处理器,注册并绑定不同的命令处理函数。
在函数内部,首先使用RegisterCommand()
方法注册PING
命令的处理函数Ping
,并将该命令的参数个数设置为-1,表示该命令可以接受任意数量的参数。
该函数可用于初始化命令处理器,在程序启动时调用该函数注册不同的命令处理函数,以便后续处理客户端请求。需要注意的是,不同的命令处理函数可能需要不同数量的参数,具体参数数量需要根据命令的语义和实现进行设置。
func init() {
RegisterCommand("ping", Ping, -1)
}
7、实现string指令
7.1 getAsString方法
该方法用于获取一个键对应的字符串类型的值。
该方法接受一个参数,即要获取值的键名key。
在方法内部,首先使用db.GetEntity()
方法获取键名为key的Entity类型的值,如果该键不存在于键值数据库中,则返回nil和nil。如果该键存在于键值数据库中,则判断该Entity类型的值的Data字段的类型是否为[]byte
类型。如果是,则将该字段转换为[]byte
类型的值,并将其作为函数的第一个返回值返回。如果不是,则使用WrongTypeErrReply()
方法创建一个WrongTypeErrReply
类型的值,并将其作为函数的第二个返回值返回。
该方法可用于获取键值数据库中指定键名的字符串类型的值。需要注意的是,在使用该方法前需要确定该键名对应的值确实为字符串类型,否则将导致类型不匹配的错误。
func (db *DB) getAsString(key string) ([]byte, reply.ErrorReply) {
entity, ok := db.GetEntity(key)
if !ok {
return nil, nil
}
bytes, ok := entity.Data.([]byte)
if !ok {
return nil, &reply.WrongTypeErrReply{}
}
return bytes, nil
}
7.2 execGet方法
该函数用于处理Redis协议中的GET
命令,即获取指定键名的值。
该函数接受两个参数,分别为db和args。db是一个指向DB类型的指针,表示要操作的键值数据库。args是一个[][]byte
类型的值,表示GET
命令所包含的参数列表,即要获取值的键名。
在函数内部,首先将args中的第一个[][]byte
类型的值转换为对应的字符串,并使用db.getAsString()
方法获取该键名对应的字符串类型的值。如果该键不存在于键值数据库中,则返回一个ErrorReply类型的值err。如果该键存在于键值数据库中,则将其转换为[]byte
类型的值,并保存到bytes变量中。如果bytes为nil,则使用NullBulkReply()
方法创建一个NullBulkReply类型的值,并将其作为函数的返回值返回。否则,使用MakeBulkReply()
方法创建一个BulkReply类型的值,以bytes作为参数传递给该方法,并将其作为函数的返回值返回。
该函数可用于处理Redis协议中的GET
命令,在程序中调用该函数来执行GET
命令,并返回对应的响应。需要注意的是,由于该函数可以访问一个DB类型的变量,因此可以对键值数据库进行读操作。在使用时需要注意线程安全性,以避免多个goroutine并发访问同一个键值数据库时出现的问题。
func execGet(db *DB, args [][]byte) resp.Reply {
key := string(args[0])
bytes, err := db.getAsString(key)
if err != nil {
return err
}
if bytes == nil {
return &reply.NullBulkReply{}
}
return reply.MakeBulkReply(bytes)
}
7.3 execSet方法
该函数用于处理Redis协议中的SET
命令,即设置键名对应的值。
该函数接受两个参数,分别为db和args。db是一个指向DB类型的指针,表示要操作的键值数据库。args是一个[][]byte
类型的值,表示SET
命令所包含的参数列表,即要设置值的键名和值。
在函数内部,首先将args中的第一个[][]byte
类型的值转换为对应的字符串,并将args中的第二个[][]byte
类型的值保存到value变量中。然后创建一个DataEntity类型的值entity,将value作为其Data字段的值,并将该值设置为键名为key的键值对的值,使用db.PutEntity()
方法将其保存到键值数据库中。最后,使用OkReply()方法创建一个OkReply类型的值,并将其作为函数的返回值返回。
该函数可用于处理Redis协议中的SET
命令,在程序中调用该函数来执行SET
命令,并返回对应的响应。需要注意的是,由于该函数可以访问一个DB类型的变量,因此可以对键值数据库进行写操作。在使用时需要注意线程安全性,以避免多个goroutine并发访问同一个键值数据库时出现的问题。
func execSet(db *DB, args [][]byte) resp.Reply {
key := string(args[0])
value := args[1]
entity := &database.DataEntity{
Data: value,
}
db.PutEntity(key, entity)
return &reply.OkReply{}
}
7.4 execSetNX方法
该函数用于处理Redis协议中的SETNX
命令,即设置键名对应的值,当且仅当该键名不存在时才执行操作。
该函数接受两个参数,分别为db和args。db是一个指向DB类型的指针,表示要操作的键值数据库。args是一个[][]byte
类型的值,表示SETNX
命令所包含的参数列表,即要设置值的键名和值。
在函数内部,首先将args中的第一个[][]byte
类型的值转换为对应的字符串,并将args中的第二个[][]byte
类型的值保存到value变量中。然后创建一个DataEntity类型的值entity,将value作为其Data字段的值,并使用db.PutIfAbsent()方法将该值设置为键名为key的键值对的值。该方法会返回一个bool类型的值result,表示是否执行了设置操作。最后,使用MakeIntReply()方法创建一个IntReply类型的值,以result转换为int64类型的值作为参数传递给该方法,并将其作为函数的返回值返回。
该函数可用于处理Redis协议中的SETNX
命令,在程序中调用该函数来执行SETNX
命令,并返回对应的响应。需要注意的是,由于该函数可以访问一个DB类型的变量,因此可以对键值数据库进行写操作。在使用时需要注意线程安全性,以避免多个goroutine并发访问同一个键值数据库时出现的问题。
func execSetNX(db *DB, args [][]byte) resp.Reply {
key := string(args[0])
value := args[1]
entity := &database.DataEntity{
Data: value,
}
result := db.PutIfAbsent(key, entity)
return reply.MakeIntReply(int64(result))
}
7.5 execGetSet方法
该函数用于处理Redis协议中的GETSET
命令,即设置键名对应的值,并返回该键名原来的值。
该函数接受两个参数,分别为db和args。db是一个指向DB类型的指针,表示要操作的键值数据库。args是一个[][]byte
类型的值,表示GETSET
命令所包含的参数列表,即要设置值的键名和值。
在函数内部,首先将args中的第一个[][]byte
类型的值转换为对应的字符串,并将args中的第二个[][]byte
类型的值保存到value变量中。然后使用db.GetEntity()
方法获取键名为key的Entity类型的值,并将exists变量设置为该键名是否存在于键值数据库中的bool类型的值。接着使用db.PutEntity()
方法将一个DataEntity
类型的值作为键名为key的键值对的值。最后,如果exists为false,则使用MakeNullBulkReply()
方法创建一个NullBulkReply
类型的值,并将其作为函数的返回值返回。否则,将旧的值(即entity.Data
)转换为[]byte
类型的值,并使用MakeBulkReply()
方法创建一个BulkReply
类型的值,以该值作为参数传递给该方法,并将其作为函数的返回值返回。
该函数可用于处理Redis协议中的GETSET
命令,在程序中调用该函数来执行GETSET
命令,并返回对应的响应。需要注意的是,由于该函数可以访问一个DB类型的变量,因此可以对键值数据库进行读写操作。在使用时需要注意线程安全性,以避免多个goroutine并发访问同一个键值数据库时出现的问题。
func execGetSet(db *DB, args [][]byte) resp.Reply {
key := string(args[0])
value := args[1]
entity, exists := db.GetEntity(key)
db.PutEntity(key, &database.DataEntity{Data: value})
if !exists {
return reply.MakeNullBulkReply()
}
old := entity.Data.([]byte)
return reply.MakeBulkReply(old)
}
7.6 execStrLen方法
该函数用于处理Redis协议中的STRLEN
命令,即获取键名对应的字符串类型的值的长度。
该函数接受两个参数,分别为db和args。db是一个指向DB类型的指针,表示要操作的键值数据库。args是一个[][]byte
类型的值,表示STRLEN
命令所包含的参数列表,即要获取长度的键名。
在函数内部,首先将args中的第一个[][]byte
类型的值转换为对应的字符串,并使用db.GetEntity()
方法获取键名为key的Entity类型的值,并将exists变量设置为该键名是否存在于键值数据库中的bool类型的值。接着,如果exists为false,则使用MakeNullBulkReply()
方法创建一个NullBulkReply类型的值,并将其作为函数的返回值返回。否则,将获取到的值(即entity.Data
)转换为[]byte
类型的值,并使用len()函数获取该值的长度,然后使用MakeIntReply()
方法创建一个IntReply类型的值,以该长度转换为int64类型的值作为参数传递给该方法,并将其作为函数的返回值返回。
该函数可用于处理Redis协议中的STRLEN
命令,在程序中调用该函数来执行STRLEN
命令,并返回对应的响应。需要注意的是,由于该函数可以访问一个DB类型的变量,因此可以对键值数据库进行读操作。在使用时需要注意线程安全性,以避免多个goroutine并发访问同一个键值数据库时出现的问题。
func execStrLen(db *DB, args [][]byte) resp.Reply {
key := string(args[0])
entity, exists := db.GetEntity(key)
if !exists {
return reply.MakeNullBulkReply()
}
old := entity.Data.([]byte)
return reply.MakeIntReply(int64(len(old)))
}
7.7 init方法
该函数用于初始化命令处理器,注册并绑定不同的命令处理函数。
在函数内部,首先使用RegisterCommand()
方法注册GET
命令的处理函数execGet
,并将该命令的参数个数设置为2。然后使用RegisterCommand()
方法注册SET
命令的处理函数execSet,并将该命令的参数个数设置为-3,表示该命令可以接受任意数量的参数。接着使用RegisterCommand()
方法注册SETNX
命令的处理函数execSetNX
,并将该命令的参数个数设置为3。然后使用RegisterCommand()
方法注册GETSET
命令的处理函数execGetSet
,并将该命令的参数个数设置为3。最后使用RegisterCommand()
方法注册STRLEN
命令的处理函数execStrLen
,并将该命令的参数个数设置为2。
该函数可用于初始化命令处理器,在程序启动时调用该函数注册不同的命令处理函数,以便后续处理客户端请求。需要注意的是,不同的命令处理函数可能需要不同数量的参数,具体参数数量需要根据命令的语义和实现进行设置。
func init() {
RegisterCommand("Get", execGet, 2)
RegisterCommand("Set", execSet, -3)
RegisterCommand("SetNx", execSetNX, 3)
RegisterCommand("GetSet", execGetSet, 3)
RegisterCommand("StrLen", execStrLen, 2)
}
8、Database
我们在上一章为了测试,写了一个echoDataBase,现在我们来实现一个真实的Database
type Database struct {
dbSet []*DB
}
该结构体包含一个dbSet成员变量,类型为[]*DB
,表示一个DB类型的指针数组。该结构体用于表示Redis服务器中的数据库集合,每个元素表示一个数据库。
在Redis服务器中,可以通过SELECT
命令来切换当前使用的数据库。每个数据库都是一个键值对数据库,可以存储任意类型的值。在该结构体中,dbSet数组中的每个元素都是一个DB类型的指针,表示一个键值对数据库。可以通过遍历dbSet数组来访问不同的数据库,并对其进行读写操作。
8.1 NewDatabase
该函数用于创建一个新的Database类型的值,并返回其指针。
在函数内部,首先创建一个空的Database类型的值,并将其赋值给mdb变量。然后判断配置参数中指定的数据库数量是否为0,如果为0,则将其设置为默认值16。接着,使用make()函数创建一个长度为config.Properties.Databases
的[]*DB
类型的值,并将其赋值给mdb.dbSet
变量。接下来,遍历mdb.dbSet数组中的每个元素,使用makeDB()
函数创建一个新的DB类型的值,并将其赋值给singleDB变量。然后将遍历到的元素的索引值赋值给singleDB的index成员变量,并将singleDB赋值给mdb.dbSet
数组中的相应元素。最后,将mdb的指针作为函数的返回值返回。
该函数可用于创建一个新的Database类型的值,并初始化其中的每个DB类型的值。在程序启动时调用该函数来创建Redis服务器使用的数据库集合。需要注意的是,根据配置参数的不同,该函数可能会创建不同数量的数据库。
func NewDatabase() *Database {
mdb := &Database{}
if config.Properties.Databases == 0 {
config.Properties.Databases = 16
}
mdb.dbSet = make([]*DB, config.Properties.Databases)
for i := range mdb.dbSet {
singleDB := makeDB()
singleDB.index = i
mdb.dbSet[i] = singleDB
}
return mdb
}
8.2 Exec方法
该方法用于执行Redis命令,根据命令名称调用相应的命令处理函数。
该方法接受两个参数,分别为c和cmdLine。其中,c表示一个resp.Connection
类型的值,表示一个客户端连接。cmdLine是一个[][]byte
类型的值,表示Redis命令的参数列表。
在函数内部,首先使用defer和recover函数实现了异常捕获和处理。然后将cmdLine中的第一个参数转换为小写字母,并将其赋值给cmdName变量。接着判断cmdName是否等于"select",如果是,则调用execSelect()
函数来处理SELECT
命令,并将其返回值作为函数的返回值返回。如果cmdName不等于"select",则获取当前客户端连接所选择的数据库的索引值,并从mdb.dbSet
数组中选择相应的DB类型的值。然后调用该DB类型的值的Exec方法来处理命令,并将其返回值作为函数的返回值返回。
该方法可用于执行Redis命令,并根据命令名称调用相应的命令处理函数。在程序中调用该方法来处理客户端请求。需要注意的是,由于该方法可以访问一个Database类型的变量,因此可以对其中的每个DB类型的变量进行读写操作。在使用时需要注意线程安全性,以避免多个goroutine并发访问同一个Database类型的变量时出现的问题。
func (mdb *Database) Exec(c resp.Connection, cmdLine [][]byte) (result resp.Reply) {
defer func() {
if err := recover(); err != nil {
logger.Warn(fmt.Sprintf("error occurs: %v\n%s", err, string(debug.Stack())))
}
}()
cmdName := strings.ToLower(string(cmdLine[0]))
if cmdName == "select" {
if len(cmdLine) != 2 {
return reply.MakeArgNumErrReply("select")
}
return execSelect(c, mdb, cmdLine[1:])
}
// normal commands
dbIndex := c.GetDBIndex()
selectedDB := mdb.dbSet[dbIndex]
return selectedDB.Exec(c, cmdLine)
}
8.3 execSelect方法
该函数用于处理Redis中的SELECT
命令,切换当前使用的数据库。
该函数接受三个参数,分别为c、mdb和args。其中,c表示一个resp.Connection类型的值,表示一个客户端连接。mdb表示一个Database类型的指针,表示Redis服务器的数据库集合。args是一个[][]byte
类型的值,表示Redis命令的参数列表。
在函数内部,首先使用strconv.Atoi()
函数将args中的第一个参数转换为整数,并将其赋值给dbIndex变量。如果转换失败,则返回一个错误响应。接着,判断dbIndex是否大于等于mdb.dbSet
数组的长度,如果是,则返回一个超出范围的错误响应。否则,调用c的SelectDB
方法,将dbIndex作为参数传入,以切换当前使用的数据库。最后,返回一个OK响应。
该函数可用于处理Redis中的SELECT
命令,切换当前使用的数据库。在程序中调用该函数来处理客户端请求。需要注意的是,由于该函数可以访问一个Database类型的变量,因此可以对其中的每个DB类型的变量进行读写操作。在使用时需要注意线程安全性,以避免多个goroutine并发访问同一个Database类型的变量时出现的问题。
func execSelect(c resp.Connection, mdb *Database, args [][]byte) resp.Reply {
dbIndex, err := strconv.Atoi(string(args[0]))
if err != nil {
return reply.MakeErrReply("ERR invalid DB index")
}
if dbIndex >= len(mdb.dbSet) {
return reply.MakeErrReply("ERR DB index is out of range")
}
c.SelectDB(dbIndex)
return reply.MakeOkReply()
}
GO实现Redis持久化
本章将介绍如何使用 golang 实现 Append Only File 持久化
1、AofHandler
新建文件aof/aof.go
1.1 AofHandler结构体
定义AofHandler结构体。该结构体用于处理Redis的AOF(Append Only File)持久化功能,将Redis命令记录到磁盘文件中以保证数据的持久化。
该结构体包含了以下成员变量:
db
:表示一个databaseface.Database
类型的值,表示Redis服务器的数据库集合。aofChan
:表示一个chan *payload
类型的通道,用于接收需要记录到AOF文件的Redis命令。aofFile
:表示一个*os.File
类型的指针,表示当前正在记录的AOF文件。aofFilename
:表示一个字符串类型的值,表示当前正在使用的AOF文件的文件名。currentDB
:表示一个整数类型的值,表示当前正在使用的数据库的索引值。
AofHandler结构体可用于实现Redis的AOF持久化功能,将Redis命令记录到磁盘文件中以保证数据的持久化。在程序中,可以通过创建一个AofHandler类型的值并调用其相应的方法来实现AOF持久化功能。
type AofHandler struct {
db databaseface.Database
aofChan chan *payload
aofFile *os.File
aofFilename string
currentDB int
}
1.2 实现AddAof
该方法用于将Redis命令添加到AOF持久化日志中。
该方法接受两个参数,分别为dbIndex和cmdLine。其中,dbIndex表示当前使用的数据库的索引值,cmdLine表示一个CmdLine类型的值,表示Redis命令的参数列表。
在函数内部,首先判断是否启用AOF持久化功能,如果未启用,则直接返回。否则,将cmdLine和dbIndex封装成一个payload类型的值,并将其发送到aofChan通道中。
该方法可用于将Redis命令添加到AOF持久化日志中。在程序中调用该方法来实现AOF持久化功能。需要注意的是,由于该方法可以访问AofHandler结构体中的成员变量,因此需要确保线程安全性,以避免多个goroutine并发访问同一个AofHandler类型的变量时出现的问题。
func (handler *AofHandler) AddAof(dbIndex int, cmdLine CmdLine) {
if config.Properties.AppendOnly && handler.aofChan != nil {
handler.aofChan <- &payload{
cmdLine: cmdLine,
dbIndex: dbIndex,
}
}
}
1.3 实现handleAof
handleAof函数会不断地从一个AOF通道(handler.aofChan)中读取命令,然后将这些命令写入到AOF文件中。具体来说,该函数会遍历通道中的每个命令,如果该命令所在的数据库不是当前正在处理的数据库,则会先发送一个SELECT
命令来切换到相应的数据库。然后,将该命令转化为Redis协议格式的字节流,写入到AOF文件中。
如果写入AOF文件失败,则会记录日志并跳过该命令。在实际应用中,该函数通常在后台线程中运行,不断地从AOF通道中读取命令,并将它们写入到AOF文件中。同时,Redis还提供了另一种持久化方式——RDB(Redis Database File),它会将整个数据库的状态保存到一个二进制文件中。与AOF相比,RDB的优点是文件体积小,恢复速度快,但缺点是可能会丢失最近的一部分写操作。
func (handler *AofHandler) handleAof() {
// serialized execution
handler.currentDB = 0
for p := range handler.aofChan {
if p.dbIndex != handler.currentDB {
// select db
data := reply.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex))).ToBytes()
_, err := handler.aofFile.Write(data)
if err != nil {
logger.Warn(err)
continue // skip this command
}
handler.currentDB = p.dbIndex
}
data := reply.MakeMultiBulkReply(p.cmdLine).ToBytes()
_, err := handler.aofFile.Write(data)
if err != nil {
logger.Warn(err)
}
}
}
2、实现Aof落盘功能
2.1 Database结构体
改造Database结构体,让它持有aofHandler:
type Database struct {
dbSet []*DB
aofHandler *aof.AofHandler
}
初始化aofHandler,在NewDatabase方法初始化话16个db之后初始化aofHandler
这段代码是根据配置文件中的AppendOnly参数来启用或禁用Redis的AOF持久化机制。如果配置文件中的AppendOnly参数为true,则会创建一个新的AOFHandler,并将其设置为mdb(一个Redis数据库实例)的aofHandler属性。同时,该代码还会为每个数据库(dbSet中的每个元素)设置一个addAof函数,该函数用于将命令写入到AOF文件中。
具体来说,该代码会先创建一个新的AOFHandler,并将其与mdb对象关联起来。然后,对于mdb中的每个数据库,该代码会创建一个addAof函数,该函数会将命令写入到AOFHandler中。在创建addAof函数时,为了避免闭包问题,该代码会将每个数据库单独赋值给一个局部变量singleDB,然后在addAof函数中使用该局部变量。
总的来说,这段代码是Redis中AOF持久化机制的启用代码,它会在Redis启动时根据配置文件中的参数来决定是否启用AOF持久化,并将所有的写操作写入到AOF文件中,以保证数据的持久性。
func NewDatabase() *Database {
mdb := &Database{}
if config.Properties.Databases == 0 {
config.Properties.Databases = 16
}
mdb.dbSet = make([]*DB, config.Properties.Databases)
for i := range mdb.dbSet {
singleDB := makeDB()
singleDB.index = i
mdb.dbSet[i] = singleDB
}
if config.Properties.AppendOnly {
aofHandler, err := aof.NewAOFHandler(mdb)
if err != nil {
panic(err)
}
mdb.aofHandler = aofHandler
for _, db := range mdb.dbSet {
// avoid closure
singleDB := db
singleDB.addAof = func(line CmdLine) {
mdb.aofHandler.AddAof(singleDB.index, line)
}
}
}
return mdb
}
makeDB方法执行的时候给他一个空的实现
func makeDB() *DB {
db := &DB{
data: dict.MakeSyncDict(),
addAof: func(line CmdLine) {
},
}
return db
}
2.2 改造keys指令
删除时候需要记录:
func execDel(db *DB, args [][]byte) resp.Reply {
keys := make([]string, len(args))
for i, v := range args {
keys[i] = string(v)
}
deleted := db.Removes(keys...)
/**
Aof记录指令
*/
if deleted > 0 {
db.addAof(utils.ToCmdLine2("del", args...))
}
return reply.MakeIntReply(int64(deleted))
}
FlushDB的时候:
func execFlushDB(db *DB, args [][]byte) resp.Reply {
db.Flush()
/**
Aof记录删除指令
*/
db.addAof(utils.ToCmdLine2("flushdb", args...))
return &reply.OkReply{}
}
重命名:
func execRename(db *DB, args [][]byte) resp.Reply {
if len(args) != 2 {
return reply.MakeErrReply("ERR wrong number of arguments for 'rename' command")
}
src := string(args[0])
dest := string(args[1])
entity, ok := db.GetEntity(src)
if !ok {
return reply.MakeErrReply("no such key")
}
db.PutEntity(dest, entity)
db.Remove(src)
/**
Aof记录指令
*/
db.addAof(utils.ToCmdLine2("rename", args...))
return &reply.OkReply{}
}
func execRenameNx(db *DB, args [][]byte) resp.Reply {
src := string(args[0])
dest := string(args[1])
_, ok := db.GetEntity(dest)
if ok {
return reply.MakeIntReply(0)
}
entity, ok := db.GetEntity(src)
if !ok {
return reply.MakeErrReply("no such key")
}
db.Removes(src, dest) // clean src and dest with their ttl
db.PutEntity(dest, entity)
/**
Aof记录指令
*/
db.addAof(utils.ToCmdLine2("renamenx", args...))
return reply.MakeIntReply(1)
}
2.3 改造strings指令
func execSet(db *DB, args [][]byte) resp.Reply {
key := string(args[0])
value := args[1]
entity := &database.DataEntity{
Data: value,
}
db.PutEntity(key, entity)
/**
Aof记录指令
*/
db.addAof(utils.ToCmdLine2("set", args...))
return &reply.OkReply{}
}
func execSetNX(db *DB, args [][]byte) resp.Reply {
key := string(args[0])
value := args[1]
entity := &database.DataEntity{
Data: value,
}
result := db.PutIfAbsent(key, entity)
/**
Aof记录指令
*/
db.addAof(utils.ToCmdLine2("setnx", args...))
return reply.MakeIntReply(int64(result))
}
func execGetSet(db *DB, args [][]byte) resp.Reply {
key := string(args[0])
value := args[1]
entity, exists := db.GetEntity(key)
db.PutEntity(key, &database.DataEntity{Data: value})
if !exists {
return reply.MakeNullBulkReply()
}
old := entity.Data.([]byte)
/**
Aof记录指令
*/
db.addAof(utils.ToCmdLine2("getset", args...))
return reply.MakeBulkReply(old)
}
3、实现Aof恢复
这一节就来实现redis重启时恢复数据的功能
实现LoadAof方法
LoadAof是一个用于加载追加日志文件(AOF)到数据库中的Go函数。以下是代码的实现思路:
- 使用
handler.aofFilename
指定的文件路径打开AOF文件。 - 如果打开文件时发生错误,则记录警告并返回。
- 设置
defer
语句,在函数返回时关闭文件。 - 使用解析器将AOF文件读取为Redis协议命令流。
- 创建一个虚拟连接对象以保存数据库索引。
- 处理流中的每个命令,使用
handler.db.Exec
在数据库上执行它。 - 如果执行命令时出现错误,则记录错误消息。
总体而言,这个函数负责从AOF文件中读取Redis协议命令,并在数据库上执行它们,记录任何出现的错误。
func (handler *AofHandler) LoadAof() {
file, err := os.Open(handler.aofFilename)
if err != nil {
logger.Warn(err)
return
}
defer file.Close()
ch := parser.ParseStream(file)
fakeConn := &connection.Connection{} // only used for save dbIndex
for p := range ch {
if p.Err != nil {
if p.Err == io.EOF {
break
}
logger.Error("parse error: " + p.Err.Error())
continue
}
if p.Data == nil {
logger.Error("empty payload")
continue
}
r, ok := p.Data.(*reply.MultiBulkReply)
if !ok {
logger.Error("require multi bulk reply")
continue
}
ret := handler.db.Exec(fakeConn, r.Args)
if reply.IsErrorReply(ret) {
logger.Error("exec err", err)
}
}
}
说在后面
本文,仅仅是《从0开始,手写Redis》 的一部分,后面的内容 更加精彩。
持续迭代、持续升级 是 尼恩团队的宗旨,
持续迭代、持续升级 也是 《从0开始,手写Redis》的灵魂。
后面会收集更多的面试真题,同时,遇到面试难题,可以来尼恩的社区《技术自由圈(原 疯狂创客圈)》中交流和求助。
咱们的目标,打造宇宙最牛的《手写Redis》面试宝典。
技术自由的实现路径 PDF:
实现你的 响应式 自由:
这是老版本 《Flux、Mono、Reactor 实战(史上最全)》
实现你的 spring cloud 自由:
《Spring cloud Alibaba 学习圣经》 PDF
《分库分表 Sharding-JDBC 底层原理、核心实战(史上最全)》
《一文搞定:SpringBoot、SLF4j、Log4j、Logback、Netty之间混乱关系(史上最全)》
实现你的 linux 自由:
实现你的 网络 自由:
《网络三张表:ARP表, MAC表, 路由表,实现你的网络自由!!》
实现你的 分布式锁 自由:
实现你的 王者组件 自由:
《队列之王: Disruptor 原理、架构、源码 一文穿透》
《缓存之王:Caffeine 源码、架构、原理(史上最全,10W字 超级长文)》
《Java Agent 探针、字节码增强 ByteBuddy(史上最全)》