Skip to content

Commit f0a04ef

Browse files
committed
fix: schema version handling
1 parent dff4634 commit f0a04ef

File tree

4 files changed

+293
-53
lines changed

4 files changed

+293
-53
lines changed

ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3012,6 +3012,89 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) {
30123012
UNIT_ASSERT_C(!indexBackupExists, "Index backup should NOT exist when OmitIndexes flag is set");
30133013
}
30143014

3015+
Y_UNIT_TEST(CdcVersionSync) {
3016+
TPortManager portManager;
3017+
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
3018+
.SetUseRealThreads(false)
3019+
.SetDomainName("Root")
3020+
.SetEnableChangefeedInitialScan(true)
3021+
.SetEnableBackupService(true)
3022+
.SetEnableRealSystemViewPaths(false)
3023+
);
3024+
3025+
auto& runtime = *server->GetRuntime();
3026+
const auto edgeActor = runtime.AllocateEdgeActor();
3027+
3028+
SetupLogging(runtime);
3029+
InitRoot(server, edgeActor);
3030+
3031+
// Create first table with index
3032+
CreateShardedTable(server, edgeActor, "/Root", "Table1",
3033+
TShardedTableOptions()
3034+
.Columns({
3035+
{"key", "Uint32", true, false},
3036+
{"val1", "Uint32", false, false}
3037+
})
3038+
.Indexes({
3039+
{"idx1", {"val1"}, {}, NKikimrSchemeOp::EIndexTypeGlobal}
3040+
}));
3041+
3042+
// Create second table with different index
3043+
CreateShardedTable(server, edgeActor, "/Root", "Table2",
3044+
TShardedTableOptions()
3045+
.Columns({
3046+
{"key", "Uint32", true, false},
3047+
{"val2", "Uint32", false, false}
3048+
})
3049+
.Indexes({
3050+
{"idx2", {"val2"}, {}, NKikimrSchemeOp::EIndexTypeGlobal}
3051+
}));
3052+
3053+
// Insert data into both tables
3054+
ExecSQL(server, edgeActor, R"(
3055+
UPSERT INTO `/Root/Table1` (key, val1) VALUES (1, 100), (2, 200);
3056+
UPSERT INTO `/Root/Table2` (key, val2) VALUES (1, 1000), (2, 2000);
3057+
)");
3058+
3059+
// Create backup collection with both tables
3060+
ExecSQL(server, edgeActor, R"(
3061+
CREATE BACKUP COLLECTION `MultiTableCollection`
3062+
( TABLE `/Root/Table1`
3063+
, TABLE `/Root/Table2`
3064+
)
3065+
WITH
3066+
( STORAGE = 'cluster'
3067+
, INCREMENTAL_BACKUP_ENABLED = 'true'
3068+
);
3069+
)", false);
3070+
3071+
// Full backup
3072+
ExecSQL(server, edgeActor, R"(BACKUP `MultiTableCollection`;)", false);
3073+
SimulateSleep(server, TDuration::Seconds(1));
3074+
3075+
// Modify both tables
3076+
ExecSQL(server, edgeActor, R"(
3077+
UPSERT INTO `/Root/Table1` (key, val1) VALUES (3, 300);
3078+
UPSERT INTO `/Root/Table2` (key, val2) VALUES (3, 3000);
3079+
)");
3080+
3081+
// Incremental backup
3082+
ExecSQL(server, edgeActor, R"(BACKUP `MultiTableCollection` INCREMENTAL;)", false);
3083+
SimulateSleep(server, TDuration::Seconds(5));
3084+
3085+
// Capture expected states
3086+
ExecSQL(server, edgeActor, R"(
3087+
SELECT key, val1 FROM `/Root/Table1` ORDER BY key
3088+
)");
3089+
3090+
ExecSQL(server, edgeActor, R"(
3091+
SELECT key, val2 FROM `/Root/Table2` ORDER BY key
3092+
)");
3093+
3094+
// Drop both tables
3095+
ExecSQL(server, edgeActor, R"(DROP TABLE `/Root/Table1`;)", false);
3096+
}
3097+
30153098
} // Y_UNIT_TEST_SUITE(IncrementalBackup)
30163099

30173100
} // NKikimr

ydb/core/tx/schemeshard/schemeshard__operation_common_cdc_stream.cpp

