Skip to content

Commit cab7526

Browse files
committed
fix: schema version handling
1 parent dff4634 commit cab7526

File tree

2 files changed

+180
-12
lines changed

2 files changed

+180
-12
lines changed

ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3012,6 +3012,89 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) {
30123012
UNIT_ASSERT_C(!indexBackupExists, "Index backup should NOT exist when OmitIndexes flag is set");
30133013
}
30143014

3015+
Y_UNIT_TEST(Test) {
3016+
TPortManager portManager;
3017+
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
3018+
.SetUseRealThreads(false)
3019+
.SetDomainName("Root")
3020+
.SetEnableChangefeedInitialScan(true)
3021+
.SetEnableBackupService(true)
3022+
.SetEnableRealSystemViewPaths(false)
3023+
);
3024+
3025+
auto& runtime = *server->GetRuntime();
3026+
const auto edgeActor = runtime.AllocateEdgeActor();
3027+
3028+
SetupLogging(runtime);
3029+
InitRoot(server, edgeActor);
3030+
3031+
// Create first table with index
3032+
CreateShardedTable(server, edgeActor, "/Root", "Table1",
3033+
TShardedTableOptions()
3034+
.Columns({
3035+
{"key", "Uint32", true, false},
3036+
{"val1", "Uint32", false, false}
3037+
})
3038+
.Indexes({
3039+
{"idx1", {"val1"}, {}, NKikimrSchemeOp::EIndexTypeGlobal}
3040+
}));
3041+
3042+
// Create second table with different index
3043+
CreateShardedTable(server, edgeActor, "/Root", "Table2",
3044+
TShardedTableOptions()
3045+
.Columns({
3046+
{"key", "Uint32", true, false},
3047+
{"val2", "Uint32", false, false}
3048+
})
3049+
.Indexes({
3050+
{"idx2", {"val2"}, {}, NKikimrSchemeOp::EIndexTypeGlobal}
3051+
}));
3052+
3053+
// Insert data into both tables
3054+
ExecSQL(server, edgeActor, R"(
3055+
UPSERT INTO `/Root/Table1` (key, val1) VALUES (1, 100), (2, 200);
3056+
UPSERT INTO `/Root/Table2` (key, val2) VALUES (1, 1000), (2, 2000);
3057+
)");
3058+
3059+
// Create backup collection with both tables
3060+
ExecSQL(server, edgeActor, R"(
3061+
CREATE BACKUP COLLECTION `MultiTableCollection`
3062+
( TABLE `/Root/Table1`
3063+
, TABLE `/Root/Table2`
3064+
)
3065+
WITH
3066+
( STORAGE = 'cluster'
3067+
, INCREMENTAL_BACKUP_ENABLED = 'true'
3068+
);
3069+
)", false);
3070+
3071+
// Full backup
3072+
ExecSQL(server, edgeActor, R"(BACKUP `MultiTableCollection`;)", false);
3073+
SimulateSleep(server, TDuration::Seconds(1));
3074+
3075+
// Modify both tables
3076+
ExecSQL(server, edgeActor, R"(
3077+
UPSERT INTO `/Root/Table1` (key, val1) VALUES (3, 300);
3078+
UPSERT INTO `/Root/Table2` (key, val2) VALUES (3, 3000);
3079+
)");
3080+
3081+
// Incremental backup
3082+
ExecSQL(server, edgeActor, R"(BACKUP `MultiTableCollection` INCREMENTAL;)", false);
3083+
SimulateSleep(server, TDuration::Seconds(5));
3084+
3085+
// Capture expected states
3086+
ExecSQL(server, edgeActor, R"(
3087+
SELECT key, val1 FROM `/Root/Table1` ORDER BY key
3088+
)");
3089+
3090+
ExecSQL(server, edgeActor, R"(
3091+
SELECT key, val2 FROM `/Root/Table2` ORDER BY key
3092+
)");
3093+
3094+
// Drop both tables
3095+
ExecSQL(server, edgeActor, R"(DROP TABLE `/Root/Table1`;)", false);
3096+
}
3097+
30153098
} // Y_UNIT_TEST_SUITE(IncrementalBackup)
30163099

30173100
} // NKikimr

ydb/core/tx/schemeshard/schemeshard__operation_common_cdc_stream.cpp

