redis通过bgSave
命令将数据持久化到磁盘上,在启动的时候,可以从磁盘上加载bgSave生成的RDB文件,恢复数据
save命令会阻塞,不推荐使用
RDB持久化机制简介
redis的RDB结构大致如下
以hashtable为例
REDIS|db_version|SELECTDB|0|REDIS_TYPE_HASH|hash_size|key1_len|key1_value|key1_value_len|key1_value|EOF|checksum
- REDIS:放在文件开头的标识符
- db_version:当前RDB的版本
- SELECTDB:标识符,接下来要读到的是server中的数据库下标
- 0:表示第0个db,默认有16个
- REDIS_TYPE_HASH:在db中存了hashTable结构
- hash_size:hashTable中一共有多少个元素
- key1_len:第一个key占的字节数
- key1_value:第一个key的字面值
- key1_value_len:第一个key对应的value的字节数
- key1_value:第一个key对应的value的值
- EOF:没有数据的标识符
- checksum:RDB文件的校验和,校验内容的完整性
调用bgSave进行存储
当用户执行bgSave命令的时候,redis会fork出子进程进行处理,使得其他命令不会被阻塞执行
Code.SLICE.source("if ((childpid = fork()) == 0) {" +
" //..." +
" retval = rdbSave(filename,rsi);" +
" if (retval == C_OK) {" +
" //..." +
" server.child_info_data.cow_size = private_dirty;" +
" sendChildInfo(CHILD_INFO_TYPE_RDB);" +
" }" +
" exitFromChild((retval == C_OK) ? 0 : 1);" +
" } else {" +
" /* Parent */" +
" //..." +
" server.rdb_save_time_start = time(NULL);" +
" server.rdb_child_pid = childpid;" +
" server.rdb_child_type = RDB_CHILD_TYPE_DISK;" +
" updateDictResizePolicy();" +
" return C_OK;" +
" }")
.interpretation("创建子进程,子进程负责做rdb相关的处理,父进程记下处理中的子进程ID,返回当前bgsave的执行,也就是说bgsave不会阻塞其它命令的执行");
在存储数据进入RDB的时候,首先会在文件头写入 REDIS 字符串,拼上当前RDB的版本
Code.SLICE.source("snprintf(magic,sizeof(magic),\"REDIS%04d\",RDB_VERSION);" +
" if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;")
.interpretation("首先在文件中写下 REDIS字符串和RDB的版本");
紧接着遍历redis的server中所有的数据库,一个个的写入数据,根据数据的类型不同,采用不用的TYPE来标识,然后记下对应的长度,再存入值,比如要存储的对象的值是hashTable
Code.SLICE.source("else if (o->type == OBJ_HASH) {" +
" /* Save a hash value */" +
" if (o->encoding == OBJ_ENCODING_ZIPLIST) {" +
" size_t l = ziplistBlobLen((unsigned char*)o->ptr);" +
"" +
" if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;" +
" nwritten += n;" +
"" +
" } else if (o->encoding == OBJ_ENCODING_HT) {" +
" dictIterator *di = dictGetIterator(o->ptr);" +
" dictEntry *de;" +
"" +
" if ((n = rdbSaveLen(rdb,dictSize((dict*)o->ptr))) == -1) {" +
" dictReleaseIterator(di);" +
" return -1;" +
" }" +
" nwritten += n;" +
"" +
" while((de = dictNext(di)) != NULL) {" +
" sds field = dictGetKey(de);" +
" sds value = dictGetVal(de);" +
"" +
" if ((n = rdbSaveRawString(rdb,(unsigned char*)field," +
" sdslen(field))) == -1)" +
" {" +
" dictReleaseIterator(di);" +
" return -1;" +
" }" +
" nwritten += n;" +
" if ((n = rdbSaveRawString(rdb,(unsigned char*)value," +
" sdslen(value))) == -1)" +
" {" +
" dictReleaseIterator(di);" +
" return -1;" +
" }" +
" nwritten += n;" +
" }" +
" dictReleaseIterator(di);" +
" } else {" +
" serverPanic(\"Unknown hash encoding\");" +
" }" +
" } ")
.interpretation("以hash的编码方式为例,看底层的实现")
.interpretation("1: hash的底层实现如果是ziplist,那么拿到ziplist的长度,将ziplist转为字符串存储")
.interpretation("2: hash的底层实现方式为 hasttable,那么一个个的遍历key,value,将它们分别转成String的形式再存储");
当所有数据记录完成之后,写入EOF结束标记,最后加上校验和,至此完成内存数据序列化,存储到磁盘
Code.SLICE.source("if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;")
.interpretation("写入EOF标记,代表所有db的数据都已经写入了");
Code.SLICE.source("cksum = rdb->cksum;" +
" memrev64ifbe(&cksum);" +
" if (rioWrite(rdb,&cksum,8) == 0) goto werr;")
.interpretation("写入校验和,完整的内存数据写入完毕");
启动加载
在redis的启动的过程中会进行加载,它实质上就是存储的反序列化过程,首先是读取字符串 REDIS
Code.SLICE.source("if (rioRead(rdb,buf,9) == 0) goto eoferr;" +
" buf[9] = '\\0';" +
" if (memcmp(buf,\"REDIS\",5) != 0)")
.interpretation("读取文件的前9个字节,前5个必定是REDIS字符,否则出错");
接下来便可以按照序列化的规则,进行反序列化,知道读取完成
Code.SLICE.source("while(1) {..." +
"if ((type = rdbLoadType(rdb)) == -1) goto eoferr;" +
"..." +
" else if (type == RDB_OPCODE_EOF) {" +
" /* EOF: End of file, exit the main loop. */" +
" break;" +
"..." +
"else if (type == RDB_OPCODE_RESIZEDB){...}" +
"..." +
"if ((key = rdbLoadStringObject(rdb)) == NULL) goto eoferr;" +
"if ((val = rdbLoadObject(type,rdb)) == NULL) goto eoferr;" +
"}")
.interpretation("循环读取文件的内容,首先读到接下来的类型")
.interpretation("1: 读到EOF结束")
.interpretation("2: 读取到对应的标记,就继续读取后面的字节,直到读到key")
.interpretation("3: 读取key,读取val");
value以hashtable为例,会构造出对应的结构
Code.SLICE.source("else if (rdbtype == RDB_TYPE_HASH) {" +
" len = rdbLoadLen(rdb, NULL);" +
"..." +
" o = createHashObject();" +
" /* ... */" +
" while (o->encoding == OBJ_ENCODING_ZIPLIST && len > 0) {" +
" len--;" +
" /* Load raw strings */" +
" if ((field = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL))" +
" == NULL) return NULL;" +
" if ((value = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL))" +
" == NULL) return NULL;" +
"" +
" /* Add pair to ziplist */" +
" o->ptr = ziplistPush(o->ptr, (unsigned char*)field," +
" sdslen(field), ZIPLIST_TAIL);" +
" o->ptr = ziplistPush(o->ptr, (unsigned char*)value," +
" sdslen(value), ZIPLIST_TAIL);" +
"" +
" /* Convert to hash table if size threshold is exceeded */" +
" if (sdslen(field) > server.hash_max_ziplist_value ||" +
" sdslen(value) > server.hash_max_ziplist_value)" +
" {" +
" sdsfree(field);" +
" sdsfree(value);" +
" hashTypeConvert(o, OBJ_ENCODING_HT);" +
" break;" +
" }" +
" sdsfree(field);" +
" sdsfree(value);" +
" }" +
" ........"+
" /* Load remaining fields and values into the hash table */" +
" while (o->encoding == OBJ_ENCODING_HT && len > 0) {" +
" len--;" +
" /* Load encoded strings */" +
" if ((field = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL))" +
" == NULL) return NULL;" +
" if ((value = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL))" +
" == NULL) return NULL;" +
"" +
" /* Add pair to hash table */" +
" ret = dictAdd((dict*)o->ptr, field, value);" +
" if (ret == DICT_ERR) {" +
" rdbExitReportCorruptRDB(\"Duplicate keys detected\");" +
" }" +
" }" +
" }")
.interpretation("以hashtable为例,读取到对应的数据长度,创建对象,根据对象的编码方式,分别解析成ziplist或者是hashtable来存储");
总结
1、 bgsave不会阻塞redis其它命令的运行,通过fork子进程实现;
2、 RDB序列化内存对象的机制是先设定数据的类型表示,然后记下数据量,再记下数据值的长度,再记下数据本身
3、 启动加载RDB文件的解析就是按照既定的保存规则进行反序列化
RDB的优势与劣势
- 优势:RDB是一个紧凑压缩的二进制文件,适用于备份,全量复制的场景;它的恢复速度远快于AOF
- 劣势:不适用于实时持久化,实时操作成本高;老版本的Redis服务无法兼容新版本的Redis产生的RDB文件
附录
RDB启动加载源码
bgSave执行源码
书籍:Redis设计与实现、Redis开发与运维
AOF机制介绍