@@ -1898,9 +1898,32 @@ func (b *Backuper) restoreDataRegular(ctx context.Context, backupName string, ba
18981898 }
18991899 // https://github.com/Altinity/clickhouse-backup/issues/937
19001900 if len (b .cfg .General .RestoreTableMapping ) > 0 {
1901- if targetTable , isMapped := b .cfg .General .RestoreTableMapping [table .Table ]; isMapped {
1902- dstTableName = targetTable
1903- tablesForRestore [i ].Table = targetTable
1901+ // Check full qualified name first (db.table), then table name only
1902+ fullName := table .Database + "." + table .Table
1903+ if targetValue , isMapped := b .cfg .General .RestoreTableMapping [fullName ]; isMapped {
1904+ // Target may contain database (e.g., target_db.new_table)
1905+ if strings .Contains (targetValue , "." ) {
1906+ parts := strings .SplitN (targetValue , "." , 2 )
1907+ dstDatabase = parts [0 ]
1908+ dstTableName = parts [1 ]
1909+ tablesForRestore [i ].Database = parts [0 ]
1910+ tablesForRestore [i ].Table = parts [1 ]
1911+ } else {
1912+ dstTableName = targetValue
1913+ tablesForRestore [i ].Table = targetValue
1914+ }
1915+ } else if targetTable , isMapped := b .cfg .General .RestoreTableMapping [table .Table ]; isMapped {
1916+ // Handle target with database prefix
1917+ if strings .Contains (targetTable , "." ) {
1918+ parts := strings .SplitN (targetTable , "." , 2 )
1919+ dstDatabase = parts [0 ]
1920+ dstTableName = parts [1 ]
1921+ tablesForRestore [i ].Database = parts [0 ]
1922+ tablesForRestore [i ].Table = parts [1 ]
1923+ } else {
1924+ dstTableName = targetTable
1925+ tablesForRestore [i ].Table = targetTable
1926+ }
19041927 }
19051928 }
19061929 logger := log .With ().Str ("table" , fmt .Sprintf ("%s.%s" , dstDatabase , dstTableName )).Logger ()
@@ -2255,8 +2278,24 @@ func (b *Backuper) checkMissingTables(tablesForRestore ListOfTables, chTables []
22552278 }
22562279 }
22572280 if len (b .cfg .General .RestoreTableMapping ) > 0 {
2258- if targetTable , isMapped := b .cfg .General .RestoreTableMapping [table .Table ]; isMapped {
2259- dstTable = targetTable
2281+ // Check full qualified name first (db.table), then table name only
2282+ fullName := table .Database + "." + table .Table
2283+ if targetValue , isMapped := b .cfg .General .RestoreTableMapping [fullName ]; isMapped {
2284+ if strings .Contains (targetValue , "." ) {
2285+ parts := strings .SplitN (targetValue , "." , 2 )
2286+ dstDatabase = parts [0 ]
2287+ dstTable = parts [1 ]
2288+ } else {
2289+ dstTable = targetValue
2290+ }
2291+ } else if targetTable , isMapped := b .cfg .General .RestoreTableMapping [table .Table ]; isMapped {
2292+ if strings .Contains (targetTable , "." ) {
2293+ parts := strings .SplitN (targetTable , "." , 2 )
2294+ dstDatabase = parts [0 ]
2295+ dstTable = parts [1 ]
2296+ } else {
2297+ dstTable = targetTable
2298+ }
22602299 }
22612300 }
22622301 found := false
@@ -2267,7 +2306,7 @@ func (b *Backuper) checkMissingTables(tablesForRestore ListOfTables, chTables []
22672306 }
22682307 }
22692308 if ! found {
2270- missingTables = append (missingTables , fmt .Sprintf ("'%s.%s'" , dstDatabase , table . Table ))
2309+ missingTables = append (missingTables , fmt .Sprintf ("'%s.%s'" , dstDatabase , dstTable ))
22712310 }
22722311 }
22732312 return missingTables
@@ -2290,25 +2329,80 @@ func (b *Backuper) changeTablePatternFromRestoreMapping(tablePattern, objType st
22902329 case "database" :
22912330 mapping = b .cfg .General .RestoreDatabaseMapping
22922331 case "table" :
2293- mapping = b .cfg .General .RestoreDatabaseMapping
2332+ mapping = b .cfg .General .RestoreTableMapping
22942333 default :
2295- return ""
2334+ return tablePattern
22962335 }
2336+ isDatabase := objType == "database"
22972337 for sourceObj , targetObj := range mapping {
22982338 if tablePattern != "" {
2299- sourceObjRE := regexp .MustCompile (fmt .Sprintf ("(^%s.*)|(,%s.*)" , sourceObj , sourceObj ))
2339+ var sourceObjRE * regexp.Regexp
2340+ if isDatabase {
2341+ sourceObjRE = regexp .MustCompile (fmt .Sprintf ("(^%s\\ .[^,]*)|(,%s\\ .[^,]*)" , sourceObj , sourceObj ))
2342+ } else {
2343+ // Check if sourceObj is a full qualified name (db.table)
2344+ if strings .Contains (sourceObj , "." ) {
2345+ // Full qualified mapping: source_db.table -> target_db.new_table
2346+ escapedSource := regexp .QuoteMeta (sourceObj )
2347+ sourceObjRE = regexp .MustCompile (fmt .Sprintf ("(^%s)|(,%s)" , escapedSource , escapedSource ))
2348+ } else {
2349+ sourceObjRE = regexp .MustCompile (fmt .Sprintf ("(^([^\\ .]+)\\ .%s)|(,([^\\ .]+)\\ .%s)" , sourceObj , sourceObj ))
2350+ }
2351+ }
2352+
23002353 if sourceObjRE .MatchString (tablePattern ) {
23012354 matches := sourceObjRE .FindAllStringSubmatch (tablePattern , - 1 )
2302- substitution := targetObj + ".*"
2303- if strings .HasPrefix (matches [0 ][1 ], "," ) {
2355+ var substitution string
2356+ if isDatabase {
2357+ substitution = targetObj + ".*"
2358+ } else {
2359+ // Check if sourceObj is full qualified
2360+ if strings .Contains (sourceObj , "." ) {
2361+ // Use targetObj as-is (may contain database)
2362+ substitution = targetObj
2363+ } else {
2364+ // matches[0][2] has database name when first alternative matches (^...)
2365+ // matches[0][4] has database name when second alternative matches (,...)
2366+ dbName := matches [0 ][2 ]
2367+ if dbName == "" && len (matches [0 ]) > 4 {
2368+ dbName = matches [0 ][4 ]
2369+ }
2370+ // Check if targetObj contains database
2371+ if strings .Contains (targetObj , "." ) {
2372+ substitution = targetObj
2373+ } else {
2374+ substitution = dbName + "." + targetObj
2375+ }
2376+ }
2377+ }
2378+ if strings .HasPrefix (matches [0 ][0 ], "," ) {
23042379 substitution = "," + substitution
23052380 }
2381+
23062382 tablePattern = sourceObjRE .ReplaceAllString (tablePattern , substitution )
23072383 } else {
2308- tablePattern += "," + targetObj + ".*"
2384+ if isDatabase {
2385+ tablePattern += "," + targetObj + ".*"
2386+ } else {
2387+ // Check if targetObj contains database
2388+ if strings .Contains (targetObj , "." ) {
2389+ tablePattern += "," + targetObj
2390+ } else {
2391+ tablePattern += ",*." + targetObj
2392+ }
2393+ }
23092394 }
23102395 } else {
2311- tablePattern += targetObj + ".*"
2396+ if isDatabase {
2397+ tablePattern += targetObj + ".*"
2398+ } else {
2399+ // Check if targetObj contains database
2400+ if strings .Contains (targetObj , "." ) {
2401+ tablePattern += targetObj
2402+ } else {
2403+ tablePattern += "*." + targetObj
2404+ }
2405+ }
23122406 }
23132407 }
23142408 return tablePattern
0 commit comments