Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/docs/asciidoc/api.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,12 @@ outstanding unconfirmed messages timed out.
|Time before enqueueing of a message fail when the maximum number of unconfirmed
is reached. The callback of the message will be called with a negative status.
Set the value to `Duration.ZERO` if there should be no timeout.
|10 seconds.
|10 seconds

|`retryOnRecovery`
|Re-publish unconfirmed messages when restoring a connection.
Set to false if do not want to re-publish unconfirmed messages when restoring a connection.
|true
|===

==== Sending Messages
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/com/rabbitmq/stream/ProducerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,18 @@ public interface ProducerBuilder {
*/
ProducerBuilder enqueueTimeout(Duration timeout);

/**
* Re-publish unconfirmed messages when restoring a connection.
*
* <p>Default is true.</p>
*
* <p>Set to false if do not want to re-publish unconfirmed messages when restoring a connection.</p>
*
* @param retryOnRecovery
* @return this builder instance
*/
ProducerBuilder retryOnRecovery(boolean retryOnRecovery);

/**
* Logic to extract a filter value from a message.
*
Expand Down
82 changes: 50 additions & 32 deletions src/main/java/com/rabbitmq/stream/impl/StreamProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class StreamProducer implements Producer {
entity -> ((AccumulatedEntity) entity).publishingId();
private final long enqueueTimeoutMs;
private final boolean blockOnMaxUnconfirmed;
private final boolean retryOnRecovery;
private volatile Client client;
private volatile byte publisherId;
private volatile Status status;
Expand All @@ -95,6 +96,7 @@ class StreamProducer implements Producer {
int maxUnconfirmedMessages,
Duration confirmTimeout,
Duration enqueueTimeout,
boolean retryOnRecovery,
Function<Message, String> filterValueExtractor,
StreamEnvironment environment) {
if (filterValueExtractor != null && !environment.filteringSupported()) {
Expand All @@ -107,6 +109,7 @@ class StreamProducer implements Producer {
this.name = name;
this.stream = stream;
this.enqueueTimeoutMs = enqueueTimeout.toMillis();
this.retryOnRecovery = retryOnRecovery;
this.blockOnMaxUnconfirmed = enqueueTimeout.isZero();
this.closingCallback = environment.registerProducer(this, name, this.stream);
final Client.OutboundEntityWriteCallback delegateWriteCallback;
Expand Down Expand Up @@ -504,43 +507,58 @@ void unavailable() {

void running() {
synchronized (this) {
LOGGER.debug(
"Re-publishing {} unconfirmed message(s) and {} accumulated message(s)",
this.unconfirmedMessages.size(),
this.accumulator.size());
if (!this.unconfirmedMessages.isEmpty()) {
Map<Long, AccumulatedEntity> messagesToResend = new TreeMap<>(this.unconfirmedMessages);
if (!this.retryOnRecovery) {
LOGGER.debug(
"Skip to republish {} unconfirmed message(s) and re-publishing {} accumulated message(s)",
this.unconfirmedMessages.size(),
this.accumulator.size());

this.unconfirmedMessages.clear();
Iterator<Entry<Long, AccumulatedEntity>> resendIterator =
int toRelease = maxUnconfirmedMessages - unconfirmedMessagesSemaphore.availablePermits();
if (toRelease > 0) {
unconfirmedMessagesSemaphore.release(toRelease);
}

publishBatch(false);
} else {
LOGGER.debug(
"Re-publishing {} unconfirmed message(s) and {} accumulated message(s)",
this.unconfirmedMessages.size(),
this.accumulator.size());
if (!this.unconfirmedMessages.isEmpty()) {
Map<Long, AccumulatedEntity> messagesToResend = new TreeMap<>(this.unconfirmedMessages);
this.unconfirmedMessages.clear();
Iterator<Entry<Long, AccumulatedEntity>> resendIterator =
messagesToResend.entrySet().iterator();
while (resendIterator.hasNext()) {
List<Object> messages = new ArrayList<>(this.batchSize);
int batchCount = 0;
while (batchCount != this.batchSize) {
Object accMessage = resendIterator.hasNext() ? resendIterator.next().getValue() : null;
if (accMessage == null) {
break;
while (resendIterator.hasNext()) {
List<Object> messages = new ArrayList<>(this.batchSize);
int batchCount = 0;
while (batchCount != this.batchSize) {
Object accMessage = resendIterator.hasNext() ? resendIterator.next().getValue() : null;
if (accMessage == null) {
break;
}
messages.add(accMessage);
batchCount++;
}
messages.add(accMessage);
batchCount++;
client.publishInternal(
this.publishVersion,
this.publisherId,
messages,
this.writeCallback,
this.publishSequenceFunction);
}
client.publishInternal(
this.publishVersion,
this.publisherId,
messages,
this.writeCallback,
this.publishSequenceFunction);
}
}
publishBatch(false);

int toRelease = maxUnconfirmedMessages - unconfirmedMessagesSemaphore.availablePermits();
if (toRelease > 0) {
unconfirmedMessagesSemaphore.release(toRelease);
if (!unconfirmedMessagesSemaphore.tryAcquire(this.unconfirmedMessages.size())) {
LOGGER.debug(
"Could not acquire {} permit(s) for message republishing",
this.unconfirmedMessages.size());
publishBatch(false);

int toRelease = maxUnconfirmedMessages - unconfirmedMessagesSemaphore.availablePermits();
if (toRelease > 0) {
unconfirmedMessagesSemaphore.release(toRelease);
if (!unconfirmedMessagesSemaphore.tryAcquire(this.unconfirmedMessages.size())) {
LOGGER.debug(
"Could not acquire {} permit(s) for message republishing",
this.unconfirmedMessages.size());
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class StreamProducerBuilder implements ProducerBuilder {

private Duration enqueueTimeout = Duration.ofSeconds(10);

private boolean retryOnRecovery = true;

private DefaultRoutingConfiguration routingConfiguration;

private Function<Message, String> filterValueExtractor;
Expand Down Expand Up @@ -131,6 +133,12 @@ public ProducerBuilder enqueueTimeout(Duration timeout) {
return this;
}

@Override
public ProducerBuilder retryOnRecovery(boolean retryOnRecovery) {
this.retryOnRecovery = retryOnRecovery;
return this;
}

@Override
public ProducerBuilder filterValue(Function<Message, String> filterValueExtractor) {
this.filterValueExtractor = filterValueExtractor;
Expand Down Expand Up @@ -195,6 +203,7 @@ public Producer build() {
maxUnconfirmedMessages,
confirmTimeout,
enqueueTimeout,
retryOnRecovery,
filterValueExtractor,
environment);
this.environment.addProducer((StreamProducer) producer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ void sendToNonExistingStreamShouldReturnUnconfirmedStatus() throws Exception {
@TestUtils.DisabledIfRabbitMqCtlNotSet
void shouldRecoverAfterConnectionIsKilled(int subEntrySize) throws Exception {
Producer producer =
environment.producerBuilder().subEntrySize(subEntrySize).stream(stream).build();
environment.producerBuilder().subEntrySize(subEntrySize).retryOnRecovery(true).stream(stream).build();

AtomicInteger published = new AtomicInteger(0);
AtomicInteger confirmed = new AtomicInteger(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ void confirmTimeoutTaskShouldFailMessagesAfterTimeout(
messageCount * 10,
confirmTimeout,
Duration.ofSeconds(10),
true,
null,
env);

Expand Down Expand Up @@ -219,6 +220,7 @@ void enqueueTimeoutMessageShouldBeFailedWhenEnqueueTimeoutIsReached(int subEntry
2,
Duration.ofMinutes(1),
enqueueTimeout,
true,
null,
env);

Expand Down Expand Up @@ -258,6 +260,7 @@ void enqueueTimeoutSendingShouldBlockWhenEnqueueTimeoutIsZero(int subEntrySize)
2,
Duration.ofMinutes(1),
enqueueTimeout,
true,
null,
env);

Expand Down