From 1bb92b130f9c61bc6a50f88dea35dc35df7a169c Mon Sep 17 00:00:00 2001 From: Andy Reid Date: Fri, 13 Jun 2025 10:26:13 +0100 Subject: [PATCH 1/6] Update WorkerApplication.java --- .../core/WorkerApplication.java | 18 +++--------------- 1 file changed, 3 insertions(+), 15 deletions(-) diff --git a/worker-core/src/main/java/com/github/workerframework/core/WorkerApplication.java b/worker-core/src/main/java/com/github/workerframework/core/WorkerApplication.java index e2f836c2a..f2911a0f7 100644 --- a/worker-core/src/main/java/com/github/workerframework/core/WorkerApplication.java +++ b/worker-core/src/main/java/com/github/workerframework/core/WorkerApplication.java @@ -145,23 +145,11 @@ public void start() { public void stop() { LOG.info("Worker stop requested, allowing in-progress tasks to complete."); workerQueue.shutdownIncoming(); - while (!wtp.isIdle()) { - try { - //The grace period will expire and the process killed so no need for time limit here - LOG.trace("Awaiting the Worker Thread Pool to become idle, {} tasks in the backlog.", - wtp.getBacklogSize()); - Thread.sleep(1000); - } catch (final InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } - } - LOG.trace("Worker Thread Pool is idle."); wtp.shutdown(); try { - wtp.awaitTermination(10_000, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - LOG.warn("Shutdown interrupted", e); + wtp.awaitTermination(5, TimeUnit.MINUTES); + } catch (final InterruptedException e) { + LOG.error("Worker stop interrupted, in-progress tasks may not have completed.", e); Thread.currentThread().interrupt(); } workerQueue.shutdown(); From 0046993157e0fe11adce63976bb7ab0ac50270d0 Mon Sep 17 00:00:00 2001 From: Andy Reid Date: Tue, 5 Aug 2025 14:21:54 +0100 Subject: [PATCH 2/6] Improve shutdown process --- .../core/BulkWorkerThreadPool.java | 5 +++++ .../core/StreamingWorkerThreadPool.java | 5 +++++ .../core/WorkerApplication.java | 18 +++++++++++++++++- .../workerframework/core/WorkerThreadPool.java | 2 ++ .../queues/rabbit/RabbitWorkerQueue.java | 6 ++++++ 5 files changed, 35 insertions(+), 1 deletion(-) diff --git a/worker-core/src/main/java/com/github/workerframework/core/BulkWorkerThreadPool.java b/worker-core/src/main/java/com/github/workerframework/core/BulkWorkerThreadPool.java index f6fbc502e..50b3041bd 100644 --- a/worker-core/src/main/java/com/github/workerframework/core/BulkWorkerThreadPool.java +++ b/worker-core/src/main/java/com/github/workerframework/core/BulkWorkerThreadPool.java @@ -150,6 +150,11 @@ public int getBacklogSize() { return workQueue.size() + backupThreadPool.getBacklogSize(); } + + @Override + public int getActiveCount() { + return backupThreadPool.getActiveCount(); + } @Override public void submitWorkerTask(final WorkerTaskImpl workerTask) diff --git a/worker-core/src/main/java/com/github/workerframework/core/StreamingWorkerThreadPool.java b/worker-core/src/main/java/com/github/workerframework/core/StreamingWorkerThreadPool.java index 0857f6130..e60ef1964 100644 --- a/worker-core/src/main/java/com/github/workerframework/core/StreamingWorkerThreadPool.java +++ b/worker-core/src/main/java/com/github/workerframework/core/StreamingWorkerThreadPool.java @@ -76,6 +76,11 @@ public int getBacklogSize() { return workQueue.size(); } + + @Override + public int getActiveCount() { + return threadPoolExecutor.getActiveCount(); + } /** * Execute the specified task at some point in the future diff --git a/worker-core/src/main/java/com/github/workerframework/core/WorkerApplication.java b/worker-core/src/main/java/com/github/workerframework/core/WorkerApplication.java index f2911a0f7..6b51c8037 100644 --- a/worker-core/src/main/java/com/github/workerframework/core/WorkerApplication.java +++ b/worker-core/src/main/java/com/github/workerframework/core/WorkerApplication.java @@ -78,6 +78,8 @@ public final class WorkerApplication extends Application { private final long startTime = System.currentTimeMillis(); private static final Logger LOG = LoggerFactory.getLogger(WorkerApplication.class); + private static final long SHUTDOWN_DURATION = 5 * 60 * 1000; // 5 minutes in milliseconds + public static final int SHUTDOWN_LOG_INTERVAL = 15_000; // 15 seconds in milliseconds /** * Entry point for the asynchronous micro-service worker framework. @@ -143,8 +145,22 @@ public void start() { @Override public void stop() { - LOG.info("Worker stop requested, allowing in-progress tasks to complete."); + LOG.info("Worker stop requested."); + workerQueue.shutdownIncoming(); + + final long startTime = System.currentTimeMillis(); + + while(wtp.getBacklogSize() > 0 && System.currentTimeMillis() - startTime < SHUTDOWN_DURATION) { + try { + LOG.info("Allowing {} backlog tasks to complete, {} currently active.", wtp.getBacklogSize(), wtp.getActiveCount()); + Thread.sleep(SHUTDOWN_LOG_INTERVAL); // 15 seconds + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + System.out.println("Logger thread interrupted, exiting..."); + break; + } + } wtp.shutdown(); try { wtp.awaitTermination(5, TimeUnit.MINUTES); diff --git a/worker-core/src/main/java/com/github/workerframework/core/WorkerThreadPool.java b/worker-core/src/main/java/com/github/workerframework/core/WorkerThreadPool.java index 2406e79f8..fcd7cc56c 100644 --- a/worker-core/src/main/java/com/github/workerframework/core/WorkerThreadPool.java +++ b/worker-core/src/main/java/com/github/workerframework/core/WorkerThreadPool.java @@ -61,6 +61,8 @@ void awaitTermination(long timeout, TimeUnit unit) boolean isIdle(); int getBacklogSize(); + + int getActiveCount(); /** * Execute the specified task at some point in the future diff --git a/worker-queue-rabbit/src/main/java/com/github/workerframework/queues/rabbit/RabbitWorkerQueue.java b/worker-queue-rabbit/src/main/java/com/github/workerframework/queues/rabbit/RabbitWorkerQueue.java index 58a0ebfc5..edfacf009 100644 --- a/worker-queue-rabbit/src/main/java/com/github/workerframework/queues/rabbit/RabbitWorkerQueue.java +++ b/worker-queue-rabbit/src/main/java/com/github/workerframework/queues/rabbit/RabbitWorkerQueue.java @@ -73,6 +73,7 @@ public final class RabbitWorkerQueue implements ManagedWorkerQueue private final RabbitWorkerQueueConfiguration config; private final int maxTasks; private static final Logger LOG = LoggerFactory.getLogger(RabbitWorkerQueue.class); + private boolean incomingShutdownPermanent = false; /** * Setup a new RabbitWorkerQueue. @@ -214,6 +215,7 @@ public String getPausedQueue() * {@inheritDoc} * * The incoming queues will all be cancelled so the consumer will fall back to idle. + * This is permanent, and attempts to reconnectIncoming will fail. */ @Override public void shutdownIncoming() @@ -224,6 +226,7 @@ public void shutdownIncoming() try { incomingChannel.basicCancel(consumerTag); consumerTag = null; + incomingShutdownPermanent = true; } catch (IOException e) { metrics.incremementErrors(); LOG.warn("Failed to cancel consumer {}", consumerTag, e); @@ -277,6 +280,9 @@ public void disconnectIncoming() public void reconnectIncoming() { LOG.debug("Reconnecting incoming queues"); + if(incomingShutdownPermanent) { + throw new IllegalStateException("Queue is permanently shutdown"); + } synchronized (consumerLock) { if (consumerTag == null && incomingChannel.isOpen()) { try { From 17da26bc7a55cc4a12627cdff2cbe4d4d1d709dd Mon Sep 17 00:00:00 2001 From: Andy Reid Date: Tue, 5 Aug 2025 14:31:09 +0100 Subject: [PATCH 3/6] Update ShutdownDeveloperTest.java --- .../workerframework/workertest/ShutdownDeveloperTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/worker-test/src/test/java/com/github/workerframework/workertest/ShutdownDeveloperTest.java b/worker-test/src/test/java/com/github/workerframework/workertest/ShutdownDeveloperTest.java index 272bfbb6b..5e4a6166b 100644 --- a/worker-test/src/test/java/com/github/workerframework/workertest/ShutdownDeveloperTest.java +++ b/worker-test/src/test/java/com/github/workerframework/workertest/ShutdownDeveloperTest.java @@ -45,7 +45,7 @@ public class ShutdownDeveloperTest extends TestWorkerTestBase { public void shutdownTest() throws IOException, TimeoutException, CodecException { // Usage instructions - // Comment out the iages for test worker 2 and 3 in this module's pom.xml + // Comment out the images for worker-test-2 and worker-test-no-valid-cert in this module's pom.xml // Use mvn docker:start to start test worker // Remove the @Ignore and run the test to create 100 test messages // From a terminal execute docker stop -t 300 CONTAINER_ID From 99e56c2072d75cf80c2bd8674c2f4b6f7fd48b56 Mon Sep 17 00:00:00 2001 From: Andy Reid Date: Thu, 7 Aug 2025 10:57:07 +0100 Subject: [PATCH 4/6] Code review --- .../github/workerframework/core/BulkWorkerThreadPool.java | 8 ++++++-- .../workerframework/core/StreamingWorkerThreadPool.java | 2 +- .../github/workerframework/core/WorkerApplication.java | 7 +++---- .../com/github/workerframework/core/WorkerThreadPool.java | 2 +- 4 files changed, 11 insertions(+), 8 deletions(-) diff --git a/worker-core/src/main/java/com/github/workerframework/core/BulkWorkerThreadPool.java b/worker-core/src/main/java/com/github/workerframework/core/BulkWorkerThreadPool.java index 50b3041bd..62599448e 100644 --- a/worker-core/src/main/java/com/github/workerframework/core/BulkWorkerThreadPool.java +++ b/worker-core/src/main/java/com/github/workerframework/core/BulkWorkerThreadPool.java @@ -22,6 +22,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,6 +37,7 @@ final class BulkWorkerThreadPool implements WorkerThreadPool private final StreamingWorkerThreadPool backupThreadPool; private volatile boolean isActive; + private volatile AtomicInteger activeThreads = new AtomicInteger(0); public BulkWorkerThreadPool( final WorkerFactory workerFactory, @@ -86,10 +88,12 @@ private void execute() = new BulkWorkerTaskProvider(task, workQueue); try { + activeThreads.addAndGet(1); bulkWorker.processTasks(taskProvider); } catch (final RuntimeException ex) { LOG.warn("Bulk Worker threw unhandled exception", ex); } finally { + activeThreads.decrementAndGet(); // Re-submit the first task if it has not been consumed // NB: It's really faulty Worker logic to not consume at least // the one task. @@ -152,8 +156,8 @@ public int getBacklogSize() } @Override - public int getActiveCount() { - return backupThreadPool.getActiveCount(); + public int getApproxActiveCount() { + return activeThreads.get() + backupThreadPool.getApproxActiveCount(); } @Override diff --git a/worker-core/src/main/java/com/github/workerframework/core/StreamingWorkerThreadPool.java b/worker-core/src/main/java/com/github/workerframework/core/StreamingWorkerThreadPool.java index e60ef1964..62927f73d 100644 --- a/worker-core/src/main/java/com/github/workerframework/core/StreamingWorkerThreadPool.java +++ b/worker-core/src/main/java/com/github/workerframework/core/StreamingWorkerThreadPool.java @@ -78,7 +78,7 @@ public int getBacklogSize() } @Override - public int getActiveCount() { + public int getApproxActiveCount() { return threadPoolExecutor.getActiveCount(); } diff --git a/worker-core/src/main/java/com/github/workerframework/core/WorkerApplication.java b/worker-core/src/main/java/com/github/workerframework/core/WorkerApplication.java index 6b51c8037..8d3754e29 100644 --- a/worker-core/src/main/java/com/github/workerframework/core/WorkerApplication.java +++ b/worker-core/src/main/java/com/github/workerframework/core/WorkerApplication.java @@ -79,7 +79,7 @@ public final class WorkerApplication extends Application private final long startTime = System.currentTimeMillis(); private static final Logger LOG = LoggerFactory.getLogger(WorkerApplication.class); private static final long SHUTDOWN_DURATION = 5 * 60 * 1000; // 5 minutes in milliseconds - public static final int SHUTDOWN_LOG_INTERVAL = 15_000; // 15 seconds in milliseconds + private static final int SHUTDOWN_LOG_INTERVAL = 15_000; // 15 seconds in milliseconds /** * Entry point for the asynchronous micro-service worker framework. @@ -153,11 +153,10 @@ public void stop() { while(wtp.getBacklogSize() > 0 && System.currentTimeMillis() - startTime < SHUTDOWN_DURATION) { try { - LOG.info("Allowing {} backlog tasks to complete, {} currently active.", wtp.getBacklogSize(), wtp.getActiveCount()); - Thread.sleep(SHUTDOWN_LOG_INTERVAL); // 15 seconds + LOG.info("Allowing {} backlog tasks to complete, {} currently active.", wtp.getBacklogSize(), wtp.getApproxActiveCount()); + Thread.sleep(SHUTDOWN_LOG_INTERVAL); } catch (final InterruptedException e) { Thread.currentThread().interrupt(); - System.out.println("Logger thread interrupted, exiting..."); break; } } diff --git a/worker-core/src/main/java/com/github/workerframework/core/WorkerThreadPool.java b/worker-core/src/main/java/com/github/workerframework/core/WorkerThreadPool.java index fcd7cc56c..6b4c839cd 100644 --- a/worker-core/src/main/java/com/github/workerframework/core/WorkerThreadPool.java +++ b/worker-core/src/main/java/com/github/workerframework/core/WorkerThreadPool.java @@ -62,7 +62,7 @@ void awaitTermination(long timeout, TimeUnit unit) int getBacklogSize(); - int getActiveCount(); + int getApproxActiveCount(); /** * Execute the specified task at some point in the future From 509dd13f002e82d39b0c3b2253c2a4ab87e1261f Mon Sep 17 00:00:00 2001 From: Andy Reid Date: Fri, 8 Aug 2025 10:31:17 +0100 Subject: [PATCH 5/6] Update BulkWorkerThreadPool.java --- .../com/github/workerframework/core/BulkWorkerThreadPool.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/worker-core/src/main/java/com/github/workerframework/core/BulkWorkerThreadPool.java b/worker-core/src/main/java/com/github/workerframework/core/BulkWorkerThreadPool.java index 62599448e..fc92d84bd 100644 --- a/worker-core/src/main/java/com/github/workerframework/core/BulkWorkerThreadPool.java +++ b/worker-core/src/main/java/com/github/workerframework/core/BulkWorkerThreadPool.java @@ -37,7 +37,7 @@ final class BulkWorkerThreadPool implements WorkerThreadPool private final StreamingWorkerThreadPool backupThreadPool; private volatile boolean isActive; - private volatile AtomicInteger activeThreads = new AtomicInteger(0); + private final AtomicInteger activeThreads = new AtomicInteger(0); public BulkWorkerThreadPool( final WorkerFactory workerFactory, @@ -88,7 +88,7 @@ private void execute() = new BulkWorkerTaskProvider(task, workQueue); try { - activeThreads.addAndGet(1); + activeThreads.incrementAndGet(); bulkWorker.processTasks(taskProvider); } catch (final RuntimeException ex) { LOG.warn("Bulk Worker threw unhandled exception", ex); From 1c3eb1ed920df329bb62fd3cf16aef423cb0cfd5 Mon Sep 17 00:00:00 2001 From: Andy Reid Date: Fri, 8 Aug 2025 10:31:35 +0100 Subject: [PATCH 6/6] Update WorkerApplication.java --- .../com/github/workerframework/core/WorkerApplication.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/worker-core/src/main/java/com/github/workerframework/core/WorkerApplication.java b/worker-core/src/main/java/com/github/workerframework/core/WorkerApplication.java index 8d3754e29..22062d262 100644 --- a/worker-core/src/main/java/com/github/workerframework/core/WorkerApplication.java +++ b/worker-core/src/main/java/com/github/workerframework/core/WorkerApplication.java @@ -151,10 +151,12 @@ public void stop() { final long startTime = System.currentTimeMillis(); - while(wtp.getBacklogSize() > 0 && System.currentTimeMillis() - startTime < SHUTDOWN_DURATION) { + int backlogSize = wtp.getBacklogSize(); + while(backlogSize > 0 && System.currentTimeMillis() - startTime < SHUTDOWN_DURATION) { try { - LOG.info("Allowing {} backlog tasks to complete, {} currently active.", wtp.getBacklogSize(), wtp.getApproxActiveCount()); + LOG.debug("Allowing {} backlog tasks to complete, {} currently active.", backlogSize, wtp.getApproxActiveCount()); Thread.sleep(SHUTDOWN_LOG_INTERVAL); + backlogSize = wtp.getBacklogSize(); } catch (final InterruptedException e) { Thread.currentThread().interrupt(); break;