Skip to content

Commit 0e8b088

Browse files
authored
feat(zerozone2): async start RouterChannel and ConfirmWAL (#2960)
Signed-off-by: Robin Han <hanxvdovehx@gmail.com>
1 parent b474bbf commit 0e8b088

File tree

8 files changed

+70
-32
lines changed

8 files changed

+70
-32
lines changed

core/src/main/java/kafka/automq/zerozone/ConfirmWALProvider.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121

2222
import com.automq.stream.s3.wal.WriteAheadLog;
2323

24+
import java.util.concurrent.CompletableFuture;
25+
2426
public interface ConfirmWALProvider {
2527

26-
WriteAheadLog readOnly(String walConfig, int nodeId);
28+
CompletableFuture<WriteAheadLog> readOnly(String walConfig, int nodeId);
2729

2830
}

core/src/main/java/kafka/automq/zerozone/DefaultConfirmWALProvider.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,12 @@
3131
import com.automq.stream.utils.Time;
3232

3333
import java.util.Map;
34+
import java.util.concurrent.CompletableFuture;
3435
import java.util.concurrent.ConcurrentHashMap;
36+
import java.util.concurrent.ExecutorService;
3537

3638
public class DefaultConfirmWALProvider implements ConfirmWALProvider {
39+
private static final ExecutorService START_EXECUTOR = java.util.concurrent.Executors.newCachedThreadPool();
3740
private final Map<Short, ObjectStorage> objectStorages = new ConcurrentHashMap<>();
3841
private final String clusterId;
3942
private final Time time = Time.SYSTEM;
@@ -43,7 +46,7 @@ public DefaultConfirmWALProvider(String clusterId) {
4346
}
4447

4548
@Override
46-
public WriteAheadLog readOnly(String walConfig, int nodeId) {
49+
public CompletableFuture<WriteAheadLog> readOnly(String walConfig, int nodeId) {
4750
BucketURI bucketURI = BucketURI.parse(walConfig);
4851
ObjectStorage objectStorage = objectStorages.computeIfAbsent(bucketURI.bucketId(), id -> {
4952
try {
@@ -64,6 +67,13 @@ public WriteAheadLog readOnly(String walConfig, int nodeId) {
6467
.withNodeId(nodeId)
6568
.withOpenMode(OpenMode.READ_ONLY)
6669
.build();
67-
return new ObjectWALService(time, objectStorage, objectWALConfig);
70+
ObjectWALService wal = new ObjectWALService(time, objectStorage, objectWALConfig);
71+
return CompletableFuture.runAsync(() -> {
72+
try {
73+
wal.start();
74+
} catch (Throwable e) {
75+
throw new RuntimeException(e);
76+
}
77+
}, START_EXECUTOR).thenApply(v -> wal);
6878
}
6979
}

core/src/main/java/kafka/automq/zerozone/DefaultRouterChannelProvider.java

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import org.slf4j.Logger;
3838
import org.slf4j.LoggerFactory;
3939

40-
import java.io.IOException;
4140
import java.nio.ByteBuffer;
4241
import java.util.List;
4342
import java.util.Map;
@@ -84,11 +83,6 @@ public RouterChannel channel() {
8483
.withType(WAL_TYPE)
8584
.build();
8685
ObjectWALService wal = new ObjectWALService(Time.SYSTEM, objectStorage(), config);
87-
try {
88-
wal.start();
89-
} catch (IOException e) {
90-
throw new RuntimeException(e);
91-
}
9286
RouterChannel routerChannel = new ObjectRouterChannel(this.nodeId, channelId, wal);
9387
routerChannel.nextEpoch(epoch.getCurrent());
9488
routerChannel.trim(epoch.getCommitted());
@@ -106,11 +100,6 @@ public RouterChannel readOnlyChannel(int node) {
106100
return routerChannels.computeIfAbsent(node, nodeId -> {
107101
ObjectWALConfig config = ObjectWALConfig.builder().withClusterId(clusterId).withNodeId(node).withOpenMode(OpenMode.READ_ONLY).withType(WAL_TYPE).build();
108102
ObjectWALService wal = new ObjectWALService(Time.SYSTEM, objectStorage(), config);
109-
try {
110-
wal.start();
111-
} catch (IOException e) {
112-
throw new RuntimeException(e);
113-
}
114103
return new ObjectRouterChannel(nodeId, channelId, wal);
115104
});
116105
}
@@ -127,8 +116,8 @@ public void addEpochListener(EpochListener listener) {
127116

128117
@Override
129118
public void close() {
130-
FutureUtil.suppress(() -> routerChannel.close(), LOGGER);
131-
routerChannels.forEach((nodeId, channel) -> FutureUtil.suppress(channel::close, LOGGER));
119+
FutureUtil.suppress(() -> routerChannel.close().get(), LOGGER);
120+
routerChannels.forEach((nodeId, channel) -> FutureUtil.suppress(() -> channel.close().get(), LOGGER));
132121
}
133122

134123
@Override

core/src/main/java/kafka/automq/zerozone/ObjectRouterChannel.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.automq.stream.s3.wal.exception.OverCapacityException;
2626
import com.automq.stream.s3.wal.impl.DefaultRecordOffset;
2727
import com.automq.stream.s3.wal.impl.object.ObjectWALService;
28+
import com.automq.stream.utils.FutureUtil;
2829
import com.automq.stream.utils.LogContext;
2930

3031
import org.slf4j.Logger;
@@ -34,12 +35,15 @@
3435
import java.util.Map;
3536
import java.util.Queue;
3637
import java.util.concurrent.CompletableFuture;
38+
import java.util.concurrent.ExecutorService;
39+
import java.util.concurrent.Executors;
3740
import java.util.concurrent.atomic.AtomicLong;
3841
import java.util.concurrent.locks.ReentrantReadWriteLock;
3942

4043
import io.netty.buffer.ByteBuf;
4144

4245
public class ObjectRouterChannel implements RouterChannel {
46+
private static final ExecutorService ASYNC_EXECUTOR = Executors.newCachedThreadPool();
4347
private final Logger logger;
4448
private final AtomicLong mockOffset = new AtomicLong(0);
4549
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@@ -54,15 +58,29 @@ public class ObjectRouterChannel implements RouterChannel {
5458
private final Queue<Long> channelEpochQueue = new LinkedList<>();
5559
private final Map<Long, RecordOffset> channelEpoch2LastRecordOffset = new HashMap<>();
5660

61+
private final CompletableFuture<Void> startCf;
62+
5763
public ObjectRouterChannel(int nodeId, short channelId, ObjectWALService wal) {
5864
this.logger = new LogContext(String.format("[OBJECT_ROUTER_CHANNEL-%s-%s] ", channelId, nodeId)).logger(ObjectRouterChannel.class);
5965
this.nodeId = nodeId;
6066
this.channelId = channelId;
6167
this.wal = wal;
68+
this.startCf = CompletableFuture.runAsync(() -> {
69+
try {
70+
wal.start();
71+
} catch (Throwable e) {
72+
logger.error("start object router channel failed.", e);
73+
throw new RuntimeException(e);
74+
}
75+
}, ASYNC_EXECUTOR);
6276
}
6377

6478
@Override
6579
public CompletableFuture<AppendResult> append(int targetNodeId, short orderHint, ByteBuf data) {
80+
return startCf.thenCompose(nil -> append0(targetNodeId, orderHint, data));
81+
}
82+
83+
CompletableFuture<AppendResult> append0(int targetNodeId, short orderHint, ByteBuf data) {
6684
StreamRecordBatch record = new StreamRecordBatch(targetNodeId, 0, mockOffset.incrementAndGet(), 1, data);
6785
try {
6886
return wal.append(TraceContext.DEFAULT, record).thenApply(walRst -> {
@@ -83,6 +101,10 @@ public CompletableFuture<AppendResult> append(int targetNodeId, short orderHint,
83101

84102
@Override
85103
public CompletableFuture<ByteBuf> get(ByteBuf channelOffset) {
104+
return startCf.thenCompose(nil -> get0(channelOffset));
105+
}
106+
107+
CompletableFuture<ByteBuf> get0(ByteBuf channelOffset) {
86108
return wal.get(DefaultRecordOffset.of(ChannelOffset.of(channelOffset).walRecordOffset())).thenApply(streamRecordBatch -> {
87109
ByteBuf payload = streamRecordBatch.getPayload().retainedSlice();
88110
streamRecordBatch.release();
@@ -108,7 +130,7 @@ public void trim(long epoch) {
108130
writeLock.lock();
109131
try {
110132
RecordOffset recordOffset = null;
111-
for (;;) {
133+
for (; ; ) {
112134
Long channelEpoch = channelEpochQueue.peek();
113135
if (channelEpoch == null || channelEpoch > epoch) {
114136
break;
@@ -129,7 +151,7 @@ public void trim(long epoch) {
129151
}
130152

131153
@Override
132-
public void close() {
133-
wal.shutdownGracefully();
154+
public CompletableFuture<Void> close() {
155+
return startCf.thenAcceptAsync(nil -> FutureUtil.suppress(wal::shutdownGracefully, logger), ASYNC_EXECUTOR);
134156
}
135157
}

core/src/main/java/kafka/automq/zerozone/RouterChannel.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public interface RouterChannel {
3333

3434
void trim(long epoch);
3535

36-
void close();
36+
CompletableFuture<Void> close();
3737

3838
class AppendResult {
3939
private final long epoch;

core/src/main/java/kafka/automq/zerozone/SnapshotReadPartitionsManager.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040

4141
import com.automq.stream.s3.cache.SnapshotReadCache;
4242
import com.automq.stream.s3.wal.RecordOffset;
43+
import com.automq.stream.utils.FutureUtil;
4344
import com.automq.stream.utils.Threads;
4445
import com.automq.stream.utils.threads.EventLoop;
4546

@@ -262,10 +263,12 @@ class Subscriber {
262263
public Subscriber(Node node, AutoMQVersion version) {
263264
this.node = node;
264265
this.version = version;
265-
this.requester = new SubscriberRequester(this, node, version, asyncSender, SnapshotReadPartitionsManager.this::getTopicName, eventLoop, time);
266266
this.replayer = new SubscriberReplayer(confirmWALProvider, SnapshotReadPartitionsManager.this.replayer, node, metadataCache);
267-
LOGGER.info("[SNAPSHOT_READ_SUBSCRIBE],node={}", node);
267+
this.requester = new SubscriberRequester(this, node, version, asyncSender, SnapshotReadPartitionsManager.this::getTopicName, eventLoop, time);
268+
// start the tasks after initialized.
269+
this.requester.start();
268270
run();
271+
LOGGER.info("[SNAPSHOT_READ_SUBSCRIBE],node={}", node);
269272
}
270273

271274
// only for test
@@ -314,9 +317,9 @@ public CompletableFuture<Void> close() {
314317
partitions.forEach(SnapshotReadPartitionsManager.this::removePartition);
315318
partitions.clear();
316319
snapshotWithOperations.clear();
317-
replayer.close();
320+
CompletableFuture<Void> replayerCloseCf = replayer.close();
318321
requester.nextSnapshotCf().complete(null);
319-
cf.complete(null);
322+
FutureUtil.propagate(replayerCloseCf, cf);
320323
} catch (Throwable e) {
321324
cf.completeExceptionally(e);
322325
}

core/src/main/java/kafka/automq/zerozone/SubscriberReplayer.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,22 +38,26 @@
3838
import java.util.Collections;
3939
import java.util.List;
4040
import java.util.concurrent.CompletableFuture;
41+
import java.util.concurrent.ExecutorService;
42+
import java.util.concurrent.Executors;
4143
import java.util.function.LongConsumer;
4244
import java.util.stream.Collectors;
4345

4446
class SubscriberReplayer {
4547
private static final Logger LOGGER = LoggerFactory.getLogger(SubscriberReplayer.class);
48+
private static final ExecutorService CLOSE_EXECUTOR = Executors.newCachedThreadPool();
4649
private long loadedObjectOrderId = -1L;
4750
private CompletableFuture<Void> lastDataLoadCf = CompletableFuture.completedFuture(null);
48-
private WriteAheadLog wal;
51+
private CompletableFuture<WriteAheadLog> wal;
4952
private RecordOffset loadedEndOffset = null;
5053

5154
private final Replayer replayer;
5255
private final Node node;
5356
private final MetadataCache metadataCache;
5457
private final ConfirmWALProvider confirmWALProvider;
5558

56-
public SubscriberReplayer(ConfirmWALProvider confirmWALProvider, Replayer replayer, Node node, MetadataCache metadataCache) {
59+
public SubscriberReplayer(ConfirmWALProvider confirmWALProvider, Replayer replayer, Node node,
60+
MetadataCache metadataCache) {
5761
this.confirmWALProvider = confirmWALProvider;
5862
this.replayer = replayer;
5963
this.node = node;
@@ -73,11 +77,11 @@ public void onNewWalEndOffset(String walConfig, RecordOffset endOffset) {
7377
return;
7478
}
7579
// The replayer will ensure the order of replay
76-
this.lastDataLoadCf = replayer.replay(wal, startOffset, endOffset).thenAccept(nil -> {
80+
this.lastDataLoadCf = wal.thenCompose(w -> replayer.replay(w, startOffset, endOffset).thenAccept(nil -> {
7781
if (LOGGER.isTraceEnabled()) {
7882
LOGGER.trace("replay {} confirm wal [{}, {})", node, startOffset, endOffset);
7983
}
80-
});
84+
}));
8185
}
8286

8387
public CompletableFuture<Void> relayObject() {
@@ -104,11 +108,12 @@ public CompletableFuture<Void> replayWal() {
104108
return lastDataLoadCf;
105109
}
106110

107-
public void close() {
108-
WriteAheadLog wal = this.wal;
111+
public CompletableFuture<Void> close() {
112+
CompletableFuture<WriteAheadLog> wal = this.wal;
109113
if (wal != null) {
110-
FutureUtil.suppress(wal::shutdownGracefully, LOGGER);
114+
return CompletableFuture.runAsync(() -> FutureUtil.suppress(() -> wal.get().shutdownGracefully(), LOGGER), CLOSE_EXECUTOR);
111115
}
116+
return CompletableFuture.completedFuture(null);
112117
}
113118

114119
public void reset() {

core/src/main/java/kafka/automq/zerozone/SubscriberRequester.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,9 @@ public SubscriberRequester(SnapshotReadPartitionsManager.Subscriber subscriber,
8181
this.topicNameGetter = topicNameGetter;
8282
this.eventLoop = eventLoop;
8383
this.time = time;
84+
}
85+
86+
public void start() {
8487
request();
8588
}
8689

@@ -126,7 +129,11 @@ private void request0() {
126129
AutomqGetPartitionSnapshotRequest.Builder builder = new AutomqGetPartitionSnapshotRequest.Builder(data);
127130
asyncSender.sendRequest(node, builder)
128131
.thenAcceptAsync(rst -> {
129-
handleResponse(rst, snapshotCf);
132+
try {
133+
handleResponse(rst, snapshotCf);
134+
} catch (Exception e) {
135+
subscriber.reset("Exception when handle snapshot response: " + e.getMessage());
136+
}
130137
subscriber.unsafeRun();
131138
}, eventLoop)
132139
.exceptionally(ex -> {

0 commit comments

Comments
 (0)