Skip to content

Commit c17f66a

Browse files
committed
Revert "CQ: No longer limit reads to a single segment"
This reverts commit b586ea8.
1 parent a3b103c commit c17f66a

File tree

3 files changed

+69
-49
lines changed

3 files changed

+69
-49
lines changed

deps/rabbit/src/rabbit_classic_queue_index_v2.erl

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
-module(rabbit_classic_queue_index_v2).
99

1010
-export([erase/1, init/1, reset_state/1, recover/4,
11-
bounds/2, info/1,
11+
bounds/2, next_segment_boundary/1, info/1,
1212
terminate/3, delete_and_terminate/1,
1313
publish/7, ack/2, read/3,
1414
sync/1, needs_sync/1]).
@@ -757,6 +757,11 @@ ack_delete_fold_fun(SeqId, Write, {Buffer, Updates, Deletes, SegmentEntryCount})
757757
Deletes, SegmentEntryCount}
758758
end.
759759

760+
%% A better interface for read/3 would be to request a maximum
761+
%% of N messages, rather than first call next_segment_boundary/3
762+
%% and then read from S1 to S2. This function could then return
763+
%% either N messages or less depending on the current state.
764+
760765
-spec read(rabbit_variable_queue:seq_id(),
761766
rabbit_variable_queue:seq_id(),
762767
State) ->
@@ -1059,6 +1064,16 @@ bounds(State = #qi{ segments = Segments }, NextSeqIdHint) ->
10591064
State}
10601065
end.
10611066

1067+
%% The next_segment_boundary/1 function is used internally when
1068+
%% reading. It should not be called from rabbit_variable_queue.
1069+
1070+
-spec next_segment_boundary(SeqId) -> SeqId when SeqId::rabbit_variable_queue:seq_id().
1071+
1072+
next_segment_boundary(SeqId) ->
1073+
?DEBUG("~0p", [SeqId]),
1074+
SegmentEntryCount = segment_entry_count(),
1075+
(1 + (SeqId div SegmentEntryCount)) * SegmentEntryCount.
1076+
10621077
%% ----
10631078
%%
10641079
%% Internal.
@@ -1069,10 +1084,6 @@ segment_entry_count() ->
10691084
%% producer produces.
10701085
persistent_term:get(classic_queue_index_v2_segment_entry_count, 4096).
10711086

1072-
next_segment_boundary(SeqId) ->
1073-
SegmentEntryCount = segment_entry_count(),
1074-
(1 + (SeqId div SegmentEntryCount)) * SegmentEntryCount.
1075-
10761087
%% Note that store files will also be removed if there are any in this directory.
10771088
%% Currently the v2 per-queue store expects this function to remove its own files.
10781089

deps/rabbit/src/rabbit_variable_queue.erl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1864,8 +1864,11 @@ read_from_q_tail(DelsAndAcksFun,
18641864
%% For v2 we want to limit the number of messages read at once to lower
18651865
%% the memory footprint. We use the consume rate to determine how many
18661866
%% messages we read.
1867+
%% @todo Simply ask for N messages instead of low/high bounds.
18671868
QTailSeqLimit = QTailSeqId + MemoryLimit,
1868-
QTailSeqId1 = min(QTailSeqLimit, QTailSeqIdEnd),
1869+
QTailSeqId1 =
1870+
lists:min([rabbit_classic_queue_index_v2:next_segment_boundary(QTailSeqId),
1871+
QTailSeqLimit, QTailSeqIdEnd]),
18691872
{List0, IndexState1} = rabbit_classic_queue_index_v2:read(QTailSeqId, QTailSeqId1, IndexState),
18701873
{List, StoreState3, MCStateP3, MCStateT3} = case WhatToRead of
18711874
messages ->

deps/rabbit/test/backing_queue_SUITE.erl

Lines changed: 49 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -796,12 +796,9 @@ bq_queue_index(Config) ->
796796
index_mod() ->
797797
rabbit_classic_queue_index_v2.
798798

799-
segment_size() ->
800-
persistent_term:get(classic_queue_index_v2_segment_entry_count, 4096).
801-
802799
bq_queue_index1(_Config) ->
803800
IndexMod = index_mod(),
804-
SegmentSize = segment_size(),
801+
SegmentSize = IndexMod:next_segment_boundary(0),
805802
TwoSegs = SegmentSize + SegmentSize,
806803
MostOfASegment = trunc(SegmentSize*0.75),
807804
SeqIdsA = lists:seq(0, MostOfASegment-1),
@@ -995,8 +992,8 @@ v2_delete_segment_file_completely_acked(Config) ->
995992
?MODULE, v2_delete_segment_file_completely_acked1, [Config]).
996993

