-
Notifications
You must be signed in to change notification settings - Fork 6.9k
add flower monitoring #58431
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
add flower monitoring #58431
Conversation
Signed-off-by: harshit <harshit@anyscale.com>
| logger.info("Queue monitor stopped successfully") | ||
|
|
||
| except Exception as e: | ||
| logger.error(f"Error stopping queue monitor: {e}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Executor Leaks Resources After Timeout
The stop() method fails to shut down the executor when _loop_thread.result(timeout=20) times out. The shutdown(wait=True) call at line 135 is inside the try block, so it's skipped when a TimeoutError is raised. This leaves the executor running with a potentially stuck thread, causing a resource leak. The executor shutdown should be in the finally block to ensure cleanup regardless of timeout.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed it
Signed-off-by: harshit <harshit@anyscale.com>
| self._loop.close() | ||
|
|
||
| # Start event loop in background thread | ||
| self._loop_thread = self._executor.submit(_run_event_loop) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Concurrent Initialization Race Condition
The start() method has a race condition where self._loop is checked in the main thread but assigned in the background thread. Multiple concurrent calls to start() can pass the if self._loop is not None check before any thread assigns the value, causing multiple event loops and threads to be created. The check should use a thread-safe flag or lock to prevent concurrent initialization.
| finally: | ||
| self._loop = None | ||
| self._loop_thread = None | ||
| self._loop_ready.clear() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Race Condition: Loop Shutdown Inconsistency
The stop() method has a race condition where get_queue_lengths() can be called after the event loop is closed but before _loop is set to None. After line 132 waits for the thread to complete, the loop is closed, but _loop remains non-None and _loop_ready remains set until the finally block executes. A concurrent call to get_queue_lengths() during this window will pass the check on line 150 and attempt to schedule a coroutine on the closed loop, causing an error.
| setup_spec.extras["serve"] | ||
| + [ | ||
| "celery", | ||
| "flower", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please update requirements_compiled.txt and/or depset lock files.
We've recently added asynchronous inference support in Ray Serve, but currently it lacks the capability of auto-scaling based on the number of tasks in the queues (which is being used in async inf)
This PR aims to merge code which should deduce the number of pending tasks in the queue(celery-queue) so that we can decide the number of replicas on the basis of that.
Celery itself doesn't provide any API which tells us the # of tasks in the queue, so we have to either
This PR implements and propose to go with approach 2 as celery also mentions flower as their monitoring and observability tool. Link - https://docs.celeryq.dev/en/latest/userguide/monitoring.html#flower-real-time-celery-web-monitor
We will spin up the flower in a separate thread but in the same process of ray-serve. The thread will be containing an event loop. Whenever someone asks the queue length, we will add a coroutine to this event loop, and ask the queue lengths.
Whenever we try to stop the replica, the stop_consumer() function in CeleryTaskProcessorAdapter will stop the flower thread as well.