Skip to content

Conversation

@MasterPtato
Copy link
Contributor

No description provided.

@vercel
Copy link

vercel bot commented Nov 11, 2025

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Preview Comments Updated (UTC)
rivet-inspector Error Error Nov 13, 2025 10:48pm
3 Skipped Deployments
Project Deployment Preview Comments Updated (UTC)
rivet-cloud Ignored Ignored Nov 13, 2025 10:48pm
rivet-site Ignored Ignored Preview Nov 13, 2025 10:48pm
rivetkit-serverless Skipped Skipped Nov 13, 2025 10:48pm

Copy link
Contributor Author

MasterPtato commented Nov 11, 2025


How to use the Graphite Merge Queue

Add the label merge-queue to this PR to add it to the merge queue.

You must have a Graphite account in order to use the merge queue. Sign up using this link.

An organization admin has enabled the Graphite Merge Queue in this repository.

Please do not merge from GitHub as this will restart CI on PRs being processed by the merge queue.

This stack of pull requests is managed by Graphite. Learn more about stacking.

@claude
Copy link

claude bot commented Nov 11, 2025

PR Review: fix(guard): handle websocket tasks during shutdown

Summary

This PR introduces a TaskGroup abstraction to properly track and wait for WebSocket and analytics tasks during guard shutdown. This is an important fix for ensuring graceful shutdown behavior.

Code Quality & Best Practices ✅

Positives:

  • Well-structured TaskGroup implementation with clear separation of concerns
  • Follows Rust idioms with proper use of Arc, atomics, and async patterns
  • Good use of tracing instrumentation throughout
  • Consistent with project conventions (hard tabs, structured logging)
  • Clean integration into existing codebase

Minor Issues:

  1. Missing Default trait (task_group.rs:13-19)
    The TaskGroup::new() method could benefit from implementing Default:

    impl Default for TaskGroup {
        fn default() -> Arc<Self> {
            Arc::new(Self {
                running_count: AtomicUsize::new(0),
                notify: Notify::new(),
            })
        }
    }
  2. Comment consistency (config/runtime.rs:11-16)
    Good fix changing // to /// for doc comments, but these should be complete sentences:

    /// Time (in seconds) to allow guard to wait for pending requests after receiving SIGTERM.
    /// Defaults to 1 hour.
    

Potential Bugs or Issues ⚠️

  1. Race condition in wait_idle (task_group.rs:42-55)
    There's a potential race condition between lines 44 and 50. A task could complete between the initial check and calling notified(), causing the function to wait unnecessarily for the next notification. Consider this pattern:

    pub async fn wait_idle(&self) {
        loop {
            if self.running_count.load(Ordering::Acquire) == 0 {
                return;
            }
            self.notify.notified().await;
        }
    }

    This eliminates the "fast path" but ensures correctness.

  2. Missing spawn result handling (task_group.rs:28-38)
    The tokio::spawn result (a JoinHandle) is being dropped. While this is intentional for fire-and-forget tasks, consider documenting this behavior or tracking panics:

    let handle = tokio::spawn(/* ... */);
    // Explicitly drop to make intent clear
    drop(handle);
  3. Moved async task (proxy_service.rs:814-819)
    The release_in_flight task was moved from a defer-like macro to after the response. This changes timing - the counter is now released after the response is sent rather than when the function scope ends. This could affect rate limiting accuracy during high load. Was this intentional? Consider documenting the reasoning.

Performance Considerations 🚀

  1. Memory ordering (task_group.rs:25,33)

    • Line 25 uses Ordering::Relaxed for increment, which is fine
    • Line 33 uses Ordering::AcqRel for decrement, which is appropriate
    • Consider using Ordering::Release on increment and Ordering::Acquire on decrement for consistency, though current implementation is correct
  2. Notify overhead (task_group.rs:34)
    notify_waiters() is called on every task completion when count hits zero. This is efficient, but in high-churn scenarios (many tasks starting/stopping), consider if this could cause contention. Current implementation is likely fine for guard's use case.

  3. TaskGroup allocation (proxy_service.rs:382)
    TaskGroup::new() returns an Arc directly, which is ergonomic but means ProxyState holds an Arc<TaskGroup> (line 354). This is fine but adds one extra indirection. Could consider:

    tasks: TaskGroup,  // and adjust new() to not wrap in Arc

    But current design is more flexible for sharing.

Security Concerns 🔒

No security issues identified. The changes improve security by ensuring proper cleanup during shutdown, preventing potential resource leaks.

Test Coverage 📊

Missing:

  • No unit tests for TaskGroup
  • No integration tests verifying shutdown behavior with active WebSocket connections
  • No tests for the race condition scenarios

Recommendations:

  1. Add unit tests for TaskGroup:

    #[tokio::test]
    async fn test_wait_idle_with_no_tasks() {
        let tg = TaskGroup::new();
        tg.wait_idle().await; // Should return immediately
    }
    
    #[tokio::test]
    async fn test_wait_idle_with_tasks() {
        let tg = TaskGroup::new();
        tg.spawn(async { tokio::time::sleep(Duration::from_millis(100)).await });
        tg.wait_idle().await; // Should wait
    }
  2. Add integration test verifying shutdown waits for WebSocket tasks to complete

Additional Notes

  1. Frontend changes (frontend/src/routeTree.gen.ts)
    This appears to be auto-generated code with only formatting changes. Ensure the generator is deterministic to avoid noise in future PRs.

  2. Version bump (openapi.json)
    Version bumped to 2.0.24-rc.1 - appropriate for a bug fix.

  3. Unused variable cleanup (service-manager/src/lib.rs:163, gasoline/src/worker.rs:326)
    Good cleanup of unused variables and improved logging.

Verdict

Overall: Approve with suggestions

This is a solid fix for an important shutdown issue. The TaskGroup abstraction is well-designed and the integration is clean. The main concerns are:

  1. The potential race condition in wait_idle (should be addressed)
  2. Missing test coverage (should be added before merge)
  3. Verify the timing change for release_in_flight is intentional

The code quality is good and follows project conventions. With tests added and the race condition addressed, this will be a strong contribution.

@pkg-pr-new
Copy link

pkg-pr-new bot commented Nov 11, 2025

More templates

@rivetkit/actor

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/actor@3448

@rivetkit/cloudflare-workers

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/cloudflare-workers@3448

@rivetkit/core

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/core@3448

@rivetkit/db

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/db@3448

@rivetkit/framework-base

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/framework-base@3448

@rivetkit/next-js

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/next-js@3448

@rivetkit/react

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/react@3448

rivetkit

pnpm add https://pkg.pr.new/rivet-dev/rivet/rivetkit@3448

@rivetkit/sql-loader

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/sql-loader@3448

@rivetkit/engine-runner

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/engine-runner@3448

@rivetkit/engine-runner-protocol

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/engine-runner-protocol@3448

commit: 380ccdd

@claude
Copy link

claude bot commented Nov 11, 2025

PR Review: fix(guard): handle websocket tasks during shutdown

Summary

This PR introduces a TaskGroup abstraction to track WebSocket and other background tasks during guard shutdown, ensuring they complete gracefully before the service terminates. The implementation is solid and addresses a real shutdown race condition.

Positive Aspects