997994
v2_delete_segment_file_completely_acked1(_Config) ->
998-
IndexMod = index_mod(),
999-
SegmentSize = segment_size(),
995+
IndexMod = rabbit_classic_queue_index_v2,
996+
SegmentSize = IndexMod:next_segment_boundary(0),
1000997
SeqIds = lists:seq(0, SegmentSize - 1),
1001998

1002999
with_empty_test_queue(
@@ -1022,8 +1019,8 @@ v2_delete_segment_file_partially_acked(Config) ->
10221019
?MODULE, v2_delete_segment_file_partially_acked1, [Config]).
10231020

10241021
v2_delete_segment_file_partially_acked1(_Config) ->
1025-
IndexMod = index_mod(),
1026-
SegmentSize = segment_size(),
1022+
IndexMod = rabbit_classic_queue_index_v2,
1023+
SegmentSize = IndexMod:next_segment_boundary(0),
10271024
SeqIds = lists:seq(0, SegmentSize div 2),
10281025
SeqIdsLen = length(SeqIds),
10291026

@@ -1050,8 +1047,8 @@ v2_delete_segment_file_partially_acked_with_holes(Config) ->
10501047
?MODULE, v2_delete_segment_file_partially_acked_with_holes1, [Config]).
10511048

10521049
v2_delete_segment_file_partially_acked_with_holes1(_Config) ->
1053-
IndexMod = index_mod(),
1054-
SegmentSize = segment_size(),
1050+
IndexMod = rabbit_classic_queue_index_v2,
1051+
SegmentSize = IndexMod:next_segment_boundary(0),
10551052
SeqIdsA = lists:seq(0, SegmentSize div 2),
10561053
SeqIdsB = lists:seq(11 + SegmentSize div 2, SegmentSize - 1),
10571054
SeqIdsLen = length(SeqIdsA) + length(SeqIdsB),
@@ -1110,7 +1107,8 @@ bq_queue_recover(Config) ->
11101107
?MODULE, bq_queue_recover1, [Config]).
11111108

11121109
bq_queue_recover1(Config) ->
1113-
Count = 2 * segment_size(),
1110+
IndexMod = index_mod(),
1111+
Count = 2 * IndexMod:next_segment_boundary(0),
11141112
QName0 = queue_name(Config, <<"bq_queue_recover-q">>),
11151113
{new, Q} = rabbit_amqqueue:declare(QName0, true, false, [], none, <<"acting-user">>),
11161114
QName = amqqueue:get_name(Q),
@@ -1166,13 +1164,14 @@ get_queue_sup_pid([], _QueuePid) ->
11661164

11671165
variable_queue_partial_segments_q_tail_thing(Config) ->
11681166
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
1169-
?MODULE, variable_queue_partial_segments_q_tail_thing1, []).
1167+
?MODULE, variable_queue_partial_segments_q_tail_thing1, [Config]).
11701168

1171-
variable_queue_partial_segments_q_tail_thing1() ->
1169+
variable_queue_partial_segments_q_tail_thing1(Config) ->
11721170
with_fresh_variable_queue(fun variable_queue_partial_segments_q_tail_thing2/2).
11731171

11741172
variable_queue_partial_segments_q_tail_thing2(VQ0, _QName) ->
1175-
SegmentSize = segment_size(),
1173+
IndexMod = index_mod(),
1174+
SegmentSize = IndexMod:next_segment_boundary(0),
11761175
HalfSegment = SegmentSize div 2,
11771176
OneAndAHalfSegment = SegmentSize + HalfSegment,
11781177
VQ1 = variable_queue_publish(true, OneAndAHalfSegment, VQ0),
@@ -1195,8 +1194,12 @@ variable_queue_partial_segments_q_tail_thing2(VQ0, _QName) ->
11951194
SegmentSize + HalfSegment + 1, VQ5),
11961195
VQ7 = check_variable_queue_status(
11971196
VQ6,
1198-
%% We can't make assumptions about how many messages are in memory.
1199-
[{len, HalfSegment + 1}]),
1197+
%% We only read from q_tail up to the end of the segment, so
1198+
%% after fetching exactly one segment, we should have no
1199+
%% messages in memory.
1200+
[{q_head, 0},
1201+
{q_tail, {q_tail, SegmentSize, HalfSegment + 1, OneAndAHalfSegment + 1}},
1202+
{len, HalfSegment + 1}]),
12001203
{VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false,
12011204
HalfSegment + 1, VQ7),
12021205
{_Guids, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8),
@@ -1206,13 +1209,14 @@ variable_queue_partial_segments_q_tail_thing2(VQ0, _QName) ->
12061209

12071210
variable_queue_all_the_bits_not_covered_elsewhere_A(Config) ->
12081211
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
1209-
?MODULE, variable_queue_all_the_bits_not_covered_elsewhere_A1, []).
1212+
?MODULE, variable_queue_all_the_bits_not_covered_elsewhere_A1, [Config]).
12101213

