@@ -707,43 +707,47 @@ int rdbLoadBinaryFloatValue(rio *rdb, float *val) {
707707 return 0 ;
708708}
709709
710- /* Save the object type of object "o". */
711- int rdbSaveObjectType (rio * rdb , robj * o ) {
710+ /* Return the RDB object type to use for saving object "o", or -1 if the object
711+ * can't be represented in the given RDB version (only for older RDB). */
712+ int rdbGetObjectType (robj * o , int rdbver ) {
712713 switch (o -> type ) {
713- case OBJ_STRING : return rdbSaveType ( rdb , RDB_TYPE_STRING ) ;
714+ case OBJ_STRING : return RDB_TYPE_STRING ;
714715 case OBJ_LIST :
715716 if (o -> encoding == OBJ_ENCODING_QUICKLIST || o -> encoding == OBJ_ENCODING_LISTPACK )
716- return rdbSaveType ( rdb , RDB_TYPE_LIST_QUICKLIST_2 ) ;
717+ return RDB_TYPE_LIST_QUICKLIST_2 ;
717718 else
718719 serverPanic ("Unknown list encoding" );
719720 case OBJ_SET :
720721 if (o -> encoding == OBJ_ENCODING_INTSET )
721- return rdbSaveType ( rdb , RDB_TYPE_SET_INTSET ) ;
722+ return RDB_TYPE_SET_INTSET ;
722723 else if (o -> encoding == OBJ_ENCODING_HASHTABLE )
723- return rdbSaveType ( rdb , RDB_TYPE_SET ) ;
724+ return RDB_TYPE_SET ;
724725 else if (o -> encoding == OBJ_ENCODING_LISTPACK )
725- return rdbSaveType ( rdb , RDB_TYPE_SET_LISTPACK ) ;
726+ return RDB_TYPE_SET_LISTPACK ;
726727 else
727728 serverPanic ("Unknown set encoding" );
728729 case OBJ_ZSET :
729730 if (o -> encoding == OBJ_ENCODING_LISTPACK )
730- return rdbSaveType ( rdb , RDB_TYPE_ZSET_LISTPACK ) ;
731+ return RDB_TYPE_ZSET_LISTPACK ;
731732 else if (o -> encoding == OBJ_ENCODING_SKIPLIST )
732- return rdbSaveType ( rdb , RDB_TYPE_ZSET_2 ) ;
733+ return RDB_TYPE_ZSET_2 ;
733734 else
734735 serverPanic ("Unknown sorted set encoding" );
735736 case OBJ_HASH :
736737 if (o -> encoding == OBJ_ENCODING_LISTPACK )
737- return rdbSaveType ( rdb , RDB_TYPE_HASH_LISTPACK ) ;
738+ return RDB_TYPE_HASH_LISTPACK ;
738739 else if (o -> encoding == OBJ_ENCODING_HASHTABLE )
739740 if (hashTypeHasVolatileFields (o ))
740- return rdbSaveType (rdb , RDB_TYPE_HASH_2 );
741+ if (rdbver >= 80 )
742+ return RDB_TYPE_HASH_2 ;
743+ else
744+ return -1 ; /* skip the key; can't be sent over old RDB */
741745 else
742- return rdbSaveType ( rdb , RDB_TYPE_HASH ) ;
746+ return RDB_TYPE_HASH ;
743747 else
744748 serverPanic ("Unknown hash encoding" );
745- case OBJ_STREAM : return rdbSaveType ( rdb , RDB_TYPE_STREAM_LISTPACKS_3 ) ;
746- case OBJ_MODULE : return rdbSaveType ( rdb , RDB_TYPE_MODULE_2 ) ;
749+ case OBJ_STREAM : return RDB_TYPE_STREAM_LISTPACKS_3 ;
750+ case OBJ_MODULE : return RDB_TYPE_MODULE_2 ;
747751 default : serverPanic ("Unknown object type" );
748752 }
749753 return -1 ; /* avoid warning */
@@ -860,7 +864,7 @@ size_t rdbSaveStreamConsumers(rio *rdb, streamCG *cg) {
860864
861865/* Save an Object.
862866 * Returns -1 on error, number of bytes written on success. */
863- ssize_t rdbSaveObject (rio * rdb , robj * o , robj * key , int dbid ) {
867+ ssize_t rdbSaveObject (rio * rdb , robj * o , robj * key , int dbid , unsigned char rdbtype ) {
864868 ssize_t n = 0 , nwritten = 0 ;
865869 if (o -> type == OBJ_STRING ) {
866870 /* Save a string value */
@@ -978,14 +982,15 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) {
978982 if ((n = rdbSaveRawString (rdb , o -> ptr , l )) == -1 ) return -1 ;
979983 nwritten += n ;
980984 } else if (o -> encoding == OBJ_ENCODING_HASHTABLE ) {
985+ serverAssert (rdbtype == RDB_TYPE_HASH || rdbtype == RDB_TYPE_HASH_2 );
981986 hashtable * ht = o -> ptr ;
982987
983988 if ((n = rdbSaveLen (rdb , hashtableSize (ht ))) == -1 ) {
984989 return -1 ;
985990 }
986991 nwritten += n ;
987992 /* check if need to add expired time for the hash fields */
988- bool add_expiry = hashTypeHasVolatileFields ( o );
993+ bool add_expiry = ( rdbtype == RDB_TYPE_HASH_2 );
989994 hashtableIterator iter ;
990995 hashtableInitIterator (& iter , ht , HASHTABLE_ITER_SKIP_VALIDATION );
991996 void * next ;
@@ -1164,15 +1169,17 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) {
11641169 * this length with very little changes to the code. In the future
11651170 * we could switch to a faster solution. */
11661171size_t rdbSavedObjectLen (robj * o , robj * key , int dbid ) {
1167- ssize_t len = rdbSaveObject (NULL , o , key , dbid );
1172+ int rdbtype = rdbGetObjectType (o , RDB_VERSION );
1173+ serverAssert (rdbtype != -1 );
1174+ ssize_t len = rdbSaveObject (NULL , o , key , dbid , rdbtype );
11681175 serverAssertWithInfo (NULL , o , len != -1 );
11691176 return len ;
11701177}
11711178
11721179/* Save a key-value pair, with expire time, type, key, value.
11731180 * On error -1 is returned.
11741181 * On success if the key was actually saved 1 is returned. */
1175- int rdbSaveKeyValuePair (rio * rdb , robj * key , robj * val , long long expiretime , int dbid ) {
1182+ int rdbSaveKeyValuePair (rio * rdb , robj * key , robj * val , long long expiretime , int dbid , int rdbver ) {
11761183 int savelru = server .maxmemory_policy & MAXMEMORY_FLAG_LRU ;
11771184 int savelfu = server .maxmemory_policy & MAXMEMORY_FLAG_LFU ;
11781185
@@ -1203,9 +1210,15 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime, in
12031210 }
12041211
12051212 /* Save type, key, value */
1206- if (rdbSaveObjectType (rdb , val ) == -1 ) return -1 ;
1213+ int rdbtype = rdbGetObjectType (val , rdbver );
1214+ if (rdbtype == -1 ) {
1215+ serverLog (LL_WARNING , "Can't store key '%s' (db %d) in RDB version %d" ,
1216+ (char * )key -> ptr , dbid , rdbver );
1217+ return -1 ;
1218+ }
1219+ if (rdbSaveType (rdb , rdbtype ) == -1 ) return -1 ;
12071220 if (rdbSaveStringObject (rdb , key ) == -1 ) return -1 ;
1208- if (rdbSaveObject (rdb , val , key , dbid ) == -1 ) return -1 ;
1221+ if (rdbSaveObject (rdb , val , key , dbid , rdbtype ) == -1 ) return -1 ;
12091222
12101223 /* Delay return if required (for testing) */
12111224 if (server .rdb_key_save_delay ) debugDelay (server .rdb_key_save_delay );
@@ -1364,7 +1377,7 @@ ssize_t rdbSaveFunctions(rio *rdb) {
13641377 return -1 ;
13651378}
13661379
1367- ssize_t rdbSaveDb (rio * rdb , int dbid , int rdbflags , long * key_counter ) {
1380+ ssize_t rdbSaveDb (rio * rdb , int dbid , int rdbflags , int rdbver , long * key_counter ) {
13681381 ssize_t written = 0 ;
13691382 ssize_t res ;
13701383 kvstoreIterator * kvs_it = NULL ;
@@ -1419,7 +1432,7 @@ ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) {
14191432
14201433 initStaticStringObject (key , keystr );
14211434 expire = objectGetExpire (o );
1422- if ((res = rdbSaveKeyValuePair (rdb , & key , o , expire , dbid )) < 0 ) goto werr ;
1435+ if ((res = rdbSaveKeyValuePair (rdb , & key , o , expire , dbid , rdbver )) < 0 ) goto werr ;
14231436 written += res ;
14241437
14251438 /* In fork child process, we can try to release memory back to the
@@ -1455,14 +1468,16 @@ ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) {
14551468 * When the function returns C_ERR and if 'error' is not NULL, the
14561469 * integer pointed by 'error' is set to the value of errno just after the I/O
14571470 * error. */
1458- int rdbSaveRio (int req , rio * rdb , int * error , int rdbflags , rdbSaveInfo * rsi ) {
1471+ int rdbSaveRio (int req , int rdbver , rio * rdb , int * error , int rdbflags , rdbSaveInfo * rsi ) {
14591472 char magic [10 ];
14601473 uint64_t cksum ;
14611474 long key_counter = 0 ;
14621475 int j ;
14631476
14641477 if (server .rdb_checksum ) rdb -> update_cksum = rioGenericUpdateChecksum ;
1465- snprintf (magic , sizeof (magic ), "VALKEY%03d" , RDB_VERSION );
1478+ const char * magic_prefix = rdbUseValkeyMagic (rdbver ) ? "VALKEY" : "REDIS0" ;
1479+ serverAssert (rdbver >= 0 && rdbver <= RDB_VERSION );
1480+ snprintf (magic , sizeof (magic ), "%s%03d" , magic_prefix , rdbver );
14661481 if (rdbWriteRaw (rdb , magic , 9 ) == -1 ) goto werr ;
14671482 if (rdbSaveInfoAuxFields (rdb , rdbflags , rsi ) == -1 ) goto werr ;
14681483 if (!(req & REPLICA_REQ_RDB_EXCLUDE_DATA ) && rdbSaveModulesAux (rdb , VALKEYMODULE_AUX_BEFORE_RDB ) == -1 ) goto werr ;
@@ -1474,9 +1489,9 @@ int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
14741489 if (!(req & REPLICA_REQ_RDB_EXCLUDE_DATA )) {
14751490 /* RDB slot import info is encoded in a required opcode since exposing
14761491 * importing slots is a consistency problem. */
1477- if (clusterRDBSaveSlotImports (rdb ) == C_ERR ) goto werr ;
1492+ if (clusterRDBSaveSlotImports (rdb , rdbver ) == C_ERR ) goto werr ;
14781493 for (j = 0 ; j < server .dbnum ; j ++ ) {
1479- if (rdbSaveDb (rdb , j , rdbflags , & key_counter ) == -1 ) goto werr ;
1494+ if (rdbSaveDb (rdb , j , rdbflags , rdbver , & key_counter ) == -1 ) goto werr ;
14801495 }
14811496 }
14821497
@@ -1506,7 +1521,7 @@ int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
15061521 * While the suffix is the 40 bytes hex string we announced in the prefix.
15071522 * This way processes receiving the payload can understand when it ends
15081523 * without doing any processing of the content. */
1509- int rdbSaveRioWithEOFMark (int req , rio * rdb , int * error , rdbSaveInfo * rsi ) {
1524+ int rdbSaveRioWithEOFMark (int req , int rdbver , rio * rdb , int * error , rdbSaveInfo * rsi ) {
15101525 char eofmark [RDB_EOF_MARK_SIZE ];
15111526
15121527 startSaving (RDBFLAGS_REPLICATION );
@@ -1515,7 +1530,7 @@ int rdbSaveRioWithEOFMark(int req, rio *rdb, int *error, rdbSaveInfo *rsi) {
15151530 if (rioWrite (rdb , "$EOF:" , 5 ) == 0 ) goto werr ;
15161531 if (rioWrite (rdb , eofmark , RDB_EOF_MARK_SIZE ) == 0 ) goto werr ;
15171532 if (rioWrite (rdb , "\r\n" , 2 ) == 0 ) goto werr ;
1518- if (rdbSaveRio (req , rdb , error , RDBFLAGS_REPLICATION , rsi ) == C_ERR ) goto werr ;
1533+ if (rdbSaveRio (req , rdbver , rdb , error , RDBFLAGS_REPLICATION , rsi ) == C_ERR ) goto werr ;
15191534 if (rioWrite (rdb , eofmark , RDB_EOF_MARK_SIZE ) == 0 ) goto werr ;
15201535 stopSaving (1 );
15211536 return C_OK ;
@@ -1554,7 +1569,7 @@ static int rdbSaveInternal(int req, const char *filename, rdbSaveInfo *rsi, int
15541569 if (!(rdbflags & RDBFLAGS_KEEP_CACHE )) rioSetReclaimCache (& rdb , 1 );
15551570 }
15561571
1557- if (rdbSaveRio (req , & rdb , & error , rdbflags , rsi ) == C_ERR ) {
1572+ if (rdbSaveRio (req , RDB_VERSION , & rdb , & error , rdbflags , rsi ) == C_ERR ) {
15581573 errno = error ;
15591574 err_op = "rdbSaveRio" ;
15601575 goto werr ;
@@ -3655,7 +3670,7 @@ void killRDBChild(void) {
36553670
36563671/* Spawn an RDB child that writes the RDB to the sockets of the replicas
36573672 * that are currently in REPLICA_STATE_WAIT_BGSAVE_START state. */
3658- int rdbSaveToReplicasSockets (int req , rdbSaveInfo * rsi ) {
3673+ int rdbSaveToReplicasSockets (int req , int rdbver , rdbSaveInfo * rsi ) {
36593674 listNode * ln ;
36603675 listIter li ;
36613676 pid_t childpid ;
@@ -3711,6 +3726,7 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
37113726 if (replica -> repl_data -> repl_state == REPLICA_STATE_WAIT_BGSAVE_START ) {
37123727 /* Check replica has the exact requirements */
37133728 if (replica -> repl_data -> replica_req != req ) continue ;
3729+ if (replicaRdbVersion (replica ) != rdbver ) continue ;
37143730
37153731 conns [connsnum ++ ] = replica -> conn ;
37163732 if (dual_channel ) {
@@ -3756,7 +3772,7 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
37563772
37573773 if (skip_rdb_checksum ) rdb .flags |= RIO_FLAG_SKIP_RDB_CHECKSUM ;
37583774
3759- retval = rdbSaveRioWithEOFMark (req , & rdb , NULL , rsi );
3775+ retval = rdbSaveRioWithEOFMark (req , rdbver , & rdb , NULL , rsi );
37603776 if (retval == C_OK && rioFlush (& rdb ) == 0 ) retval = C_ERR ;
37613777
37623778 if (retval == C_OK ) {
0 commit comments