-
Notifications
You must be signed in to change notification settings - Fork 131
fix(guard): handle websocket tasks during shutdown #3448
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(guard): handle websocket tasks during shutdown #3448
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
3 Skipped Deployments
|
PR Review: fix(guard): handle websocket tasks during shutdownSummaryThis PR introduces a Code Quality & Best Practices ✅Positives:
Minor Issues:
Potential Bugs or Issues
|
More templates
@rivetkit/actor
@rivetkit/cloudflare-workers
@rivetkit/core
@rivetkit/db
@rivetkit/framework-base
@rivetkit/next-js
@rivetkit/react
rivetkit
@rivetkit/sql-loader
@rivetkit/engine-runner
@rivetkit/engine-runner-protocol
commit: |
bc86aa3 to
b7ad73c
Compare
PR Review: fix(guard): handle websocket tasks during shutdownSummaryThis PR introduces a Positive Aspects✅ Clean abstraction: The Issues & Recommendations🔴 Critical: Panic Handling (task_group.rs:27)The TODO comment indicates panics aren't handled. If a spawned task panics, the counter will never decrement, causing Recommendation: Wrap the future execution to catch panics: pub fn spawn<F, O>(self: &Arc<Self>, fut: F)
where
F: Future<Output = O> + Send + 'static,
{
self.running_count.fetch_add(1, Ordering::Relaxed);
let self2 = self.clone();
tokio::spawn(
async move {
let _guard = scopeguard::guard((), |_| {
if self2.running_count.fetch_sub(1, Ordering::AcqRel) == 1 {
self2.notify.notify_waiters();
}
});
fut.await;
}
.in_current_span(),
);
}Or use
|
b7ad73c to
a353c1b
Compare
7aff577 to
9f71238
Compare
PR Review: fix(guard): handle websocket tasks during shutdownSummaryThis PR improves the guard service's shutdown handling by properly tracking and waiting for background WebSocket and analytics tasks to complete. The implementation adds a new Code Quality & ArchitectureStrengths:
Issues Found: 1. Race Condition in TaskGroup (Critical)
There's a potential race condition in loop {
self.notify.notified().await;
if self.running_count.load(Ordering::Acquire) == 0 {
break;
}
}Problem: If all tasks complete between the initial fast-path check (line 45) and calling Solution: Subscribe to notifications BEFORE checking the count: pub async fn wait_idle(&self) {
loop {
let notified = self.notify.notified();
if self.running_count.load(Ordering::Acquire) == 0 {
return;
}
notified.await;
}
}This ensures we're listening for notifications before checking the count, preventing the race. 2. Missing Panic Handling (High Priority)
The TODO comment indicates panic handling is missing. If a spawned task panics:
Recommendation: Wrap the future in a panic-catching wrapper or use pub fn spawn<F, O>(self: &Arc<Self>, fut: F)
where
F: Future<Output = O> + Send + 'static,
{
self.running_count.fetch_add(1, Ordering::Relaxed);
let self2 = self.clone();
tokio::spawn(
async move {
let _guard = scopeguard::guard((), |_| {
if self2.running_count.fetch_sub(1, Ordering::AcqRel) == 1 {
self2.notify.notify_waiters();
}
});
fut.await;
}
.in_current_span(),
);
}Or use the 3. Ordering Consistency (Low Priority)
The increment uses 4. Removed defer! Macro Without Cleanup
The PR removes the Good: The change to spawn cleanup after the request completes is correct - it prevents blocking the request response. Action needed: The old macro definition should be removed if it's no longer used anywhere. Performance ConsiderationsPositive:
Concern:
Security ConcernsNo security issues identified. The changes don't introduce new attack surfaces or bypass existing security measures. Test CoverageMissing tests for:
Recommendation: Add integration tests in
Minor Issues
RecommendationsMust Fix:
Should Fix: Optional: VerdictThis is a solid improvement to shutdown handling, but the race condition and missing panic handling are blockers that must be addressed before merging. Once those critical issues are fixed, this will significantly improve the reliability of the guard service's shutdown process. The overall architecture is sound and follows best practices. Great work on identifying and solving the WebSocket task tracking problem! |
PR Review: fix(guard): handle websocket tasks during shutdownSummaryThis PR addresses a critical issue where WebSocket and other spawned tasks were not being properly tracked during guard shutdown, potentially leading to abrupt termination of in-flight operations. The solution introduces a Code Quality & Best Practices ✅Strengths:
Minor improvements:
Potential Issues & Bugs 🔍1. Race condition in TaskGroup::wait_idle() - CRITICALLocation: There's a potential race condition between lines 45-46 (fast path check) and lines 50-54 (notification loop). If a task increments the counter between the fast-path check and the Suggested fix: pub async fn wait_idle(&self) {
loop {
// Subscribe to notifications *before* checking the count
let notified = self.notify.notified();
if self.running_count.load(Ordering::Acquire) == 0 {
return;
}
notified.await;
}
}2. Memory ordering inconsistencyLocation: Using self.running_count.fetch_add(1, Ordering::Release);3. Panic handling TODO - CRITICALLocation: If a spawned future panics, the counter won't be decremented, causing 4. Defer macro removalLocation: Please verify no other files use this macro before removing it. Performance Considerations ⚡Positive:
Potential concerns:
Security Concerns 🔒No critical security issues identified The existing shutdown timeout mechanism properly prevents indefinite hangs. The changes ensure WebSocket connections are properly tracked during shutdown. Test Coverage 🧪Missing: No unit tests for Recommended:
Additional Observations
RecommendationOverall assessment: Well-implemented fix for a real issue. Core logic is sound. Action items before merge:
Great work on identifying and addressing this shutdown issue! 🎉 |
9f71238 to
09cec26
Compare
a353c1b to
380ccdd
Compare
09cec26 to
f6632bf
Compare
380ccdd to
4c115b8
Compare
f53a47c to
2990334
Compare
Code ReviewSummaryThis PR improves WebSocket task handling during guard shutdown by introducing a TaskGroup to track spawned tasks. The changes ensure that background WebSocket tasks are properly waited for during graceful shutdown, preventing premature termination. Strengths1. Well-designed TaskGroup implementation
2. Improved shutdown sequence
3. Better code organization
4. Minor quality improvements
Issues & Recommendations🔴 Critical: Panic handling in TaskGroup::spawn()Location: engine/packages/guard-core/src/task_group.rs:27-39 The TODO comment highlights a critical issue - if a spawned task panics, the counter is never decremented, potentially causing wait_idle() to hang indefinitely. Recommended fix: 🟡 Potential race condition in wait_idle()Location: engine/packages/guard-core/src/task_group.rs:43-56 There is a subtle TOCTOU race condition. If a task completes between the fast path check and calling notified(), we might miss the notification. Recommended fix: 🟡 Inconsistent task trackingLocation: engine/packages/guard-core/src/proxy_service.rs:814-819 The release_in_flight_task is spawned with regular tokio::spawn() instead of self.state.tasks.spawn(). If the goal is to wait for all background work during shutdown, this should probably use the TaskGroup. Similarly, the analytics event insertion at line 2344 uses plain tokio::spawn(). Question: Should these tasks also be tracked by the TaskGroup? 🔵 Minor: Missing Default traitConsider implementing Default for TaskGroup for better Rust idioms, though the current API is fine. Additional ObservationsFrontend changes: The routeTree.gen.ts changes appear to be auto-generated formatting differences (quote style, import ordering). These are noise in the diff but harmless. Testing: It would be valuable to add tests for:
VerdictThe core implementation is solid and significantly improves shutdown handling. However, the panic handling issue is critical and should be addressed before merging, as it could cause shutdown hangs in production. The race condition fix is recommended for correctness. Overall, great work on improving the shutdown reliability! |
Review: Fix guard shutdown handling for WebSocket tasksThis PR addresses an important shutdown issue where WebSocket and other background tasks weren't being properly tracked during guard shutdown. The implementation looks solid overall, with a few areas for improvement. ✅ Strengths
🔍 Issues & Recommendations🔴 Critical: Race Condition in TaskGroupLocation: There's a race condition between incrementing the counter and spawning the task: self.running_count.fetch_add(1, Ordering::Relaxed); // [1]
let self2 = self.clone();
tokio::spawn( // [2] - Task could complete before this returns
async move {
fut.await;
if self2.running_count.fetch_sub(1, Ordering::AcqRel) == 1 {
self2.notify.notify_waiters();
}
}
.in_current_span(),
);Problem: If Recommendation: pub fn spawn<F, O>(self: &Arc<Self>, fut: F)
where
F: Future<Output = O> + Send + 'static,
{
let self2 = self.clone();
self.running_count.fetch_add(1, Ordering::Release); // Use Release ordering
tokio::spawn(
async move {
let _guard = TaskGuard(self2.clone()); // Use guard to ensure decrement
fut.await;
}
.in_current_span(),
);
}
struct TaskGuard(Arc<TaskGroup>);
impl Drop for TaskGuard {
fn drop(&mut self) {
if self.0.running_count.fetch_sub(1, Ordering::AcqRel) == 1 {
self.0.notify.notify_waiters();
}
}
}This ensures proper memory ordering and guarantees cleanup even on panics. 🟡 Medium: Unhandled Panic RecoveryLocation: The TODO comment acknowledges panic handling isn't implemented. If a task panics:
Recommendation: Use a guard pattern (shown above) to ensure the counter is always decremented, or wrap the future in 🟡 Medium: Potential Notification Miss in wait_idleLocation: loop {
self.notify.notified().await;
if self.running_count.load(Ordering::Acquire) == 0 {
break;
}
}Problem: There's a subtle race where:
Recommendation: let mut notified = std::pin::pin!(self.notify.notified());
loop {
// Check count first
if self.running_count.load(Ordering::Acquire) == 0 {
break;
}
// Wait for notification
notified.as_mut().await;
// Create new notified future for next iteration
notified.set(self.notify.notified());
}🟡 Medium: Behavioral Change in release_in_flightLocation: The PR changes when
Question: Is this intentional? The defer pattern ensured cleanup happened at scope exit. The new pattern spawns the cleanup task earlier, which might be fine, but worth verifying this doesn't affect rate limiting accuracy. 🟢 Minor: Inconsistent Memory OrderingLocation:
Recommendation: Use stronger ordering for increment ( 🟢 Minor: Missing Default Implementation
impl Default for TaskGroup {
fn default() -> Self {
Self {
running_count: AtomicUsize::new(0),
notify: Notify::new(),
}
}
}Then users can do 📋 Additional Notes
SummaryThe core approach is sound and solves the shutdown problem effectively. The main concerns are:
Fixing the critical race condition is essential before merging. The other issues are lower priority but should be addressed for production robustness. Great work on identifying and addressing this shutdown issue! 🎯 |
2990334 to
0688959
Compare
67f0a65 to
4039392
Compare
PR Review: fix(guard): handle websocket tasks during shutdownThanks for this PR! The implementation looks solid overall and addresses an important gap in shutdown handling. Here's my detailed review: ✅ Strengths
🔍 Potential Issues1. Race condition in TaskGroup (
|
PR Review: fix(guard): handle websocket tasks during shutdownSummaryThis PR introduces a Code Quality & Best Practices✅ Strengths
🔍 Issues & Concerns1. Critical: Race condition in TaskGroup::wait_idle (task_group.rs:43-56)There's a potential race condition in the pub async fn wait_idle(&self) {
// Fast path
if self.running_count.load(Ordering::Acquire) == 0 {
return;
}
// Wait for notifications until the count reaches zero
loop {
self.notify.notified().await;
if self.running_count.load(Ordering::Acquire) == 0 {
break;
}
}
}Problem: Between the initial check and calling Fix: Subscribe to notifications before checking the count: pub async fn wait_idle(&self) {
loop {
let notified = self.notify.notified();
if self.running_count.load(Ordering::Acquire) == 0 {
return;
}
notified.await;
}
}2. Memory ordering mismatch (task_group.rs:25, 34)The increment uses self.running_count.fetch_add(1, Ordering::Relaxed); // Line 25
// ...
if self2.running_count.fetch_sub(1, Ordering::AcqRel) == 1 { // Line 34Recommendation: Use 3. Unhandled panics (task_group.rs:27)The TODO comment acknowledges panic handling is missing: // TODO: Handle panics
let self2 = self.clone();
tokio::spawn(...)Problem: If a spawned task panics, the counter will never be decremented, causing Fix: Wrap the future in use std::panic::AssertUnwindSafe;
use futures::FutureExt;
tokio::spawn(
async move {
let result = AssertUnwindSafe(fut).catch_unwind().await;
if result.is_err() {
tracing::error!("task panicked in TaskGroup");
}
if self2.running_count.fetch_sub(1, Ordering::AcqRel) == 1 {
self2.notify.notify_waiters();
}
}
.in_current_span(),
);4. Potential resource leak (proxy_service.rs:814-819)The // Release in-flight counter when done
let state_clone = self.state.clone();
tokio::spawn(
async move {
state_clone.release_in_flight(client_ip, &actor_id).await;
}
.instrument(tracing::info_span!("release_in_flight_task")),
);Problem: This task is spawned after the request completes but is not tracked by the TaskGroup. During shutdown, these tasks may not complete, potentially leaving in-flight counters incorrectly incremented. Fix: Either:
5. Missing error handling (proxy_service.rs:2346)The analytics insertion spawns a task that could fail silently: tokio::spawn(
async move {
if let Err(error) = context_clone.insert_event().await {
tracing::warn!(?error, "failed to insert guard analytics event");
}
}
.instrument(tracing::info_span!("insert_event_task")),
);Question: Should this also be tracked by TaskGroup to ensure analytics are flushed before shutdown? Performance Considerations
Security ConcernsNo significant security issues identified. The shutdown handling improvement actually enhances security by ensuring proper cleanup. Test CoverageMissing: I don't see any tests for the
Recommendations SummaryMust Fix (Blocking)
Should Fix
Nice to Have
Overall AssessmentThis is a solid improvement that addresses a real problem with WebSocket task handling during shutdown. The The changes follow Rivet's code style and logging patterns well. Once the concurrency issues are resolved, this will be a valuable addition to the codebase. Generated with Claude Code |
0688959 to
4fa0007
Compare
4039392 to
6546966
Compare
Merge activity
|
Pull Request Review: fix(guard): handle websocket tasks during shutdownSummaryThis PR introduces a Code Quality & Best Practices✅ Strengths
Potential Issues & Concerns🔴 Critical: Race Condition in
|
Pull Request Review: Fix guard websocket tasks during shutdownSummaryThis PR addresses a critical shutdown issue by introducing proper tracking of background websocket tasks during guard shutdown. The implementation adds a Code Quality & Best PracticesStrengths ✓
Improvements Needed1. Critical: Race condition in
|
4fa0007 to
7f53a33
Compare
6546966 to
7bde763
Compare
Code Review: fix(guard): handle websocket tasks during shutdownSummaryThis PR introduces a Strengths✅ Clear architectural improvement: The ✅ Proper shutdown sequencing: The server now waits for both hyper's graceful shutdown AND the custom task group, ensuring all work completes. ✅ Good use of atomics: The ✅ Documentation improvements: Fixed doc comment syntax ( ✅ Consistent logging patterns: Fixed structured logging in Issues & Concerns🔴 CRITICAL: Race Condition in TaskGroupLocation: The pub async fn wait_idle(&self) {
// Fast path
if self.running_count.load(Ordering::Acquire) == 0 { // ← Check here
return;
}
// Wait for notifications until the count reaches zero
loop {
self.notify.notified().await; // ← Could miss notification if count became 0 between check and wait
if self.running_count.load(Ordering::Acquire) == 0 {
break;
}
}
}Problem: If the last task completes between the initial check (line 45) and calling Solution: Subscribe to notifications BEFORE checking the count: pub async fn wait_idle(&self) {
loop {
let notified = self.notify.notified(); // Subscribe first
if self.running_count.load(Ordering::Acquire) == 0 {
return;
}
notified.await; // Now safe to wait
}
}This ensures we either see count == 0 OR we're already subscribed when the notification fires. 🟡 ISSUE: Panic Handling Not ImplementedLocation: // TODO: Handle panics
let self2 = self.clone();
tokio::spawn(
async move {
fut.await; // ← If fut panics, count never decrements!Problem: If a spawned task panics, the running count is never decremented, causing Impact: Any panic in WebSocket handling, analytics insertion, or in-flight counter release will break graceful shutdown. Solution: Wrap the future in use std::panic::AssertUnwindSafe;
use futures::FutureExt;
tokio::spawn(
async move {
let _ = AssertUnwindSafe(fut).catch_unwind().await;
if self2.running_count.fetch_sub(1, Ordering::AcqRel) == 1 {
self2.notify.notify_waiters();
}
}
.in_current_span(),
);🟡 ISSUE: In-Flight Counter Release Semantics ChangedLocation: Before: The defer macro ensured After: The release is spawned AFTER the request completes normally. Problem: This is a subtle behavioral change. The old code would spawn the release task regardless of how Questions:
Recommendation: If the old behavior was correct, consider adding a Drop guard or moving the spawn to ensure it always happens: struct ReleaseOnDrop {
state: Arc<ProxyState>,
client_ip: std::net::IpAddr,
actor_id: Option<Id>,
}
impl Drop for ReleaseOnDrop {
fn drop(&mut self) {
let state = self.state.clone();
let client_ip = self.client_ip;
let actor_id = self.actor_id;
tokio::spawn(async move {
state.release_in_flight(client_ip, &actor_id).await;
}.instrument(tracing::info_span!("release_in_flight_task")));
}
}🟢 Minor: Missing Default traitLocation: The impl Default for TaskGroup {
fn default() -> Self {
Self {
running_count: AtomicUsize::new(0),
notify: Notify::new(),
}
}
}
pub fn new() -> Arc<Self> {
Arc::new(Self::default())
}This allows users to create non-Arc'd instances if needed in tests. 🟢 Minor: Unused output in service_managerLocation: Good cleanup changing Performance Considerations
Security ConsiderationsNo direct security issues identified. The shutdown handling ensures tasks complete before process exit, which prevents data loss. Test Coverage
Recommendations
Style & Convention Compliance✅ Follows Rust workspace conventions from CLAUDE.md The frontend Overall AssessmentThis PR addresses a real production issue (orphaned WebSocket tasks during shutdown), but the implementation has a critical race condition that must be fixed before merging. The panic handling TODO should also be addressed to prevent shutdown hangs. Recommendation: Request changes for the race condition fix, strongly encourage panic handling implementation. |
PR Review: fix(guard): handle websocket tasks during shutdownSummaryThis PR addresses a shutdown handling issue in the guard service by introducing a Code Quality & Best PracticesStrengths:
Observations:
Potential Issues & Concerns1. Race Condition in TaskGroup (Critical)Location: There's a subtle race condition in the pub fn spawn<F, O>(self: &Arc<Self>, fut: F) {
self.running_count.fetch_add(1, Ordering::Relaxed); // Line 25
let self2 = self.clone();
tokio::spawn(
async move {
fut.await;
if self2.running_count.fetch_sub(1, Ordering::AcqRel) == 1 {
self2.notify.notify_waiters();
}
}
.in_current_span(),
);
}Issue: The increment uses
Recommendation: Change line 25 to use self.running_count.fetch_add(1, Ordering::Release);2. Missing Panic Handling (Medium Priority)Location: The TODO comment mentions this, but it's worth emphasizing: // TODO: Handle panicsIssue: If a spawned task panics, the Recommendation: Wrap the future in a panic-catching mechanism: async move {
let result = std::panic::AssertUnwindSafe(fut).catch_unwind().await;
// Decrement regardless of panic
if self2.running_count.fetch_sub(1, Ordering::AcqRel) == 1 {
self2.notify.notify_waiters();
}
if let Err(e) = result {
tracing::error!("task panicked: {:?}", e);
}
}3. Notification Lost Wakeup (Low Priority)Location: In loop {
self.notify.notified().await; // Line 51
if self.running_count.load(Ordering::Acquire) == 0 { // Line 52
break;
}
}Issue: Between the notification and the count check, new tasks could be spawned. While this is unlikely to cause issues in practice (shutdown scenario), it could lead to unnecessary loop iterations. Recommendation: This is acceptable for the shutdown use case, but document this behavior. Alternatively, consider using a shutdown flag to prevent new spawns after 4. Change in Release Timing (Medium Priority)Location: The in-flight counter release logic was moved from a deferred cleanup (before the response) to after the response: Before: // Prepare to release in-flight counter when done
let state_clone = self.state.clone();
crate::defer! {
tokio::spawn(async move {
state_clone.release_in_flight(client_ip, &actor_id).await;
}.instrument(tracing::info_span!("release_in_flight_task")));
}After: // Release in-flight counter when done
let state_clone = self.state.clone();
tokio::spawn(
async move {
state_clone.release_in_flight(client_ip, &actor_id).await;
}
.instrument(tracing::info_span!("release_in_flight_task")),
);Question: Does this change the timing semantics? The old code would spawn the cleanup task before returning the response, while the new code spawns it after. This might affect:
Recommendation: Verify this doesn't change the intended behavior. If the timing matters, consider spawning before the response is returned. Performance Considerations
Security ConcernsNo significant security concerns identified. The shutdown handling improvement actually enhances reliability, which is security-adjacent. Test CoverageObservation: The existing test Recommendation: Consider adding a test that:
Example test structure: #[tokio::test]
async fn test_graceful_shutdown_with_websockets() {
// Start server with multiple WebSocket connections
// Trigger shutdown signal
// Verify all WebSocket tasks complete
// Verify shutdown completes within timeout
}Missing NewlineLocation: Good catch adding the trailing newline. Frontend ChangesThe changes to Overall AssessmentThis is a solid fix for handling WebSocket tasks during shutdown. The core implementation is good, but there are some concurrency concerns that should be addressed: Must Fix:
Should Consider:
Nice to Have:
The PR moves in the right direction by properly tracking background tasks during shutdown. With the memory ordering fix and panic handling, this should be production-ready. |

No description provided.