Skip to content

Commit 7f3bc9a

Browse files
claudeEnjection
authored andcommitted
Improve continuous backup detection for impl table operations
The previous approach applied version synchronization to ALL impl table CDC operations, which broke normal CDC stream creation on index tables. The issue: Normal CDC stream creation (with initial scan) on index tables was triggering version synchronization, syncing impl table to a parent table version that hadn't been propagated to datashard yet, causing: "Schema version mismatch: got# 1, expected# 2" Solution: Enhanced continuous backup detection for impl tables: 1. Check current operation's CDC stream name (existing logic) 2. ALSO check if parent table has ANY CDC streams with "_continuousBackupImpl" suffix (new logic) This ensures: - Continuous backup operations ARE detected (even sub-operations without explicit stream name matching) - Normal CDC operations are NOT detected (no false positives) - Version synchronization only applies to continuous backup Example: - Parent table has "20250101000000Z_continuousBackupImpl" stream - Impl table CDC operation checks parent's children - Finds the continuous backup stream → triggers synchronization - Normal CDC stream "my_cdc_stream" on index table → no sync
1 parent cab7526 commit 7f3bc9a

File tree

1 file changed

+27
-8
lines changed

1 file changed

+27
-8
lines changed

ydb/core/tx/schemeshard/schemeshard__operation_common_cdc_stream.cpp

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -156,15 +156,35 @@ bool TProposeAtTable::HandleReply(TEvPrivate::TEvOperationPlan::TPtr& ev, TOpera
156156
}
157157
}
158158

159-
// For index impl tables: synchronize with parent table version
160-
// This applies to ALL operations on impl tables, not just continuous backup
161-
if (isIndexImplTable && grandParentPathId && context.SS->Tables.contains(grandParentPathId)) {
159+
// Enhanced continuous backup detection for impl tables
160+
// Check if the parent table has any continuous backup CDC streams
161+
bool isPartOfContinuousBackup = isContinuousBackupStream;
162+
if (isIndexImplTable && grandParentPathId && context.SS->PathsById.contains(grandParentPathId)) {
163+
auto grandParentPath = context.SS->PathsById.at(grandParentPathId);
164+
for (const auto& [childName, childPathId] : grandParentPath->GetChildren()) {
165+
auto childPath = context.SS->PathsById.at(childPathId);
166+
// Check if this is a CDC stream with continuous backup suffix
167+
if (childPath->IsCdcStream() && childName.EndsWith("_continuousBackupImpl")) {
168+
isPartOfContinuousBackup = true;
169+
LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
170+
DebugHint() << " Detected continuous backup via parent table CDC stream"
171+
<< ", implTablePathId: " << pathId
172+
<< ", parentTablePathId: " << grandParentPathId
173+
<< ", cdcStreamName: " << childName
174+
<< ", at schemeshard: " << context.SS->SelfTabletId());
175+
break;
176+
}
177+
}
178+
}
179+
180+
// For index impl tables in continuous backup: synchronize with parent table version
181+
// For regular CDC operations: increment normally
182+
if (isPartOfContinuousBackup && isIndexImplTable && grandParentPathId && context.SS->Tables.contains(grandParentPathId)) {
162183
auto parentTable = context.SS->Tables.at(grandParentPathId);
163184
ui64 currentImplVersion = table->AlterVersion;
164185
ui64 currentParentVersion = parentTable->AlterVersion;
165186

166-
// Impl table should stay synchronized with parent table
167-
// If behind or equal, sync to parent version
187+
// Impl table should stay synchronized with parent table during continuous backup
168188
if (currentImplVersion <= currentParentVersion) {
169189
table->AlterVersion = currentParentVersion;
170190
LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
@@ -174,10 +194,9 @@ bool TProposeAtTable::HandleReply(TEvPrivate::TEvOperationPlan::TPtr& ev, TOpera
174194
<< ", oldImplVersion: " << currentImplVersion
175195
<< ", parentVersion: " << currentParentVersion
176196
<< ", newImplVersion: " << table->AlterVersion
177-
<< ", continuous backup: " << (isContinuousBackupStream ? "yes" : "no")
178197
<< ", at schemeshard: " << context.SS->SelfTabletId());
179198
} else {
180-
// Impl version is ahead (shouldn't happen, but handle gracefully)
199+
// Impl version is ahead
181200
table->AlterVersion += 1;
182201
LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
183202
DebugHint() << " WARNING: Impl table version ahead of parent, incrementing"
@@ -189,7 +208,6 @@ bool TProposeAtTable::HandleReply(TEvPrivate::TEvOperationPlan::TPtr& ev, TOpera
189208
}
190209

191210
// ALWAYS sync parent index entity version to match impl table
192-
// This is critical to avoid version mismatches
193211
if (context.SS->Indexes.contains(parentPathId)) {
194212
auto index = context.SS->Indexes.at(parentPathId);
195213
index->AlterVersion = table->AlterVersion;
@@ -216,6 +234,7 @@ bool TProposeAtTable::HandleReply(TEvPrivate::TEvOperationPlan::TPtr& ev, TOpera
216234
<< ", pathId: " << pathId
217235
<< ", newVersion: " << table->AlterVersion
218236
<< ", isIndexImpl: " << (isIndexImplTable ? "yes" : "no")
237+
<< ", isContinuousBackup: " << (isPartOfContinuousBackup ? "yes" : "no")
219238
<< ", at schemeshard: " << context.SS->SelfTabletId());
220239
}
221240

0 commit comments

Comments
 (0)