Lines changed: 203 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ namespace NKikimr::NSchemeShard::NCdcStreamState {
99

1010
namespace {
1111

12+
constexpr const char* CONTINUOUS_BACKUP_SUFFIX = "_continuousBackupImpl";
13+
1214
bool IsExpectedTxType(TTxState::ETxType txType) {
1315
switch (txType) {
1416
case TTxState::TxCreateCdcStreamAtTable:
@@ -24,11 +26,206 @@ bool IsExpectedTxType(TTxState::ETxType txType) {
2426
}
2527
}
2628

29+
bool IsContinuousBackupStream(const TString& streamName) {
30+
return streamName.EndsWith(CONTINUOUS_BACKUP_SUFFIX);
31+
}
32+
33+
struct TTableVersionContext {
34+
TPathId PathId;
35+
TPathId ParentPathId;
36+
TPathId GrandParentPathId;
37+
bool IsIndexImplTable = false;
38+
bool IsContinuousBackupStream = false;
39+
bool IsPartOfContinuousBackup = false;
40+
};
41+
42+
bool DetectContinuousBackupStream(const TTxState& txState, TOperationContext& context) {
43+
if (!txState.CdcPathId || !context.SS->PathsById.contains(txState.CdcPathId)) {
44+
return false;
45+
}
46+
47+
auto cdcPath = context.SS->PathsById.at(txState.CdcPathId);
48+
LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
49+
"Checking CDC stream name"
50+
<< ", cdcPathId: " << txState.CdcPathId
51+
<< ", streamName: " << cdcPath->Name
52+
<< ", at schemeshard: " << context.SS->SelfTabletId());
53+
54+
return IsContinuousBackupStream(cdcPath->Name);
55+
}
56+
57+
bool DetectIndexImplTable(TPathElement::TPtr path, TOperationContext& context, TPathId& outGrandParentPathId) {
58+
const TPathId& parentPathId = path->ParentPathId;
59+
if (!parentPathId || !context.SS->PathsById.contains(parentPathId)) {
60+
return false;
61+
}
62+
63+
auto parentPath = context.SS->PathsById.at(parentPathId);
64+
if (parentPath->IsTableIndex()) {
65+
outGrandParentPathId = parentPath->ParentPathId;
66+
return true;
67+
}
68+
69+
return false;
70+
}
71+
72+
bool HasParentContinuousBackup(const TPathId& grandParentPathId, TOperationContext& context) {
73+
if (!grandParentPathId || !context.SS->PathsById.contains(grandParentPathId)) {
74+
return false;
75+
}
76+
77+
auto grandParentPath = context.SS->PathsById.at(grandParentPathId);
78+
for (const auto& [childName, childPathId] : grandParentPath->GetChildren()) {
79+
auto childPath = context.SS->PathsById.at(childPathId);
80+
if (childPath->IsCdcStream() && IsContinuousBackupStream(childName)) {
81+
LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
82+
"Detected continuous backup via parent table CDC stream"
83+
<< ", parentTablePathId: " << grandParentPathId
84+
<< ", cdcStreamName: " << childName
85+
<< ", at schemeshard: " << context.SS->SelfTabletId());
86+
return true;
87+
}
88+
}
89+
90+
return false;
91+
}
92+
93+
TTableVersionContext BuildTableVersionContext(
94+
const TTxState& txState,
95+
TPathElement::TPtr path,
96+
TOperationContext& context)
97+
{
98+
TTableVersionContext ctx;
99+
ctx.PathId = txState.TargetPathId;
100+
ctx.ParentPathId = path->ParentPathId;
101+
ctx.IsContinuousBackupStream = DetectContinuousBackupStream(txState, context);
102+
ctx.IsIndexImplTable = DetectIndexImplTable(path, context, ctx.GrandParentPathId);
103+
104+
// Check if impl table is part of continuous backup
105+
if (ctx.IsIndexImplTable) {
106+
ctx.IsPartOfContinuousBackup = HasParentContinuousBackup(ctx.GrandParentPathId, context);
107+
} else {
108+
ctx.IsPartOfContinuousBackup = ctx.IsContinuousBackupStream;
109+
}
110+
111+
return ctx;
112+
}
113+
114+
void SyncImplTableVersion(
115+
const TTableVersionContext& versionCtx,
116+
TTableInfo::TPtr& table,
117+
TOperationContext& context)
118+
{
119+
Y_ABORT_UNLESS(context.SS->Tables.contains(versionCtx.GrandParentPathId));
120+
auto parentTable = context.SS->Tables.at(versionCtx.GrandParentPathId);
121+
122+
ui64 currentImplVersion = table->AlterVersion;
123+
ui64 currentParentVersion = parentTable->AlterVersion;
124+
125+
if (currentImplVersion <= currentParentVersion) {
126+
table->AlterVersion = currentParentVersion;
127+
LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
128+
"Synchronized index impl table version to parent table"
129+
<< ", implTablePathId: " << versionCtx.PathId
130+
<< ", parentTablePathId: " << versionCtx.GrandParentPathId
131+
<< ", oldImplVersion: " << currentImplVersion
132+
<< ", parentVersion: " << currentParentVersion
133+
<< ", newImplVersion: " << table->AlterVersion
134+
<< ", at schemeshard: " << context.SS->SelfTabletId());
135+
} else {
136+
table->AlterVersion += 1;
137+
LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
138+
"WARNING: Impl table version ahead of parent, incrementing"
139+
<< ", implTablePathId: " << versionCtx.PathId
140+
<< ", implVersion: " << currentImplVersion
141+
<< ", parentVersion: " << currentParentVersion
142+
<< ", newImplVersion: " << table->AlterVersion
143+
<< ", at schemeshard: " << context.SS->SelfTabletId());
144+
}
145+
}
146+
147+
void SyncIndexEntityVersion(
148+
const TPathId& indexPathId,
149+
ui64 targetVersion,
150+
TOperationId operationId,
151+
TOperationContext& context,
152+
NIceDb::TNiceDb& db)
153+
{
154+
if (!context.SS->Indexes.contains(indexPathId)) {
155+
return;
156+
}
157+
158+
auto index = context.SS->Indexes.at(indexPathId);
159+
index->AlterVersion = targetVersion;
160+
161+
context.SS->PersistTableIndexAlterVersion(db, indexPathId, index);
162+
163+
auto indexPath = context.SS->PathsById.at(indexPathId);
164+
context.SS->ClearDescribePathCaches(indexPath);
165+
context.OnComplete.PublishToSchemeBoard(operationId, indexPathId);
166+
167+
LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
168+
"Synced index entity version"
169+
<< ", indexPathId: " << indexPathId
170+
<< ", newVersion: " << index->AlterVersion
171+
<< ", at schemeshard: " << context.SS->SelfTabletId());
172+
}
173+
174+
void SyncChildIndexes(
175+
TPathElement::TPtr parentPath,
176+
ui64 targetVersion,
177+
TOperationId operationId,
178+
TOperationContext& context,
179+
NIceDb::TNiceDb& db)
180+
{
181+
for (const auto& [childName, childPathId] : parentPath->GetChildren()) {
182+
auto childPath = context.SS->PathsById.at(childPathId);
183+
184+
// Skip non-index children and deleted indexes
185+
if (!childPath->IsTableIndex() || childPath->Dropped()) {
186+
continue;
187+
}
188+
189+
SyncIndexEntityVersion(childPathId, targetVersion, operationId, context, db);
190+
191+
LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
192+
"Synced parent index version with parent table"
193+
<< ", parentTable: " << parentPath->Name
194+
<< ", indexName: " << childName
195+
<< ", indexPathId: " << childPathId
196+
<< ", newVersion: " << targetVersion
197+
<< ", at schemeshard: " << context.SS->SelfTabletId());
198+
}
199+
}
200+
201+
void UpdateTableVersion(
202+
const TTableVersionContext& versionCtx,
203+
TTableInfo::TPtr& table,
204+
TOperationId operationId,
205+
TOperationContext& context,
206+
NIceDb::TNiceDb& db)
207+
{
208+
if (versionCtx.IsPartOfContinuousBackup && versionCtx.IsIndexImplTable &&
209+
versionCtx.GrandParentPathId && context.SS->Tables.contains(versionCtx.GrandParentPathId)) {
210+
211+
SyncImplTableVersion(versionCtx, table, context);
212+
213+
SyncIndexEntityVersion(versionCtx.ParentPathId, table->AlterVersion, operationId, context, db);
214+
} else {
215+
table->AlterVersion += 1;
216+
LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
217+
"Incremented table version"
218+
<< ", pathId: " << versionCtx.PathId
219+
<< ", newVersion: " << table->AlterVersion
220+
<< ", isIndexImpl: " << (versionCtx.IsIndexImplTable ? "yes" : "no")
221+
<< ", isContinuousBackup: " << (versionCtx.IsPartOfContinuousBackup ? "yes" : "no")
222+
<< ", at schemeshard: " << context.SS->SelfTabletId());
223+
}
224+
}
225+
27226
} // namespace anonymous
28227