1211-
variable_queue_all_the_bits_not_covered_elsewhere_A1() ->
1214+
variable_queue_all_the_bits_not_covered_elsewhere_A1(Config) ->
12121215
with_fresh_variable_queue(fun variable_queue_all_the_bits_not_covered_elsewhere_A2/2).
12131216

12141217
variable_queue_all_the_bits_not_covered_elsewhere_A2(VQ0, QName) ->
1215-
Count = 2 * segment_size(),
1218+
IndexMod = index_mod(),
1219+
Count = 2 * IndexMod:next_segment_boundary(0),
12161220
VQ1 = variable_queue_publish(true, Count, VQ0),
12171221
VQ2 = variable_queue_publish(false, Count, VQ1),
12181222
{VQ4, _AckTags} = variable_queue_fetch(Count, true, false,
@@ -1230,9 +1234,9 @@ variable_queue_all_the_bits_not_covered_elsewhere_A2(VQ0, QName) ->
12301234

12311235
variable_queue_all_the_bits_not_covered_elsewhere_B(Config) ->
12321236
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
1233-
?MODULE, variable_queue_all_the_bits_not_covered_elsewhere_B1, []).
1237+
?MODULE, variable_queue_all_the_bits_not_covered_elsewhere_B1, [Config]).
12341238

1235-
variable_queue_all_the_bits_not_covered_elsewhere_B1() ->
1239+
variable_queue_all_the_bits_not_covered_elsewhere_B1(Config) ->
12361240
with_fresh_variable_queue(fun variable_queue_all_the_bits_not_covered_elsewhere_B2/2).
12371241

12381242
variable_queue_all_the_bits_not_covered_elsewhere_B2(VQ1, QName) ->
@@ -1248,9 +1252,9 @@ variable_queue_all_the_bits_not_covered_elsewhere_B2(VQ1, QName) ->
12481252

12491253
variable_queue_drop(Config) ->
12501254
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
1251-
?MODULE, variable_queue_drop1, []).
1255+
?MODULE, variable_queue_drop1, [Config]).
12521256

1253-
variable_queue_drop1() ->
1257+
variable_queue_drop1(Config) ->
12541258
with_fresh_variable_queue(fun variable_queue_drop2/2).
12551259

12561260
variable_queue_drop2(VQ0, _QName) ->
@@ -1271,9 +1275,9 @@ variable_queue_drop2(VQ0, _QName) ->
12711275

12721276
variable_queue_fold_msg_on_disk(Config) ->
12731277
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
1274-
?MODULE, variable_queue_fold_msg_on_disk1, []).
1278+
?MODULE, variable_queue_fold_msg_on_disk1, [Config]).
12751279

1276-
variable_queue_fold_msg_on_disk1() ->
1280+
variable_queue_fold_msg_on_disk1(Config) ->
12771281
with_fresh_variable_queue(fun variable_queue_fold_msg_on_disk2/2).
12781282

12791283
variable_queue_fold_msg_on_disk2(VQ0, _QName) ->
@@ -1285,9 +1289,9 @@ variable_queue_fold_msg_on_disk2(VQ0, _QName) ->
12851289

12861290
variable_queue_dropfetchwhile(Config) ->
12871291
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
1288-
?MODULE, variable_queue_dropfetchwhile1, []).
1292+
?MODULE, variable_queue_dropfetchwhile1, [Config]).
12891293

1290-
variable_queue_dropfetchwhile1() ->
1294+
variable_queue_dropfetchwhile1(Config) ->
12911295
with_fresh_variable_queue(fun variable_queue_dropfetchwhile2/2).
12921296

12931297
variable_queue_dropfetchwhile2(VQ0, _QName) ->
@@ -1331,9 +1335,9 @@ variable_queue_dropfetchwhile2(VQ0, _QName) ->
13311335

13321336
variable_queue_dropwhile_restart(Config) ->
13331337
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
1334-
?MODULE, variable_queue_dropwhile_restart1, []).
1338+
?MODULE, variable_queue_dropwhile_restart1, [Config]).
13351339

1336-
variable_queue_dropwhile_restart1() ->
1340+
variable_queue_dropwhile_restart1(Config) ->
13371341
with_fresh_variable_queue(fun variable_queue_dropwhile_restart2/2).
13381342

13391343
variable_queue_dropwhile_restart2(VQ0, QName) ->
@@ -1368,9 +1372,9 @@ variable_queue_dropwhile_restart2(VQ0, QName) ->
13681372

13691373
variable_queue_dropwhile_sync_restart(Config) ->
13701374
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
1371-
?MODULE, variable_queue_dropwhile_sync_restart1, []).
1375+
?MODULE, variable_queue_dropwhile_sync_restart1, [Config]).
13721376

