Skip to content

Commit 00a6b76

Browse files
committed
CQ: Tune reads from index based on message rate
We want to reduce the number of times we cross over to the next segment when reading. We do this by computing a threshold based on read rate, then reading a little more if we are close enough to the end of the current segment, and a little less if we are not reading enough into the next segment.
1 parent c17f66a commit 00a6b76

File tree

2 files changed

+48
-4
lines changed

2 files changed

+48
-4
lines changed

deps/rabbit/src/rabbit_classic_queue_index_v2.erl

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
-export([erase/1, init/1, reset_state/1, recover/4,
1111
bounds/2, next_segment_boundary/1, info/1,
1212
terminate/3, delete_and_terminate/1,
13-
publish/7, ack/2, read/3,
13+
publish/7, ack/2, tune_read/2, read/3,
1414
sync/1, needs_sync/1]).
1515

1616
%% Recovery. Unlike other functions in this module, these
@@ -1067,6 +1067,52 @@ bounds(State = #qi{ segments = Segments }, NextSeqIdHint) ->
10671067
%% The next_segment_boundary/1 function is used internally when
10681068
%% reading. It should not be called from rabbit_variable_queue.
10691069

1070+
%% @todo OK so segment boundaries are still good but we need
1071+
%% to be smarter about them. The queue can give the
1072+
%% read it is about to do and the index can decide to
1073+
%% lower or increase the limit to match segments (so
1074+
%% if we are close to a segment we increase otherwise
1075+
%% we lower, based on how many reads are requested).
1076+
1077+
-spec tune_read(rabbit_variable_queue:seq_id(), rabbit_variable_queue:seq_id())
1078+
-> rabbit_variable_queue:seq_id().
1079+
1080+
tune_read(FromSeqId, ToSeqId)
1081+
when FromSeqId =:= ToSeqId ->
1082+
%% Nothing will be read as From is inclusive but To is exclusive.
1083+
ToSeqId;
1084+
tune_read(FromSeqId, ToSeqId) ->
1085+
%% How much we are reading.
1086+
ReqCount = ToSeqId - FromSeqId,
1087+
%% How much remains in the current segment.
1088+
NextSeqId = next_segment_boundary(FromSeqId),
1089+
RemCount = NextSeqId - FromSeqId,
1090+
%% How much we are willing to accept as extra messages to read.
1091+
Threshold = max(1, ReqCount div 7),
1092+
if
1093+
%% There are messages remaining in the segment, and the number
1094+
%% of messages remaining is less than the threshold: read up
1095+
%% to the end of the segment (To is exclusive).
1096+
(RemCount >= ReqCount) andalso (RemCount - ReqCount =< Threshold) ->
1097+
NextSeqId;
1098+
%% There are messages remaining in the segment but the number
1099+
%% of messages remaining is more than we are willing to read:
1100+
%% only read what was originally requested.
1101+
(RemCount >= ReqCount) ->
1102+
ToSeqId;
1103+
%% We are requested to read past the end of the current segment.
1104+
%% This would require us to read from two different segments,
1105+
%% which we only want to do if this involves a good number of
1106+
%% messages. If this number is below the threshold, we reduce
1107+
%% the number of messages to read.
1108+
(ReqCount - RemCount =< Threshold) ->
1109+
NextSeqId;
1110+
%% Otherwise we cross over into the next segment, meaning we
1111+
%% only read what was originally requested.
1112+
true ->
1113+
ToSeqId
1114+
end.
1115+
10701116
-spec next_segment_boundary(SeqId) -> SeqId when SeqId::rabbit_variable_queue:seq_id().
10711117

10721118
next_segment_boundary(SeqId) ->

deps/rabbit/src/rabbit_variable_queue.erl

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1866,9 +1866,7 @@ read_from_q_tail(DelsAndAcksFun,
18661866
%% messages we read.
18671867
%% @todo Simply ask for N messages instead of low/high bounds.
18681868
QTailSeqLimit = QTailSeqId + MemoryLimit,
1869-
QTailSeqId1 =
1870-
lists:min([rabbit_classic_queue_index_v2:next_segment_boundary(QTailSeqId),
1871-
QTailSeqLimit, QTailSeqIdEnd]),
1869+
QTailSeqId1 = rabbit_classic_queue_index_v2:tune_read(QTailSeqId, min(QTailSeqLimit, QTailSeqIdEnd)),
18721870
{List0, IndexState1} = rabbit_classic_queue_index_v2:read(QTailSeqId, QTailSeqId1, IndexState),
18731871
{List, StoreState3, MCStateP3, MCStateT3} = case WhatToRead of
18741872
messages ->

0 commit comments

Comments
 (0)