Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -2401,7 +2401,7 @@ int rewriteAppendOnlyFile(char *filename) {

if (server.aof_use_rdb_preamble) {
int error;
if (rdbSaveRio(REPLICA_REQ_NONE, &aof, &error, RDBFLAGS_AOF_PREAMBLE, NULL) == C_ERR) {
if (rdbSaveRio(REPLICA_REQ_NONE, RDB_VERSION, &aof, &error, RDBFLAGS_AOF_PREAMBLE, NULL) == C_ERR) {
errno = error;
goto werr;
}
Expand Down
6 changes: 4 additions & 2 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,10 @@ void createDumpPayload(rio *payload, robj *o, robj *key, int dbid) {
/* Serialize the object in an RDB-like format. It consist of an object type
* byte followed by the serialized object. This is understood by RESTORE. */
rioInitWithBuffer(payload, sdsempty());
serverAssert(rdbSaveObjectType(payload, o));
serverAssert(rdbSaveObject(payload, o, key, dbid));
int rdbtype = rdbGetObjectType(o, RDB_VERSION);
serverAssert(rdbtype >= 0);
serverAssert(rdbSaveType(payload, rdbtype));
serverAssert(rdbSaveObject(payload, o, key, dbid, rdbtype));

/* Write the footer, this is how it looks like:
* ----------------+---------------------+---------------+
Expand Down
6 changes: 5 additions & 1 deletion src/cluster_migrateslots.c
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ void fireModuleSlotMigrationEvent(slotMigrationJob *job, int subevent) {

/* Save the active slot imports to the RDB file. The import job name and the
* slot ranges are saved. */
int clusterRDBSaveSlotImports(rio *rdb) {
int clusterRDBSaveSlotImports(rio *rdb, int rdbver) {
if (!server.cluster_enabled) return C_OK;
if (listLength(server.cluster->slot_migration_jobs) == 0) return C_OK;
listNode *ln;
Expand All @@ -371,6 +371,10 @@ int clusterRDBSaveSlotImports(rio *rdb) {
slotMigrationJob *job = ln->value;
if (isSlotMigrationJobFinished(job)) continue;
if (job->type == SLOT_MIGRATION_EXPORT) continue;
if (rdbver < 80) {
serverLog(LL_WARNING, "Can't store slot migrations in RDB version %d", rdbver);
return C_ERR;
}
if (rdbSaveType(rdb, RDB_OPCODE_SLOT_IMPORT) < 0) return C_ERR;
if (rdbSaveRawString(rdb, (unsigned char *)job->name, CLUSTER_NAMELEN) < 0) return C_ERR;
if (rdbSaveLen(rdb, listLength(job->slot_ranges)) < 0) return C_ERR;
Expand Down
2 changes: 1 addition & 1 deletion src/cluster_migrateslots.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ void clusterCleanSlotImportsOnFullSync(void);
void clusterCleanSlotImportsOnPromotion(void);
void clusterCleanSlotImportsBeforeLoad(void);
void clusterCleanSlotImportsAfterLoad(void);
int clusterRDBSaveSlotImports(rio *rdb);
int clusterRDBSaveSlotImports(rio *rdb, int rdbver);
int clusterRDBLoadSlotImport(rio *rdb);

#endif /* __CLUSTER_MIGRATESLOTS_H */
78 changes: 47 additions & 31 deletions src/rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -707,43 +707,47 @@ int rdbLoadBinaryFloatValue(rio *rdb, float *val) {
return 0;
}

/* Save the object type of object "o". */
int rdbSaveObjectType(rio *rdb, robj *o) {
/* Return the RDB object type to use for saving object "o", or -1 if the object
* can't be represented in the given RDB version (only for older RDB). */
int rdbGetObjectType(robj *o, int rdbver) {
switch (o->type) {
case OBJ_STRING: return rdbSaveType(rdb, RDB_TYPE_STRING);
case OBJ_STRING: return RDB_TYPE_STRING;
case OBJ_LIST:
if (o->encoding == OBJ_ENCODING_QUICKLIST || o->encoding == OBJ_ENCODING_LISTPACK)
return rdbSaveType(rdb, RDB_TYPE_LIST_QUICKLIST_2);
return RDB_TYPE_LIST_QUICKLIST_2;
else
serverPanic("Unknown list encoding");
case OBJ_SET:
if (o->encoding == OBJ_ENCODING_INTSET)
return rdbSaveType(rdb, RDB_TYPE_SET_INTSET);
return RDB_TYPE_SET_INTSET;
else if (o->encoding == OBJ_ENCODING_HASHTABLE)
return rdbSaveType(rdb, RDB_TYPE_SET);
return RDB_TYPE_SET;
else if (o->encoding == OBJ_ENCODING_LISTPACK)
return rdbSaveType(rdb, RDB_TYPE_SET_LISTPACK);
return RDB_TYPE_SET_LISTPACK;
else
serverPanic("Unknown set encoding");
case OBJ_ZSET:
if (o->encoding == OBJ_ENCODING_LISTPACK)
return rdbSaveType(rdb, RDB_TYPE_ZSET_LISTPACK);
return RDB_TYPE_ZSET_LISTPACK;
else if (o->encoding == OBJ_ENCODING_SKIPLIST)
return rdbSaveType(rdb, RDB_TYPE_ZSET_2);
return RDB_TYPE_ZSET_2;
else
serverPanic("Unknown sorted set encoding");
case OBJ_HASH:
if (o->encoding == OBJ_ENCODING_LISTPACK)
return rdbSaveType(rdb, RDB_TYPE_HASH_LISTPACK);
return RDB_TYPE_HASH_LISTPACK;
else if (o->encoding == OBJ_ENCODING_HASHTABLE)
if (hashTypeHasVolatileFields(o))
return rdbSaveType(rdb, RDB_TYPE_HASH_2);
if (rdbver >= 80)
return RDB_TYPE_HASH_2;
else
return -1; /* skip the key; can't be sent over old RDB */
else
return rdbSaveType(rdb, RDB_TYPE_HASH);
return RDB_TYPE_HASH;
else
serverPanic("Unknown hash encoding");
case OBJ_STREAM: return rdbSaveType(rdb, RDB_TYPE_STREAM_LISTPACKS_3);
case OBJ_MODULE: return rdbSaveType(rdb, RDB_TYPE_MODULE_2);
case OBJ_STREAM: return RDB_TYPE_STREAM_LISTPACKS_3;
case OBJ_MODULE: return RDB_TYPE_MODULE_2;
default: serverPanic("Unknown object type");
}
return -1; /* avoid warning */
Expand Down Expand Up @@ -860,7 +864,7 @@ size_t rdbSaveStreamConsumers(rio *rdb, streamCG *cg) {

/* Save an Object.
* Returns -1 on error, number of bytes written on success. */
ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) {
ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid, unsigned char rdbtype) {
ssize_t n = 0, nwritten = 0;
if (o->type == OBJ_STRING) {
/* Save a string value */
Expand Down Expand Up @@ -978,14 +982,15 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) {
if ((n = rdbSaveRawString(rdb, o->ptr, l)) == -1) return -1;
nwritten += n;
} else if (o->encoding == OBJ_ENCODING_HASHTABLE) {
serverAssert(rdbtype == RDB_TYPE_HASH || rdbtype == RDB_TYPE_HASH_2);
hashtable *ht = o->ptr;

if ((n = rdbSaveLen(rdb, hashtableSize(ht))) == -1) {
return -1;
}
nwritten += n;
/* check if need to add expired time for the hash fields */
bool add_expiry = hashTypeHasVolatileFields(o);
bool add_expiry = (rdbtype == RDB_TYPE_HASH_2);
hashtableIterator iter;
hashtableInitIterator(&iter, ht, HASHTABLE_ITER_SKIP_VALIDATION);
void *next;
Expand Down Expand Up @@ -1164,15 +1169,17 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) {
* this length with very little changes to the code. In the future
* we could switch to a faster solution. */
size_t rdbSavedObjectLen(robj *o, robj *key, int dbid) {
ssize_t len = rdbSaveObject(NULL, o, key, dbid);
int rdbtype = rdbGetObjectType(o, RDB_VERSION);
serverAssert(rdbtype != -1);
ssize_t len = rdbSaveObject(NULL, o, key, dbid, rdbtype);
serverAssertWithInfo(NULL, o, len != -1);
return len;
}

/* Save a key-value pair, with expire time, type, key, value.
* On error -1 is returned.
* On success if the key was actually saved 1 is returned. */
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime, int dbid) {
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime, int dbid, int rdbver) {
int savelru = server.maxmemory_policy & MAXMEMORY_FLAG_LRU;
int savelfu = server.maxmemory_policy & MAXMEMORY_FLAG_LFU;

Expand Down Expand Up @@ -1203,9 +1210,15 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime, in
}

/* Save type, key, value */
if (rdbSaveObjectType(rdb, val) == -1) return -1;
int rdbtype = rdbGetObjectType(val, rdbver);
if (rdbtype == -1) {
serverLog(LL_WARNING, "Can't store key '%s' (db %d) in RDB version %d",
(char *)key->ptr, dbid, rdbver);
return -1;
}
if (rdbSaveType(rdb, rdbtype) == -1) return -1;
if (rdbSaveStringObject(rdb, key) == -1) return -1;
if (rdbSaveObject(rdb, val, key, dbid) == -1) return -1;
if (rdbSaveObject(rdb, val, key, dbid, rdbtype) == -1) return -1;

/* Delay return if required (for testing) */
if (server.rdb_key_save_delay) debugDelay(server.rdb_key_save_delay);
Expand Down Expand Up @@ -1364,7 +1377,7 @@ ssize_t rdbSaveFunctions(rio *rdb) {
return -1;
}

ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) {
ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, int rdbver, long *key_counter) {
ssize_t written = 0;
ssize_t res;
kvstoreIterator *kvs_it = NULL;
Expand Down Expand Up @@ -1419,7 +1432,7 @@ ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) {

initStaticStringObject(key, keystr);
expire = objectGetExpire(o);
if ((res = rdbSaveKeyValuePair(rdb, &key, o, expire, dbid)) < 0) goto werr;
if ((res = rdbSaveKeyValuePair(rdb, &key, o, expire, dbid, rdbver)) < 0) goto werr;
written += res;

/* In fork child process, we can try to release memory back to the
Expand Down Expand Up @@ -1455,14 +1468,16 @@ ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) {
* When the function returns C_ERR and if 'error' is not NULL, the
* integer pointed by 'error' is set to the value of errno just after the I/O
* error. */
int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
int rdbSaveRio(int req, int rdbver, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
char magic[10];
uint64_t cksum;
long key_counter = 0;
int j;

if (server.rdb_checksum) rdb->update_cksum = rioGenericUpdateChecksum;
snprintf(magic, sizeof(magic), "VALKEY%03d", RDB_VERSION);
const char *magic_prefix = rdbUseValkeyMagic(rdbver) ? "VALKEY" : "REDIS0";
serverAssert(rdbver >= 0 && rdbver <= RDB_VERSION);
snprintf(magic, sizeof(magic), "%s%03d", magic_prefix, rdbver);
if (rdbWriteRaw(rdb, magic, 9) == -1) goto werr;
if (rdbSaveInfoAuxFields(rdb, rdbflags, rsi) == -1) goto werr;
if (!(req & REPLICA_REQ_RDB_EXCLUDE_DATA) && rdbSaveModulesAux(rdb, VALKEYMODULE_AUX_BEFORE_RDB) == -1) goto werr;
Expand All @@ -1474,9 +1489,9 @@ int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
if (!(req & REPLICA_REQ_RDB_EXCLUDE_DATA)) {
/* RDB slot import info is encoded in a required opcode since exposing
* importing slots is a consistency problem. */
if (clusterRDBSaveSlotImports(rdb) == C_ERR) goto werr;
if (clusterRDBSaveSlotImports(rdb, rdbver) == C_ERR) goto werr;
for (j = 0; j < server.dbnum; j++) {
if (rdbSaveDb(rdb, j, rdbflags, &key_counter) == -1) goto werr;
if (rdbSaveDb(rdb, j, rdbflags, rdbver, &key_counter) == -1) goto werr;
}
}

Expand Down Expand Up @@ -1506,7 +1521,7 @@ int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
* While the suffix is the 40 bytes hex string we announced in the prefix.
* This way processes receiving the payload can understand when it ends
* without doing any processing of the content. */
int rdbSaveRioWithEOFMark(int req, rio *rdb, int *error, rdbSaveInfo *rsi) {
int rdbSaveRioWithEOFMark(int req, int rdbver, rio *rdb, int *error, rdbSaveInfo *rsi) {
char eofmark[RDB_EOF_MARK_SIZE];

startSaving(RDBFLAGS_REPLICATION);
Expand All @@ -1515,7 +1530,7 @@ int rdbSaveRioWithEOFMark(int req, rio *rdb, int *error, rdbSaveInfo *rsi) {
if (rioWrite(rdb, "$EOF:", 5) == 0) goto werr;
if (rioWrite(rdb, eofmark, RDB_EOF_MARK_SIZE) == 0) goto werr;
if (rioWrite(rdb, "\r\n", 2) == 0) goto werr;
if (rdbSaveRio(req, rdb, error, RDBFLAGS_REPLICATION, rsi) == C_ERR) goto werr;
if (rdbSaveRio(req, rdbver, rdb, error, RDBFLAGS_REPLICATION, rsi) == C_ERR) goto werr;
if (rioWrite(rdb, eofmark, RDB_EOF_MARK_SIZE) == 0) goto werr;
stopSaving(1);
return C_OK;
Expand Down Expand Up @@ -1554,7 +1569,7 @@ static int rdbSaveInternal(int req, const char *filename, rdbSaveInfo *rsi, int
if (!(rdbflags & RDBFLAGS_KEEP_CACHE)) rioSetReclaimCache(&rdb, 1);
}

if (rdbSaveRio(req, &rdb, &error, rdbflags, rsi) == C_ERR) {
if (rdbSaveRio(req, RDB_VERSION, &rdb, &error, rdbflags, rsi) == C_ERR) {
errno = error;
err_op = "rdbSaveRio";
goto werr;
Expand Down Expand Up @@ -3655,7 +3670,7 @@ void killRDBChild(void) {

/* Spawn an RDB child that writes the RDB to the sockets of the replicas
* that are currently in REPLICA_STATE_WAIT_BGSAVE_START state. */
int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
int rdbSaveToReplicasSockets(int req, int rdbver, rdbSaveInfo *rsi) {
listNode *ln;
listIter li;
pid_t childpid;
Expand Down Expand Up @@ -3711,6 +3726,7 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
if (replica->repl_data->repl_state == REPLICA_STATE_WAIT_BGSAVE_START) {
/* Check replica has the exact requirements */
if (replica->repl_data->replica_req != req) continue;
if (replicaRdbVersion(replica) != rdbver) continue;

conns[connsnum++] = replica->conn;
if (dual_channel) {
Expand Down Expand Up @@ -3756,7 +3772,7 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {

if (skip_rdb_checksum) rdb.flags |= RIO_FLAG_SKIP_RDB_CHECKSUM;

retval = rdbSaveRioWithEOFMark(req, &rdb, NULL, rsi);
retval = rdbSaveRioWithEOFMark(req, rdbver, &rdb, NULL, rsi);
if (retval == C_OK && rioFlush(&rdb) == 0) retval = C_ERR;

if (retval == C_OK) {
Expand Down
22 changes: 13 additions & 9 deletions src/rdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ static inline bool rdbIsForeignVersion(int rdbver) {
return rdbver >= RDB_FOREIGN_VERSION_MIN && rdbver <= RDB_FOREIGN_VERSION_MAX;
}

static inline bool rdbUseValkeyMagic(int rdbver) {
return rdbver > RDB_FOREIGN_VERSION_MAX;
}

/* Defines related to the dump file format. To store 32 bits lengths for short
* keys requires a lot of space, so we check the most significant 2 bits of
* the first byte to interpreter the length:
Expand Down Expand Up @@ -110,13 +114,13 @@ enum RdbType {
RDB_TYPE_HASH_ZIPLIST = 13,
RDB_TYPE_LIST_QUICKLIST = 14,
RDB_TYPE_STREAM_LISTPACKS = 15,
RDB_TYPE_HASH_LISTPACK = 16,
RDB_TYPE_HASH_LISTPACK = 16, /* Added in RDB 10 (7.0) */
RDB_TYPE_ZSET_LISTPACK = 17,
RDB_TYPE_LIST_QUICKLIST_2 = 18,
RDB_TYPE_STREAM_LISTPACKS_2 = 19,
RDB_TYPE_SET_LISTPACK = 20,
RDB_TYPE_SET_LISTPACK = 20, /* Added in RDB 11 (7.2) */
RDB_TYPE_STREAM_LISTPACKS_3 = 21,
RDB_TYPE_HASH_2 = 22, /* Hash with field-level expiration (Valkey 9.0) */
RDB_TYPE_HASH_2 = 22, /* Hash with field-level expiration, RDB 80 (9.0) */
RDB_TYPE_LAST
};
/* NOTE: WHEN ADDING NEW RDB TYPE, UPDATE rdb_type_string[] */
Expand All @@ -131,7 +135,7 @@ enum RdbType {

/* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType).
* These are special RDB types, but they start from 255 and grow down. */
#define RDB_OPCODE_SLOT_IMPORT 243 /* Slot import state. */
#define RDB_OPCODE_SLOT_IMPORT 243 /* Slot import state (9.0). */
#define RDB_OPCODE_SLOT_INFO 244 /* Foreign slot info, safe to ignore. */
#define RDB_OPCODE_FUNCTION2 245 /* function library data */
#define RDB_OPCODE_FUNCTION_PRE_GA 246 /* old function library data for 7.0 rc1 and rc2 */
Expand Down Expand Up @@ -183,19 +187,19 @@ ssize_t rdbSaveMillisecondTime(rio *rdb, long long t);
long long rdbLoadMillisecondTime(rio *rdb, int rdbver);
uint64_t rdbLoadLen(rio *rdb, int *isencoded);
int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr);
int rdbSaveObjectType(rio *rdb, robj *o);
int rdbGetObjectType(robj *o, int rdbver);
int rdbLoadObjectType(rio *rdb);
int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags);
int rdbSaveBackground(int req, char *filename, rdbSaveInfo *rsi, int rdbflags);
int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi);
int rdbSaveToReplicasSockets(int req, int rdbver, rdbSaveInfo *rsi);
void rdbRemoveTempFile(pid_t childpid, int from_signal);
int rdbSaveToFile(const char *filename);
int rdbSave(int req, char *filename, rdbSaveInfo *rsi, int rdbflags);
ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid);
ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid, unsigned char type);
size_t rdbSavedObjectLen(robj *o, robj *key, int dbid);
robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error);
void backgroundSaveDoneHandler(int exitcode, int bysignal);
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime, int dbid);
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime, int dbid, int rdbver);
ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt);
robj *rdbLoadCheckModuleValue(rio *rdb, char *modulename);
robj *rdbLoadStringObject(rio *rdb);
Expand All @@ -209,7 +213,7 @@ int rdbLoadBinaryFloatValue(rio *rdb, float *val);
int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi);
int rdbLoadRioWithLoadingCtxScopedRdb(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx);
int rdbFunctionLoad(rio *rdb, int ver, functionsLibCtx *lib_ctx, int rdbflags, sds *err);
int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi);
int rdbSaveRio(int req, int rdbver, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi);
ssize_t rdbSaveFunctions(rio *rdb);
rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi);

Expand Down
Loading
Loading