Clean abstraction: The TaskGroup implementation is well-designed and focused
Proper shutdown sequencing: The server now waits for both requests (graceful.shutdown()) and background tasks (wait_idle())
Atomic operations: Uses appropriate memory orderings for the counter
Tracing improvements: Better structured logging (e.g., ?remaining_workflows instead of inline formatting)
Code cleanup: Removed the complex defer! macro in favor of simpler explicit spawning
Documentation fixes: Fixed comment syntax (/////) in runtime.rs:12-13

Issues & Recommendations

🔴 Critical: Panic Handling (task_group.rs:27)

The TODO comment indicates panics aren't handled. If a spawned task panics, the counter will never decrement, causing wait_idle() to hang indefinitely during shutdown.

Recommendation: Wrap the future execution to catch panics:

pub fn spawn<F, O>(self: &Arc<Self>, fut: F)
where
	F: Future<Output = O> + Send + 'static,
{
	self.running_count.fetch_add(1, Ordering::Relaxed);

	let self2 = self.clone();
	tokio::spawn(
		async move {
			let _guard = scopeguard::guard((), |_| {
				if self2.running_count.fetch_sub(1, Ordering::AcqRel) == 1 {
					self2.notify.notify_waiters();
				}
			});
			fut.await;
		}
		.in_current_span(),
	);
}

Or use AssertUnwindSafe + catch_unwind if you need panic details.

⚠️ Medium: Potential Race in wait_idle() (task_group.rs:43-56)

There's a subtle race condition between lines 51-52:

self.notify.notified().await;
if self.running_count.load(Ordering::Acquire) == 0 {

If the last task completes between notified().await returning and the count check, we might wait for another notification that never comes.

Recommendation: Use a more robust pattern:

pub async fn wait_idle(&self) {
	loop {
		if self.running_count.load(Ordering::Acquire) == 0 {
			return;
		}
		self.notify.notified().await;
	}
}

This checks the count before waiting, eliminating the gap.

🟡 Minor: Memory Ordering Inconsistency (task_group.rs:25,34)

  • Line 25: fetch_add(1, Ordering::Relaxed)
  • Line 34: fetch_sub(1, Ordering::AcqRel)

While likely correct, the asymmetry is unusual. Since you're using Acquire in wait_idle(), consider using Ordering::Release for the increment to establish proper happens-before relationships.

Recommendation:

self.running_count.fetch_add(1, Ordering::Release);

🟡 Minor: Unused release_in_flight Task (proxy_service.rs:814-819)

The spawned task for release_in_flight is no longer tracked by TaskGroup. This is probably fine since it's a cleanup operation, but consider documenting this decision or adding it to the task group for consistency.

🟡 Minor: Default Implementation

TaskGroup::new() could implement Default for consistency with Rust conventions.

Testing Concerns

⚠️ The existing test (simple_websocket_test2.rs) doesn't validate the shutdown behavior. Consider adding a test that:

  1. Spawns WebSocket tasks via TaskGroup
  2. Initiates shutdown
  3. Verifies all tasks complete before wait_idle() returns
  4. Tests timeout behavior with the configured guard_shutdown_duration

Security & Performance

✅ No security concerns identified
✅ The atomic counter approach is efficient and lock-free
✅ Proper use of tokio::spawn with tracing context preservation

Code Quality

The code follows the repository's conventions well:

  • Hard tabs for indentation ✅
  • Proper error handling patterns ✅
  • Structured logging ✅
  • Workspace dependencies ✅

Verdict

Approve with recommendations - This is a valuable fix for graceful shutdown. The critical panic handling issue should be addressed before merging, but the overall approach is sound.


Review generated with assistance from Claude Code

@claude
Copy link

claude bot commented Nov 11, 2025

PR Review: fix(guard): handle websocket tasks during shutdown

Summary

This PR improves the guard service's shutdown handling by properly tracking and waiting for background WebSocket and analytics tasks to complete. The implementation adds a new TaskGroup primitive to coordinate graceful shutdown.


Code Quality & Architecture

Strengths:

  • Clean, focused implementation with clear separation of concerns
  • The TaskGroup abstraction is well-designed and reusable
  • Proper use of atomic operations for lock-free task counting
  • Follows the project's error handling and logging patterns

Issues Found:

1. Race Condition in TaskGroup (Critical)

engine/packages/guard-core/src/task_group.rs:50-54

There's a potential race condition in wait_idle():

loop {
    self.notify.notified().await;
    if self.running_count.load(Ordering::Acquire) == 0 {
        break;
    }
}

Problem: If all tasks complete between the initial fast-path check (line 45) and calling notified() (line 51), the notification will be missed, causing wait_idle() to hang indefinitely.

Solution: Subscribe to notifications BEFORE checking the count:

pub async fn wait_idle(&self) {
    loop {
        let notified = self.notify.notified();
        if self.running_count.load(Ordering::Acquire) == 0 {
            return;
        }
        notified.await;
    }
}

This ensures we're listening for notifications before checking the count, preventing the race.

2. Missing Panic Handling (High Priority)

engine/packages/guard-core/src/task_group.rs:27

The TODO comment indicates panic handling is missing. If a spawned task panics:

  • The task counter will never decrement
  • wait_idle() will hang forever during shutdown
  • The guard service will not shut down gracefully

Recommendation: Wrap the future in a panic-catching wrapper or use AssertUnwindSafe with catch_unwind:

pub fn spawn<F, O>(self: &Arc<Self>, fut: F)
where
    F: Future<Output = O> + Send + 'static,
{
    self.running_count.fetch_add(1, Ordering::Relaxed);
    
    let self2 = self.clone();
    tokio::spawn(
        async move {
            let _guard = scopeguard::guard((), |_| {
                if self2.running_count.fetch_sub(1, Ordering::AcqRel) == 1 {
                    self2.notify.notify_waiters();
                }
            });
            fut.await;
        }
        .in_current_span(),
    );
}

Or use the scopeguard crate to ensure the counter always decrements.

3. Ordering Consistency (Low Priority)

engine/packages/guard-core/src/task_group.rs:25,34

The increment uses Relaxed ordering while decrement uses AcqRel. For consistency and to ensure visibility of the count changes, consider using AcqRel for both operations, or at minimum Release for increment and Acquire for decrement.

4. Removed defer! Macro Without Cleanup

engine/packages/guard-core/src/proxy_service.rs:2407-2429

The PR removes the defer! macro entirely but doesn't clean up its definition location. The macro was defined in this file but is no longer needed after moving the cleanup to a post-request spawn.

Good: The change to spawn cleanup after the request completes is correct - it prevents blocking the request response.

Action needed: The old macro definition should be removed if it's no longer used anywhere.


Performance Considerations

Positive:

  • Lock-free atomic operations minimize contention
  • The fast-path check in wait_idle() avoids unnecessary async operations
  • Moving release_in_flight after response is correct - prevents blocking the response

Concern:

  • Each spawned task allocates a new tokio task. For high-frequency WebSocket connections, this could add overhead. However, this is acceptable for the guard service's use case.

Security Concerns

No security issues identified. The changes don't introduce new attack surfaces or bypass existing security measures.


Test Coverage

Missing tests for:

  1. TaskGroup race conditions (especially the one identified above)
  2. Panic handling in spawned tasks
  3. Shutdown with pending WebSocket tasks
  4. Timeout behavior during shutdown with hanging tasks

Recommendation: Add integration tests in engine/packages/guard-core/tests/ to verify:

  • Multiple concurrent tasks complete before wait_idle() returns
  • Tasks spawned during wait_idle() are handled correctly
  • Shutdown completes within the configured timeout
  • Panicking tasks don't prevent shutdown

Minor Issues

  1. Comment formatting (engine/packages/config/src/config/runtime.rs:12,14) - Good catch fixing the comment syntax from // to ///
  2. Logging consistency (engine/packages/gasoline/src/worker.rs:326) - Good improvement using structured logging
  3. Frontend route file (frontend/src/routeTree.gen.ts) - This is auto-generated; changes are expected

Recommendations

Must Fix:

  1. ✅ Fix the race condition in TaskGroup::wait_idle()
  2. ✅ Implement panic handling for spawned tasks

Should Fix:
3. Add test coverage for TaskGroup
4. Consider adding a timeout mechanism inside TaskGroup itself
5. Document the memory ordering guarantees in comments

Optional:
6. Add metrics for tracked task count (useful for debugging)
7. Consider adding task naming/identification for better observability


Verdict

This is a solid improvement to shutdown handling, but the race condition and missing panic handling are blockers that must be addressed before merging. Once those critical issues are fixed, this will significantly improve the reliability of the guard service's shutdown process.

The overall architecture is sound and follows best practices. Great work on identifying and solving the WebSocket task tracking problem!

@claude
Copy link

claude bot commented Nov 11, 2025

PR Review: fix(guard): handle websocket tasks during shutdown

Summary

This PR addresses a critical issue where WebSocket and other spawned tasks were not being properly tracked during guard shutdown, potentially leading to abrupt termination of in-flight operations. The solution introduces a TaskGroup utility to track spawned tasks and ensure graceful shutdown.


Code Quality & Best Practices ✅

Strengths:

  • Clean abstraction: The TaskGroup implementation is well-designed and reusable
  • Proper separation of concerns: Task tracking logic is isolated in its own module
  • Consistent with project style: Uses hard tabs, follows Rust naming conventions
  • Good tracing integration: Uses .in_current_span() to preserve trace context across spawned tasks
  • Documentation fixes: Corrected comment syntax from // to /// in runtime.rs

Minor improvements:

  1. Comment formatting: config/runtime.rs:12,14 - Good catch fixing the doc comment syntax
  2. Structured logging: gasoline/worker.rs:326 - Correctly uses ?remaining_workflows instead of the verbose form
  3. Import organization: server.rs:7-16 - Imports now properly grouped with stdlib first, then external crates, then internal modules

Potential Issues & Bugs 🔍

1. Race condition in TaskGroup::wait_idle() - CRITICAL

Location: task_group.rs:43-56

There's a potential race condition between lines 45-46 (fast path check) and lines 50-54 (notification loop). If a task increments the counter between the fast-path check and the notified().await call, and then completes before we start waiting, we'll miss the notification.

Suggested fix:

pub async fn wait_idle(&self) {
    loop {
        // Subscribe to notifications *before* checking the count
        let notified = self.notify.notified();
        
        if self.running_count.load(Ordering::Acquire) == 0 {
            return;
        }
        
        notified.await;
    }
}

2. Memory ordering inconsistency

Location: task_group.rs:25,34

Using Relaxed for increment but AcqRel for decrement is asymmetric. Consider using Release for the increment as well:

self.running_count.fetch_add(1, Ordering::Release);

3. Panic handling TODO - CRITICAL

Location: task_group.rs:27

If a spawned future panics, the counter won't be decremented, causing wait_idle() to hang indefinitely. Consider using scopeguard or similar pattern to ensure cleanup.

4. Defer macro removal

Location: proxy_service.rs:2420-2432 (removed)

Please verify no other files use this macro before removing it.


Performance Considerations ⚡

Positive:

  • Minimal overhead using atomic operations
  • Smart fast path avoids unnecessary notification setup
  • Moved cleanup task spawn after metrics update (more logical)

Potential concerns:

  • Consider adding impl Drop for TaskGroup that warns if tasks are still running when dropped

Security Concerns 🔒

No critical security issues identified

The existing shutdown timeout mechanism properly prevents indefinite hangs. The changes ensure WebSocket connections are properly tracked during shutdown.


Test Coverage 🧪

Missing: No unit tests for TaskGroup or integration tests for graceful shutdown behavior.

Recommended:

  • Test wait_idle() with multiple concurrent tasks
  • Test panic handling (will currently fail)
  • Test race conditions

Additional Observations

  1. Frontend changes: routeTree.gen.ts (+471/-470) appears to be auto-generated reformatting
  2. Service manager cleanup: Removed unused res variable - good cleanup
  3. Newline fix: Added missing EOF newline in Cargo.toml

Recommendation

Overall assessment: Well-implemented fix for a real issue. Core logic is sound.

Action items before merge:

  1. ✅ Fix race condition in wait_idle() (critical)
  2. ✅ Implement panic handling (critical)
  3. ✅ Consider stronger memory ordering (recommended)
  4. ⚠️ Add unit tests for TaskGroup (recommended)
  5. ⚠️ Verify no other code uses removed defer! macro (recommended)

Great work on identifying and addressing this shutdown issue! 🎉

@MasterPtato MasterPtato force-pushed the 11-10-fix_improve_sigterm_handling_for_the_entire_runtime branch from 9f71238 to 09cec26 Compare November 11, 2025 22:35
@MasterPtato MasterPtato force-pushed the 11-10-fix_guard_handle_websocket_tasks_during_shutdown branch from a353c1b to 380ccdd Compare November 11, 2025 22:35
@MasterPtato MasterPtato marked this pull request as ready for review November 11, 2025 22:35
@NathanFlurry NathanFlurry force-pushed the 11-10-fix_improve_sigterm_handling_for_the_entire_runtime branch from 09cec26 to f6632bf Compare November 13, 2025 17:20
@NathanFlurry NathanFlurry force-pushed the 11-10-fix_guard_handle_websocket_tasks_during_shutdown branch from 380ccdd to 4c115b8 Compare November 13, 2025 17:20
@claude
Copy link

claude bot commented Nov 13, 2025

Code Review

Summary

This PR improves WebSocket task handling during guard shutdown by introducing a TaskGroup to track spawned tasks. The changes ensure that background WebSocket tasks are properly waited for during graceful shutdown, preventing premature termination.


Strengths

1. Well-designed TaskGroup implementation

  • Clean, minimal API with proper atomic operations
  • Correct use of memory ordering (Relaxed for increment, AcqRel for decrement)
  • Efficient fast path in wait_idle() to avoid unnecessary waiting

2. Improved shutdown sequence

  • The server now waits for both HTTP requests AND background tasks to complete
  • Proper ordering: waits for graceful HTTP shutdown first, then background tasks
  • Maintains timeout behavior for safety

3. Better code organization

  • Removal of the defer! macro in favor of explicit spawning improves readability
  • WebSocket tasks now properly tracked via self.state.tasks.spawn()

4. Minor quality improvements

  • Fixed comment syntax in runtime.rs:12-14
  • Better structured logging (?remaining_workflows in worker.rs:326)
  • Added span instrumentation to analytics event insertion

Issues & Recommendations

🔴 Critical: Panic handling in TaskGroup::spawn()

Location: engine/packages/guard-core/src/task_group.rs:27-39

The TODO comment highlights a critical issue - if a spawned task panics, the counter is never decremented, potentially causing wait_idle() to hang indefinitely.

Recommended fix:
Add panic catching using futures::FutureExt::catch_unwind() to ensure the counter is always decremented, even when tasks panic. This prevents indefinite hangs during shutdown.

🟡 Potential race condition in wait_idle()

Location: engine/packages/guard-core/src/task_group.rs:43-56

There is a subtle TOCTOU race condition. If a task completes between the fast path check and calling notified(), we might miss the notification.

Recommended fix:
Subscribe to notifications BEFORE checking the counter to prevent missed wakeups. Call self.notify.notified() first, then check the count.

🟡 Inconsistent task tracking

Location: engine/packages/guard-core/src/proxy_service.rs:814-819

The release_in_flight_task is spawned with regular tokio::spawn() instead of self.state.tasks.spawn(). If the goal is to wait for all background work during shutdown, this should probably use the TaskGroup.

Similarly, the analytics event insertion at line 2344 uses plain tokio::spawn().

Question: Should these tasks also be tracked by the TaskGroup?

🔵 Minor: Missing Default trait

Consider implementing Default for TaskGroup for better Rust idioms, though the current API is fine.


Additional Observations

Frontend changes: The routeTree.gen.ts changes appear to be auto-generated formatting differences (quote style, import ordering). These are noise in the diff but harmless.

Testing: It would be valuable to add tests for:

  • TaskGroup behavior under concurrent spawn/wait operations
  • Panic handling in spawned tasks
  • Shutdown timeout behavior with slow WebSocket tasks

Verdict

The core implementation is solid and significantly improves shutdown handling. However, the panic handling issue is critical and should be addressed before merging, as it could cause shutdown hangs in production. The race condition fix is recommended for correctness.

Overall, great work on improving the shutdown reliability!

@claude
Copy link

claude bot commented Nov 13, 2025

Review: Fix guard shutdown handling for WebSocket tasks

This PR addresses an important shutdown issue where WebSocket and other background tasks weren't being properly tracked during guard shutdown. The implementation looks solid overall, with a few areas for improvement.

✅ Strengths

  1. Well-designed TaskGroup implementation - Clean abstraction using atomic counters and Notify for task tracking
  2. Proper integration - Tasks are now tracked across WebSocket handlers, error proxies, and analytics insertion
  3. Correct shutdown sequence - Waits for both graceful HTTP shutdown AND background tasks to complete
  4. Minor fixes - Fixed comment syntax (/////) and improved logging consistency

🔍 Issues & Recommendations

🔴 Critical: Race Condition in TaskGroup

Location: engine/packages/guard-core/src/task_group.rs:25-39

There's a race condition between incrementing the counter and spawning the task:

self.running_count.fetch_add(1, Ordering::Relaxed);  // [1]

let self2 = self.clone();
tokio::spawn(  // [2] - Task could complete before this returns
    async move {
        fut.await;
        if self2.running_count.fetch_sub(1, Ordering::AcqRel) == 1 {
            self2.notify.notify_waiters();
        }
    }
    .in_current_span(),
);

Problem: If wait_idle() is called between [1] and [2], and the task completes very quickly, the counter could hit zero and notify waiters before the task is even spawned, potentially causing wait_idle() to return prematurely.

Recommendation:

pub fn spawn<F, O>(self: &Arc<Self>, fut: F)
where
    F: Future<Output = O> + Send + 'static,
{
    let self2 = self.clone();
    self.running_count.fetch_add(1, Ordering::Release);  // Use Release ordering
    
    tokio::spawn(
        async move {
            let _guard = TaskGuard(self2.clone());  // Use guard to ensure decrement
            fut.await;
        }
        .in_current_span(),
    );
}

struct TaskGuard(Arc<TaskGroup>);
impl Drop for TaskGuard {
    fn drop(&mut self) {
        if self.0.running_count.fetch_sub(1, Ordering::AcqRel) == 1 {
            self.0.notify.notify_waiters();
        }
    }
}

This ensures proper memory ordering and guarantees cleanup even on panics.

🟡 Medium: Unhandled Panic Recovery

Location: engine/packages/guard-core/src/task_group.rs:27

The TODO comment acknowledges panic handling isn't implemented. If a task panics:

  • The counter is never decremented
  • wait_idle() will hang indefinitely

Recommendation: Use a guard pattern (shown above) to ensure the counter is always decremented, or wrap the future in AssertUnwindSafe and catch panics.

🟡 Medium: Potential Notification Miss in wait_idle

Location: engine/packages/guard-core/src/task_group.rs:50-55

loop {
    self.notify.notified().await;
    if self.running_count.load(Ordering::Acquire) == 0 {
        break;
    }
}

Problem: There's a subtle race where:

  1. Thread checks count, sees it's > 0
  2. All tasks complete and notify
  3. Thread calls notified() and waits forever for the next notification that may never come

Recommendation:

let mut notified = std::pin::pin!(self.notify.notified());
loop {
    // Check count first
    if self.running_count.load(Ordering::Acquire) == 0 {
        break;
    }
    // Wait for notification
    notified.as_mut().await;
    // Create new notified future for next iteration
    notified.set(self.notify.notified());
}

🟡 Medium: Behavioral Change in release_in_flight

Location: engine/packages/guard-core/src/proxy_service.rs:788-814

The PR changes when release_in_flight is called:

  • Before: Using defer! macro, it was called when the request handler dropped (after response sent)
  • After: Called immediately after handling completes but potentially before response is fully sent

Question: Is this intentional? The defer pattern ensured cleanup happened at scope exit. The new pattern spawns the cleanup task earlier, which might be fine, but worth verifying this doesn't affect rate limiting accuracy.

🟢 Minor: Inconsistent Memory Ordering

Location: engine/packages/guard-core/src/task_group.rs:25,34,45,52

  • Line 25: Relaxed ordering on increment
  • Line 34: AcqRel ordering on decrement
  • Lines 45, 52: Acquire ordering on load

Recommendation: Use stronger ordering for increment (Release) to ensure happens-before relationship with the decrement.

🟢 Minor: Missing Default Implementation

TaskGroup::new() returns Arc<Self> which is unusual. Consider implementing Default:

impl Default for TaskGroup {
    fn default() -> Self {
        Self {
            running_count: AtomicUsize::new(0),
            notify: Notify::new(),
        }
    }
}

Then users can do Arc::new(TaskGroup::default()) which is more idiomatic.

📋 Additional Notes

  1. Frontend changes (routeTree.gen.ts): This appears to be auto-generated and unrelated to the guard fix. Consider removing from this PR.

  2. Testing: Consider adding integration tests that verify:

    • Shutdown waits for WebSocket tasks
    • Rapid task spawning/completion doesn't race
    • Panic in tasks doesn't hang shutdown

Summary

The core approach is sound and solves the shutdown problem effectively. The main concerns are:

  1. Race condition in task tracking (critical)
  2. Panic handling (blocks shutdown)
  3. Potential notification miss (rare but possible)

Fixing the critical race condition is essential before merging. The other issues are lower priority but should be addressed for production robustness.

Great work on identifying and addressing this shutdown issue! 🎯

@claude
Copy link

claude bot commented Nov 13, 2025

PR Review: fix(guard): handle websocket tasks during shutdown

Thanks for this PR! The implementation looks solid overall and addresses an important gap in shutdown handling. Here's my detailed review:

✅ Strengths

  1. Well-designed TaskGroup abstraction - The new TaskGroup struct in task_group.rs provides a clean interface for tracking background tasks during shutdown
  2. Proper instrumentation - Good use of tracing spans (in_current_span()) to maintain observability across task boundaries
  3. Code cleanup - Fixed comment syntax (/////) in config/runtime.rs:12-13 and improved structured logging in gasoline/worker.rs:326
  4. Comprehensive shutdown - Now waits for both HTTP requests AND background tasks before completing shutdown

🔍 Potential Issues

1. Race condition in TaskGroup (task_group.rs:25-39)

There's a subtle race condition between incrementing the counter and spawning the task:

self.running_count.fetch_add(1, Ordering::Relaxed);  // Line 25

let self2 = self.clone();
tokio::spawn(
    async move {
        fut.await;
        
        if self2.running_count.fetch_sub(1, Ordering::AcqRel) == 1 {  // Line 34
            self2.notify.notify_waiters();
        }
    }
    .in_current_span(),
);

Issue: If a task completes extremely quickly (or if there's a scheduling delay), the following sequence could occur:

  1. Thread A: Increments count to 1
  2. Thread A: Spawns task
  3. Spawned task: Runs and decrements count to 0, notifies waiters
  4. Thread B: Calls wait_idle(), sees count == 0, returns immediately
  5. Thread A: Still has code to execute in the spawned closure

Recommendation: Use AcqRel ordering for the increment to ensure proper synchronization:

self.running_count.fetch_add(1, Ordering::AcqRel);

2. Missing panic handling (task_group.rs:27)

The TODO comment acknowledges this, but panicking tasks won't decrement the counter, causing wait_idle() to hang indefinitely.

Recommendation: Wrap the future execution:

tokio::spawn(
    async move {
        let _guard = scopeguard::guard((), |_| {
            if self2.running_count.fetch_sub(1, Ordering::AcqRel) == 1 {
                self2.notify.notify_waiters();
            }
        });
        fut.await;
        // Guard automatically decrements even if panic occurs
    }
    .in_current_span(),
);

Or use AssertUnwindSafe and catch_unwind if you prefer to stay in async context.

3. Notify starvation in wait_idle (task_group.rs:50-55)

The wait_idle implementation has a subtle issue:

loop {
    self.notify.notified().await;
    if self.running_count.load(Ordering::Acquire) == 0 {
        break;
    }
}

Issue: Between checking the count and calling notified().await again, tasks could complete and notify, but we'll miss that notification because we weren't subscribed yet.

Recommendation: Subscribe before checking:

loop {
    let notified = self.notify.notified();  // Subscribe first
    if self.running_count.load(Ordering::Acquire) == 0 {
        return;
    }
    notified.await;  // Then wait
}

4. Task spawn timing change (proxy_service.rs:812-820)

The in-flight counter release was moved from a deferred cleanup (using the now-removed defer! macro) to happening AFTER the request completes:

// Old: cleanup deferred at start of request
// New: cleanup spawned after request completes

metrics::PROXY_REQUEST_PENDING.add(-1, &[]);

// Release in-flight counter when done
let state_clone = self.state.clone();
tokio::spawn(
    async move {
        state_clone.release_in_flight(client_ip, &actor_id).await;
    }
    .instrument(tracing::info_span!("release_in_flight_task")),
);

Concern: This is no longer tracked by TaskGroup, so these cleanup tasks won't be awaited during shutdown. If shutdown occurs, in-flight counters might not be properly released.

Recommendation: Use self.state.tasks.spawn() instead of bare tokio::spawn() for consistency.

5. Analytics event spawning (proxy_service.rs:2344-2350)

Similarly, analytics insertion uses bare tokio::spawn:

tokio::spawn(
    async move {
        if let Err(error) = context_clone.insert_event().await {
            tracing::warn!(?error, "failed to insert guard analytics event");
        }
    }
    .instrument(tracing::info_span!("insert_event_task")),
);

This won't be tracked either. Consider whether analytics events should block shutdown.

📊 Performance Considerations

  1. Atomic operations: The current mix of Relaxed, Acquire, and AcqRel orderings is mostly appropriate, though the increment should use AcqRel as noted
  2. Lock-free design: Excellent use of atomics instead of mutexes for the counter
  3. Notification overhead: Notify::notify_waiters() is efficient for this use case

🔒 Security Concerns

No security issues identified. The changes are internal infrastructure improvements.

🧪 Test Coverage

The package has good existing test coverage with WebSocket-specific tests:

  • tests/websocket.rs
  • tests/simple_websocket.rs
  • tests/simple_websocket_echo.rs

Recommendation: Add a test specifically for shutdown behavior:

  • Test that wait_idle() properly waits for all spawned tasks
  • Test that shutdown completes within the configured timeout
  • Test the panic handling once implemented

📝 Minor Items

  1. Frontend changes (frontend/src/routeTree.gen.ts): This is an auto-generated file with 471 additions/470 deletions, likely just a formatting change. Should verify the generator is working correctly.

  2. Trailing newline (service-manager/Cargo.toml): Good catch adding the missing newline at EOF.

  3. Unused variable cleanup (service-manager/src/lib.rs:163): Nice cleanup changing Result::Ok(res) to Result::Ok(_).

🎯 Summary

This is a solid improvement to shutdown handling. The main concerns are:

  1. Critical: Fix the race condition in TaskGroup::spawn() with proper atomic ordering
  2. Critical: Implement panic handling to prevent shutdown hangs
  3. Important: Fix the notification starvation issue in wait_idle()
  4. Recommended: Track all spawned tasks (including cleanup and analytics) consistently

Great work on improving the shutdown reliability! 🚀

@claude
Copy link

claude bot commented Nov 13, 2025

PR Review: fix(guard): handle websocket tasks during shutdown

Summary

This PR introduces a TaskGroup utility to properly track and wait for background WebSocket tasks during guard shutdown. This is a solid improvement to the shutdown handling that addresses a real issue where WebSocket tasks could be orphaned during graceful shutdown.

Code Quality & Best Practices

✅ Strengths

  1. Clean abstraction: The TaskGroup implementation is well-designed and focused, providing a clear API for tracking background tasks.

  2. Good documentation fixes: Fixed comment syntax from // to /// in engine/packages/config/src/config/runtime.rs:12-13.

  3. Improved logging: Better structured logging with ?remaining_workflows instead of ?self.running_workflows.len() in engine/packages/gasoline/src/worker.rs:326.

  4. Proper tracing: Good use of .in_current_span() and .instrument() for maintaining trace context across task boundaries.

🔍 Issues & Concerns

1. Critical: Race condition in TaskGroup::wait_idle (task_group.rs:43-56)

There's a potential race condition in the wait_idle implementation:

pub async fn wait_idle(&self) {
    // Fast path
    if self.running_count.load(Ordering::Acquire) == 0 {
        return;
    }

    // Wait for notifications until the count reaches zero
    loop {
        self.notify.notified().await;
        if self.running_count.load(Ordering::Acquire) == 0 {
            break;
        }
    }
}

Problem: Between the initial check and calling notified().await, tasks could complete and notify, causing a missed notification. The code could then wait forever on a notification that already happened.

Fix: Subscribe to notifications before checking the count:

pub async fn wait_idle(&self) {
    loop {
        let notified = self.notify.notified();
        if self.running_count.load(Ordering::Acquire) == 0 {
            return;
        }
        notified.await;
    }
}

2. Memory ordering mismatch (task_group.rs:25, 34)

The increment uses Ordering::Relaxed (line 25) but the decrement uses Ordering::AcqRel (line 34). This asymmetry is questionable:

self.running_count.fetch_add(1, Ordering::Relaxed);  // Line 25
// ...
if self2.running_count.fetch_sub(1, Ordering::AcqRel) == 1 {  // Line 34

Recommendation: Use Ordering::AcqRel for both operations to ensure proper synchronization, or document why Relaxed is sufficient for increments.

3. Unhandled panics (task_group.rs:27)

The TODO comment acknowledges panic handling is missing:

// TODO: Handle panics
let self2 = self.clone();
tokio::spawn(...)

Problem: If a spawned task panics, the counter will never be decremented, causing wait_idle() to hang forever during shutdown.

Fix: Wrap the future in AssertUnwindSafe and use catch_unwind:

use std::panic::AssertUnwindSafe;
use futures::FutureExt;

tokio::spawn(
    async move {
        let result = AssertUnwindSafe(fut).catch_unwind().await;
        if result.is_err() {
            tracing::error!("task panicked in TaskGroup");
        }
        
        if self2.running_count.fetch_sub(1, Ordering::AcqRel) == 1 {
            self2.notify.notify_waiters();
        }
    }
    .in_current_span(),
);

4. Potential resource leak (proxy_service.rs:814-819)

The release_in_flight task is spawned with tokio::spawn (not tracked by TaskGroup):

// Release in-flight counter when done
let state_clone = self.state.clone();
tokio::spawn(
    async move {
        state_clone.release_in_flight(client_ip, &actor_id).await;
    }
    .instrument(tracing::info_span!("release_in_flight_task")),
);

Problem: This task is spawned after the request completes but is not tracked by the TaskGroup. During shutdown, these tasks may not complete, potentially leaving in-flight counters incorrectly incremented.

Fix: Either:

  1. Track this with the TaskGroup: self.state.tasks.spawn(...)
  2. Or make it synchronous since it happens after the response is sent

5. Missing error handling (proxy_service.rs:2346)

The analytics insertion spawns a task that could fail silently:

tokio::spawn(
    async move {
        if let Err(error) = context_clone.insert_event().await {
            tracing::warn!(?error, "failed to insert guard analytics event");
        }
    }
    .instrument(tracing::info_span!("insert_event_task")),
);

Question: Should this also be tracked by TaskGroup to ensure analytics are flushed before shutdown?

Performance Considerations

  1. Atomic operations: The use of atomics is appropriate for this use case. The lock-free design is good for performance.

  2. Notification overhead: Using notify_waiters() is efficient since it only notifies when count reaches zero.

  3. No blocking: All operations are async-friendly, which is good.

Security Concerns

No significant security issues identified. The shutdown handling improvement actually enhances security by ensuring proper cleanup.

Test Coverage

Missing: I don't see any tests for the TaskGroup implementation. Consider adding:

  1. Unit tests for TaskGroup:

    • Basic spawn and wait_idle
    • Multiple concurrent tasks
    • Rapid spawn/complete cycles
    • Edge case: wait_idle when no tasks
  2. Integration tests:

    • Shutdown behavior with active WebSocket connections
    • Verify all tasks complete during graceful shutdown
    • Timeout behavior

Recommendations Summary

Must Fix (Blocking)

  1. Fix race condition in wait_idle() - Could cause shutdown hangs
  2. Handle panics in spawned tasks - Could cause shutdown hangs

Should Fix

  1. Fix memory ordering consistency (use AcqRel for increment too)
  2. Track release_in_flight task in TaskGroup or make it synchronous

Nice to Have

  1. Add test coverage for TaskGroup
  2. Consider tracking analytics tasks in TaskGroup for complete flush on shutdown
  3. Add documentation to TaskGroup explaining its purpose and usage

Overall Assessment

This is a solid improvement that addresses a real problem with WebSocket task handling during shutdown. The TaskGroup abstraction is clean and well-focused. However, there are critical concurrency issues (race condition, panic handling) that must be addressed before merging, as they could cause shutdown hangs in production.

The changes follow Rivet's code style and logging patterns well. Once the concurrency issues are resolved, this will be a valuable addition to the codebase.


Generated with Claude Code

@graphite-app
Copy link
Contributor

graphite-app bot commented Nov 13, 2025

Merge activity

@claude
Copy link

claude bot commented Nov 13, 2025

Pull Request Review: fix(guard): handle websocket tasks during shutdown

Summary

This PR introduces a TaskGroup abstraction to properly track and wait for WebSocket and other background tasks during guard shutdown. This addresses a gap where previously only HTTP requests were gracefully handled during shutdown, but spawned background tasks could be orphaned.

Code Quality & Best Practices

✅ Strengths

  1. Well-designed TaskGroup pattern (engine/packages/guard-core/src/task_group.rs)

    • Clean abstraction for tracking spawned tasks
    • Proper use of atomic operations with appropriate memory ordering
    • Good separation of concerns
  2. Improved shutdown flow (engine/packages/guard-core/src/server.rs:253-264)

    • Now waits for both graceful HTTP connection shutdown AND background tasks
    • Sequential waiting ensures all work completes before shutdown
  3. Code cleanup

    • Fixed comment markers from // to /// for proper doc comments (engine/packages/config/src/config/runtime.rs:12,14)
    • Improved logging consistency using structured logging (engine/packages/gasoline/src/worker.rs:326)
    • Removed unused defer! macro that was replaced by direct task spawning
  4. Consistent task spawning

    • All WebSocket-related tasks now tracked via self.state.tasks.spawn()
    • Added tracing spans to background tasks for better observability

Potential Issues & Concerns

🔴 Critical: Race Condition in TaskGroup::wait_idle()

Location: engine/packages/guard-core/src/task_group.rs:43-56

There's a race condition between the fast path check and the notification loop:

pub async fn wait_idle(&self) {
    // Fast path
    if self.running_count.load(Ordering::Acquire) == 0 {
        return;
    }

    // Wait for notifications until the count reaches zero
    loop {
        self.notify.notified().await;  // ⚠️ PROBLEM: Could miss notification
        if self.running_count.load(Ordering::Acquire) == 0 {
            break;
        }
    }
}

The Problem:

  1. Thread A calls wait_idle(), sees running_count = 1, passes fast path check
  2. Thread B (spawned task) completes, decrements to 0, calls notify_waiters()
  3. Thread A now calls notified().await - but the notification already happened!
  4. Thread A waits forever (or until another task completes)

Recommended Fix:

pub async fn wait_idle(&self) {
    loop {
        let notified = self.notify.notified();  // Register interest FIRST
        if self.running_count.load(Ordering::Acquire) == 0 {
            return;
        }
        notified.await;  // Then wait
    }
}

This ensures we register for notifications before checking the count, preventing the race.

🟡 Medium: Missing Panic Handling

Location: engine/packages/guard-core/src/task_group.rs:27

The TODO comment indicates panic handling is missing:

// TODO: Handle panics
let self2 = self.clone();
tokio::spawn(
    async move {
        fut.await;
        // ...
    }
    .in_current_span(),
);

Issue: If a spawned task panics, the counter is never decremented, causing wait_idle() to hang forever during shutdown.

Recommended Fix:

tokio::spawn(
    async move {
        let _guard = scopeguard::guard((), |_| {
            if self2.running_count.fetch_sub(1, Ordering::AcqRel) == 1 {
                self2.notify.notify_waiters();
            }
        });
        
        fut.await;
        
        // Cancel the guard - we'll decrement normally
        std::mem::forget(_guard);
        
        if self2.running_count.fetch_sub(1, Ordering::AcqRel) == 1 {
            self2.notify.notify_waiters();
        }
    }
    .in_current_span(),
);

Or use AssertUnwindSafe with catch_unwind if you want to handle panics explicitly.

🟡 Medium: Memory Ordering Inconsistency

Location: engine/packages/guard-core/src/task_group.rs:25

self.running_count.fetch_add(1, Ordering::Relaxed);  // ⚠️ Relaxed

But in spawn() cleanup:

if self2.running_count.fetch_sub(1, Ordering::AcqRel) == 1 {  // AcqRel

Issue: The increment uses Relaxed ordering, but the decrement uses AcqRel. While this might be safe (increments don't need synchronization), it's inconsistent and potentially confusing.

Recommendation: Use Ordering::Release for the increment to match the acquire-release semantics of the decrement, or document why Relaxed is sufficient.

🟢 Minor: Release Timing Change

Location: engine/packages/guard-core/src/proxy_service.rs:812-818

The in-flight counter release was moved from a deferred cleanup (using defer!) to after the request completes:

// OLD: Released via defer at start of function
// NEW: Released after request completes
let state_clone = self.state.clone();
tokio::spawn(
    async move {
        state_clone.release_in_flight(client_ip, &actor_id).await;
    }
    .instrument(tracing::info_span!("release_in_flight_task")),
);

Question: Was the defer! approach intentionally releasing the counter early (as soon as the request started being processed), or was this a bug? The new approach seems more correct (release after completion), but this behavioral change should be verified.

Performance Considerations

✅ Good

  • Atomic operations use appropriate ordering (mostly)
  • Fast path in wait_idle() avoids unnecessary waiting
  • Notify is efficient for waking waiters

⚠️ Consideration

  • Each spawn() call increments an atomic counter - overhead is minimal but worth noting for high-throughput scenarios
  • The notification mechanism wakes ALL waiters on each task completion - with only one waiter (shutdown), this is fine

Security Concerns

No security issues identified. The changes are focused on lifecycle management and don't touch authentication, authorization, or data handling.

Test Coverage

Recommendation: Add tests for TaskGroup:

  1. Basic spawn and wait functionality
  2. Race condition scenario (spawn multiple tasks, call wait_idle() concurrently)
  3. Panic handling once implemented
  4. Behavior when wait_idle() is called with no tasks

Example test structure:

#[tokio::test]
async fn test_task_group_wait_idle_race() {
    let group = TaskGroup::new();
    
    // Spawn a task that completes quickly
    group.spawn(async {
        tokio::time::sleep(Duration::from_millis(1)).await;
    });
    
    // Immediately wait - should not hang
    tokio::time::timeout(
        Duration::from_secs(1),
        group.wait_idle()
    ).await.expect("wait_idle should not hang");
}

Additional Notes

Style & Formatting

  • Comment fixes are good (doc comments now properly marked with ///)
  • Logging improvements follow the project's structured logging patterns
  • Frontend auto-generated file changes (routeTree.gen.ts) are expected

Documentation

Consider adding doc comments to TaskGroup:

/// A group of spawned tasks that can be waited on for completion.
/// 
/// Useful for ensuring all background work completes during graceful shutdown.
pub struct TaskGroup { ... }

Verdict

Status: ⚠️ Needs Changes - Critical race condition must be fixed

The concept and implementation are solid, but the race condition in wait_idle() and missing panic handling could cause shutdown hangs in production. These issues should be addressed before merging.

Required Changes:

  1. Fix the race condition in TaskGroup::wait_idle() (critical)
  2. Add panic handling to prevent counter leaks (high priority)

Recommended Changes:

  1. Add tests for TaskGroup
  2. Document memory ordering choices
  3. Verify the in-flight counter release timing change is intentional
  4. Add doc comments to public API

Great work on identifying and addressing the shutdown handling gap! Once the race condition is fixed, this will be a solid improvement to the system's reliability. 🚀

@claude
Copy link

claude bot commented Nov 13, 2025

Pull Request Review: Fix guard websocket tasks during shutdown

Summary

This PR addresses a critical shutdown issue by introducing proper tracking of background websocket tasks during guard shutdown. The implementation adds a TaskGroup to manage spawned tasks and ensures graceful shutdown waits for all tasks to complete.

Code Quality & Best Practices

Strengths ✓

  1. Clean abstraction: The TaskGroup implementation is well-designed with a simple, focused API
  2. Proper atomic operations: Uses appropriate memory orderings (Relaxed for increment, AcqRel for decrement, Acquire for loads)
  3. Tracing integration: Good use of in_current_span() and instrumentation
  4. Documentation: Clear inline comments explaining the shutdown flow
  5. Style compliance: Code follows Rust conventions and the codebase's hard-tab formatting

Improvements Needed

1. Critical: Race condition in wait_idle() ⚠️

Location: engine/packages/guard-core/src/task_group.rs:43-56

There's a potential race condition where tasks could complete between the fast-path check and the notification wait:

// Current code
if self.running_count.load(Ordering::Acquire) == 0 {
    return;  // Fast path
}

loop {
    self.notify.notified().await;  // If count became 0 before this, we'll wait forever
    if self.running_count.load(Ordering::Acquire) == 0 {
        break;
    }
}

Recommendation: Use a more robust pattern:

loop {
    let notified = self.notify.notified();
    if self.running_count.load(Ordering::Acquire) == 0 {
        return;
    }
    notified.await;
}

This ensures the notification is registered before checking the count, preventing missed notifications.

2. Unhandled panic issue ⚠️

Location: engine/packages/guard-core/src/task_group.rs:27

The TODO comment acknowledges this, but spawned tasks that panic will leave the counter in an incorrect state, potentially causing wait_idle() to hang indefinitely.

Recommendation: Wrap the future execution:

tokio::spawn(
    async move {
        let _guard = scopeguard::guard((), |_| {
            if self2.running_count.fetch_sub(1, Ordering::AcqRel) == 1 {
                self2.notify.notify_waiters();
            }
        });
        fut.await;
        std::mem::forget(_guard);
    }
    .in_current_span(),
);

Or use AssertUnwindSafe with catch_unwind if panics should be caught.

3. Memory ordering consideration

Location: engine/packages/guard-core/src/task_group.rs:25

The increment uses Relaxed ordering, which is fine for incrementing, but consider documenting why this is safe (because the decrement synchronizes).

4. Removed defer macro safety ⚠️

Location: engine/packages/guard-core/src/proxy_service.rs:814-819

The defer macro ensured release_in_flight was called even if the request handling panicked. The new code spawns the cleanup asynchronously after the request completes, but doesn't guarantee execution on panic.

Impact: If handle_websocket_upgrade or handle_http_request panics, in-flight counters may not be released properly.

Recommendation: Either:

  • Keep the defer pattern for safety, or
  • Document that panic=abort is expected, or
  • Ensure panic safety through other means

5. Frontend changes unrelated to PR

Location: frontend/src/routeTree.gen.ts

This file contains 941 lines of changes that appear to be auto-generated formatting changes unrelated to the guard shutdown fix. While the diff shows it's mostly whitespace/formatting (spaces to tabs), this makes the PR harder to review.

Recommendation: These should be in a separate PR or excluded if they're truly just generated artifacts with no functional changes.

Security Concerns

  1. Resource exhaustion: If tasks never complete, wait_idle() could block shutdown indefinitely. The timeout in server.rs mitigates this, but the root cause should be addressed.
  2. No task limit: The TaskGroup has no upper bound on spawned tasks. Consider adding a semaphore-based limit to prevent resource exhaustion.

Performance Considerations

Good

  • Fast-path optimization in wait_idle() avoids unnecessary waits
  • Atomic operations are efficient
  • Minimal overhead per spawned task

Potential issues

  • Every websocket spawn now has additional atomic operations overhead (minor)
  • If many tasks complete simultaneously, notify_waiters() could have contention (unlikely in practice)

Test Coverage

Missing: No tests found for the TaskGroup implementation. This is critical infrastructure that should be tested.

Recommendations:

  1. Add unit tests for TaskGroup:
    • Spawn multiple tasks and verify wait_idle() completes
    • Test fast-path optimization
    • Test concurrent spawns during wait_idle()
    • Test the race condition scenario
  2. Add integration tests for guard shutdown with active websockets

Additional Notes

Minor issues

  1. Line 323 in gasoline/worker.rs: Good fix to use structured logging consistently
  2. Line 160 in service-manager/lib.rs: Removing unused variable is good cleanup
  3. Comment fixes in config/runtime.rs: Good attention to documentation

Questions

  1. Is there a maximum expected number of concurrent websocket tasks? This could inform whether a bounded task group would be beneficial.
  2. What's the expected behavior if websocket tasks are still running when the shutdown timeout expires?

Recommendation

Conditionally approve with required changes:

  1. Fix the race condition in wait_idle() (critical)
  2. Address panic handling (either implement or document expectations)
  3. Consider separating frontend formatting changes
  4. Add basic tests for TaskGroup

The core solution is sound and addresses the shutdown issue effectively. With the race condition fix and panic handling, this will be production-ready.

@NathanFlurry NathanFlurry changed the base branch from 11-10-fix_improve_sigterm_handling_for_the_entire_runtime to graphite-base/3448 November 13, 2025 22:46
@NathanFlurry NathanFlurry force-pushed the 11-10-fix_guard_handle_websocket_tasks_during_shutdown branch from 6546966 to 7bde763 Compare November 13, 2025 22:47
@NathanFlurry NathanFlurry changed the base branch from graphite-base/3448 to main November 13, 2025 22:47
@vercel vercel bot temporarily deployed to Preview – rivetkit-serverless November 13, 2025 22:47 Inactive
@NathanFlurry NathanFlurry merged commit da324bd into main Nov 13, 2025
7 of 14 checks passed
@NathanFlurry NathanFlurry deleted the 11-10-fix_guard_handle_websocket_tasks_during_shutdown branch November 13, 2025 22:48
@claude
Copy link

claude bot commented Nov 13, 2025

Code Review: fix(guard): handle websocket tasks during shutdown

Summary

This PR introduces a TaskGroup mechanism to properly track and wait for WebSocket tasks during graceful shutdown. The implementation is generally solid and addresses a real issue where background tasks spawned for WebSocket connections weren't being awaited during shutdown.


Strengths

Clear architectural improvement: The TaskGroup abstraction provides a clean way to track background tasks and wait for them during shutdown.

Proper shutdown sequencing: The server now waits for both hyper's graceful shutdown AND the custom task group, ensuring all work completes.

Good use of atomics: The TaskGroup uses appropriate memory ordering (Relaxed for increment, AcqRel for decrement with notification check).

Documentation improvements: Fixed doc comment syntax (/////) in runtime.rs.

Consistent logging patterns: Fixed structured logging in gasoline/worker.rs to use ?remaining_workflows instead of inline formatting.


Issues & Concerns

🔴 CRITICAL: Race Condition in TaskGroup

Location: engine/packages/guard-core/src/task_group.rs:43-56

The wait_idle() implementation has a TOCTOU (time-of-check-time-of-use) race condition:

pub async fn wait_idle(&self) {
    // Fast path
    if self.running_count.load(Ordering::Acquire) == 0 {  // ← Check here
        return;
    }

    // Wait for notifications until the count reaches zero
    loop {
        self.notify.notified().await;  // ← Could miss notification if count became 0 between check and wait
        if self.running_count.load(Ordering::Acquire) == 0 {
            break;
        }
    }
}

Problem: If the last task completes between the initial check (line 45) and calling notified().await (line 51), the notification is lost and wait_idle() will hang forever.

Solution: Subscribe to notifications BEFORE checking the count:

pub async fn wait_idle(&self) {
    loop {
        let notified = self.notify.notified();  // Subscribe first
        if self.running_count.load(Ordering::Acquire) == 0 {
            return;
        }
        notified.await;  // Now safe to wait
    }
}

This ensures we either see count == 0 OR we're already subscribed when the notification fires.


🟡 ISSUE: Panic Handling Not Implemented

Location: engine/packages/guard-core/src/task_group.rs:27

// TODO: Handle panics
let self2 = self.clone();
tokio::spawn(
    async move {
        fut.await;  // ← If fut panics, count never decrements!

Problem: If a spawned task panics, the running count is never decremented, causing wait_idle() to hang indefinitely during shutdown.

Impact: Any panic in WebSocket handling, analytics insertion, or in-flight counter release will break graceful shutdown.

Solution: Wrap the future in AssertUnwindSafe and catch panics:

use std::panic::AssertUnwindSafe;
use futures::FutureExt;

tokio::spawn(
    async move {
        let _ = AssertUnwindSafe(fut).catch_unwind().await;
        
        if self2.running_count.fetch_sub(1, Ordering::AcqRel) == 1 {
            self2.notify.notify_waiters();
        }
    }
    .in_current_span(),
);

🟡 ISSUE: In-Flight Counter Release Semantics Changed

Location: engine/packages/guard-core/src/proxy_service.rs:812-819

Before: The defer macro ensured release_in_flight was spawned even on early returns or panics.

After: The release is spawned AFTER the request completes normally.

Problem: This is a subtle behavioral change. The old code would spawn the release task regardless of how handle_request exited (return, panic, cancel). The new code only spawns if the function returns normally.

Questions:

  • Is this intentional?
  • What happens if handle_http_request or handle_websocket_upgrade panics?
  • Should the release task be spawned in a defer-like manner (Drop guard) or is the current approach acceptable?

Recommendation: If the old behavior was correct, consider adding a Drop guard or moving the spawn to ensure it always happens:

struct ReleaseOnDrop {
    state: Arc<ProxyState>,
    client_ip: std::net::IpAddr,
    actor_id: Option<Id>,
}

impl Drop for ReleaseOnDrop {
    fn drop(&mut self) {
        let state = self.state.clone();
        let client_ip = self.client_ip;
        let actor_id = self.actor_id;
        tokio::spawn(async move {
            state.release_in_flight(client_ip, &actor_id).await;
        }.instrument(tracing::info_span!("release_in_flight_task")));
    }
}

🟢 Minor: Missing Default trait

Location: engine/packages/guard-core/src/task_group.rs:13

The new() constructor returns Arc<Self>, which is unusual. Consider:

impl Default for TaskGroup {
    fn default() -> Self {
        Self {
            running_count: AtomicUsize::new(0),
            notify: Notify::new(),
        }
    }
}

pub fn new() -> Arc<Self> {
    Arc::new(Self::default())
}

This allows users to create non-Arc'd instances if needed in tests.


🟢 Minor: Unused output in service_manager

Location: engine/packages/service-manager/src/lib.rs:163

Good cleanup changing Result::Ok(res) to Result::Ok(_) since the value wasn't used.


Performance Considerations

  1. Atomic operations: The use of Relaxed ordering for increment and AcqRel for decrement is appropriate and efficient.

  2. Notification strategy: notify_waiters() is called on every task completion when count reaches 0. This is fine since it's a rare event (only when transitioning to idle).

  3. No additional overhead during normal operation: The task tracking adds minimal overhead (atomic increment/decrement per task).


Security Considerations

No direct security issues identified. The shutdown handling ensures tasks complete before process exit, which prevents data loss.


Test Coverage

⚠️ No new tests added for the TaskGroup functionality. Recommended tests:

  1. Basic functionality: Spawn tasks, verify wait_idle() completes when all finish
  2. Race condition test: Rapidly spawn/complete tasks while calling wait_idle()
  3. Panic handling: Verify count decrements even if tasks panic (after implementing panic handling)
  4. Shutdown integration test: Verify guard waits for WebSocket tasks during SIGTERM

Recommendations

  1. MUST FIX: Address the race condition in wait_idle() before merging
  2. SHOULD FIX: Implement panic handling to prevent shutdown hangs
  3. SHOULD REVIEW: Verify the in-flight counter release semantics change is intentional
  4. NICE TO HAVE: Add unit tests for TaskGroup
  5. NICE TO HAVE: Add integration test for shutdown behavior

Style & Convention Compliance

✅ Follows Rust workspace conventions from CLAUDE.md
✅ Uses hard tabs for indentation
✅ Proper structured logging with tracing
✅ Consistent error handling patterns

The frontend routeTree.gen.ts changes appear to be auto-generated formatting (tabs → spaces conversion) and don't require review.


Overall Assessment

This PR addresses a real production issue (orphaned WebSocket tasks during shutdown), but the implementation has a critical race condition that must be fixed before merging. The panic handling TODO should also be addressed to prevent shutdown hangs.

Recommendation: Request changes for the race condition fix, strongly encourage panic handling implementation.

@claude
Copy link

claude bot commented Nov 13, 2025

PR Review: fix(guard): handle websocket tasks during shutdown

Summary

This PR addresses a shutdown handling issue in the guard service by introducing a TaskGroup mechanism to track spawned WebSocket tasks and ensure they complete gracefully before shutdown completes.

Code Quality & Best Practices

Strengths:

  • Clean implementation of the TaskGroup pattern with proper atomic operations
  • Good use of structured logging throughout (tracing::info_span!, ? formatting)
  • Proper instrumentation added to spawned tasks
  • Comment fixes (converting // to /// for doc comments) in engine/packages/config/src/config/runtime.rs:12-14
  • Consistent with codebase patterns for error handling and logging

Observations:

  1. Comment style improvements - Good catches on the doc comment formatting
  2. Logging improvement in gasoline/src/worker.rs:326 - Changed from remaining_workflows=?self.running_workflows.len() to ?remaining_workflows which is cleaner
  3. Removed unused macro - The defer! macro was removed as it's been replaced by explicit cleanup logic

Potential Issues & Concerns

1. Race Condition in TaskGroup (Critical)

Location: engine/packages/guard-core/src/task_group.rs:25-39

There's a subtle race condition in the TaskGroup implementation:

pub fn spawn<F, O>(self: &Arc<Self>, fut: F) {
    self.running_count.fetch_add(1, Ordering::Relaxed);  // Line 25
    
    let self2 = self.clone();
    tokio::spawn(
        async move {
            fut.await;
            
            if self2.running_count.fetch_sub(1, Ordering::AcqRel) == 1 {
                self2.notify.notify_waiters();
            }
        }
        .in_current_span(),
    );
}

Issue: The increment uses Ordering::Relaxed, but in wait_idle() we use Ordering::Acquire. This creates a potential race where:

  1. Thread A calls spawn() and increments with Relaxed ordering
  2. Thread B calls wait_idle() and loads with Acquire ordering
  3. Thread B might not see the increment and incorrectly think count is 0

Recommendation: Change line 25 to use Ordering::Release or Ordering::AcqRel:

self.running_count.fetch_add(1, Ordering::Release);

2. Missing Panic Handling (Medium Priority)

Location: engine/packages/guard-core/src/task_group.rs:27

The TODO comment mentions this, but it's worth emphasizing:

// TODO: Handle panics

Issue: If a spawned task panics, the running_count will never be decremented, causing wait_idle() to hang indefinitely during shutdown.

Recommendation: Wrap the future in a panic-catching mechanism:

async move {
    let result = std::panic::AssertUnwindSafe(fut).catch_unwind().await;
    
    // Decrement regardless of panic
    if self2.running_count.fetch_sub(1, Ordering::AcqRel) == 1 {
        self2.notify.notify_waiters();
    }
    
    if let Err(e) = result {
        tracing::error!("task panicked: {:?}", e);
    }
}

3. Notification Lost Wakeup (Low Priority)

Location: engine/packages/guard-core/src/task_group.rs:43-56

In wait_idle(), there's a theoretical edge case:

loop {
    self.notify.notified().await;  // Line 51
    if self.running_count.load(Ordering::Acquire) == 0 {  // Line 52
        break;
    }
}

Issue: Between the notification and the count check, new tasks could be spawned. While this is unlikely to cause issues in practice (shutdown scenario), it could lead to unnecessary loop iterations.

Recommendation: This is acceptable for the shutdown use case, but document this behavior. Alternatively, consider using a shutdown flag to prevent new spawns after wait_idle() is called.

4. Change in Release Timing (Medium Priority)

Location: engine/packages/guard-core/src/proxy_service.rs:812-819

The in-flight counter release logic was moved from a deferred cleanup (before the response) to after the response:

Before:

// Prepare to release in-flight counter when done
let state_clone = self.state.clone();
crate::defer! {
    tokio::spawn(async move {
        state_clone.release_in_flight(client_ip, &actor_id).await;
    }.instrument(tracing::info_span!("release_in_flight_task")));
}

After:

// Release in-flight counter when done
let state_clone = self.state.clone();
tokio::spawn(
    async move {
        state_clone.release_in_flight(client_ip, &actor_id).await;
    }
    .instrument(tracing::info_span!("release_in_flight_task")),
);

Question: Does this change the timing semantics? The old code would spawn the cleanup task before returning the response, while the new code spawns it after. This might affect:

  • When the in-flight counter is decremented
  • Rate limiting behavior during high load

Recommendation: Verify this doesn't change the intended behavior. If the timing matters, consider spawning before the response is returned.

Performance Considerations

  1. Atomic Operations: The use of atomics is appropriate and efficient for this use case
  2. Notify Overhead: tokio::sync::Notify is efficient, but note that notify_waiters() on line 35 wakes all waiters. In typical shutdown scenarios, there should only be one waiter, so this is fine
  3. Additional Spawns: The new task tracking doesn't add significant overhead since tasks were already being spawned

Security Concerns

No significant security concerns identified. The shutdown handling improvement actually enhances reliability, which is security-adjacent.

Test Coverage

Observation: The existing test simple_websocket_test2.rs doesn't test shutdown behavior.

Recommendation: Consider adding a test that:

  1. Spawns multiple WebSocket tasks
  2. Initiates shutdown
  3. Verifies all tasks complete before shutdown finishes
  4. Verifies the shutdown timeout works correctly

Example test structure:

#[tokio::test]
async fn test_graceful_shutdown_with_websockets() {
    // Start server with multiple WebSocket connections
    // Trigger shutdown signal
    // Verify all WebSocket tasks complete
    // Verify shutdown completes within timeout
}

Missing Newline

Location: engine/packages/service-manager/Cargo.toml:19

Good catch adding the trailing newline.

Frontend Changes

The changes to frontend/src/routeTree.gen.ts appear to be auto-generated code reorganization (import reordering, formatting changes). These look fine.

Overall Assessment

This is a solid fix for handling WebSocket tasks during shutdown. The core implementation is good, but there are some concurrency concerns that should be addressed:

Must Fix:

  • Memory ordering issue in TaskGroup::spawn() (use Ordering::Release instead of Relaxed)
  • Panic handling in spawned tasks (prevents shutdown hangs)

Should Consider:

  • Verify the timing change for release_in_flight doesn't affect rate limiting
  • Add shutdown-specific tests

Nice to Have:

  • Document the TaskGroup behavior more thoroughly
  • Add metrics for tracked task counts

The PR moves in the right direction by properly tracking background tasks during shutdown. With the memory ordering fix and panic handling, this should be production-ready.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants