3030import org .apache .kafka .common .protocol .Errors ;
3131import org .apache .kafka .common .requests .ListOffsetsRequest ;
3232import org .apache .kafka .common .requests .ListOffsetsResponse ;
33+ import org .apache .kafka .common .utils .ExponentialBackoffManager ;
3334import org .apache .kafka .common .utils .Time ;
35+ import org .apache .kafka .common .utils .Utils ;
3436import org .apache .kafka .metadata .MetadataCache ;
3537import org .apache .kafka .server .util .InterBrokerSendThread ;
3638import org .apache .kafka .server .util .RequestAndCompletionHandler ;
39+ import org .apache .kafka .server .util .timer .Timer ;
40+ import org .apache .kafka .server .util .timer .TimerTask ;
3741
3842import org .slf4j .Logger ;
3943import org .slf4j .LoggerFactory ;
@@ -54,20 +58,42 @@ public class NetworkPartitionMetadataClient implements PartitionMetadataClient {
5458
5559 private static final Logger log = LoggerFactory .getLogger (NetworkPartitionMetadataClient .class );
5660
61+ private static final long REQUEST_BACKOFF_MS = 1_000L ;
62+ private static final long REQUEST_BACKOFF_MAX_MS = 30_000L ;
63+ private static final int MAX_RETRY_ATTEMPTS = 5 ;
64+
5765 private final MetadataCache metadataCache ;
5866 private final Supplier <KafkaClient > networkClientSupplier ;
5967 private final Time time ;
6068 private final ListenerName listenerName ;
6169 private final AtomicBoolean initialized = new AtomicBoolean (false );
6270 private volatile SendThread sendThread ;
71+ private final Timer timer ;
6372
6473 public NetworkPartitionMetadataClient (MetadataCache metadataCache ,
6574 Supplier <KafkaClient > networkClientSupplier ,
66- Time time , ListenerName listenerName ) {
75+ Time time , ListenerName listenerName , Timer timer ) {
76+ if (metadataCache == null ) {
77+ throw new IllegalArgumentException ("MetadataCache must not be null." );
78+ }
79+ if (networkClientSupplier == null ) {
80+ throw new IllegalArgumentException ("NetworkClientSupplier must not be null." );
81+ }
82+ if (time == null ) {
83+ throw new IllegalArgumentException ("Time must not be null." );
84+ }
85+ if (listenerName == null ) {
86+ throw new IllegalArgumentException ("ListenerName must not be null." );
87+ }
88+ if (timer == null ) {
89+ throw new IllegalArgumentException ("Timer must not be null." );
90+ }
91+
6792 this .metadataCache = metadataCache ;
6893 this .networkClientSupplier = networkClientSupplier ;
6994 this .time = time ;
7095 this .listenerName = listenerName ;
96+ this .timer = timer ;
7197 }
7298
7399 @ Override
@@ -125,6 +151,7 @@ public Map<TopicPartition, CompletableFuture<OffsetResponse>> listLatestOffsets(
125151 public void close () {
126152 // Only close sendThread if it was initialized. Note, close is called only during broker shutdown, so need
127153 // for further synchronization here.
154+ Utils .closeQuietly (timer , "NetworkPartitionMetadataClient timer" );
128155 if (!initialized .get ()) {
129156 return ;
130157 }
@@ -186,14 +213,18 @@ private ListOffsetsRequest.Builder createListOffsetsRequest(List<TopicPartition>
186213 * Handles the response from a ListOffsets request.
187214 */
188215 // Visible for Testing.
189- void handleResponse (Map < TopicPartition , CompletableFuture < OffsetResponse >> partitionFutures , ClientResponse clientResponse ) {
216+ void handleResponse (PendingRequest pendingRequest , ClientResponse clientResponse ) {
190217 // Handle error responses first
191- if (maybeHandleErrorResponse (partitionFutures , clientResponse )) {
218+ if (maybeHandleErrorResponse (pendingRequest , clientResponse )) {
192219 return ;
193220 }
194221
195222 log .debug ("ListOffsets response received successfully - {}" , clientResponse );
223+ // Reset retry attempts on success
224+ pendingRequest .backoffManager ().resetAttempts ();
225+
196226 ListOffsetsResponse response = (ListOffsetsResponse ) clientResponse .responseBody ();
227+ Map <TopicPartition , CompletableFuture <OffsetResponse >> partitionFutures = pendingRequest .futures ();
197228
198229 for (ListOffsetsTopicResponse topicResponse : response .topics ()) {
199230 String topicName = topicResponse .name ();
@@ -216,11 +247,14 @@ void handleResponse(Map<TopicPartition, CompletableFuture<OffsetResponse>> parti
216247 }
217248
218249 /**
219- * Handles error responses by completing all associated futures with an error. Returns true if an error was
220- * handled. Otherwise, returns false.
250+ * Handles error responses by completing all associated futures with an error or retrying the request.
251+ * Returns true if an error was handled. Otherwise, returns false.
221252 */
222- private boolean maybeHandleErrorResponse (Map <TopicPartition , CompletableFuture <OffsetResponse >> partitionFutures , ClientResponse clientResponse ) {
253+ private boolean maybeHandleErrorResponse (PendingRequest pendingRequest , ClientResponse clientResponse ) {
254+ Map <TopicPartition , CompletableFuture <OffsetResponse >> partitionFutures = pendingRequest .futures ();
223255 Errors error ;
256+ boolean shouldRetry = false ;
257+
224258 if (clientResponse == null ) {
225259 log .error ("Response for ListOffsets for topicPartitions: {} is null" , partitionFutures .keySet ());
226260 error = Errors .UNKNOWN_SERVER_ERROR ;
@@ -231,11 +265,13 @@ private boolean maybeHandleErrorResponse(Map<TopicPartition, CompletableFuture<O
231265 log .error ("Version mismatch exception" , clientResponse .versionMismatch ());
232266 error = Errors .UNKNOWN_SERVER_ERROR ;
233267 } else if (clientResponse .wasDisconnected ()) {
234- log .error ("Response for ListOffsets for TopicPartitions: {} was disconnected - {}." , partitionFutures .keySet (), clientResponse );
268+ log .debug ("Response for ListOffsets for TopicPartitions: {} was disconnected - {}." , partitionFutures .keySet (), clientResponse );
235269 error = Errors .NETWORK_EXCEPTION ;
270+ shouldRetry = true ;
236271 } else if (clientResponse .wasTimedOut ()) {
237- log .error ("Response for ListOffsets for TopicPartitions: {} timed out - {}." , partitionFutures .keySet (), clientResponse );
272+ log .debug ("Response for ListOffsets for TopicPartitions: {} timed out - {}." , partitionFutures .keySet (), clientResponse );
238273 error = Errors .REQUEST_TIMED_OUT ;
274+ shouldRetry = true ;
239275 } else if (!clientResponse .hasResponse ()) {
240276 log .error ("Response for ListOffsets for TopicPartitions: {} has no response - {}." , partitionFutures .keySet (), clientResponse );
241277 error = Errors .UNKNOWN_SERVER_ERROR ;
@@ -244,16 +280,63 @@ private boolean maybeHandleErrorResponse(Map<TopicPartition, CompletableFuture<O
244280 return false ;
245281 }
246282
283+ // For retriable errors (disconnected or timed out), attempt retry if possible
284+ if (shouldRetry ) {
285+ ExponentialBackoffManager backoffManager = pendingRequest .backoffManager ();
286+ if (backoffManager .canAttempt ()) {
287+ backoffManager .incrementAttempt ();
288+ long backoffMs = backoffManager .backOff ();
289+ log .debug ("Retrying ListOffsets request for TopicPartitions: {} after {} ms (attempt {}/{})" ,
290+ partitionFutures .keySet (), backoffMs , backoffManager .attempts (), MAX_RETRY_ATTEMPTS );
291+ timer .add (new RetryTimerTask (backoffMs , pendingRequest ));
292+ return true ;
293+ } else {
294+ log .error ("Exhausted max retries ({}) for ListOffsets request for TopicPartitions: {}" ,
295+ MAX_RETRY_ATTEMPTS , partitionFutures .keySet ());
296+ }
297+ }
298+
299+ // Complete all futures with error (either non-retriable error or exhausted retries)
247300 partitionFutures .forEach ((tp , future ) -> future .complete (new OffsetResponse (-1 , error )));
248301 return true ;
249302 }
250303
251304 /**
252305 * Tracks a pending ListOffsets request and its associated futures.
253306 */
254- private record PendingRequest (Node node ,
255- Map <TopicPartition , CompletableFuture <OffsetResponse >> futures ,
256- ListOffsetsRequest .Builder requestBuilder ) {
307+ // Visible for testing.
308+ record PendingRequest (Node node ,
309+ Map <TopicPartition , CompletableFuture <OffsetResponse >> futures ,
310+ ListOffsetsRequest .Builder requestBuilder ,
311+ ExponentialBackoffManager backoffManager ) {
312+ PendingRequest (Node node ,
313+ Map <TopicPartition , CompletableFuture <OffsetResponse >> futures ,
314+ ListOffsetsRequest .Builder requestBuilder ) {
315+ this (node , futures , requestBuilder , new ExponentialBackoffManager (
316+ MAX_RETRY_ATTEMPTS ,
317+ REQUEST_BACKOFF_MS ,
318+ CommonClientConfigs .RETRY_BACKOFF_EXP_BASE ,
319+ REQUEST_BACKOFF_MAX_MS ,
320+ CommonClientConfigs .RETRY_BACKOFF_JITTER ));
321+ }
322+ }
323+
324+ /**
325+ * Timer task for retrying failed requests after backoff.
326+ */
327+ private final class RetryTimerTask extends TimerTask {
328+ private final PendingRequest pendingRequest ;
329+
330+ RetryTimerTask (long delayMs , PendingRequest pendingRequest ) {
331+ super (delayMs );
332+ this .pendingRequest = pendingRequest ;
333+ }
334+
335+ @ Override
336+ public void run () {
337+ sendThread .enqueue (pendingRequest );
338+ sendThread .wakeup ();
339+ }
257340 }
258341
259342 private class SendThread extends InterBrokerSendThread {
@@ -286,8 +369,7 @@ public Collection<RequestAndCompletionHandler> generateRequests() {
286369 time .hiResClockMs (),
287370 current .node ,
288371 requestBuilder ,
289- response -> handleResponse (current .futures , response )
290- );
372+ response -> handleResponse (current , response ));
291373
292374 requests .add (requestHandler );
293375 }
0 commit comments