Skip to content

Commit a5e0272

Browse files
committed
Produce RDB 11 for replicas older than 9.0
Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
1 parent 8182f4a commit a5e0272

File tree

9 files changed

+203
-69
lines changed

9 files changed

+203
-69
lines changed

src/aof.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2401,7 +2401,7 @@ int rewriteAppendOnlyFile(char *filename) {
24012401

24022402
if (server.aof_use_rdb_preamble) {
24032403
int error;
2404-
if (rdbSaveRio(REPLICA_REQ_NONE, &aof, &error, RDBFLAGS_AOF_PREAMBLE, NULL) == C_ERR) {
2404+
if (rdbSaveRio(REPLICA_REQ_NONE, RDB_VERSION, &aof, &error, RDBFLAGS_AOF_PREAMBLE, NULL) == C_ERR) {
24052405
errno = error;
24062406
goto werr;
24072407
}

src/cluster.c

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,10 @@ void createDumpPayload(rio *payload, robj *o, robj *key, int dbid) {
126126
/* Serialize the object in an RDB-like format. It consist of an object type
127127
* byte followed by the serialized object. This is understood by RESTORE. */
128128
rioInitWithBuffer(payload, sdsempty());
129-
serverAssert(rdbSaveObjectType(payload, o));
130-
serverAssert(rdbSaveObject(payload, o, key, dbid));
129+
int rdbtype = rdbGetObjectType(o, RDB_VERSION);
130+
serverAssert(rdbtype >= 0);
131+
serverAssert(rdbSaveType(payload, rdbtype));
132+
serverAssert(rdbSaveObject(payload, o, key, dbid, rdbtype));
131133

132134
/* Write the footer, this is how it looks like:
133135
* ----------------+---------------------+---------------+

src/rdb.c

Lines changed: 45 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -707,43 +707,47 @@ int rdbLoadBinaryFloatValue(rio *rdb, float *val) {
707707
return 0;
708708
}
709709

710-
/* Save the object type of object "o". */
711-
int rdbSaveObjectType(rio *rdb, robj *o) {
710+
/* Return the RDB object type to use for saving object "o", or -1 if the object
711+
* can't be represented in the given RDB version (only for older RDB). */
712+
int rdbGetObjectType(robj *o, int rdbver) {
712713
switch (o->type) {
713-
case OBJ_STRING: return rdbSaveType(rdb, RDB_TYPE_STRING);
714+
case OBJ_STRING: return RDB_TYPE_STRING;
714715
case OBJ_LIST:
715716
if (o->encoding == OBJ_ENCODING_QUICKLIST || o->encoding == OBJ_ENCODING_LISTPACK)
716-
return rdbSaveType(rdb, RDB_TYPE_LIST_QUICKLIST_2);
717+
return RDB_TYPE_LIST_QUICKLIST_2;
717718
else
718719
serverPanic("Unknown list encoding");
719720
case OBJ_SET:
720721
if (o->encoding == OBJ_ENCODING_INTSET)
721-
return rdbSaveType(rdb, RDB_TYPE_SET_INTSET);
722+
return RDB_TYPE_SET_INTSET;
722723
else if (o->encoding == OBJ_ENCODING_HASHTABLE)
723-
return rdbSaveType(rdb, RDB_TYPE_SET);
724+
return RDB_TYPE_SET;
724725
else if (o->encoding == OBJ_ENCODING_LISTPACK)
725-
return rdbSaveType(rdb, RDB_TYPE_SET_LISTPACK);
726+
return RDB_TYPE_SET_LISTPACK;
726727
else
727728
serverPanic("Unknown set encoding");
728729
case OBJ_ZSET:
729730
if (o->encoding == OBJ_ENCODING_LISTPACK)
730-
return rdbSaveType(rdb, RDB_TYPE_ZSET_LISTPACK);
731+
return RDB_TYPE_ZSET_LISTPACK;
731732
else if (o->encoding == OBJ_ENCODING_SKIPLIST)
732-
return rdbSaveType(rdb, RDB_TYPE_ZSET_2);
733+
return RDB_TYPE_ZSET_2;
733734
else
734735
serverPanic("Unknown sorted set encoding");
735736
case OBJ_HASH:
736737
if (o->encoding == OBJ_ENCODING_LISTPACK)
737-
return rdbSaveType(rdb, RDB_TYPE_HASH_LISTPACK);
738+
return RDB_TYPE_HASH_LISTPACK;
738739
else if (o->encoding == OBJ_ENCODING_HASHTABLE)
739740
if (hashTypeHasVolatileFields(o))
740-
return rdbSaveType(rdb, RDB_TYPE_HASH_2);
741+
if (rdbver >= 80)
742+
return RDB_TYPE_HASH_2;
743+
else
744+
return -1; /* skip the key; can't be sent over old RDB */
741745
else
742-
return rdbSaveType(rdb, RDB_TYPE_HASH);
746+
return RDB_TYPE_HASH;
743747
else
744748
serverPanic("Unknown hash encoding");
745-
case OBJ_STREAM: return rdbSaveType(rdb, RDB_TYPE_STREAM_LISTPACKS_3);
746-
case OBJ_MODULE: return rdbSaveType(rdb, RDB_TYPE_MODULE_2);
749+
case OBJ_STREAM: return RDB_TYPE_STREAM_LISTPACKS_3;
750+
case OBJ_MODULE: return RDB_TYPE_MODULE_2;
747751
default: serverPanic("Unknown object type");
748752
}
749753
return -1; /* avoid warning */
@@ -860,7 +864,7 @@ size_t rdbSaveStreamConsumers(rio *rdb, streamCG *cg) {
860864

861865
/* Save an Object.
862866
* Returns -1 on error, number of bytes written on success. */
863-
ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) {
867+
ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid, unsigned char rdbtype) {
864868
ssize_t n = 0, nwritten = 0;
865869
if (o->type == OBJ_STRING) {
866870
/* Save a string value */
@@ -978,14 +982,15 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) {
978982
if ((n = rdbSaveRawString(rdb, o->ptr, l)) == -1) return -1;
979983
nwritten += n;
980984
} else if (o->encoding == OBJ_ENCODING_HASHTABLE) {
985+
serverAssert(rdbtype == RDB_TYPE_HASH || rdbtype == RDB_TYPE_HASH_2);
981986
hashtable *ht = o->ptr;
982987

983988
if ((n = rdbSaveLen(rdb, hashtableSize(ht))) == -1) {
984989
return -1;
985990
}
986991
nwritten += n;
987992
/* check if need to add expired time for the hash fields */
988-
bool add_expiry = hashTypeHasVolatileFields(o);
993+
bool add_expiry = (rdbtype == RDB_TYPE_HASH_2);
989994
hashtableIterator iter;
990995
hashtableInitIterator(&iter, ht, HASHTABLE_ITER_SKIP_VALIDATION);
991996
void *next;
@@ -1164,15 +1169,17 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) {
11641169
* this length with very little changes to the code. In the future
11651170
* we could switch to a faster solution. */
11661171
size_t rdbSavedObjectLen(robj *o, robj *key, int dbid) {
1167-
ssize_t len = rdbSaveObject(NULL, o, key, dbid);
1172+
int rdbtype = rdbGetObjectType(o, RDB_VERSION);
1173+
serverAssert(rdbtype != -1);
1174+
ssize_t len = rdbSaveObject(NULL, o, key, dbid, rdbtype);
11681175
serverAssertWithInfo(NULL, o, len != -1);
11691176
return len;
11701177
}
11711178

11721179
/* Save a key-value pair, with expire time, type, key, value.
11731180
* On error -1 is returned.
11741181
* On success if the key was actually saved 1 is returned. */
1175-
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime, int dbid) {
1182+
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime, int dbid, int rdbver) {
11761183
int savelru = server.maxmemory_policy & MAXMEMORY_FLAG_LRU;
11771184
int savelfu = server.maxmemory_policy & MAXMEMORY_FLAG_LFU;
11781185

@@ -1203,9 +1210,12 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime, in
12031210
}
12041211

12051212
/* Save type, key, value */
1206-
if (rdbSaveObjectType(rdb, val) == -1) return -1;
1207-
if (rdbSaveStringObject(rdb, key) == -1) return -1;
1208-
if (rdbSaveObject(rdb, val, key, dbid) == -1) return -1;
1213+
int rdbtype = rdbGetObjectType(val, rdbver);
1214+
if (rdbtype != -1) {
1215+
if (rdbSaveType(rdb, rdbtype) == -1) return -1;
1216+
if (rdbSaveStringObject(rdb, key) == -1) return -1;
1217+
if (rdbSaveObject(rdb, val, key, dbid, rdbtype) == -1) return -1;
1218+
}
12091219

12101220
/* Delay return if required (for testing) */
12111221
if (server.rdb_key_save_delay) debugDelay(server.rdb_key_save_delay);
@@ -1364,7 +1374,7 @@ ssize_t rdbSaveFunctions(rio *rdb) {
13641374
return -1;
13651375
}
13661376

1367-
ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) {
1377+
ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, int rdbver, long *key_counter) {
13681378
ssize_t written = 0;
13691379
ssize_t res;
13701380
kvstoreIterator *kvs_it = NULL;
@@ -1419,7 +1429,7 @@ ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) {
14191429

14201430
initStaticStringObject(key, keystr);
14211431
expire = objectGetExpire(o);
1422-
if ((res = rdbSaveKeyValuePair(rdb, &key, o, expire, dbid)) < 0) goto werr;
1432+
if ((res = rdbSaveKeyValuePair(rdb, &key, o, expire, dbid, rdbver)) < 0) goto werr;
14231433
written += res;
14241434

14251435
/* In fork child process, we can try to release memory back to the
@@ -1455,14 +1465,16 @@ ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) {
14551465
* When the function returns C_ERR and if 'error' is not NULL, the
14561466
* integer pointed by 'error' is set to the value of errno just after the I/O
14571467
* error. */
1458-
int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
1468+
int rdbSaveRio(int req, int rdbver, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
14591469
char magic[10];
14601470
uint64_t cksum;
14611471
long key_counter = 0;
14621472
int j;
14631473

14641474
if (server.rdb_checksum) rdb->update_cksum = rioGenericUpdateChecksum;
1465-
snprintf(magic, sizeof(magic), "VALKEY%03d", RDB_VERSION);
1475+
const char *magic_prefix = rdbUseValkeyMagic(rdbver) ? "VALKEY" : "REDIS0";
1476+
serverAssert(rdbver >= 0 && rdbver <= RDB_VERSION);
1477+
snprintf(magic, sizeof(magic), "%s%03d", magic_prefix, rdbver);
14661478
if (rdbWriteRaw(rdb, magic, 9) == -1) goto werr;
14671479
if (rdbSaveInfoAuxFields(rdb, rdbflags, rsi) == -1) goto werr;
14681480
if (!(req & REPLICA_REQ_RDB_EXCLUDE_DATA) && rdbSaveModulesAux(rdb, VALKEYMODULE_AUX_BEFORE_RDB) == -1) goto werr;
@@ -1474,9 +1486,9 @@ int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
14741486
if (!(req & REPLICA_REQ_RDB_EXCLUDE_DATA)) {
14751487
/* RDB slot import info is encoded in a required opcode since exposing
14761488
* importing slots is a consistency problem. */
1477-
if (clusterRDBSaveSlotImports(rdb) == C_ERR) goto werr;
1489+
if (rdbver >= 80 && clusterRDBSaveSlotImports(rdb) == C_ERR) goto werr;
14781490
for (j = 0; j < server.dbnum; j++) {
1479-
if (rdbSaveDb(rdb, j, rdbflags, &key_counter) == -1) goto werr;
1491+
if (rdbSaveDb(rdb, j, rdbflags, rdbver, &key_counter) == -1) goto werr;
14801492
}
14811493
}
14821494

@@ -1506,7 +1518,7 @@ int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
15061518
* While the suffix is the 40 bytes hex string we announced in the prefix.
15071519
* This way processes receiving the payload can understand when it ends
15081520
* without doing any processing of the content. */
1509-
int rdbSaveRioWithEOFMark(int req, rio *rdb, int *error, rdbSaveInfo *rsi) {
1521+
int rdbSaveRioWithEOFMark(int req, int rdbver, rio *rdb, int *error, rdbSaveInfo *rsi) {
15101522
char eofmark[RDB_EOF_MARK_SIZE];
15111523

15121524
startSaving(RDBFLAGS_REPLICATION);
@@ -1515,7 +1527,7 @@ int rdbSaveRioWithEOFMark(int req, rio *rdb, int *error, rdbSaveInfo *rsi) {
15151527
if (rioWrite(rdb, "$EOF:", 5) == 0) goto werr;
15161528
if (rioWrite(rdb, eofmark, RDB_EOF_MARK_SIZE) == 0) goto werr;
15171529
if (rioWrite(rdb, "\r\n", 2) == 0) goto werr;
1518-
if (rdbSaveRio(req, rdb, error, RDBFLAGS_REPLICATION, rsi) == C_ERR) goto werr;
1530+
if (rdbSaveRio(req, rdbver, rdb, error, RDBFLAGS_REPLICATION, rsi) == C_ERR) goto werr;
15191531
if (rioWrite(rdb, eofmark, RDB_EOF_MARK_SIZE) == 0) goto werr;
15201532
stopSaving(1);
15211533
return C_OK;
@@ -1554,7 +1566,7 @@ static int rdbSaveInternal(int req, const char *filename, rdbSaveInfo *rsi, int
15541566
if (!(rdbflags & RDBFLAGS_KEEP_CACHE)) rioSetReclaimCache(&rdb, 1);
15551567
}
15561568

1557-
if (rdbSaveRio(req, &rdb, &error, rdbflags, rsi) == C_ERR) {
1569+
if (rdbSaveRio(req, RDB_VERSION, &rdb, &error, rdbflags, rsi) == C_ERR) {
15581570
errno = error;
15591571
err_op = "rdbSaveRio";
15601572
goto werr;
@@ -3655,7 +3667,7 @@ void killRDBChild(void) {
36553667

36563668
/* Spawn an RDB child that writes the RDB to the sockets of the replicas
36573669
* that are currently in REPLICA_STATE_WAIT_BGSAVE_START state. */
3658-
int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
3670+
int rdbSaveToReplicasSockets(int req, int rdbver, rdbSaveInfo *rsi) {
36593671
listNode *ln;
36603672
listIter li;
36613673
pid_t childpid;
@@ -3711,6 +3723,7 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
37113723
if (replica->repl_data->repl_state == REPLICA_STATE_WAIT_BGSAVE_START) {
37123724
/* Check replica has the exact requirements */
37133725
if (replica->repl_data->replica_req != req) continue;
3726+
if (replicaRdbVersion(replica) != rdbver) continue;
37143727

37153728
conns[connsnum++] = replica->conn;
37163729
if (dual_channel) {
@@ -3756,7 +3769,7 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
37563769

37573770
if (skip_rdb_checksum) rdb.flags |= RIO_FLAG_SKIP_RDB_CHECKSUM;
37583771

3759-
retval = rdbSaveRioWithEOFMark(req, &rdb, NULL, rsi);
3772+
retval = rdbSaveRioWithEOFMark(req, rdbver, &rdb, NULL, rsi);
37603773
if (retval == C_OK && rioFlush(&rdb) == 0) retval = C_ERR;
37613774

37623775
if (retval == C_OK) {

src/rdb.h

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@ static inline bool rdbIsForeignVersion(int rdbver) {
6161
return rdbver >= RDB_FOREIGN_VERSION_MIN && rdbver <= RDB_FOREIGN_VERSION_MAX;
6262
}
6363

64+
static inline bool rdbUseValkeyMagic(int rdbver) {
65+
return rdbver > RDB_FOREIGN_VERSION_MAX;
66+
}
67+
6468
/* Defines related to the dump file format. To store 32 bits lengths for short
6569
* keys requires a lot of space, so we check the most significant 2 bits of
6670
* the first byte to interpreter the length:
@@ -110,13 +114,13 @@ enum RdbType {
110114
RDB_TYPE_HASH_ZIPLIST = 13,
111115
RDB_TYPE_LIST_QUICKLIST = 14,
112116
RDB_TYPE_STREAM_LISTPACKS = 15,
113-
RDB_TYPE_HASH_LISTPACK = 16,
117+
RDB_TYPE_HASH_LISTPACK = 16, /* Added in RDB 10 (7.0) */
114118
RDB_TYPE_ZSET_LISTPACK = 17,
115119
RDB_TYPE_LIST_QUICKLIST_2 = 18,
116120
RDB_TYPE_STREAM_LISTPACKS_2 = 19,
117-
RDB_TYPE_SET_LISTPACK = 20,
121+
RDB_TYPE_SET_LISTPACK = 20, /* Added in RDB 11 (7.2) */
118122
RDB_TYPE_STREAM_LISTPACKS_3 = 21,
119-
RDB_TYPE_HASH_2 = 22, /* Hash with field-level expiration (Valkey 9.0) */
123+
RDB_TYPE_HASH_2 = 22, /* Hash with field-level expiration, RDB 80 (9.0) */
120124
RDB_TYPE_LAST
121125
};
122126
/* NOTE: WHEN ADDING NEW RDB TYPE, UPDATE rdb_type_string[] */
@@ -131,7 +135,7 @@ enum RdbType {
131135

132136
/* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType).
133137
* These are special RDB types, but they start from 255 and grow down. */
134-
#define RDB_OPCODE_SLOT_IMPORT 243 /* Slot import state. */
138+
#define RDB_OPCODE_SLOT_IMPORT 243 /* Slot import state (9.0). */
135139
#define RDB_OPCODE_SLOT_INFO 244 /* Foreign slot info, safe to ignore. */
136140
#define RDB_OPCODE_FUNCTION2 245 /* function library data */
137141
#define RDB_OPCODE_FUNCTION_PRE_GA 246 /* old function library data for 7.0 rc1 and rc2 */
@@ -183,19 +187,19 @@ ssize_t rdbSaveMillisecondTime(rio *rdb, long long t);
183187
long long rdbLoadMillisecondTime(rio *rdb, int rdbver);
184188
uint64_t rdbLoadLen(rio *rdb, int *isencoded);
185189
int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr);
186-
int rdbSaveObjectType(rio *rdb, robj *o);
190+
int rdbGetObjectType(robj *o, int rdbver);
187191
int rdbLoadObjectType(rio *rdb);
188192
int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags);
189193
int rdbSaveBackground(int req, char *filename, rdbSaveInfo *rsi, int rdbflags);
190-
int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi);
194+
int rdbSaveToReplicasSockets(int req, int rdbver, rdbSaveInfo *rsi);
191195
void rdbRemoveTempFile(pid_t childpid, int from_signal);
192196
int rdbSaveToFile(const char *filename);
193197
int rdbSave(int req, char *filename, rdbSaveInfo *rsi, int rdbflags);
194-
ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid);
198+
ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid, unsigned char type);
195199
size_t rdbSavedObjectLen(robj *o, robj *key, int dbid);
196200
robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error);
197201
void backgroundSaveDoneHandler(int exitcode, int bysignal);
198-
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime, int dbid);
202+
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime, int dbid, int rdbver);
199203
ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt);
200204
robj *rdbLoadCheckModuleValue(rio *rdb, char *modulename);
201205
robj *rdbLoadStringObject(rio *rdb);
@@ -209,7 +213,7 @@ int rdbLoadBinaryFloatValue(rio *rdb, float *val);
209213
int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi);
210214
int rdbLoadRioWithLoadingCtxScopedRdb(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx);
211215
int rdbFunctionLoad(rio *rdb, int ver, functionsLibCtx *lib_ctx, int rdbflags, sds *err);
212-
int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi);
216+
int rdbSaveRio(int req, int rdbver, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi);
213217
ssize_t rdbSaveFunctions(rio *rdb);
214218
rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi);
215219

0 commit comments

Comments
 (0)