Skip to content

Commit b474bbf

Browse files
authored
chore(zerozone2): schedule logging proxy mapping (#2958)
Signed-off-by: Robin Han <hanxvdovehx@gmail.com>
1 parent 2d88b92 commit b474bbf

File tree

2 files changed

+32
-6
lines changed

2 files changed

+32
-6
lines changed

core/src/main/java/kafka/automq/zerozone/ProxyNodeMapping.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import org.apache.kafka.image.MetadataImage;
2929
import org.apache.kafka.metadata.BrokerRegistration;
3030

31+
import com.automq.stream.utils.Threads;
32+
3133
import org.slf4j.Logger;
3234
import org.slf4j.LoggerFactory;
3335

@@ -39,6 +41,7 @@
3941
import java.util.Objects;
4042
import java.util.Optional;
4143
import java.util.concurrent.CopyOnWriteArrayList;
44+
import java.util.concurrent.TimeUnit;
4245
import java.util.stream.Collectors;
4346

4447
import software.amazon.awssdk.annotations.NotNull;
@@ -57,7 +60,7 @@ class ProxyNodeMapping {
5760
private final MetadataCache metadataCache;
5861
private final List<ProxyTopologyChangeListener> listeners = new CopyOnWriteArrayList<>();
5962

60-
volatile Map<String, Map<Integer, BrokerRegistration>> main2proxyByRack = new HashMap<>();
63+
volatile Map<String /* proxy rack */, Map<Integer /* main nodeId */, BrokerRegistration /* proxy */>> main2proxyByRack = new HashMap<>();
6164
volatile boolean inited = false;
6265

6366
public ProxyNodeMapping(Node currentNode, String currentRack, String interBrokerListenerName,
@@ -66,6 +69,7 @@ public ProxyNodeMapping(Node currentNode, String currentRack, String interBroker
6669
this.currentNode = currentNode;
6770
this.currentRack = currentRack;
6871
this.metadataCache = metadataCache;
72+
Threads.COMMON_SCHEDULER.scheduleWithFixedDelay(() -> logMapping(main2proxyByRack), 1, 1, TimeUnit.MINUTES);
6973
}
7074

7175
/**
@@ -215,6 +219,7 @@ public void onChange(MetadataDelta delta, MetadataImage image) {
215219
});
216220
});
217221
this.main2proxyByRack = calMain2proxyByRack(rack2brokers);
222+
logMapping(main2proxyByRack);
218223
notifyListeners(this.main2proxyByRack);
219224
}
220225

@@ -354,6 +359,18 @@ static void tryFreeController(List<ProxyNode> proxyNodes, double avg) {
354359
}
355360
}
356361

362+
363+
364+
static void logMapping(Map<String, Map<Integer, BrokerRegistration>> main2proxyByRack) {
365+
StringBuilder sb = new StringBuilder();
366+
main2proxyByRack.forEach((rack, main2proxy) ->
367+
main2proxy.forEach((mainNodeId, proxyNode) ->
368+
sb.append(" Main ").append(mainNodeId).append(" => Proxy ").append(proxyNode.id()).append("(").append(rack).append(")\n")
369+
)
370+
);
371+
LOGGER.info("ProxyNodeMapping:\n{}", sb);
372+
}
373+
357374
static class ProxyNode implements thirdparty.com.github.jaskey.consistenthash.Node, Comparable<ProxyNode> {
358375
final BrokerRegistration node;
359376
final List<Integer> mainNodeIds = new ArrayList<>();

s3stream/src/main/java/com/automq/stream/utils/threads/S3StreamThreadPoolMonitor.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.util.concurrent.ThreadFactory;
3535
import java.util.concurrent.ThreadPoolExecutor;
3636
import java.util.concurrent.TimeUnit;
37+
import java.util.concurrent.atomic.AtomicReference;
3738
import java.util.function.Function;
3839

3940
public class S3StreamThreadPoolMonitor {
@@ -97,6 +98,7 @@ public static ThreadPoolExecutor createAndMonitor(int corePoolSize,
9798
ThreadUtils.createFastThreadLocalThreadFactory(name + "-%d", isDaemon) :
9899
ThreadUtils.createThreadFactory(name + "-%d", isDaemon);
99100

101+
AtomicReference<ThreadPoolWrapper> wrapperRef = new AtomicReference<>();
100102
ThreadPoolExecutor executor = new ThreadPoolExecutor(
101103
corePoolSize,
102104
maximumPoolSize,
@@ -110,16 +112,23 @@ protected void afterExecute(Runnable r, Throwable t) {
110112
super.afterExecute(r, t);
111113
afterExecutionHook.apply(t);
112114
}
115+
116+
@Override
117+
protected void terminated() {
118+
super.terminated();
119+
ThreadPoolWrapper wrapper = wrapperRef.get();
120+
if (wrapper != null) {
121+
MONITOR_EXECUTOR.remove(wrapper);
122+
}
123+
}
113124
};
114125
List<ThreadPoolStatusMonitor> printers = new ArrayList<>();
115126
printers.add(new ThreadPoolQueueSizeMonitor(queueCapacity));
116127
printers.addAll(threadPoolStatusMonitors);
117128

118-
MONITOR_EXECUTOR.add(ThreadPoolWrapper.builder()
119-
.name(name)
120-
.threadPoolExecutor(executor)
121-
.statusPrinters(printers)
122-
.build());
129+
ThreadPoolWrapper wrapper = ThreadPoolWrapper.builder().name(name).threadPoolExecutor(executor).statusPrinters(printers).build();
130+
wrapperRef.set(wrapper);
131+
MONITOR_EXECUTOR.add(wrapper);
123132
return executor;
124133
}
125134

0 commit comments

Comments
 (0)