29228

30-
// NCdcStreamState::TConfigurePartsAtTable
31-
//
32229
TConfigurePartsAtTable::TConfigurePartsAtTable(TOperationId id)
33230
: OperationId(id)
34231
{
@@ -80,8 +277,6 @@ bool TConfigurePartsAtTable::HandleReply(TEvDataShard::TEvProposeTransactionResu
80277
}
81278

82279

83-
// NCdcStreamState::TProposeAtTable
84-
//
85280
TProposeAtTable::TProposeAtTable(TOperationId id)
86281
: OperationId(id)
87282
{
@@ -124,59 +319,16 @@ bool TProposeAtTable::HandleReply(TEvPrivate::TEvOperationPlan::TPtr& ev, TOpera
124319
Y_ABORT_UNLESS(context.SS->Tables.contains(pathId));
125320
auto table = context.SS->Tables.at(pathId);
126321

127-
table->AlterVersion += 1;
128-
129322
NIceDb::TNiceDb db(context.GetDB());
130323

131-
bool isContinuousBackupStream = false;
132-
if (txState->CdcPathId && context.SS->PathsById.contains(txState->CdcPathId)) {
133-
auto cdcPath = context.SS->PathsById.at(txState->CdcPathId);
134-
LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
135-
DebugHint() << " Checking CDC stream name"
136-
<< ", cdcPathId: " << txState->CdcPathId
137-
<< ", streamName: " << cdcPath->Name
138-
<< ", at schemeshard: " << context.SS->SelfTabletId());
139-
if (cdcPath->Name.EndsWith("_continuousBackupImpl")) {
140-
isContinuousBackupStream = true;
141-
}
142-
} else {
143-
LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
144-
DebugHint() << " CdcPathId not found"
145-
<< ", cdcPathId: " << txState->CdcPathId
146-
<< ", at schemeshard: " << context.SS->SelfTabletId());
147-
}
148-
149-
// Check if this is an index implementation table
150-
// If so, we need to sync the parent index version to match the impl table version
151-
// Do this ONLY for continuous backup operations
152-
TPathId parentPathId = path->ParentPathId;
153-
if (parentPathId && context.SS->PathsById.contains(parentPathId) && isContinuousBackupStream) {
154-
auto parentPath = context.SS->PathsById.at(parentPathId);
155-
if (parentPath->IsTableIndex()) {
156-
Y_ABORT_UNLESS(context.SS->Indexes.contains(parentPathId));
157-
auto index = context.SS->Indexes.at(parentPathId);
158-
159-
index->AlterVersion = table->AlterVersion;
160-
161-
// Persist the index version update directly to database
162-
db.Table<Schema::TableIndex>().Key(parentPathId.LocalPathId).Update(
163-
NIceDb::TUpdate<Schema::TableIndex::AlterVersion>(index->AlterVersion)
164-
);
165-
166-
context.SS->ClearDescribePathCaches(parentPath);
167-
context.OnComplete.PublishToSchemeBoard(OperationId, parentPathId);
324+
auto versionCtx = BuildTableVersionContext(*txState, path, context);
325+
UpdateTableVersion(versionCtx, table, OperationId, context, db);
168326

169-
LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
170-
DebugHint() << " Synced parent index version with impl table"
171-
<< ", indexPathId: " << parentPathId
172-
<< ", indexName: " << parentPath->Name
173-
<< ", newVersion: " << index->AlterVersion
174-
<< ", at schemeshard: " << context.SS->SelfTabletId());
175-
}
327+
if (versionCtx.IsContinuousBackupStream && !versionCtx.IsIndexImplTable) {
328+
SyncChildIndexes(path, table->AlterVersion, OperationId, context, db);
176329
}
177330

178331
context.SS->PersistTableAlterVersion(db, pathId, table);
179-
180332
context.SS->ClearDescribePathCaches(path);
181333
context.OnComplete.PublishToSchemeBoard(OperationId, pathId);
182334

@@ -195,8 +347,6 @@ bool TProposeAtTable::HandleReply(TEvDataShard::TEvSchemaChanged::TPtr& ev, TOpe
195347
}
196348

197349

198-
// NCdcStreamState::TProposeAtTableDropSnapshot
199-
//
200350
bool TProposeAtTableDropSnapshot::HandleReply(TEvPrivate::TEvOperationPlan::TPtr& ev, TOperationContext& context) {
201351
TProposeAtTable::HandleReply(ev, context);
202352

0 commit comments

Comments
 (0)