Lines changed: 97 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,6 @@ bool TProposeAtTable::HandleReply(TEvPrivate::TEvOperationPlan::TPtr& ev, TOpera
124124
Y_ABORT_UNLESS(context.SS->Tables.contains(pathId));
125125
auto table = context.SS->Tables.at(pathId);
126126

127-
table->AlterVersion += 1;
128-
129127
NIceDb::TNiceDb db(context.GetDB());
130128

131129
bool isContinuousBackupStream = false;
@@ -147,32 +145,119 @@ bool TProposeAtTable::HandleReply(TEvPrivate::TEvOperationPlan::TPtr& ev, TOpera
147145
}
148146

149147
// Check if this is an index implementation table
150-
// If so, we need to sync the parent index version to match the impl table version
151-
// Do this ONLY for continuous backup operations
148+
bool isIndexImplTable = false;
152149
TPathId parentPathId = path->ParentPathId;
153-
if (parentPathId && context.SS->PathsById.contains(parentPathId) && isContinuousBackupStream) {
150+
TPathId grandParentPathId;
151+
if (parentPathId && context.SS->PathsById.contains(parentPathId)) {
154152
auto parentPath = context.SS->PathsById.at(parentPathId);
155153
if (parentPath->IsTableIndex()) {
156-
Y_ABORT_UNLESS(context.SS->Indexes.contains(parentPathId));
154+
isIndexImplTable = true;
155+
grandParentPathId = parentPath->ParentPathId;
156+
}
157+
}
158+
159+
// For index impl tables: synchronize with parent table version
160+
// This applies to ALL operations on impl tables, not just continuous backup
161+
if (isIndexImplTable && grandParentPathId && context.SS->Tables.contains(grandParentPathId)) {
162+
auto parentTable = context.SS->Tables.at(grandParentPathId);
163+
ui64 currentImplVersion = table->AlterVersion;
164+
ui64 currentParentVersion = parentTable->AlterVersion;
165+
166+
// Impl table should stay synchronized with parent table
167+
// If behind or equal, sync to parent version
168+
if (currentImplVersion <= currentParentVersion) {
169+
table->AlterVersion = currentParentVersion;
170+
LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
171+
DebugHint() << " Synchronized index impl table version to parent table"
172+
<< ", implTablePathId: " << pathId
173+
<< ", parentTablePathId: " << grandParentPathId
174+
<< ", oldImplVersion: " << currentImplVersion
175+
<< ", parentVersion: " << currentParentVersion
176+
<< ", newImplVersion: " << table->AlterVersion
177+
<< ", continuous backup: " << (isContinuousBackupStream ? "yes" : "no")
178+
<< ", at schemeshard: " << context.SS->SelfTabletId());
179+
} else {
180+
// Impl version is ahead (shouldn't happen, but handle gracefully)
181+
table->AlterVersion += 1;
182+
LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
183+
DebugHint() << " WARNING: Impl table version ahead of parent, incrementing"
184+
<< ", implTablePathId: " << pathId
185+
<< ", implVersion: " << currentImplVersion
186+
<< ", parentVersion: " << currentParentVersion
187+
<< ", newImplVersion: " << table->AlterVersion
188+
<< ", at schemeshard: " << context.SS->SelfTabletId());
189+
}
190+
191+
// ALWAYS sync parent index entity version to match impl table
192+
// This is critical to avoid version mismatches
193+
if (context.SS->Indexes.contains(parentPathId)) {
157194
auto index = context.SS->Indexes.at(parentPathId);
158-
159195
index->AlterVersion = table->AlterVersion;
160-
161-
// Persist the index version update directly to database
196+
197+
// Persist the index version update
162198
db.Table<Schema::TableIndex>().Key(parentPathId.LocalPathId).Update(
163199
NIceDb::TUpdate<Schema::TableIndex::AlterVersion>(index->AlterVersion)
164200
);
165-
201+
202+
auto parentPath = context.SS->PathsById.at(parentPathId);
166203
context.SS->ClearDescribePathCaches(parentPath);
167204
context.OnComplete.PublishToSchemeBoard(OperationId, parentPathId);
168205

169206
LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
170-
DebugHint() << " Synced parent index version with impl table"
207+
DebugHint() << " Synced parent index entity version to match impl table"
171208
<< ", indexPathId: " << parentPathId
172-
<< ", indexName: " << parentPath->Name
173209
<< ", newVersion: " << index->AlterVersion
174210
<< ", at schemeshard: " << context.SS->SelfTabletId());
175211
}
212+
} else {
213+
table->AlterVersion += 1;
214+
LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
215+
DebugHint() << " Incremented table version"
216+
<< ", pathId: " << pathId
217+
<< ", newVersion: " << table->AlterVersion
218+
<< ", isIndexImpl: " << (isIndexImplTable ? "yes" : "no")
219+
<< ", at schemeshard: " << context.SS->SelfTabletId());
220+
}
221+
222+
// For parent tables with indexes in continuous backup:
223+
// Sync the index entity version to match the parent table
224+
// Note: impl table version is synced during impl table's own CDC operation
225+
if (isContinuousBackupStream && !isIndexImplTable) {
226+
for (const auto& [childName, childPathId] : path->GetChildren()) {
227+
auto childPath = context.SS->PathsById.at(childPathId);
228+
229+
// Skip non-index children (CDC streams, etc.)
230+
if (!childPath->IsTableIndex()) {
231+
continue;
232+
}
233+
234+
// Skip deleted indexes
235+
if (childPath->Dropped()) {
236+
continue;
237+
}
238+
239+
// Sync parent index version with parent table version
240+
if (context.SS->Indexes.contains(childPathId)) {
241+
auto index = context.SS->Indexes.at(childPathId);
242+
index->AlterVersion = table->AlterVersion;
243+
244+
// Persist the index version update
245+
db.Table<Schema::TableIndex>().Key(childPathId.LocalPathId).Update(
246+
NIceDb::TUpdate<Schema::TableIndex::AlterVersion>(index->AlterVersion)
247+
);
248+
249+
context.SS->ClearDescribePathCaches(childPath);
250+
context.OnComplete.PublishToSchemeBoard(OperationId, childPathId);
251+
252+
LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
253+
DebugHint() << " Synced parent index version with parent table"
254+
<< ", parentTable: " << path->Name
255+
<< ", indexName: " << childName
256+
<< ", indexPathId: " << childPathId
257+
<< ", newVersion: " << index->AlterVersion
258+
<< ", at schemeshard: " << context.SS->SelfTabletId());
259+
}
260+
}
176261
}
177262

178263
context.SS->PersistTableAlterVersion(db, pathId, table);

0 commit comments

Comments
 (0)