Skip to content

Commit 3b5983e

Browse files
authored
Merge pull request #881 from rabbitmq/chunk-id-in-flow-strategy-context
Add chunk ID to flow strategy context
2 parents 0e33442 + 04de5a5 commit 3b5983e

File tree

4 files changed

+30
-4
lines changed

4 files changed

+30
-4
lines changed

src/main/java/com/rabbitmq/stream/ConsumerFlowStrategy.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ interface Context {
6767
/**
6868
* Provide credits for the subscription.
6969
*
70-
* <p>{@link ConsumerFlowStrategy} implementation should always provide 1 credit a given chunk.
70+
* <p>{@link ConsumerFlowStrategy} implementation should always provide 1 credit for a given
71+
* chunk.
7172
*
7273
* @param credits the number of credits provided, usually 1
7374
*/
@@ -79,6 +80,13 @@ interface Context {
7980
* @return number of messages in the chunk
8081
*/
8182
long messageCount();
83+
84+
/**
85+
* The offset of the first message in the chunk, aka chunk ID.
86+
*
87+
* @return offset of the first message in the chunk (chunk ID)
88+
*/
89+
long chunkId();
8290
}
8391

8492
/** Behavior for {@link MessageHandler.Context#processed()} calls. */

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -665,7 +665,8 @@ private ClientSubscriptionsManager(
665665
if (subscriptionTracker != null && subscriptionTracker.consumer.isOpen()) {
666666
processCallback =
667667
subscriptionTracker.flowStrategy.start(
668-
new DefaultConsumerFlowStrategyContext(subscriptionId, client, messageCount));
668+
new DefaultConsumerFlowStrategyContext(
669+
subscriptionId, client, messageCount, offset));
669670
} else {
670671
LOGGER.debug(
671672
"Could not find stream subscription {} or subscription closing, not providing credits",
@@ -1391,12 +1392,14 @@ private static class DefaultConsumerFlowStrategyContext implements ConsumerFlowS
13911392
private final byte subscriptionId;
13921393
private final Client client;
13931394
private final long messageCount;
1395+
private final long chunkId;
13941396

13951397
private DefaultConsumerFlowStrategyContext(
1396-
byte subscriptionId, Client client, long messageCount) {
1398+
byte subscriptionId, Client client, long messageCount, long chunkId) {
13971399
this.subscriptionId = subscriptionId;
13981400
this.client = client;
13991401
this.messageCount = messageCount;
1402+
this.chunkId = chunkId;
14001403
}
14011404

14021405
@Override
@@ -1414,7 +1417,12 @@ public void credits(int credits) {
14141417

14151418
@Override
14161419
public long messageCount() {
1417-
return messageCount;
1420+
return this.messageCount;
1421+
}
1422+
1423+
@Override
1424+
public long chunkId() {
1425+
return this.chunkId;
14181426
}
14191427
}
14201428

src/test/java/com/rabbitmq/stream/impl/CreditEveryNthChunkConsumerFlowStrategyTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,11 @@ public void credits(int credits) {
6666
public long messageCount() {
6767
throw new UnsupportedOperationException();
6868
}
69+
70+
@Override
71+
public long chunkId() {
72+
throw new UnsupportedOperationException();
73+
}
6974
};
7075
}
7176
}

src/test/java/com/rabbitmq/stream/impl/MessageCountConsumerFlowStrategyTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,11 @@ public void credits(int credits) {
7070
public long messageCount() {
7171
return messageCount;
7272
}
73+
74+
@Override
75+
public long chunkId() {
76+
return 0;
77+
}
7378
};
7479
}
7580
}

0 commit comments

Comments
 (0)