Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion ydb/core/fq/libs/actors/nodes_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ class TNodesManagerActor : public NActors::TActorBootstrapped<TNodesManagerActor
}

void Handle(NFq::TEvNodesManager::TEvGetNodesRequest::TPtr& ev) {
LOG_D("TNodesManagerActor::TEvGetNodesRequest");
LOG_T("Received TNodesManagerActor::TEvGetNodesRequest");
auto response = MakeHolder<NFq::TEvNodesManager::TEvGetNodesResponse>();
response->NodeIds.reserve(Peers.size());
for (const auto& info : Peers) {
Expand Down Expand Up @@ -347,6 +347,8 @@ class TNodesManagerActor : public NActors::TActorBootstrapped<TNodesManagerActor
}

void HandleResponse(NFq::TEvInternalService::TEvHealthCheckResponse::TPtr& ev) {
LOG_T("Received TEvHealthCheckResponse");

try {
const auto& status = ev->Get()->Status.GetStatus();
if (!ev->Get()->Status.IsSuccess()) {
Expand All @@ -356,6 +358,8 @@ class TNodesManagerActor : public NActors::TActorBootstrapped<TNodesManagerActor

auto nodesInfo = MakeIntrusive<TIntrusiveVector<TEvInterconnect::TNodeInfo>>();
nodesInfo->reserve(res.nodes().size());
LOG_T("Nodes count " << res.nodes().size());


Peers.clear();
std::set<ui32> nodeIds; // may be not unique
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/fq/libs/config/protos/row_dispatcher.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ message TRowDispatcherCoordinatorConfig {
// Topic partitions will be distributed uniformly up to TopicPartitionsLimitPerNode
// if (number nodes) * TopicPartitionsLimitPerNode < (number topic partitions)
// Request will hang up infinitely, disabled by default
uint64 TopicPartitionsLimitPerNode = 4;
uint64 TopicPartitionsLimitPerNode = 4; // deprecated
uint64 RebalancingTimeoutSec = 5; // Automatic rebalancing partition after new nodes connected / nodes disconnected.
}

message TJsonParserConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ TRowDispatcherSettings::TCoordinatorSettings::TCoordinatorSettings(const NConfig
: LocalMode(config.GetLocalMode())
, Database(config.GetDatabase())
, CoordinationNodePath(config.GetCoordinationNodePath())
, RebalancingTimeout(TDuration::Seconds(config.GetRebalancingTimeoutSec()))
{}

TRowDispatcherSettings::TCoordinatorSettings::TCoordinatorSettings(const NKikimrConfig::TStreamingQueriesConfig::TExternalStorageConfig& config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class TRowDispatcherSettings {
YDB_ACCESSOR(bool, LocalMode, false);
YDB_ACCESSOR_MUTABLE(TExternalStorageSettings, Database, {});
YDB_ACCESSOR_DEF(TString, CoordinationNodePath);
YDB_ACCESSOR(TDuration, RebalancingTimeout, TDuration::Seconds(120));
};

enum class EConsumerMode {
Expand Down
Loading
Loading