Skip to content

Conversation

@harshit-anyscale
Copy link
Contributor

@harshit-anyscale harshit-anyscale commented Nov 6, 2025

We've recently added asynchronous inference support in Ray Serve, but currently it lacks the capability of auto-scaling based on the number of tasks in the queues (which is being used in async inf)

This PR aims to merge code which should deduce the number of pending tasks in the queue(celery-queue) so that we can decide the number of replicas on the basis of that.

Celery itself doesn't provide any API which tells us the # of tasks in the queue, so we have to either

  1. write our own logic for each broker separately
  2. use the flower library https://github.com/mher/flower

This PR implements and propose to go with approach 2 as celery also mentions flower as their monitoring and observability tool. Link - https://docs.celeryq.dev/en/latest/userguide/monitoring.html#flower-real-time-celery-web-monitor

We will spin up the flower in a separate thread but in the same process of ray-serve. The thread will be containing an event loop. Whenever someone asks the queue length, we will add a coroutine to this event loop, and ask the queue lengths.

Whenever we try to stop the replica, the stop_consumer() function in CeleryTaskProcessorAdapter will stop the flower thread as well.

Signed-off-by: harshit <harshit@anyscale.com>
@harshit-anyscale harshit-anyscale self-assigned this Nov 6, 2025
@harshit-anyscale harshit-anyscale marked this pull request as ready for review November 6, 2025 18:14
@harshit-anyscale harshit-anyscale requested a review from a team as a code owner November 6, 2025 18:14
@harshit-anyscale harshit-anyscale added the go add ONLY when ready to merge, run all tests label Nov 6, 2025
@ray-gardener ray-gardener bot added serve Ray Serve Related Issue observability Issues related to the Ray Dashboard, Logging, Metrics, Tracing, and/or Profiling labels Nov 6, 2025
Signed-off-by: harshit <harshit@anyscale.com>
Signed-off-by: harshit <harshit@anyscale.com>
logger.info("Queue monitor stopped successfully")

except Exception as e:
logger.error(f"Error stopping queue monitor: {e}")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Executor Leaks Resources After Timeout

The stop() method fails to shut down the executor when _loop_thread.result(timeout=20) times out. The shutdown(wait=True) call at line 135 is inside the try block, so it's skipped when a TimeoutError is raised. This leaves the executor running with a potentially stuck thread, causing a resource leak. The executor shutdown should be in the finally block to ensure cleanup regardless of timeout.

Fix in Cursor Fix in Web

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed it

Signed-off-by: harshit <harshit@anyscale.com>
self._loop.close()

# Start event loop in background thread
self._loop_thread = self._executor.submit(_run_event_loop)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Concurrent Initialization Race Condition

The start() method has a race condition where self._loop is checked in the main thread but assigned in the background thread. Multiple concurrent calls to start() can pass the if self._loop is not None check before any thread assigns the value, causing multiple event loops and threads to be created. The check should use a thread-safe flag or lock to prevent concurrent initialization.

Fix in Cursor Fix in Web

finally:
self._loop = None
self._loop_thread = None
self._loop_ready.clear()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Race Condition: Loop Shutdown Inconsistency

The stop() method has a race condition where get_queue_lengths() can be called after the event loop is closed but before _loop is set to None. After line 132 waits for the thread to complete, the loop is closed, but _loop remains non-None and _loop_ready remains set until the finally block executes. A concurrent call to get_queue_lengths() during this window will pass the check on line 150 and attempt to schedule a coroutine on the closed loop, causing an error.

Fix in Cursor Fix in Web

setup_spec.extras["serve"]
+ [
"celery",
"flower",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please update requirements_compiled.txt and/or depset lock files.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

go add ONLY when ready to merge, run all tests observability Issues related to the Ray Dashboard, Logging, Metrics, Tracing, and/or Profiling serve Ray Serve Related Issue

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants