Skip to content

Commit 5d6c3f7

Browse files
authored
stable-25-3: Disable topic autopartitioning in global consistency mode (#29520)
2 parents 41d54ba + c8fdeba commit 5d6c3f7

File tree

3 files changed

+53
-6
lines changed

3 files changed

+53
-6
lines changed

ydb/core/testlib/basics/feature_flags.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ class TTestFeatureFlagsHolder {
8484
FEATURE_FLAG_SETTER(EnableStreamingQueries)
8585
FEATURE_FLAG_SETTER(EnableSecureScriptExecutions)
8686
FEATURE_FLAG_SETTER(DisableMissingDefaultColumnsInBulkUpsert)
87+
FEATURE_FLAG_SETTER(EnableTopicAutopartitioningForReplication)
8788

8889
#undef FEATURE_FLAG_SETTER
8990
};

ydb/core/tx/replication/controller/stream_creator.cpp

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -274,18 +274,19 @@ IActor* CreateStreamCreator(TReplication* replication, ui64 targetId, const TAct
274274
const auto* target = replication->FindTarget(targetId);
275275
Y_ABORT_UNLESS(target);
276276

277-
const auto& config = replication->GetConfig().GetConsistencySettings();
278-
const auto resolvedTimestamps = config.HasGlobal()
279-
? std::make_optional(TDuration::MilliSeconds(config.GetGlobal().GetCommitIntervalMilliSeconds()))
277+
const auto& config = replication->GetConfig();
278+
const auto& consistency = config.GetConsistencySettings();
279+
const auto resolvedTimestamps = consistency.HasGlobal()
280+
? std::make_optional(TDuration::MilliSeconds(consistency.GetGlobal().GetCommitIntervalMilliSeconds()))
280281
: std::nullopt;
281-
const bool needCreate = !replication->GetConfig().HasTransferSpecific() ||
282-
!replication->GetConfig().GetTransferSpecific().GetTarget().HasConsumerName();
282+
const bool needCreate = !config.HasTransferSpecific() || !config.GetTransferSpecific().GetTarget().HasConsumerName();
283+
const bool supportsTopicAutopartitioning = !consistency.HasGlobal() && AppData()->FeatureFlags.GetEnableTopicAutopartitioningForReplication();
283284

284285
return CreateStreamCreator(ctx.SelfID, replication->GetYdbProxy(),
285286
replication->GetId(), target->GetId(),
286287
target->GetConfig(), target->GetStreamName(), target->GetStreamConsumerName(),
287288
TDuration::Seconds(AppData()->ReplicationConfig.GetRetentionPeriodSeconds()), resolvedTimestamps,
288-
AppData()->FeatureFlags.GetEnableTopicAutopartitioningForReplication(), needCreate);
289+
supportsTopicAutopartitioning, needCreate);
289290
}
290291

291292
IActor* CreateStreamCreator(const TActorId& parent, const TActorId& proxy, ui64 rid, ui64 tid,

ydb/core/tx/replication/controller/stream_creator_ut.cpp

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,51 @@ Y_UNIT_TEST_SUITE(StreamCreator) {
7171
Y_UNIT_TEST(WithResolvedTimestamps) {
7272
Basic(TDuration::Seconds(10));
7373
}
74+
75+
void TopicAutoPartitioning(bool enabled) {
76+
TEnv env(TFeatureFlags().SetEnableTopicAutopartitioningForCDC(true));
77+
env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_CONTROLLER, NLog::PRI_TRACE);
78+
79+
env.CreateTable("/Root", *MakeTableDescription(TTestTableDescription{
80+
.Name = "Table",
81+
.KeyColumns = {"key"},
82+
.Columns = {
83+
{.Name = "key", .Type = "Uint32"},
84+
{.Name = "value", .Type = "Utf8"},
85+
},
86+
.ReplicationConfig = Nothing(),
87+
}));
88+
89+
env.GetRuntime().Register(CreateStreamCreator(
90+
env.GetSender(), env.GetYdbProxy(), 1 /* rid */, 1 /* tid */,
91+
std::make_shared<TTargetTable::TTableConfig>("/Root/Table", "/Root/Replica"),
92+
"Stream", "replicationConsumer", TDuration::Hours(1), std::nullopt, enabled
93+
));
94+
{
95+
auto ev = env.GetRuntime().GrabEdgeEvent<TEvPrivate::TEvRequestCreateStream>(env.GetSender());
96+
env.GetRuntime().Send(ev->Sender, env.GetSender(), new TEvPrivate::TEvAllowCreateStream());
97+
}
98+
{
99+
auto ev = env.GetRuntime().GrabEdgeEvent<TEvPrivate::TEvCreateStreamResult>(env.GetSender());
100+
UNIT_ASSERT(ev->Get()->IsSuccess());
101+
}
102+
103+
auto desc = env.GetDescription("/Root/Table/Stream/streamImpl");
104+
105+
const auto& pqconfig = desc.GetPathDescription().GetPersQueueGroup().GetPQTabletConfig();
106+
const auto& strategy = pqconfig.GetPartitionStrategy();
107+
108+
if (enabled) {
109+
UNIT_ASSERT_EQUAL(strategy.GetPartitionStrategyType(), NKikimrPQ::TPQTabletConfig::CAN_SPLIT);
110+
} else {
111+
UNIT_ASSERT_EQUAL(strategy.GetPartitionStrategyType(), NKikimrPQ::TPQTabletConfig::DISABLED);
112+
}
113+
}
114+
115+
Y_UNIT_TEST(TopicAutoPartitioning) {
116+
TopicAutoPartitioning(true);
117+
TopicAutoPartitioning(false);
118+
}
74119
}
75120

76121
}

0 commit comments

Comments
 (0)