1373-
variable_queue_dropwhile_sync_restart1() ->
1377+
variable_queue_dropwhile_sync_restart1(Config) ->
13741378
with_fresh_variable_queue(fun variable_queue_dropwhile_sync_restart2/2).
13751379

13761380
variable_queue_dropwhile_sync_restart2(VQ0, QName) ->
@@ -1408,9 +1412,9 @@ variable_queue_dropwhile_sync_restart2(VQ0, QName) ->
14081412

14091413
variable_queue_restart_large_seq_id(Config) ->
14101414
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
1411-
?MODULE, variable_queue_restart_large_seq_id1, []).
1415+
?MODULE, variable_queue_restart_large_seq_id1, [Config]).
14121416

1413-
variable_queue_restart_large_seq_id1() ->
1417+
variable_queue_restart_large_seq_id1(Config) ->
14141418
with_fresh_variable_queue(fun variable_queue_restart_large_seq_id2/2).
14151419

14161420
variable_queue_restart_large_seq_id2(VQ0, QName) ->
@@ -1445,9 +1449,9 @@ variable_queue_restart_large_seq_id2(VQ0, QName) ->
14451449

14461450
variable_queue_ack_limiting(Config) ->
14471451
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
1448-
?MODULE, variable_queue_ack_limiting1, []).
1452+
?MODULE, variable_queue_ack_limiting1, [Config]).
14491453

1450-
variable_queue_ack_limiting1() ->
1454+
variable_queue_ack_limiting1(Config) ->
14511455
with_fresh_variable_queue(fun variable_queue_ack_limiting2/2).
14521456

14531457
variable_queue_ack_limiting2(VQ0, _Config) ->
@@ -1473,9 +1477,9 @@ variable_queue_ack_limiting2(VQ0, _Config) ->
14731477

14741478
variable_queue_purge(Config) ->
14751479
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
1476-
?MODULE, variable_queue_purge1, []).
1480+
?MODULE, variable_queue_purge1, [Config]).
14771481

1478-
variable_queue_purge1() ->
1482+
variable_queue_purge1(Config) ->
14791483
with_fresh_variable_queue(fun variable_queue_purge2/2).
14801484

14811485
variable_queue_purge2(VQ0, _Config) ->
@@ -1495,9 +1499,9 @@ variable_queue_purge2(VQ0, _Config) ->
14951499

14961500
variable_queue_requeue(Config) ->
14971501
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
1498-
?MODULE, variable_queue_requeue1, []).
1502+
?MODULE, variable_queue_requeue1, [Config]).
14991503

1500-
variable_queue_requeue1() ->
1504+
variable_queue_requeue1(Config) ->
15011505
with_fresh_variable_queue(fun variable_queue_requeue2/2).
15021506

15031507
variable_queue_requeue2(VQ0, _Config) ->
@@ -1521,13 +1525,14 @@ variable_queue_requeue2(VQ0, _Config) ->
15211525
%% requeue from ram_pending_ack into q_head, move to q_tail and then empty queue
15221526
variable_queue_requeue_ram_beta(Config) ->
15231527
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
1524-
?MODULE, variable_queue_requeue_ram_beta1, []).
1528+
?MODULE, variable_queue_requeue_ram_beta1, [Config]).
15251529

1526-
variable_queue_requeue_ram_beta1() ->
1530+
variable_queue_requeue_ram_beta1(Config) ->
15271531
with_fresh_variable_queue(fun variable_queue_requeue_ram_beta2/2).
15281532

15291533
variable_queue_requeue_ram_beta2(VQ0, _Config) ->
1530-
Count = 2 + 2 * segment_size(),
1534+
IndexMod = index_mod(),
1535+
Count = IndexMod:next_segment_boundary(0)*2 + 2,
15311536
VQ1 = variable_queue_publish(false, Count, VQ0),
15321537
{VQ2, AcksR} = variable_queue_fetch(Count, false, false, Count, VQ1),
15331538
{Back, Front} = lists:split(Count div 2, AcksR),
@@ -1803,7 +1808,8 @@ requeue_one_by_one(Acks, VQ) ->
18031808
%% internal queues. Kept for completeness.
18041809
variable_queue_with_holes(VQ0) ->
18051810
Interval = 2048, %% should match vq:IO_BATCH_SIZE
1806-
Count = 2 * Interval + 2 * segment_size(),
1811+
IndexMod = index_mod(),
1812+
Count = IndexMod:next_segment_boundary(0)*2 + 2 * Interval,
18071813
Seq = lists:seq(1, Count),
18081814
VQ1 = variable_queue_publish(
18091815
false, 1, Count,

0 commit comments

Comments
 (0)