Skip to content

Commit f7d223c

Browse files
authored
feat(zerozone2): fast acks=0 (#2964)
Signed-off-by: Robin Han <hanxvdovehx@gmail.com>
1 parent 0e8b088 commit f7d223c

File tree

2 files changed

+23
-11
lines changed

2 files changed

+23
-11
lines changed

core/src/main/java/kafka/automq/zerozone/RouterInV2.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,8 @@ private CompletableFuture<AutomqZoneRouterResponseData.Response> append0(
196196
ProduceRequestArgs.builder()
197197
.clientId(buildClientId(realEntriesPerPartition))
198198
.timeout(data.timeoutMs())
199-
.requiredAcks(data.acks())
199+
// The CommittedEpochManager requires the data to be persisted prior to bumping the committed epoch.
200+
.requiredAcks((short) -1)
200201
.internalTopicsAllowed(flag.internalTopicsAllowed())
201202
.transactionId(data.transactionalId())
202203
.entriesPerPartition(realEntriesPerPartition)

core/src/main/java/kafka/automq/zerozone/RouterOutV2.java

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ public void handleProduceAppendProxy(ProduceRequestArgs args) {
8585
Map<TopicPartition, ProduceResponse.PartitionResponse> responseMap = new ConcurrentHashMap<>();
8686
List<CompletableFuture<Void>> cfList = new ArrayList<>(args.entriesPerPartition().size());
8787
long startNanos = time.nanoseconds();
88+
boolean acks0 = args.requiredAcks() == (short) 0;
8889
for (Map.Entry<TopicPartition, MemoryRecords> entry : args.entriesPerPartition().entrySet()) {
8990
TopicPartition tp = entry.getKey();
9091
MemoryRecords records = entry.getValue();
@@ -103,18 +104,27 @@ public void handleProduceAppendProxy(ProduceRequestArgs args) {
103104
ProxyRequest proxyRequest = new ProxyRequest(tp, channelRst.epoch(), channelRst.channelOffset(), zoneRouterProduceRequest, recordSize, timeoutMillis);
104105
sendProxyRequest(node, proxyRequest);
105106
return proxyRequest.cf.thenAccept(response -> {
106-
responseMap.put(tp, response);
107+
if (!acks0) {
108+
responseMap.put(tp, response);
109+
}
107110
ZeroZoneMetricsManager.PROXY_REQUEST_LATENCY.record(time.nanoseconds() - startNanos);
108111
});
109112
});
110113
cfList.add(proxyCf);
111114
}
112115
Consumer<Map<TopicPartition, ProduceResponse.PartitionResponse>> responseCallback = args.responseCallback();
113-
CompletableFuture<Void> cf = CompletableFuture.allOf(cfList.toArray(new CompletableFuture[0]));
114-
cf.thenAccept(nil -> responseCallback.accept(responseMap)).exceptionally(ex -> {
115-
LOGGER.error("[UNEXPECTED],[ROUTE_FAIL]", ex);
116-
return null;
117-
});
116+
if (acks0) {
117+
// When acks=0 is set, invoke the callback directly without waiting for data persistence to complete.
118+
args.entriesPerPartition().forEach((tp, records) ->
119+
responseMap.put(tp, new ProduceResponse.PartitionResponse(Errors.NONE)));
120+
responseCallback.accept(responseMap);
121+
} else {
122+
CompletableFuture<Void> cf = CompletableFuture.allOf(cfList.toArray(new CompletableFuture[0]));
123+
cf.thenAccept(nil -> responseCallback.accept(responseMap)).exceptionally(ex -> {
124+
LOGGER.error("[UNEXPECTED],[ROUTE_FAIL]", ex);
125+
return null;
126+
});
127+
}
118128
}
119129

120130
private static short orderHint(TopicPartition tp, String connectionId) {
@@ -160,6 +170,7 @@ public void send(ProxyRequest request) {
160170

161171
class RemoteProxy implements Proxy {
162172
private static final int MAX_INFLIGHT_SIZE = 64;
173+
private static final long LINGER_NANOS = TimeUnit.MICROSECONDS.toNanos(100);
163174
private final Node node;
164175
private final Semaphore inflightLimiter = new Semaphore(MAX_INFLIGHT_SIZE);
165176
private final Queue<RequestBatch> requestBatchQueue = new ConcurrentLinkedQueue<>();
@@ -173,8 +184,8 @@ public synchronized void send(ProxyRequest request) {
173184
ZeroZoneMetricsManager.recordRouterOutBytes(node.id(), request.recordSize);
174185
synchronized (this) {
175186
if (requestBatch == null) {
176-
requestBatch = new RequestBatch(time, 1, 8192);
177-
Threads.COMMON_SCHEDULER.schedule(() -> trySendRequestBatch(requestBatch), 1, TimeUnit.MILLISECONDS);
187+
requestBatch = new RequestBatch(time, LINGER_NANOS, 8192);
188+
Threads.COMMON_SCHEDULER.schedule(() -> trySendRequestBatch(requestBatch), LINGER_NANOS, TimeUnit.NANOSECONDS);
178189
}
179190
if (requestBatch.add(request)) {
180191
requestBatchQueue.add(requestBatch);
@@ -279,9 +290,9 @@ static class RequestBatch {
279290
private final long batchSize;
280291
private final Map<Long, List<ProxyRequest>> requests = new TreeMap<>();
281292

282-
public RequestBatch(Time time, long lingerMs, int batchSize) {
293+
public RequestBatch(Time time, long lingerNanos, int batchSize) {
283294
this.time = time;
284-
this.lingerNanos = TimeUnit.MILLISECONDS.toNanos(lingerMs);
295+
this.lingerNanos = lingerNanos;
285296
this.batchSize = batchSize;
286297
this.batchStartNanos = System.nanoTime();
287298
}

0 commit comments

Comments
 (0)