Skip to content

Commit 68b3dcd

Browse files
kardymondsCopilot
authored andcommitted
YQ-4848 Rebalance partitions after new nodes connected (ydb-platform#28397)
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent a48c1dd commit 68b3dcd

File tree

12 files changed

+285
-36
lines changed

12 files changed

+285
-36
lines changed

ydb/core/fq/libs/actors/nodes_manager.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ class TNodesManagerActor : public NActors::TActorBootstrapped<TNodesManagerActor
297297
}
298298

299299
void Handle(NFq::TEvNodesManager::TEvGetNodesRequest::TPtr& ev) {
300-
LOG_D("TNodesManagerActor::TEvGetNodesRequest");
300+
LOG_T("Received TNodesManagerActor::TEvGetNodesRequest");
301301
auto response = MakeHolder<NFq::TEvNodesManager::TEvGetNodesResponse>();
302302
response->NodeIds.reserve(Peers.size());
303303
for (const auto& info : Peers) {

ydb/core/fq/libs/config/protos/row_dispatcher.proto

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ message TRowDispatcherCoordinatorConfig {
1616
// Topic partitions will be distributed uniformly up to TopicPartitionsLimitPerNode
1717
// if (number nodes) * TopicPartitionsLimitPerNode < (number topic partitions)
1818
// Request will hang up infinitely, disabled by default
19-
uint64 TopicPartitionsLimitPerNode = 4;
19+
uint64 TopicPartitionsLimitPerNode = 4; // deprecated
20+
uint64 RebalancingTimeoutSec = 5; // Automatic rebalancing partition after new nodes connected / nodes disconnected.
2021
}
2122

2223
message TJsonParserConfig {

ydb/core/fq/libs/row_dispatcher/common/row_dispatcher_settings.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ TRowDispatcherSettings::TCoordinatorSettings::TCoordinatorSettings(const NConfig
2828
: LocalMode(config.GetLocalMode())
2929
, Database(config.GetDatabase())
3030
, CoordinationNodePath(config.GetCoordinationNodePath())
31+
, RebalancingTimeout(TDuration::Seconds(config.GetRebalancingTimeoutSec()))
3132
{}
3233

3334
TRowDispatcherSettings::TCoordinatorSettings::TCoordinatorSettings(const NKikimrConfig::TStreamingQueriesConfig::TExternalStorageConfig& config)

ydb/core/fq/libs/row_dispatcher/common/row_dispatcher_settings.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ class TRowDispatcherSettings {
5454
YDB_ACCESSOR(bool, LocalMode, false);
5555
YDB_ACCESSOR_MUTABLE(TExternalStorageSettings, Database, {});
5656
YDB_ACCESSOR_DEF(TString, CoordinationNodePath);
57+
YDB_ACCESSOR(TDuration, RebalancingTimeout, TDuration::Seconds(120));
5758
};
5859

5960
enum class EConsumerMode {

0 commit comments

Comments
 (0)