-
Notifications
You must be signed in to change notification settings - Fork 735
YQ-4848 Rebalance partitions after new nodes connected #28397
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
⚪ |
|
🟢 |
|
⚪ |
|
⚪ ⚪ Ya make output | Test bloat | Test bloat
⚪ Ya make output | Test bloat | Test bloat | Test bloat
🟢
*please be aware that the difference is based on comparing your commit and the last completed build from the post-commit, check comparation |
|
⚪
🟢
*please be aware that the difference is based on comparing your commit and the last completed build from the post-commit, check comparation |
|
⚪ ⚪ Ya make output | Test bloat | Test bloat
⚪ Ya make output | Test bloat | Test bloat | Test bloat
🟢
*please be aware that the difference is based on comparing your commit and the last completed build from the post-commit, check comparation |
|
⚪
🟢
*please be aware that the difference is based on comparing your commit and the last completed build from the post-commit, check comparation |
|
⚪ ⚪ Ya make output | Test bloat | Test bloat
⚪ Ya make output | Test bloat | Test bloat | Test bloat
🟢
*please be aware that the difference is based on comparing your commit and the last completed build from the post-commit, check comparation |
|
⚪
🟢
*please be aware that the difference is based on comparing your commit and the last completed build from the post-commit, check comparation |
|
⚪ ⚪ Ya make output | Test bloat | Test bloat
🟢
*please be aware that the difference is based on comparing your commit and the last completed build from the post-commit, check comparation |
|
⚪
🟢
*please be aware that the difference is based on comparing your commit and the last completed build from the post-commit, check comparation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements automatic rebalancing of row dispatcher partitions when nodes connect or disconnect from the cluster. The coordinator now tracks node states (Initializing vs Started) and schedules rebalancing operations with a configurable timeout to redistribute partitions after topology changes.
- Added
TEvCoordinatorDistributionResetevent to notify read actors when partition distribution changes - Implemented node state tracking and automatic rebalancing logic in the coordinator
- Added configurable rebalancing timeout setting
Reviewed Changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| ydb/tests/tools/fq_runner/kikimr_runner.py | Added rebalancing timeout configuration and improved error messages for checkpoint waiting |
| ydb/tests/fq/yds/test_row_dispatcher.py | Updated test logic for actor counting and added new test for partition redistribution after timeout |
| ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp | Added unit test for rebalancing after distribution reset |
| ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp | Implemented handling of TEvCoordinatorDistributionReset event to reinitialize connections |
| ydb/core/fq/libs/row_dispatcher/ut/coordinator_ut.cpp | Added unit tests for rebalancing scenarios when nodes connect/disconnect |
| ydb/core/fq/libs/row_dispatcher/protos/events.proto | Defined TEvCoordinatorDistributionReset protobuf message |
| ydb/core/fq/libs/row_dispatcher/events/data_plane.h | Added event type definition for coordinator distribution reset |
| ydb/core/fq/libs/row_dispatcher/coordinator.cpp | Implemented node state tracking, rebalancing scheduler, and distribution reset logic |
| ydb/core/fq/libs/row_dispatcher/common/row_dispatcher_settings.h | Added RebalancingTimeout setting accessor |
| ydb/core/fq/libs/row_dispatcher/common/row_dispatcher_settings.cpp | Implemented RebalancingTimeout setting initialization from config |
| ydb/core/fq/libs/config/protos/row_dispatcher.proto | Added RebalancingTimeoutSec configuration field |
| ydb/core/fq/libs/actors/nodes_manager.cpp | Changed logging level for node requests from DEBUG to TRACE |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| 'max_session_used_memory': 1000000, | ||
| 'without_consumer': True} | ||
| fq_config['row_dispatcher']['coordinator'] = {'coordination_node_path': "row_dispatcher"} | ||
| fq_config['row_dispatcher']['coordinator'] = {'rebalancing_timeout_sec': "5"} |
Copilot
AI
Nov 10, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The value "5" is set as a string, but the proto definition expects uint64 (numeric type). This should be an integer:
'rebalancing_timeout_sec': 5instead of:
'rebalancing_timeout_sec': "5"| fq_config['row_dispatcher']['coordinator'] = {'rebalancing_timeout_sec': "5"} | |
| fq_config['row_dispatcher']['coordinator'] = {'rebalancing_timeout_sec': 5} |
| // if (info.State == ENodeState::Connected) { | ||
| // info.State = ENodeState::Disconnected; | ||
| // } |
Copilot
AI
Nov 10, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This commented-out code should either be removed or uncommented if it's intended to be used. Leaving dead code in the codebase reduces maintainability and creates confusion about the intended behavior.
| // if (info.State == ENodeState::Connected) { | ||
| // info.State = ENodeState::Disconnected; | ||
| // } |
Copilot
AI
Nov 10, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This commented-out code should either be removed or uncommented if it's intended to be used. Leaving dead code in the codebase reduces maintainability and creates confusion about the intended behavior.
| for (auto id : rowDispatcherIds) { | ||
| Ping(id); | ||
| } | ||
|
|
Copilot
AI
Nov 10, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trailing whitespace should be removed.
| fq_config['row_dispatcher']['coordinator'] = {'coordination_node_path': "row_dispatcher"} | ||
| fq_config['row_dispatcher']['coordinator'] = {'rebalancing_timeout_sec': "5"} |
Copilot
AI
Nov 10, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line 540 overwrites the dictionary created on line 539. This means the coordination_node_path setting will be lost. Both settings should be combined into a single dictionary assignment, or the second assignment should update the existing dictionary instead of replacing it.
Suggested fix:
fq_config['row_dispatcher']['coordinator'] = {
'coordination_node_path': "row_dispatcher",
'rebalancing_timeout_sec': "5"
}| fq_config['row_dispatcher']['coordinator'] = {'coordination_node_path': "row_dispatcher"} | |
| fq_config['row_dispatcher']['coordinator'] = {'rebalancing_timeout_sec': "5"} | |
| fq_config['row_dispatcher']['coordinator'] = { | |
| 'coordination_node_path': "row_dispatcher", | |
| 'rebalancing_timeout_sec': "5", | |
| } |
| if count == expected_count: | ||
| return node_index # return any node |
Copilot
AI
Nov 10, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Logic error: the function now returns early when finding the expected count on any single node, but the count is cumulative across all nodes. The return should happen after the loop completes, not inside it. The condition check should remain outside the inner loop as before.
The original logic was:
for node_index in kikimr.compute_plane.kikimr_cluster.nodes:
count = count + kikimr.compute_plane.get_actor_count(node_index, activity)
if count == expected_count:
return node_index # This returned the last node_index after checking all nodesThe new logic returns immediately when count reaches expected_count during iteration, which may be before all nodes have been counted.
| if count == expected_count: | |
| return node_index # return any node | |
| if count == expected_count: | |
| return node_index # return the last node_index after checking all nodes |
|
⚪ ⚪ Ya make output | Test bloat | Test bloat
⚪ Ya make output | Test bloat | Test bloat | Test bloat
🟢
*please be aware that the difference is based on comparing your commit and the last completed build from the post-commit, check comparation |
| Started | ||
| }; | ||
|
|
||
| struct RowDispatcherInfo { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Предлагаю дописать 'T' в имя структуры, раз меняем координатор)
RowDispatcherInfo -> TRowDispatcherInfo
| Become(&TActorCoordinator::StateFunc); | ||
| Send(LocalRowDispatcherId, new NFq::TEvRowDispatcher::TEvCoordinatorChangesSubscribe()); | ||
| ScheduleNodeInfoRequest(); | ||
| Schedule(Config.GetRebalancingTimeout(), new TEvPrivate::TEvStartingTimeout()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Тут не нужен какой-то дефолт для v1? Там ведь по умолчанию будет таймаут 0
И наверное надо сделать RebalancingScheduled = true;
| assert time.time() < deadline, f"Waiting actor {activity} count failed, current count {count}" | ||
| time.sleep(1) | ||
| pass | ||
| return None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Кажется return None можно не писать (можно просто без return, всё равно None вернётся)
|
⚪ |
|
⚪ |
|
⚪ ⚪ Ya make output | Test bloat | Test bloat
⚪ Ya make output | Test bloat | Test bloat | Test bloat
🟢
*please be aware that the difference is based on comparing your commit and the last completed build from the post-commit, check comparation |
|
⚪
🟢
*please be aware that the difference is based on comparing your commit and the last completed build from the post-commit, check comparation |
|
⚪
🟢
*please be aware that the difference is based on comparing your commit and the last completed build from the post-commit, check comparation |
|
⚪ ⚪ Ya make output | Test bloat | Test bloat
⚪ Ya make output | Test bloat | Test bloat | Test bloat
🟢
*please be aware that the difference is based on comparing your commit and the last completed build from the post-commit, check comparation |
|
⚪ ⚪ Ya make output | Test bloat | Test bloat
🟢
*please be aware that the difference is based on comparing your commit and the last completed build from the post-commit, check comparation |
|
⚪
🟢
*please be aware that the difference is based on comparing your commit and the last completed build from the post-commit, check comparation |
…28397) Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…28397) Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…able (#28874) Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…28397) Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…28397) Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Changelog entry
...
Changelog category
Description for reviewers
...