Skip to content

Commit 7c310fb

Browse files
kardymondsCopilot
andcommitted
YQ-4848 Rebalance partitions after new nodes connected (ydb-platform#28397)
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent e21ecb0 commit 7c310fb

File tree

10 files changed

+329
-45
lines changed

10 files changed

+329
-45
lines changed

ydb/core/fq/libs/actors/nodes_manager.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ class TNodesManagerActor : public NActors::TActorBootstrapped<TNodesManagerActor
297297
}
298298

299299
void Handle(NFq::TEvNodesManager::TEvGetNodesRequest::TPtr& ev) {
300-
LOG_D("TNodesManagerActor::TEvGetNodesRequest");
300+
LOG_T("Received TNodesManagerActor::TEvGetNodesRequest");
301301
auto response = MakeHolder<NFq::TEvNodesManager::TEvGetNodesResponse>();
302302
response->NodeIds.reserve(Peers.size());
303303
for (const auto& info : Peers) {

ydb/core/fq/libs/config/protos/row_dispatcher.proto

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ message TRowDispatcherCoordinatorConfig {
1616
// Topic partitions will be distributed uniformly up to TopicPartitionsLimitPerNode
1717
// if (number nodes) * TopicPartitionsLimitPerNode < (number topic partitions)
1818
// Request will hang up infinitely, disabled by default
19-
uint64 TopicPartitionsLimitPerNode = 4;
19+
uint64 TopicPartitionsLimitPerNode = 4; // deprecated
20+
uint64 RebalancingTimeoutSec = 5; // Automatic rebalancing partition after new nodes connected / nodes disconnected.
2021
}
2122

2223
message TJsonParserConfig {

ydb/core/fq/libs/row_dispatcher/coordinator.cpp

Lines changed: 192 additions & 31 deletions
Large diffs are not rendered by default.

ydb/core/fq/libs/row_dispatcher/events/data_plane.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ struct TEvRowDispatcher {
5959
EvPurecalcCompileRequest,
6060
EvPurecalcCompileResponse,
6161
EvPurecalcCompileAbort,
62+
EvCoordinatorDistributionReset,
6263
EvEnd,
6364
};
6465

@@ -91,6 +92,11 @@ struct TEvRowDispatcher {
9192
TEvCoordinatorResult() = default;
9293
};
9394

95+
struct TEvCoordinatorDistributionReset : public NActors::TEventPB<TEvCoordinatorDistributionReset,
96+
NFq::NRowDispatcherProto::TEvCoordinatorDistributionReset, EEv::EvCoordinatorDistributionReset> {
97+
TEvCoordinatorDistributionReset() = default;
98+
};
99+
94100
// Session events (with seqNo checks)
95101

96102
struct TEvStartSession : public NActors::TEventPB<TEvStartSession,

ydb/core/fq/libs/row_dispatcher/protos/events.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ message TEvGetAddressResponse {
2626
repeated Ydb.Issue.IssueMessage Issues = 2;
2727
}
2828

29+
message TEvCoordinatorDistributionReset {
30+
}
31+
2932
message TPartitionOffset {
3033
uint32 PartitionId = 1;
3134
uint64 Offset = 2;

ydb/core/fq/libs/row_dispatcher/ut/coordinator_ut.cpp

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ class TFixture : public NUnitTest::TBaseFixture {
4343
database.SetEndpoint("YDB_ENDPOINT");
4444
database.SetDatabase("YDB_DATABASE");
4545
database.SetToken("");
46+
config.SetRebalancingTimeoutSec(1);
4647

4748
Coordinator = Runtime.Register(NewCoordinator(
4849
LocalRowDispatcherId,
@@ -109,6 +110,11 @@ class TFixture : public NUnitTest::TBaseFixture {
109110
return result;
110111
}
111112

113+
void ExpectDistributionReset(NActors::TActorId readActorId) {
114+
auto eventPtr = Runtime.GrabEdgeEvent<NFq::TEvRowDispatcher::TEvCoordinatorDistributionReset>(readActorId, TDuration::Seconds(5));
115+
UNIT_ASSERT(eventPtr.Get() != nullptr);
116+
}
117+
112118
void ProcessNodesManagerRequest(ui64 nodesCount) {
113119
auto eventHolder = Runtime.GrabEdgeEvent<NFq::TEvNodesManager::TEvGetNodesRequest>(NodesManager, TDuration::Seconds(5));
114120
UNIT_ASSERT(eventHolder.Get() != nullptr);
@@ -200,7 +206,7 @@ Y_UNIT_TEST_SUITE(CoordinatorTests) {
200206

201207
Y_UNIT_TEST_F(WaitNodesConnected, TFixture) {
202208
ExpectCoordinatorChangesSubscribe();
203-
ProcessNodesManagerRequest(4);
209+
ProcessNodesManagerRequest(3);
204210
Ping(RowDispatcher1Id);
205211

206212
MockRequest(ReadActor1, "endpoint", "read_group", "topic1", {0});
@@ -229,6 +235,52 @@ Y_UNIT_TEST_SUITE(CoordinatorTests) {
229235
actorId = ActorIdFromProto(result2.GetPartitions(0).GetActorId());
230236
UNIT_ASSERT_VALUES_EQUAL(actorId.NodeId(), RowDispatcher2Id.NodeId());
231237
}
238+
239+
Y_UNIT_TEST_F(RebalanceAfterNewNodeConnected, TFixture) {
240+
ExpectCoordinatorChangesSubscribe();
241+
ProcessNodesManagerRequest(1);
242+
TSet<NActors::TActorId> rowDispatcherIds{LocalRowDispatcherId};
243+
for (auto id : rowDispatcherIds) {
244+
Ping(id);
245+
}
246+
MockRequest(ReadActor1, "endpoint", "read_group", "topic1", {0});
247+
auto rdActor1 = ActorIdFromProto(ExpectResult(ReadActor1).GetPartitions(0).GetActorId());
248+
MockRequest(ReadActor2, "endpoint", "read_group", "topic1", {1});
249+
auto rdActor2 = ActorIdFromProto(ExpectResult(ReadActor2).GetPartitions(0).GetActorId());
250+
UNIT_ASSERT_VALUES_EQUAL(rdActor1, rdActor2);
251+
252+
Ping(RowDispatcher1Id);
253+
ExpectDistributionReset(ReadActor1);
254+
ExpectDistributionReset(ReadActor2);
255+
256+
MockRequest(ReadActor1, "endpoint", "read_group", "topic1", {0});
257+
rdActor1 = ActorIdFromProto(ExpectResult(ReadActor1).GetPartitions(0).GetActorId());
258+
MockRequest(ReadActor2, "endpoint", "read_group", "topic1", {1});
259+
rdActor2 = ActorIdFromProto(ExpectResult(ReadActor2).GetPartitions(0).GetActorId());
260+
UNIT_ASSERT(rdActor1 != rdActor2);
261+
}
262+
263+
Y_UNIT_TEST_F(RebalanceAfterNodeDisconnected, TFixture) {
264+
ExpectCoordinatorChangesSubscribe();
265+
ProcessNodesManagerRequest(3);
266+
TSet<NActors::TActorId> rowDispatcherIds{RowDispatcher1Id, RowDispatcher2Id, LocalRowDispatcherId};
267+
for (auto id : rowDispatcherIds) {
268+
Ping(id);
269+
}
270+
271+
MockRequest(ReadActor1, "endpoint1", "read_group", "topic1", {0, 1, 2});
272+
auto result1 = ExpectResult(ReadActor1);
273+
UNIT_ASSERT(result1.PartitionsSize() == 3);
274+
275+
auto event = new NActors::TEvInterconnect::TEvNodeDisconnected(RowDispatcher2Id.NodeId());
276+
Runtime.Send(new NActors::IEventHandle(Coordinator, RowDispatcher2Id, event));
277+
278+
ExpectDistributionReset(ReadActor1);
279+
280+
MockRequest(ReadActor1, "endpoint1", "read_group", "topic1", {0, 1, 2});
281+
result1 = ExpectResult(ReadActor1);
282+
UNIT_ASSERT(result1.PartitionsSize() == 2);
283+
}
232284
}
233285

234286
}

ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,7 @@ class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::
351351
void Handle(NFq::TEvRowDispatcher::TEvSessionError::TPtr& ev);
352352
void Handle(NFq::TEvRowDispatcher::TEvStatistics::TPtr& ev);
353353
void Handle(NFq::TEvRowDispatcher::TEvGetInternalStateRequest::TPtr& ev);
354+
void Handle(NFq::TEvRowDispatcher::TEvCoordinatorDistributionReset::TPtr& ev);
354355

355356
void HandleDisconnected(TEvInterconnect::TEvNodeDisconnected::TPtr& ev);
356357
void HandleConnected(TEvInterconnect::TEvNodeConnected::TPtr& ev);
@@ -376,6 +377,7 @@ class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::
376377
hFunc(NFq::TEvRowDispatcher::TEvSessionError, Handle);
377378
hFunc(NFq::TEvRowDispatcher::TEvStatistics, Handle);
378379
hFunc(NFq::TEvRowDispatcher::TEvGetInternalStateRequest, Handle);
380+
hFunc(NFq::TEvRowDispatcher::TEvCoordinatorDistributionReset, Handle);
379381

380382
hFunc(NActors::TEvents::TEvPong, Handle);
381383
hFunc(TEvInterconnect::TEvNodeConnected, HandleConnected);
@@ -403,6 +405,7 @@ class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::
403405
hFunc(NFq::TEvRowDispatcher::TEvSessionError, ReplyNoSession);
404406
hFunc(NFq::TEvRowDispatcher::TEvStatistics, ReplyNoSession);
405407
hFunc(NFq::TEvRowDispatcher::TEvGetInternalStateRequest, ReplyNoSession);
408+
hFunc(NFq::TEvRowDispatcher::TEvCoordinatorDistributionReset, Handle);
406409

407410
hFunc(NActors::TEvents::TEvPong, Handle);
408411
hFunc(TEvInterconnect::TEvNodeConnected, HandleConnected);
@@ -909,6 +912,16 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvGetInternalStateRequest:
909912
Send(ev->Sender, response.release(), 0, ev->Cookie);
910913
}
911914

915+
void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvCoordinatorDistributionReset::TPtr& ev) {
916+
if (CoordinatorActorId != ev->Sender) {
917+
SRC_LOG_I("Ignore TEvCoordinatorDistributionReset, sender is not active coordinator (sender " << ev->Sender << ", current coordinator " << CoordinatorActorId << ")");
918+
return;
919+
}
920+
SRC_LOG_I("Received TEvCoordinatorDistributionReset from " << ev->Sender);
921+
ReInit("Distribution changed");
922+
ScheduleProcessState();
923+
}
924+
912925
void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvNewDataArrived::TPtr& ev) {
913926
auto partitionId = ev->Get()->Record.GetPartitionId();
914927
const NYql::NDqProto::TMessageTransportMeta& meta = ev->Get()->Record.GetTransportMeta();

ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,13 @@ class TFixture : public TPqIoTestFixture {
351351
AssertDataWithWatermarks(expected, actual);
352352
}
353353

354+
void MockCoordinatorDistributionReset(NActors::TActorId coordinatorId) const {
355+
CaSetup->Execute([&](TFakeActor& actor) {
356+
auto event = new NFq::TEvRowDispatcher::TEvCoordinatorDistributionReset();
357+
CaSetup->Runtime->Send(new NActors::IEventHandle(*actor.DqAsyncInputActorId, coordinatorId, event, 0));
358+
});
359+
}
360+
354361
public:
355362
NYql::NPq::NProto::TDqPqTopicSource Settings = BuildPqTopicSourceSettings(
356363
"topic",
@@ -889,6 +896,16 @@ Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) {
889896
f.ReadMessages(expected);
890897
}
891898
}
899+
900+
Y_UNIT_TEST_F(RebalanceAfterDistributionReset, TFixture) {
901+
StartSession(Settings);
902+
MockCoordinatorDistributionReset(CoordinatorId1);
903+
904+
auto req = ExpectCoordinatorRequest(CoordinatorId1);
905+
MockCoordinatorResult(CoordinatorId1, {{RowDispatcherId2, PartitionId1}}, req->Cookie);
906+
ExpectStartSession({}, RowDispatcherId2, 2);
907+
MockAck(RowDispatcherId2, 2, PartitionId1);
908+
}
892909
}
893910

894911
} // namespace NYql::NDq

ydb/tests/fq/yds/test_row_dispatcher.py

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,10 @@ def wait_actor_count(kikimr, activity, expected_count):
6161
count = 0
6262
for node_index in kikimr.compute_plane.kikimr_cluster.nodes:
6363
count = count + kikimr.compute_plane.get_actor_count(node_index, activity)
64-
if count == expected_count:
65-
break
64+
if count == expected_count:
65+
return node_index # return any node
6666
assert time.time() < deadline, f"Waiting actor {activity} count failed, current count {count}"
6767
time.sleep(1)
68-
pass
6968

7069

7170
def wait_row_dispatcher_sensor_value(kikimr, sensor, expected_count, exact_match=True):
@@ -617,20 +616,20 @@ def test_start_new_query(self, kikimr, client):
617616

618617
@yq_v1
619618
def test_stop_start(self, kikimr, client):
620-
self.init(client, "test_stop_start")
619+
self.init(client, "test_stop_start", 10)
621620

622621
sql1 = Rf'''
623622
INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`
624623
SELECT Cast(time as String) FROM {YDS_CONNECTION}.`{self.input_topic}`
625624
WITH (format=json_each_row, SCHEMA (time Int32 NOT NULL));'''
626625

627626
query_id = start_yds_query(kikimr, client, sql1)
628-
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1)
627+
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 10)
629628

630629
data = ['{"time": 101}', '{"time": 102}']
631630
self.write_stream(data)
632631
expected = ['101', '102']
633-
assert self.read_stream(len(expected), topic_path=self.output_topic) == expected
632+
assert sorted(self.read_stream(len(expected), topic_path=self.output_topic)) == sorted(expected)
634633

635634
kikimr.compute_plane.wait_completed_checkpoints(
636635
query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2
@@ -652,7 +651,7 @@ def test_stop_start(self, kikimr, client):
652651

653652
self.write_stream(data)
654653
expected = ['103', '104']
655-
assert self.read_stream(len(expected), topic_path=self.output_topic) == expected
654+
assert sorted(self.read_stream(len(expected), topic_path=self.output_topic)) == sorted(expected)
656655

657656
stop_yds_query(client, query_id)
658657
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 0)
@@ -667,7 +666,7 @@ def test_stop_start2(self, kikimr, client):
667666
wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", 1)
668667
self.write_stream(['{"time": 101}', '{"time": 102}'])
669668
expected = ['101', '102']
670-
assert self.read_stream(len(expected), topic_path=self.output_topic) == expected
669+
assert sorted(self.read_stream(len(expected), topic_path=self.output_topic)) == sorted(expected)
671670

672671
kikimr.compute_plane.wait_completed_checkpoints(query_id1, kikimr.compute_plane.get_completed_checkpoints(query_id1) + 2)
673672
stop_yds_query(client, query_id1)
@@ -1233,3 +1232,34 @@ def test_json_errors(self, kikimr, client, use_binding):
12331232
assert time.time() < deadline, f"Waiting sensor ParsingErrors value failed, current count {count}"
12341233
time.sleep(1)
12351234
stop_yds_query(client, query_id)
1235+
1236+
@yq_v1
1237+
def test_redistribute_partition_after_timeout(self, kikimr, client):
1238+
partitions_count = 3
1239+
self.init(client, "redistribute", partitions=partitions_count)
1240+
wait_row_dispatcher_sensor_value(kikimr, "KnownRowDispatchers", 2 * COMPUTE_NODE_COUNT - 1)
1241+
1242+
sql = Rf'''
1243+
PRAGMA dq.Scheduler=@@{{"type": "single_node"}}@@;
1244+
INSERT INTO {YDS_CONNECTION}.`{self.output_topic}`
1245+
SELECT data FROM {YDS_CONNECTION}.`{self.input_topic}`
1246+
WITH (format=json_each_row, SCHEMA (time Int32 NOT NULL, data String NOT NULL));'''
1247+
1248+
query_id = start_yds_query(kikimr, client, sql)
1249+
session_node_index = wait_actor_count(kikimr, "FQ_ROW_DISPATCHER_SESSION", partitions_count)
1250+
kikimr.compute_plane.wait_completed_checkpoints(query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2)
1251+
1252+
message_count = 10
1253+
expected = "hello"
1254+
for i in range(message_count):
1255+
self.write_stream(['{"time": 100, "data": "hello"}'], topic_path=None, partition_key=str(i))
1256+
assert self.read_stream(message_count, topic_path=self.output_topic) == [expected] * message_count
1257+
kikimr.compute_plane.wait_completed_checkpoints(query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 2)
1258+
1259+
logging.debug(f"Stopping node: {session_node_index}")
1260+
kikimr.compute_plane.kikimr_cluster.nodes[session_node_index].stop()
1261+
1262+
expected = "Relativitätstheorie"
1263+
for i in range(message_count):
1264+
self.write_stream(['{"time": 101, "data": "Relativitätstheorie"}'], topic_path=None, partition_key=str(i))
1265+
assert self.read_stream(message_count, topic_path=self.output_topic) == [expected] * message_count

ydb/tests/tools/fq_runner/kikimr_runner.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -363,15 +363,15 @@ def get_completed_checkpoints(self, query_id, expect_counters_exist=False):
363363
return self.get_checkpoint_coordinator_metric(query_id, "CompletedCheckpoints",
364364
expect_counters_exist=expect_counters_exist)
365365

366-
def wait_completed_checkpoints(self, query_id, checkpoints_count,
366+
def wait_completed_checkpoints(self, query_id, expected,
367367
timeout=plain_or_under_sanitizer_wrapper(30, 150),
368368
expect_counters_exist=False):
369369
deadline = time.time() + timeout
370370
while True:
371371
completed = self.get_completed_checkpoints(query_id, expect_counters_exist=expect_counters_exist)
372-
if completed >= checkpoints_count:
372+
if completed >= expected:
373373
break
374-
assert time.time() < deadline, "Wait zero checkpoint failed, actual completed: " + str(completed)
374+
assert time.time() < deadline, "Wait checkpoint failed, actual current: " + str(completed) + ", expected " + str(expected)
375375
time.sleep(plain_or_under_sanitizer_wrapper(0.5, 2))
376376

377377
def wait_zero_checkpoint(self, query_id, timeout=plain_or_under_sanitizer_wrapper(30, 150),
@@ -537,6 +537,7 @@ def fill_config(self, control_plane):
537537
'max_session_used_memory': 1000000,
538538
'without_consumer': True}
539539
fq_config['row_dispatcher']['coordinator'] = {'coordination_node_path': "row_dispatcher"}
540+
fq_config['row_dispatcher']['coordinator'] = {'rebalancing_timeout_sec': "5"}
540541
fq_config['row_dispatcher']['coordinator']['database'] = {}
541542
self.fill_storage_config(fq_config['row_dispatcher']['coordinator']['database'],
542543
"RowDispatcher_" + self.uuid)

0 commit comments

Comments
 (0)