Skip to content

Commit 9ced615

Browse files
kirktruelianetm
authored andcommitted
KAFKA-19589: Check for ability to skip position validation in application thread when collecting buffered data (#20324)
Introduces `PositionsValidator` which queries the assignment data from the `SubscriptionState` from the application thread, allowing `AsyncKafkaConsumer` to potentially avoid the need to wait. The impetus for this change is the observation of these two points: 1. Waiting for the completion of `OffsetsRequestManager.updateFetchPositions()` on the application thread means either busy-waiting or blocking, either of which add significant (~60%) CPU load to the `AsyncKafkaConsumer` compared to the `ClassicKafkaConsumer` 2. In testing, data shows that 99.99+% of the time that `OffsetsRequestManager.updateFetchPositions()` is called, the partitions are up-to-date and there is no need to fetch offsets. This patch allows the check for stable partitions to be made in the application thread, resulting in far less waiting in the critical path of `AsyncKafkaConsumer.poll()`. Reviewers: Lianet Magrans <lmagrans@confluent.io>, Andrew Schofield <aschofield@confluent.io>
1 parent b514023 commit 9ced615

File tree

13 files changed

+267
-86
lines changed

13 files changed

+267
-86
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,7 @@ private StreamsRebalanceListenerInvoker streamsRebalanceListenerInvoker() {
325325
// Init value is needed to avoid NPE in case of exception raised in the constructor
326326
private Optional<ClientTelemetryReporter> clientTelemetryReporter = Optional.empty();
327327

328+
private final PositionsValidator positionsValidator;
328329
private AsyncPollEvent inflightPoll;
329330
private final WakeupTrigger wakeupTrigger = new WakeupTrigger();
330331
private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker;
@@ -429,6 +430,7 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
429430

430431
// This FetchBuffer is shared between the application and network threads.
431432
this.fetchBuffer = new FetchBuffer(logContext);
433+
this.positionsValidator = new PositionsValidator(logContext, time, subscriptions, metadata);
432434
final Supplier<NetworkClientDelegate> networkClientDelegateSupplier = NetworkClientDelegate.supplier(time,
433435
logContext,
434436
metadata,
@@ -458,7 +460,8 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
458460
metrics,
459461
offsetCommitCallbackInvoker,
460462
memberStateListener,
461-
streamsRebalanceData
463+
streamsRebalanceData,
464+
positionsValidator
462465
);
463466
final Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(logContext,
464467
metadata,
@@ -533,7 +536,8 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
533536
int requestTimeoutMs,
534537
int defaultApiTimeoutMs,
535538
String groupId,
536-
boolean autoCommitEnabled) {
539+
boolean autoCommitEnabled,
540+
PositionsValidator positionsValidator) {
537541
this.log = logContext.logger(getClass());
538542
this.subscriptions = subscriptions;
539543
this.clientId = clientId;
@@ -565,6 +569,7 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
565569
time,
566570
asyncConsumerMetrics
567571
);
572+
this.positionsValidator = positionsValidator;
568573
}
569574

570575
AsyncKafkaConsumer(LogContext logContext,
@@ -624,6 +629,7 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
624629
new RebalanceCallbackMetricsManager(metrics)
625630
);
626631
ApiVersions apiVersions = new ApiVersions();
632+
this.positionsValidator = new PositionsValidator(logContext, time, subscriptions, metadata);
627633
Supplier<NetworkClientDelegate> networkClientDelegateSupplier = NetworkClientDelegate.supplier(
628634
time,
629635
config,
@@ -651,7 +657,8 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
651657
metrics,
652658
offsetCommitCallbackInvoker,
653659
memberStateListener,
654-
Optional.empty()
660+
Optional.empty(),
661+
positionsValidator
655662
);
656663
Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier = ApplicationEventProcessor.supplier(
657664
logContext,
@@ -1930,6 +1937,10 @@ private Fetch<K, V> collectFetch() {
19301937
// thread has not completed that stage for the inflight event, don't attempt to collect data from the fetch
19311938
// buffer. If the inflight event was nulled out by checkInflightPoll(), that implies that it is safe to
19321939
// attempt to collect data from the fetch buffer.
1940+
if (positionsValidator.canSkipUpdateFetchPositions()) {
1941+
return fetchCollector.collectFetch(fetchBuffer);
1942+
}
1943+
19331944
if (inflightPoll != null && !inflightPoll.isValidatePositionsComplete()) {
19341945
return Fetch.empty();
19351946
}

clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ public void resetPositionsIfNeeded() {
115115
*/
116116
public void validatePositionsIfNeeded() {
117117
Map<TopicPartition, SubscriptionState.FetchPosition> partitionsToValidate =
118-
offsetFetcherUtils.getPartitionsToValidate();
118+
offsetFetcherUtils.refreshAndGetPartitionsToValidate();
119119

120120
validatePositionsAsync(partitionsToValidate);
121121
}

clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java

Lines changed: 19 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,8 @@
4747
import java.util.Map;
4848
import java.util.Optional;
4949
import java.util.Set;
50-
import java.util.concurrent.atomic.AtomicInteger;
5150
import java.util.concurrent.atomic.AtomicReference;
5251
import java.util.function.BiFunction;
53-
import java.util.function.Function;
5452
import java.util.stream.Collectors;
5553

5654
/**
@@ -62,35 +60,41 @@ class OffsetFetcherUtils {
6260
private final Time time;
6361
private final long retryBackoffMs;
6462
private final ApiVersions apiVersions;
63+
private final PositionsValidator positionsValidator;
6564
private final Logger log;
6665

67-
/**
68-
* Exception that occurred while validating positions, that will be propagated on the next
69-
* call to validate positions. This could be an error received in the
70-
* OffsetsForLeaderEpoch response, or a LogTruncationException detected when using a
71-
* successful response to validate the positions. It will be cleared when thrown.
72-
*/
73-
private final AtomicReference<RuntimeException> cachedValidatePositionsException = new AtomicReference<>();
7466
/**
7567
* Exception that occurred while resetting positions, that will be propagated on the next
7668
* call to reset positions. This will have the error received in the response to the
7769
* ListOffsets request. It will be cleared when thrown on the next call to reset.
7870
*/
7971
private final AtomicReference<RuntimeException> cachedResetPositionsException = new AtomicReference<>();
80-
private final AtomicInteger metadataUpdateVersion = new AtomicInteger(-1);
8172

8273
OffsetFetcherUtils(LogContext logContext,
8374
ConsumerMetadata metadata,
8475
SubscriptionState subscriptionState,
8576
Time time,
8677
long retryBackoffMs,
8778
ApiVersions apiVersions) {
79+
this(logContext, metadata, subscriptionState,
80+
time, retryBackoffMs, apiVersions,
81+
new PositionsValidator(logContext, time, subscriptionState, metadata));
82+
}
83+
84+
OffsetFetcherUtils(LogContext logContext,
85+
ConsumerMetadata metadata,
86+
SubscriptionState subscriptionState,
87+
Time time,
88+
long retryBackoffMs,
89+
ApiVersions apiVersions,
90+
PositionsValidator positionsValidator) {
8891
this.log = logContext.logger(getClass());
8992
this.metadata = metadata;
9093
this.subscriptionState = subscriptionState;
9194
this.time = time;
9295
this.retryBackoffMs = retryBackoffMs;
9396
this.apiVersions = apiVersions;
97+
this.positionsValidator = positionsValidator;
9498
}
9599

96100
/**
@@ -168,41 +172,16 @@ <T> Map<Node, Map<TopicPartition, T>> regroupPartitionMapByNode(Map<TopicPartiti
168172
Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
169173
}
170174

171-
Map<TopicPartition, SubscriptionState.FetchPosition> getPartitionsToValidate() {
172-
RuntimeException exception = cachedValidatePositionsException.getAndSet(null);
173-
if (exception != null)
174-
throw exception;
175-
176-
// Validate each partition against the current leader and epoch
177-
// If we see a new metadata version, check all partitions
178-
validatePositionsOnMetadataChange();
179-
180-
// Collect positions needing validation, with backoff
181-
return subscriptionState
182-
.partitionsNeedingValidation(time.milliseconds())
183-
.stream()
184-
.filter(tp -> subscriptionState.position(tp) != null)
185-
.collect(Collectors.toMap(Function.identity(), subscriptionState::position));
186-
}
187-
188-
void maybeSetValidatePositionsException(RuntimeException e) {
189-
if (!cachedValidatePositionsException.compareAndSet(null, e)) {
190-
log.error("Discarding error validating positions because another error is pending", e);
191-
}
175+
Map<TopicPartition, SubscriptionState.FetchPosition> refreshAndGetPartitionsToValidate() {
176+
return positionsValidator.refreshAndGetPartitionsToValidate(apiVersions);
192177
}
193178

194179
/**
195180
* If we have seen new metadata (as tracked by {@link org.apache.kafka.clients.Metadata#updateVersion()}), then
196181
* we should check that all the assignments have a valid position.
197182
*/
198183
void validatePositionsOnMetadataChange() {
199-
int newMetadataUpdateVersion = metadata.updateVersion();
200-
if (metadataUpdateVersion.getAndSet(newMetadataUpdateVersion) != newMetadataUpdateVersion) {
201-
subscriptionState.assignedPartitions().forEach(topicPartition -> {
202-
ConsumerMetadata.LeaderAndEpoch leaderAndEpoch = metadata.currentLeader(topicPartition);
203-
subscriptionState.maybeValidatePositionForCurrentLeader(apiVersions, topicPartition, leaderAndEpoch);
204-
});
205-
}
184+
positionsValidator.validatePositionsOnMetadataChange(apiVersions);
206185
}
207186

208187
/**
@@ -357,7 +336,7 @@ void onSuccessfulResponseForValidatingPositions(
357336
});
358337

359338
if (!truncations.isEmpty()) {
360-
maybeSetValidatePositionsException(buildLogTruncationException(truncations));
339+
positionsValidator.maybeSetError(buildLogTruncationException(truncations));
361340
}
362341
}
363342

@@ -367,7 +346,7 @@ void onFailedResponseForValidatingPositions(final Map<TopicPartition, Subscripti
367346
metadata.requestUpdate(false);
368347

369348
if (!(error instanceof RetriableException)) {
370-
maybeSetValidatePositionsException(error);
349+
positionsValidator.maybeSetError(error);
371350
}
372351
}
373352

clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ public OffsetsRequestManager(final SubscriptionState subscriptionState,
119119
final ApiVersions apiVersions,
120120
final NetworkClientDelegate networkClientDelegate,
121121
final CommitRequestManager commitRequestManager,
122+
final PositionsValidator positionsValidator,
122123
final LogContext logContext) {
123124
requireNonNull(subscriptionState);
124125
requireNonNull(metadata);
@@ -140,7 +141,7 @@ public OffsetsRequestManager(final SubscriptionState subscriptionState,
140141
this.apiVersions = apiVersions;
141142
this.networkClientDelegate = networkClientDelegate;
142143
this.offsetFetcherUtils = new OffsetFetcherUtils(logContext, metadata, subscriptionState,
143-
time, retryBackoffMs, apiVersions);
144+
time, retryBackoffMs, apiVersions, positionsValidator);
144145
// Register the cluster metadata update callback. Note this only relies on the
145146
// requestsToRetry initialized above, and won't be invoked until all managers are
146147
// initialized and the network thread started.
@@ -231,8 +232,8 @@ public CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> fetchO
231232
* on {@link SubscriptionState#hasAllFetchPositions()}). It will complete immediately, with true, if all positions
232233
* are already available. If some positions are missing, the future will complete once the offsets are retrieved and positions are updated.
233234
*/
234-
public CompletableFuture<Boolean> updateFetchPositions(long deadlineMs) {
235-
CompletableFuture<Boolean> result = new CompletableFuture<>();
235+
public CompletableFuture<Void> updateFetchPositions(long deadlineMs) {
236+
CompletableFuture<Void> result = new CompletableFuture<>();
236237

237238
try {
238239
if (maybeCompleteWithPreviousException(result)) {
@@ -243,7 +244,7 @@ public CompletableFuture<Boolean> updateFetchPositions(long deadlineMs) {
243244

244245
if (subscriptionState.hasAllFetchPositions()) {
245246
// All positions are already available
246-
result.complete(true);
247+
result.complete(null);
247248
return result;
248249
}
249250

@@ -252,7 +253,7 @@ public CompletableFuture<Boolean> updateFetchPositions(long deadlineMs) {
252253
if (error != null) {
253254
result.completeExceptionally(error);
254255
} else {
255-
result.complete(subscriptionState.hasAllFetchPositions());
256+
result.complete(null);
256257
}
257258
});
258259

@@ -262,7 +263,7 @@ public CompletableFuture<Boolean> updateFetchPositions(long deadlineMs) {
262263
return result;
263264
}
264265

265-
private boolean maybeCompleteWithPreviousException(CompletableFuture<Boolean> result) {
266+
private boolean maybeCompleteWithPreviousException(CompletableFuture<Void> result) {
266267
Throwable cachedException = cachedUpdatePositionsException.getAndSet(null);
267268
if (cachedException != null) {
268269
result.completeExceptionally(cachedException);
@@ -501,7 +502,7 @@ CompletableFuture<Void> resetPositionsIfNeeded() {
501502
* next call to this function.
502503
*/
503504
void validatePositionsIfNeeded() {
504-
Map<TopicPartition, SubscriptionState.FetchPosition> partitionsToValidate = offsetFetcherUtils.getPartitionsToValidate();
505+
Map<TopicPartition, SubscriptionState.FetchPosition> partitionsToValidate = offsetFetcherUtils.refreshAndGetPartitionsToValidate();
505506
if (partitionsToValidate.isEmpty()) {
506507
return;
507508
}

0 commit comments

Comments
 (0)