Skip to content

Conversation

@MasterPtato
Copy link
Contributor

No description provided.

@vercel
Copy link

vercel bot commented Nov 8, 2025

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Preview Comments Updated (UTC)
rivetkit-serverless Error Error Nov 13, 2025 10:46pm
3 Skipped Deployments
Project Deployment Preview Comments Updated (UTC)
rivet-cloud Ignored Ignored Nov 13, 2025 10:46pm
rivet-inspector Ignored Ignored Preview Nov 13, 2025 10:46pm
rivet-site Ignored Ignored Preview Nov 13, 2025 10:46pm

Copy link
Contributor Author

MasterPtato commented Nov 8, 2025


How to use the Graphite Merge Queue

Add the label merge-queue to this PR to add it to the merge queue.

You must have a Graphite account in order to use the merge queue. Sign up using this link.

An organization admin has enabled the Graphite Merge Queue in this repository.

Please do not merge from GitHub as this will restart CI on PRs being processed by the merge queue.

This stack of pull requests is managed by Graphite. Learn more about stacking.

@claude
Copy link

claude bot commented Nov 8, 2025

PR Review: Smart Pulling for Horizontally Scaled Workers

Overview

This PR implements intelligent workflow distribution across horizontally scaled workers in the Gasoline workflow engine. The key innovation is a hash-based consistent assignment algorithm with load shedding based on CPU usage.

Key Changes

1. Worker Identification Refactor

  • Renamed: worker_instance_idworker_id throughout the codebase
  • Impact: Affects metrics, database keys, and Grafana dashboards
  • Assessment: ✅ Good - More accurate terminology since workers are the primary entity

2. Smart Workflow Distribution Algorithm

The core algorithm in pull_workflows (engine/packages/gasoline/src/db/kv/mod.rs:985-1264):

a) Active Worker Discovery

  • Queries workers that have pinged within 2 * PING_INTERVAL (20 seconds)
  • Sorts worker IDs for global consistency across all workers
  • Determines current worker's index in the sorted list

b) Workflow Assignment via Consistent Hashing

let wf_hash = hash(wf.earliest_wake_condition_ts);
let wf_worker_idx = wf_hash % active_worker_count;

// Assign to current worker OR next worker (for redundancy)
wf_worker_idx == current_worker_idx || wf_worker_idx == next_worker_idx

c) CPU-Based Load Shedding

let cpu_usage = system.cpu_usage();
let load_shed_ratio_x1000 = calc_pull_ratio(
    (cpu_usage.max(100.0) * 10.0) as u64,
    500, 1000,  // At 50% CPU: pull all (1000/1000)
    800, 100    // At 80% CPU: pull 10% (100/1000)
);

3. New System Monitoring

  • Added sysinfo dependency for CPU usage tracking
  • New system::SystemInfo struct (engine/packages/gasoline/src/db/kv/system.rs)
  • Respects MINIMUM_CPU_UPDATE_INTERVAL for efficiency

4. Smarter Bump Subscriptions

  • New BumpSubSubject enum variants for targeted notifications:
    • Worker - Generic worker bump
    • WorkflowComplete { workflow_id } - Specific workflow completion
    • SignalPublish { to_workflow_id } - Signal for specific workflow
  • Reduces unnecessary wake-ups by targeting specific workflows

Code Quality Assessment

Strengths ✅

  1. Excellent Algorithm Design: The combination of consistent hashing + redundancy + load shedding is sophisticated and production-ready

  2. Reduced Contention: Smart use of snapshot isolation and splitting into two transactions minimizes conflicts

  3. Observability: Proper metrics with worker_id attribute for monitoring distribution

  4. Graceful Degradation: Falls back to worker index 0 if current worker not found in active list

  5. Documentation: Good inline comments explaining the algorithm

Concerns and Issues ⚠️

1. CRITICAL: CPU Usage Bug (engine/packages/gasoline/src/db/kv/mod.rs:1016)

let load_shed_ratio_x1000 = calc_pull_ratio(
    (cpu_usage.max(100.0) * 10.0) as u64,  // ❌ BUG HERE
    500, 1000, 800, 100
);

Issue: cpu_usage.max(100.0) means if CPU is < 100%, it gets clamped to 100%, then multiplied by 10 = 1000, which is always at the 80% CPU threshold.

Should be: cpu_usage.min(100.0) to clamp values above 100%

Impact: Load shedding is likely not working as intended. All workers are probably operating at minimum pull ratio (100/1000 = 10%).

2. Load Shedding Logic Inversion (engine/packages/gasoline/src/db/kv/mod.rs:1141)

if pseudorandom_value_x1000 < load_shed_ratio_x1000 {
    return false;  // Skip this workflow
}

Issue: The comparison seems inverted. If load_shed_ratio_x1000 = 1000 (pull everything), then pseudorandom_value_x1000 < 1000 is almost always true, causing the workflow to be skipped.

Expected behavior:

  • High CPU → small ratio → skip most workflows ✅
  • Low CPU → large ratio → process most workflows ❌ (currently skips them)

Should probably be: pseudorandom_value_x1000 >= load_shed_ratio_x1000 OR rename the ratio variable to load_shed_threshold_x1000

3. Redundancy May Cause Double Processing

// Every worker pulls workflows assigned to current worker AND next worker
wf_worker_idx == current_worker_idx || wf_worker_idx == next_worker_idx

Concern: With 2 workers attempting to lease the same workflow, this increases transaction conflicts. The lease mechanism prevents double execution, but causes wasted work.

Question: Has this redundancy overhead been measured? Is it necessary?

4. Hash Stability Issue (engine/packages/gasoline/src/db/kv/mod.rs:1131)

wf.earliest_wake_condition_ts.hash(&mut hasher);

Issue: The comment says "consistent for hashing purposes because when it changes it means a worker has leased it", but what happens during that transition?

  • Worker A and B both see earliest_wake_condition_ts = T1
  • Worker A leases, changing it to T2
  • Worker B's next poll sees T2, rehashes to different worker

This could cause workflow assignment to "bounce" between workers.

Suggestion: Consider hashing workflow_id instead for true consistency.

5. Worker Key Writes (engine/packages/gasoline/src/db/kv/keys/worker.rs)

The new file adds worker tracking keys. However, I notice:

tx.write(&keys::workflow::WorkerIdKey::new(wf.workflow_id), worker_id)?;

Question: Is this WorkerIdKey being read anywhere? If it's only for debugging/metrics, consider documenting that.

6. Minor: Magic Number (engine/packages/gasoline/src/db/kv/mod.rs:1137)

1234i32.hash(&mut hasher);  // "Add a little pizazz"

While the comment is amusing, consider using a named constant like HASH_SALT for clarity.

Performance Considerations

Positive ➕

  • Two-transaction approach minimizes lease contention window
  • Snapshot isolation reduces conflicts during wake condition reads
  • Buffered async operations (.buffer_unordered(1024))

Questions ❓

  1. CPU polling overhead: How often is cpu_usage() called? With MINIMUM_CPU_UPDATE_INTERVAL, should be fine, but worth monitoring.

  2. Active worker query cost: Querying all active workers on every pull. With dozens/hundreds of workers, is this acceptable?

  3. Hash distribution: Has the actual distribution been verified with real workflow patterns?

Security Concerns

No major security issues identified. The CPU-based load shedding is a good defense against overload.

Test Coverage

⚠️ No test changes in this PR

Critical test gaps:

  1. Unit tests for calc_pull_ratio function
  2. Tests verifying hash distribution is uniform
  3. Tests for load shedding behavior at different CPU levels
  4. Integration tests for multi-worker scenarios
  5. Tests for edge cases (1 worker, worker dropout, etc.)

Recommendations

Must Fix 🔴

  1. Fix CPU usage bug: Change max(100.0) to min(100.0)
  2. Fix/verify load shedding comparison: Either invert the comparison or rename the variable
  3. Add tests: At minimum, unit tests for the pull ratio calculation

Should Consider 🟡

  1. Use workflow_id for hash stability instead of earliest_wake_condition_ts
  2. Add metrics for:
    • Workflows assigned vs actually leased per worker
    • Lease conflicts per worker
    • CPU-based pull ratio in effect
  3. Document the WorkerIdKey usage and purpose
  4. Consider making redundancy factor configurable (currently hardcoded to 1)

Nice to Have 🟢

  1. Add a constant for the hash salt (1234)
  2. Consider adding a feature flag to disable load shedding for testing
  3. Add dashboard panels showing worker load distribution

Conclusion

This is a well-architected feature with sophisticated load balancing, but has critical bugs in the load shedding logic that need to be fixed before merge. The core consistent hashing algorithm is sound, but the CPU-based filtering appears broken.

Recommendation: Request changes for the CPU bug and load shedding logic. With those fixes + tests, this should be good to merge.


Great work on the overall design! The hash-based distribution with redundancy is a smart approach for horizontal scaling. 🎯

@pkg-pr-new
Copy link

pkg-pr-new bot commented Nov 8, 2025

More templates

@rivetkit/actor

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/actor@3417

@rivetkit/cloudflare-workers

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/cloudflare-workers@3417

@rivetkit/core

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/core@3417

@rivetkit/db

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/db@3417

@rivetkit/framework-base

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/framework-base@3417

@rivetkit/next-js

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/next-js@3417

@rivetkit/react

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/react@3417

rivetkit

pnpm add https://pkg.pr.new/rivet-dev/rivet/rivetkit@3417

@rivetkit/sql-loader

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/sql-loader@3417

@rivetkit/engine-runner

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/engine-runner@3417

@rivetkit/engine-runner-protocol

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/engine-runner-protocol@3417

commit: 09a68bc

@MasterPtato MasterPtato force-pushed the 11-07-feat_gas_implement_smart_pulling_for_horizontally_scaled_workers branch from d4a6b38 to 6f27fe7 Compare November 8, 2025 02:22
@claude
Copy link

claude bot commented Nov 8, 2025

PR Review: Smart Pulling for Horizontally Scaled Workers

Summary

This PR implements smart workflow pulling for horizontally scaled workers in the Gasoline workflow engine. The changes introduce CPU-based load shedding, worker coordination via active worker tracking, and more targeted pub/sub notifications.


Architecture & Design

Strengths:

  • Worker coordination: The new ActiveWorkerIdxKey index allows workers to discover each other and coordinate workflow distribution using consistent hashing
  • Load shedding: CPU-based load shedding via calc_pull_ratio helps prevent system overload
  • Targeted notifications: BumpSubSubject enum now supports workflow-specific and signal-specific subscriptions, reducing unnecessary wake-ups
  • System monitoring: Integration of sysinfo crate for CPU usage tracking provides real-time system awareness

Concerns:

  1. Worker instance → worker rename: The rename from worker_instance_id to worker_id is good for clarity, but ensure all metrics/dashboards are updated (I see Grafana dashboards were updated ✓)
  2. Subspace migration: The removal of worker_instance.rs and creation of worker.rs suggests a schema change. Ensure this is backward compatible or has a migration path.

Code Quality & Best Practices

Positive:

  • Follows Rust conventions and project patterns
  • Good use of structured logging with tracing (e.g., line 1016 uses ?worker_id)
  • Proper error handling throughout
  • Clean separation of concerns (system info in separate module)

Issues:

  1. Math error in calc_pull_ratio (line 3039):

    let neg_b = ay * neg_dy / dx;

    This should likely be ay * dx to get the y-intercept. The current formula doesn't match standard line equation derivation. Given the function calculates a linear interpolation between points (ax, ay) and (bx, by), the formula appears incorrect. Expected:

    // For line through (ax, ay) and (bx, by): y = mx + b
    // where m = (by - ay) / (bx - ax) = -neg_dy / dx
    // b = ay - m * ax = ay + (neg_dy / dx) * ax
    let b = ay + (neg_dy * ax) / dx;
    return b.saturating_sub(x * neg_dy / dx);
  2. CPU usage calculation edge case (system.rs:22-23):

    if self.last_cpu_usage_read.elapsed() > MINIMUM_CPU_UPDATE_INTERVAL {

    On first call, this may return stale/zero values. Consider forcing an initial refresh in new().

  3. Load shedding parameters hardcoded (line 1016):

    calc_pull_ratio((cpu_usage.max(100.0) * 10.0) as u64, 500, 1000, 800, 100);

    These magic numbers (500, 1000, 800, 100) should be constants with documentation explaining the threshold values.

  4. Database transaction size (line 1188):

    .buffer_unordered(1024)

    The TODO comment acknowledges this, but buffering 1024 concurrent operations could be memory-intensive. Consider making this configurable or using a smaller default.


Potential Bugs & Edge Cases

  1. Worker ping race condition (worker.rs:84-87):
    The worker updates its ping before starting work. If the worker crashes between ping update and starting the main loop, it might appear active but isn't pulling workflows.

  2. Active worker index cleanup (mod.rs:809-811):
    When marking a worker inactive, the code only removes the index entry but doesn't handle cleanup of leases. This should be okay since clear_expired_leases handles it, but the timing window could leave orphaned leases.

  3. Hash collision in workflow distribution (line 1145):

    let wf_worker_idx = wf_hash % active_worker_count;

    With worker count changes, workflow distribution will shift. This is acceptable but could cause temporary duplicate pulls during worker scaling events.

  4. Redundant workflow pulling (lines 1147-1152):
    The code pulls workflows for current worker AND next worker for redundancy. This increases transaction conflicts. Consider adding metrics to track conflict rates and potentially making this configurable.

  5. Pseudorandom value calculation (lines 1134-1139):
    The hasher is reused which means it builds on the previous state. This might be intentional for determinism, but it's subtle and could be fragile.


Performance Considerations

Improvements:

  • Snapshot isolation for active worker reads (line 1036) reduces contention ✓
  • Smart filtering reduces unnecessary workflow pulls ✓
  • Targeted pub/sub subjects reduce broadcast load ✓

Concerns:

  1. Active worker scan on every pull (lines 1028-1042):
    Every pull_workflows call scans all active workers. With many workers, this could be expensive. Consider caching the worker list with a TTL.

  2. Wake condition deduplication (lines 1094-1122):
    This O(n²) loop could be optimized with a HashMap for workflows with many wake conditions.

  3. CPU usage tracking overhead (system.rs):
    The sysinfo crate can be expensive on some platforms. The MINIMUM_CPU_UPDATE_INTERVAL helps, but consider monitoring the overhead.

  4. Lease checking (lines 1156-1192):
    Buffering 1024 concurrent lease checks could create a transaction conflict storm. Consider rate-limiting or batching differently.


Security Concerns

No major security issues identified. The changes are primarily internal orchestration logic.

Minor note: The worker_id is generated from the datacenter label (worker.rs:53). Ensure datacenter labels are validated to prevent injection issues (likely already handled elsewhere).


Test Coverage

The PR doesn't include test changes. Given the complexity of the worker coordination logic, consider adding:

  1. Unit tests for calc_pull_ratio with various inputs
  2. Integration tests for worker coordination and load distribution
  3. Tests for edge cases (single worker, worker joins/leaves during pull)

Recommendations

Must fix:

  1. ✅ Verify the math in calc_pull_ratio function (line 3039)

Should fix:
2. Extract magic numbers in load shedding to named constants
3. Initialize CPU usage in SystemInfo::new() to avoid stale first reads
4. Add unit tests for the new logic

Nice to have:
5. Add metrics for transaction conflict rates
6. Consider caching active worker list with TTL
7. Make redundancy level (current + next worker) configurable
8. Optimize wake condition deduplication with HashMap


Conclusion

This is a solid implementation that addresses horizontal scaling concerns with thoughtful coordination mechanisms. The main issues are:

  • Math error in calc_pull_ratio that needs verification
  • Some performance optimizations for high-scale scenarios
  • Missing test coverage

The architecture is sound and follows project conventions well. The targeted pub/sub approach is particularly nice for reducing unnecessary worker wake-ups.

@MasterPtato MasterPtato force-pushed the 11-07-feat_gas_implement_smart_pulling_for_horizontally_scaled_workers branch from 6f27fe7 to e5c1006 Compare November 8, 2025 02:24
@claude
Copy link

claude bot commented Nov 8, 2025

PR Review: Smart Pulling for Horizontally Scaled Workers

This is a well-architected feature that implements intelligent workflow distribution across horizontally scaled workers. The implementation shows good attention to performance and scalability.

Strengths

Excellent architecture: The hash-based workflow distribution algorithm ensures even load balancing across workers while maintaining consistency
Load shedding: CPU-based load shedding via calc_pull_ratio prevents overload during high CPU usage
Proper refactoring: Clean rename from worker_instance_id to worker_id with consistent updates across metrics and Grafana dashboards
Redundancy: Each worker pulls workflows for itself + next worker, providing fault tolerance
Smarter pub/sub: Subject-specific bump subscriptions (BumpSubSubject) reduce unnecessary wake-ups

Code Quality Issues

1. Potential Logic Error in Load Shedding (line 1016 in db/kv/mod.rs)

let load_shed_ratio_x1000 = calc_pull_ratio((cpu_usage.max(100.0) * 10.0) as u64, 500, 1000, 800, 100);

Issue: cpu_usage.max(100.0) should likely be cpu_usage.min(100.0). The current code will use CPU usage or 100.0, whichever is larger, which doesn't make sense for capping. If CPU usage is over 100%, this will use that higher value.

Fix:

let load_shed_ratio_x1000 = calc_pull_ratio((cpu_usage.min(100.0) * 10.0) as u64, 500, 1000, 800, 100);

2. Unnecessary Mutex Lock Duration (line 1014 in db/kv/mod.rs)

let cpu_usage = { self.system.lock().await.cpu_usage() };

The block scoping is good, but cpu_usage() internally refreshes CPU stats which can be expensive. Consider caching this value or moving CPU monitoring to the background gc task.

3. Import Organization (line 3 in db/kv/mod.rs)

use std::{
    collections::{HashMap, HashSet},
    hash::{DefaultHasher, Hash, Hasher},
    sync::Arc,
    time::Instant,
};

Per CLAUDE.md guidelines: "Always add imports at the top of the file inside of inline within the function." These imports are correctly at the top, which is good.

4. Missing Unit Tests

The calc_pull_ratio function implements critical load shedding logic but lacks unit tests. Consider adding:

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_calc_pull_ratio() {
        // Test at minimum CPU (50%)
        assert_eq!(calc_pull_ratio(500, 500, 1000, 800, 100), 1000);
        
        // Test at maximum CPU (80%)
        assert_eq!(calc_pull_ratio(800, 500, 1000, 800, 100), 100);
        
        // Test at midpoint CPU (65%)
        assert_eq!(calc_pull_ratio(650, 500, 1000, 800, 100), 550);
    }
}

Performance Considerations

⚠️ Snapshot Isolation for Active Workers (line 1036): The query for active workers uses Snapshot isolation level which is good for reducing contention, but could lead to slightly stale worker lists. This is likely acceptable given the 2x PING_INTERVAL window.

⚠️ Buffer Size (lines 1188, 1245): There are TODOs about buffer_unordered(1024). This is a reasonable default, but consider making it configurable if you expect extremely high workflow counts.

Security Concerns

No security issues identified. The implementation properly handles:

  • Lease validation to prevent duplicate processing
  • Workflow state checks before operations
  • Appropriate error handling

Test Coverage

Gap: No integration tests specifically for the smart pulling algorithm. Consider adding tests that:

  • Verify workflow distribution across multiple workers
  • Test failover when workers become inactive
  • Validate load shedding under high CPU

Minor Issues

  1. Line 1016: Comment says "We will gradually pull less workflows" - this should be "fewer workflows" (countable noun)

  2. Line 1137: Magic number 1234i32 for hash "pizazz" - consider a named constant like HASH_SALT for clarity

  3. Logging: Consider adding a debug log showing the calculated load_shed_ratio_x1000 and active_worker_count to help diagnose load distribution issues

Documentation

📝 The PR description is empty. Consider adding:

  • Overview of the smart pulling algorithm
  • Explanation of the load shedding formula
  • Migration notes (if any)
  • Performance impact expectations

Recommendation

Approve with minor fixes. The critical issue is the .max(100.0) which should be .min(100.0). The other suggestions are nice-to-haves that can be addressed in follow-up PRs.

Great work on this complex distributed systems feature! 🎉

@MasterPtato MasterPtato force-pushed the 11-07-fix_pb_dont_immediately_reallocate_actors_on_runner_stopping branch from 79bbca9 to 87f1c1a Compare November 10, 2025 22:49
@MasterPtato MasterPtato force-pushed the 11-07-feat_gas_implement_smart_pulling_for_horizontally_scaled_workers branch from e5c1006 to ff49fad Compare November 10, 2025 22:49
@MasterPtato MasterPtato marked this pull request as ready for review November 10, 2025 22:50
@claude
Copy link

claude bot commented Nov 10, 2025

PR Review: Smart Pulling for Horizontally Scaled Workers

Overview

This PR implements smart workflow pulling for horizontally scaled Gasoline workers, enabling efficient distribution of workflow execution across multiple worker instances. The implementation includes CPU-based load shedding and consistent hashing for workflow distribution.

Positive Aspects

Architecture & Design

  • Smart load distribution: The consistent hashing approach (lines 1127-1153 in db/kv/mod.rs) is well-designed, using earliest_wake_condition_ts for stable hashing while workflows are leased
  • Redundancy strategy: Workers pull workflows assigned to themselves plus the next worker (lines 1147-1152), providing fault tolerance with minimal overhead
  • Load shedding: CPU-based adaptive pulling (lines 1014-1016) helps prevent overload
  • Targeted subscriptions: The new BumpSubSubject enum enables more precise pub/sub notifications, reducing unnecessary wake-ups

Code Quality

  • Consistent naming: Renamed worker_instance_id to worker_id throughout (Grafana dashboards, metrics, etc.)
  • Good separation of concerns: New system.rs module cleanly encapsulates system metrics collection
  • Proper instrumentation: Worker ID tracking in metrics and logs

Issues & Concerns

1. Critical: Load Shedding Logic Bug

// line 1015-1016 in db/kv/mod.rs
let cpu_usage = { self.system.lock().await.cpu_usage() };
let load_shed_ratio_x1000 = calc_pull_ratio((cpu_usage.max(100.0) * 10.0) as u64, 500, 1000, 800, 100);

Problem: cpu_usage.max(100.0) should be cpu_usage.min(100.0). The max function will always return at least 100.0, making the calculation incorrect for CPU usage below 100%.

Impact: Load shedding will not work as intended for normal CPU loads (0-99%), potentially causing performance issues.

Fix: Change to cpu_usage.min(100.0) or remove the clamping if cpu_usage() is already bounded.

2. Missing calc_pull_ratio Function

The PR references calc_pull_ratio but doesn't show its implementation. Need to verify:

  • Is it defined somewhere in the codebase?
  • What's its expected behavior?
  • Are the parameters (500, 1000, 800, 100) documented?

3. Race Condition in Worker Ping

// lines 1009-1010 in db/kv/mod.rs
let active_workers_after = now - i64::try_from(PING_INTERVAL.as_millis() * 2)?;

This allows workers to be considered active for 2 ping intervals, but combined with the update_worker_ping call in tick() (line 84 in worker.rs), there's a window where a worker might:

  1. Not be in the active worker list yet (first ping)
  2. Try to pull workflows
  3. Default to worker index 0 (lines 1082-1089)

Impact: Multiple workers starting simultaneously might all default to index 0, causing contention.

Suggestion: Ensure worker ping is updated and confirmed before attempting to pull workflows, or handle this case more gracefully.

4. Hash Collision Handling

// lines 1134-1139 in db/kv/mod.rs
let pseudorandom_value_x1000 = {
    1234i32.hash(&mut hasher);
    hasher.finish() % 1000
};

Concern: Reusing the same hasher after calling finish() is unusual. The hasher state continues after finish(), which may lead to unexpected behavior.

Suggestion: Create a new hasher for the pseudorandom value or use a different approach:

let mut pseudo_hasher = DefaultHasher::new();
wf.earliest_wake_condition_ts.hash(&mut pseudo_hasher);
1234i32.hash(&mut pseudo_hasher);
let pseudorandom_value_x1000 = pseudo_hasher.finish() % 1000;

5. Load Shedding Logic Inverted

// line 1141-1143
if pseudorandom_value_x1000 < load_shed_ratio_x1000 {
    return false;
}

This appears inverted. If load_shed_ratio_x1000 is high (low load), we want to pull more workflows. But this code rejects workflows when the random value is less than the ratio.

Expected behavior: When ratio is 1000 (no shedding), pull all workflows. When ratio is 100 (heavy shedding), pull only 10%.

Fix needed: Either invert the logic or clarify the semantics of calc_pull_ratio.

6. Error Handling: Silent Default

// lines 1082-1089
} else {
    tracing::error!(?worker_id, "current worker should have valid ping, defaulting to 0");
    0
};

While logging the error is good, defaulting to 0 silently continues execution in an error state. Consider:

  • Returning an error to trigger retry
  • Adding a metric for this condition
  • Documenting why defaulting to 0 is safe

7. Subscription Subject Specificity

The new targeted subscriptions are great, but there's inconsistency:

  • BumpSubSubject::Worker - broadcasts to all workers
  • BumpSubSubject::WorkflowComplete { workflow_id } - targeted to specific workflow
  • BumpSubSubject::SignalPublish { to_workflow_id } - targeted to specific workflow

Question: Should workers subscribe to all three types? The code shows workers subscribing to BumpSubSubject::Worker (line 75 in worker.rs), but how do they receive workflow-specific notifications?

8. System Metrics Thread Safety

// db/kv/mod.rs line 1014
let cpu_usage = { self.system.lock().await.cpu_usage() };

The mutex is held briefly, which is good. However, system::SystemInfo::cpu_usage() updates internal state (line 23 in system.rs). Consider:

  • Documenting that this method is not pure
  • Ensuring the 250ms MINIMUM_CPU_UPDATE_INTERVAL doesn't conflict with worker poll intervals

Performance Considerations

Positive

  • Load shedding prevents worker overload
  • Consistent hashing reduces workflow migration
  • Snapshot isolation for active worker and wake condition queries reduces contention

Concerns

  • Buffer size: buffer_unordered(1024) (line 1188) could consume significant memory. Is this limit appropriate for all deployment sizes?
  • Linear scan: The deduplication loop (lines 1094-1122) is O(n²) in worst case. For many wake conditions per workflow, consider using a HashMap.

Security

  • No concerns identified: The changes don't introduce obvious security vulnerabilities

Testing

Missing: No test changes are included in this PR. Given the complexity of:

  • Distributed worker coordination
  • Load shedding logic
  • Hash-based workflow distribution
  • Race conditions between workers

Recommendation: Add tests covering:

  1. Multiple workers pulling workflows concurrently
  2. Worker failure scenarios
  3. Load shedding at various CPU levels
  4. Hash distribution uniformity
  5. Worker ping expiration and recovery

Documentation

Missing:

  • No comments explaining the load shedding algorithm
  • The meaning of calc_pull_ratio parameters (500, 1000, 800, 100) is unclear
  • The redundancy strategy (current + next worker) isn't documented in code

Minor Issues

  1. .gitattributes changes: Adding docker configs as linguist-generated is good, but these should ideally be in a separate commit
  2. Grafana dashboard changes: Correctly updated worker_instance_idworker_id, but should verify these are regenerated from templates
  3. Cargo.toml ordering: Good job maintaining alphabetical order for universaldb (line 317-318 in Cargo.toml)
  4. Import style: Good adherence to CLAUDE.md style (imports at top of file, no glob imports from anyhow)

Recommendations

Must Fix

  1. Fix the cpu_usage.max(100.0) bug → cpu_usage.min(100.0)
  2. Verify load shedding logic direction (inverted conditional)
  3. Fix hasher reuse for pseudorandom value generation

Should Fix

  1. Handle worker ping race condition on startup
  2. Add test coverage for distributed worker scenarios
  3. Document the load shedding algorithm and parameters

Nice to Have

  1. Add metrics for worker defaulting to index 0 error case
  2. Consider HashMap for wake condition deduplication
  3. Add inline documentation for complex algorithms

Conclusion

This is a solid implementation of an important feature. The core distributed coordination logic is well-designed with good fault tolerance. However, there are several bugs in the load shedding logic that need to be fixed before merging. Once the critical issues are addressed and tests are added, this will be a valuable addition to the Gasoline workflow engine.

Overall Assessment: Needs work before merging due to critical bugs, but the architecture is sound.

@claude
Copy link

claude bot commented Nov 10, 2025

Pull Request Review: Smart Pulling for Horizontally Scaled Workers

Summary

This PR implements an intelligent workflow distribution system for horizontally scaled workers in the Gasoline workflow engine. The changes introduce CPU-based load shedding and hash-based workflow assignment to evenly distribute work across active workers.


Code Quality & Architecture ⭐

Strengths:

  • Clean separation of concerns: New modules (system.rs, subjects.rs) are well-organized
  • Smart architecture: The transition from worker_instance_id to worker_id simplifies the model (engine/packages/gasoline/src/db/kv/keys/worker.rs:187)
  • Granular pubsub subjects: The new BumpSubSubject enum (engine/packages/gasoline/src/db/mod.rs:340-344) enables targeted notifications:
    • Worker - global worker notifications
    • WorkflowComplete - workflow-specific completion events
    • SignalPublish - signal-specific notifications
  • Consistent hashing: Using earliest_wake_condition_ts for workflow assignment ensures stability (engine/packages/gasoline/src/db/kv/mod.rs:1131)

Concerns:

  1. Missing documentation: The calc_pull_ratio function (engine/packages/gasoline/src/db/kv/mod.rs:3032-3042) lacks inline comments explaining the linear interpolation logic. Consider adding:

    /// Calculates pull ratio using linear interpolation for load shedding.
    /// Returns a value 0-1000 where lower values mean more aggressive shedding.
    /// - (ax, ay) = point A (e.g., 50% CPU = 80% pull ratio)
    /// - (bx, by) = point B (e.g., 100% CPU = 10% pull ratio)
    
  2. Magic numbers: The load shedding parameters are hardcoded (engine/packages/gasoline/src/db/kv/mod.rs:1016):

    calc_pull_ratio((cpu_usage.max(100.0) * 10.0) as u64, 500, 1000, 800, 100)

    Consider extracting these as named constants:

    const CPU_THRESHOLD_LOW: u64 = 500;  // 50% CPU
    const PULL_RATIO_HIGH: u64 = 1000;   // Pull 100% of assigned workflows
    const CPU_THRESHOLD_HIGH: u64 = 800; // 80% CPU  
    const PULL_RATIO_LOW: u64 = 100;     // Pull 10% of assigned workflows

Potential Bugs & Edge Cases 🐛

Critical:

  1. Integer overflow in hash calculation (engine/packages/gasoline/src/db/kv/mod.rs:1138):

    1234i32.hash(&mut hasher);
    hasher.finish() % 1000 // 0-1000

    This creates values 0-999, but you're comparing with load_shed_ratio_x1000 which is also scaled by 1000. The comment says "0-1000" but modulo 1000 gives 0-999. Should be % 1001 or the comparison should be <=.

  2. Worker fallback logic (engine/packages/gasoline/src/db/kv/mod.rs:1083-1088):

    } else {
        tracing::error!(?worker_id, "current worker should have valid ping, defaulting to 0");
        0
    };

    Defaulting to index 0 could cause uneven distribution if multiple workers fail to find themselves. Consider returning an error or using a deterministic fallback based on worker_id hash.

Medium:
3. CPU usage clamping (engine/packages/gasoline/src/db/kv/mod.rs:1016):

cpu_usage.max(100.0) * 10.0

Using max(100.0) means any CPU usage below 100% gets clamped to 100%, which seems backwards. Should this be min(100.0) to cap at 100%?

  1. Race condition on metrics lock (engine/packages/gasoline/src/db/kv/mod.rs:613-620): While the UDB transaction provides atomicity, if multiple workers have expired locks simultaneously, they may all try to acquire it, causing unnecessary transaction conflicts. Consider adding exponential backoff.

Minor:
5. Redundant worker assignment (engine/packages/gasoline/src/db/kv/mod.rs:1148-1152): Each workflow is assigned to current worker AND next worker. While this provides redundancy, it doubles the lease contention. Document why 2x redundancy was chosen over other approaches (e.g., primary/secondary with backoff).


Performance Considerations ⚡

Improvements:

  • Reduced query scope: Using active_workers_after timestamp filter (engine/packages/gasoline/src/db/kv/mod.rs:1010) prevents scanning stale workers
  • Snapshot isolation: Smart use of Snapshot isolation level for active workers and wake conditions (engine/packages/gasoline/src/db/kv/mod.rs:1036, 1063) reduces transaction conflicts
  • Early lease checking: Filtering workflows before lease acquisition reduces unnecessary database operations

Concerns:

  1. O(n²) workflow deduplication (engine/packages/gasoline/src/db/kv/mod.rs:1094-1122): The nested loop to find and update workflows is O(n²). With many wake conditions per workflow, this could be slow. Consider using a HashMap<Id, MinimalPulledWorkflow> instead.

  2. CPU sampling frequency: The MINIMUM_CPU_UPDATE_INTERVAL from sysinfo crate (engine/packages/gasoline/src/db/kv/system.rs:22) may not reflect sudden load spikes. The system only refreshes CPU usage if enough time has elapsed, which could delay load shedding response.

  3. Sorting overhead (engine/packages/gasoline/src/db/kv/mod.rs:1072): Sorting all active worker IDs on every pull could be expensive with many workers. Consider if this sort is necessary or if the database already provides consistent ordering.


Security Considerations 🔒

Good practices:

  • No sensitive data in logs
  • Proper use of conflict ranges to prevent race conditions (engine/packages/gasoline/src/db/kv/mod.rs:541-545)
  • Isolation levels appropriately chosen to balance consistency and performance

No major security concerns identified.


Test Coverage 🧪

Missing:

  • No test files in the diff
  • Critical scenarios needing tests:
    1. Load shedding behavior at various CPU thresholds (0%, 50%, 80%, 100%+)
    2. Workflow distribution with 1, 2, 3, N workers
    3. Worker failover when workers become inactive
    4. Hash collision handling
    5. Edge case: single workflow with multiple wake conditions
    6. Edge case: worker_id not found in active workers list

Recommendation: Add integration tests in engine/packages/gasoline/tests/ covering the new smart pulling logic.


Additional Notes

Naming Consistency

The Grafana dashboards correctly updated worker_instance_idworker_id (6 dashboard files), which is good for consistency.

Dependency Addition

The sysinfo crate is appropriately added for CPU monitoring. Version 0.37.2 is recent and well-maintained.

Breaking Changes

The refactoring from worker_instance.rs to worker.rs appears to be a clean rename without breaking external APIs.


Recommendations

  1. High Priority:

    • Fix the integer overflow issue in modulo calculation
    • Fix the CPU clamping logic (max vs min)
    • Add comprehensive tests for the new smart pulling algorithm
  2. Medium Priority:

    • Document the calc_pull_ratio function
    • Extract magic numbers to named constants
    • Optimize workflow deduplication to O(n) using HashMap
    • Add fallback strategy documentation for worker not found scenario
  3. Low Priority:

    • Consider adding metrics for load shedding decisions
    • Add logging for the calculated pull ratio and CPU usage for debugging

Conclusion

This is a well-architected feature that significantly improves the scalability of the Gasoline workflow engine. The core logic is sound, but there are a few bugs and edge cases that need attention before merging. Once the critical issues are addressed and tests are added, this will be a solid improvement to the system.

Overall Assessment: Approve with changes requested for the critical bugs.

@claude
Copy link

claude bot commented Nov 11, 2025

Code Review: Smart Pulling for Horizontally Scaled Workers

Summary

This PR implements intelligent workflow distribution across horizontally scaled gasoline workers using consistent hashing and CPU-based load shedding. The implementation introduces a worker ping index and sophisticated pull logic to distribute workflows evenly while handling worker failures gracefully.

Strengths

Architecture & Design

  • Consistent hashing approach: Using earliest_wake_condition_ts for hashing ensures consistent worker assignment while preventing reassignment due to lease acquisition (engine/packages/gasoline/src/db/kv/mod.rs:1131-1134)
  • Redundancy pattern: Each worker pulls workflows for itself + next worker, providing failover redundancy while accepting increased transaction conflicts as a trade-off (engine/packages/gasoline/src/db/kv/mod.rs:1149-1154)
  • Load shedding: CPU-based throttling with linear mapping (50-100% CPU → 80-10% pull ratio) prevents cascade failures (engine/packages/gasoline/src/db/kv/mod.rs:1015-1018)
  • Targeted bump subjects: Replacing global bump_sub() with specific subjects (Worker, WorkflowComplete, SignalPublish) significantly reduces unnecessary worker wakeups (engine/packages/gasoline/src/db/mod.rs:340-344, engine/packages/gasoline/src/db/kv/subjects.rs:1-13)

Data Model

  • Active worker index: The ActiveWorkerIdxKey with timestamp-based ordering enables efficient active worker discovery (engine/packages/gasoline/src/db/kv/keys/worker.rs:53-73)
  • Proper cleanup: mark_worker_inactive ensures workers are removed from the active set during shutdown (engine/packages/gasoline/src/db/kv/mod.rs:801-823)

Issues & Concerns

Critical Issues

1. CPU Usage Calculation Precision Issue

Location: engine/packages/gasoline/src/db/kv/mod.rs:1016-1018

let cpu_usage = { self.system.lock().await.cpu_usage() };
let load_shed_ratio_x1000 = 
    calc_pull_ratio((cpu_usage.max(100.0) * 10.0) as u64, 500, 1000, 800, 100);

Problem: cpu_usage.max(100.0) should be cpu_usage.min(100.0) - you want to cap the maximum value at 100%, not set a minimum floor. Current code means any CPU usage below 100% gets treated as 100%.

Impact: Load shedding won't activate until CPU is exactly at or above 100%, defeating its purpose.

Fix:

let load_shed_ratio_x1000 = 
    calc_pull_ratio((cpu_usage.min(100.0) * 10.0) as u64, 500, 1000, 800, 100);

2. Race Condition in Worker Ping Index

Location: engine/packages/gasoline/src/db/kv/mod.rs:777-789

if let Some(last_last_ping_ts) =
    tx.read_opt(&last_ping_ts_key, Serializable).await?
{
    let active_worker_idx_key =
        keys::worker::ActiveWorkerIdxKey::new(last_last_ping_ts, worker_id);
    tx.delete(&active_worker_idx_key);
}

tx.write(&last_ping_ts_key, last_ping_ts)?;

let active_worker_idx_key =
    keys::worker::ActiveWorkerIdxKey::new(last_ping_ts, worker_id);
tx.write(&active_worker_idx_key, ())?;

Problem: If a worker restarts with the same ID between transactions, you could have multiple ActiveWorkerIdxKey entries for the same worker with different timestamps. The old entry deletion relies on reading the exact previous timestamp, but if the worker crashed and restarted, that read might get stale data.

Potential Impact: Duplicate worker entries in the active index could skew load distribution calculations.

Mitigation: Consider using a range delete or adding a secondary index to clean up all entries for a worker_id.

3. Integer Overflow Risk in calc_pull_ratio

Location: engine/packages/gasoline/src/db/kv/mod.rs:3034-3044

fn calc_pull_ratio(x: u64, ax: u64, ay: u64, bx: u64, by: u64) -> u64 {
    let neg_dy = ay - by;
    let dx = bx - ax;
    let neg_b = ay * neg_dy / dx;  // Potential overflow
    
    return neg_b.saturating_sub(x * neg_dy / dx);  // Also potential overflow
}

Problem: Multiplication before division can overflow with large u64 values. While current call site uses small values (500, 1000, 800, 100), this function lacks safety guarantees.

Fix: Add overflow checks or use checked arithmetic:

fn calc_pull_ratio(x: u64, ax: u64, ay: u64, bx: u64, by: u64) -> u64 {
    let neg_dy = ay - by;
    let dx = bx - ax;
    
    // Use checked math or restructure to avoid overflow
    let neg_b = ay.checked_mul(neg_dy)
        .and_then(|v| v.checked_div(dx))
        .unwrap_or(0);
    
    let slope_term = x.checked_mul(neg_dy)
        .and_then(|v| v.checked_div(dx))
        .unwrap_or(u64::MAX);
    
    neg_b.saturating_sub(slope_term)
}

High Priority Issues

4. MINIMUM_CPU_UPDATE_INTERVAL Not Documented

Location: engine/packages/gasoline/src/db/kv/system.rs:3, 22

The code references MINIMUM_CPU_UPDATE_INTERVAL from the sysinfo crate but doesn't document why it's important or what value it has. This is critical for understanding the refresh behavior.

Suggestion: Add a comment explaining that sysinfo requires waiting between CPU usage reads for accurate measurements.

5. Missing Error Handling for CPU Usage

Location: engine/packages/gasoline/src/db/kv/system.rs:21-32

The cpu_usage() function doesn't handle the case where no CPUs are detected (self.system.cpus().len() could theoretically be 0), which would cause division by zero.

Fix:

pub fn cpu_usage(&mut self) -> f32 {
    if self.last_cpu_usage_read.elapsed() > MINIMUM_CPU_UPDATE_INTERVAL {
        self.system.refresh_cpu_usage();
        self.last_cpu_usage_read = Instant::now();
    }

    let cpu_count = self.system.cpus().len();
    if cpu_count == 0 {
        tracing::warn!("no CPUs detected, returning 0% usage");
        return 0.0;
    }

    self.system
        .cpus()
        .iter()
        .fold(0.0, |s, cpu| s + cpu.cpu_usage())
        / cpu_count as f32
}

6. Snapshot Isolation May Miss Active Workers

Location: engine/packages/gasoline/src/db/kv/mod.rs:1032-1044

The active worker query uses Snapshot isolation to reduce contention. However, this means the worker list could be stale - a worker that just pinged might not be included, causing workflows to be incorrectly distributed.

Trade-off: This is mentioned as intentional for reducing contention, but the comment should explicitly call out this staleness and its implications.

Suggestion: Add a comment:

// This is Snapshot to reduce contention. Note: worker list may be slightly 
// stale (missing very recent pings), but this is acceptable since we use 
// redundant assignment (current + next worker) and leases prevent conflicts.
Snapshot,

Medium Priority Issues

7. Magic Numbers in Load Shedding

Location: engine/packages/gasoline/src/db/kv/mod.rs:1018

calc_pull_ratio((cpu_usage.min(100.0) * 10.0) as u64, 500, 1000, 800, 100);

These parameters (500, 1000, 800, 100) lack documentation. What do they represent?

Suggestion: Add constants with clear names:

const LOAD_SHED_START_CPU_X10: u64 = 500;  // Start load shedding at 50% CPU
const LOAD_SHED_END_CPU_X10: u64 = 1000;   // Max load shedding at 100% CPU  
const LOAD_SHED_START_RATIO: u64 = 800;    // Pull 80% of workflows at start
const LOAD_SHED_END_RATIO: u64 = 100;      // Pull 10% of workflows at 100% CPU

let load_shed_ratio_x1000 = calc_pull_ratio(
    (cpu_usage.min(100.0) * 10.0) as u64,
    LOAD_SHED_START_CPU_X10,
    LOAD_SHED_END_CPU_X10, 
    LOAD_SHED_START_RATIO,
    LOAD_SHED_END_RATIO,
);

8. Metrics Renamed But Not Documented

Location: All Grafana dashboard files

The metrics were renamed from worker_instance_id to worker_id, but there's no migration note or documentation about this breaking change in observability.

Impact: Existing alerts or dashboards outside of the repo may break.

Suggestion: Add a note in the PR description or CHANGELOG about this metric label rename.

9. Pseudorandom Hash Enhancement Has Magic Number

Location: engine/packages/gasoline/src/db/kv/mod.rs:1138-1141

// Add a little pizazz to the hash so its a different number than wf_hash but
// still consistent
1234i32.hash(&mut hasher);
hasher.finish() % 1000 // 0-1000

The comment "pizazz" is too informal for production code. Also, why specifically 1234?

Suggestion:

// Hash an additional constant to derive a distinct but consistent 
// pseudorandom value from the same workflow hash seed
const LOAD_SHED_HASH_SALT: i32 = 1234;
LOAD_SHED_HASH_SALT.hash(&mut hasher);
hasher.finish() % 1000 // Range: 0-999 (representing 0.0%-99.9%)

10. Glob Import Violation

Location: engine/packages/gasoline/src/db/kv/debug.rs:18

According to CLAUDE.md, glob imports from anyhow should be avoided, but I see the pattern is used elsewhere. If this is intentional, consider documenting the exception.

Low Priority / Style Issues

11. .gitattributes Changes

Location: .gitattributes:18-23

The docker-related linguist-generated markers are good, but per CLAUDE.md, changes to docker configs should be made in docker/template/ and regenerated. Verify these were generated correctly.

12. Comment Typo

Location: engine/packages/gasoline/src/db/kv/mod.rs:1032

"Check" is an incomplete comment. Should probably be removed or completed.

13. TODO Comments Remain

Location: engine/packages/gasoline/src/db/kv/mod.rs:1189, 1246

// TODO: How to get rid of this buffer?

These TODOs suggest potential performance optimization opportunities. Consider creating follow-up issues if the buffer_unordered(1024) is a known concern.

Performance Considerations

Positive

  • Parallel lease acquisition with buffer_unordered(1024) is excellent for throughput
  • Two-transaction approach (lease acquisition separate from data read) minimizes contention window
  • Snapshot isolation for active workers reduces unnecessary serialization

Concerns

  • The redundant assignment (pulling for current + next worker) intentionally increases transaction conflicts. While this provides good failover, it doubles the conflict rate. Monitor conflict metrics in production.
  • System-wide CPU usage might not be the best metric if gasoline runs in a container with CPU limits. Consider using cgroup-aware CPU metrics or container-specific limits.

Security

No security concerns identified. The changes are internal to workflow orchestration.

Testing

Missing Coverage

  • No tests found for the new smart pulling logic (hash-based distribution, load shedding)
  • No tests for calc_pull_ratio function edge cases
  • No tests for worker ping index cleanup scenarios

Recommendations

Add integration tests covering:

  1. Multiple workers with consistent hash distribution
  2. Worker failures and failover behavior
  3. Load shedding activation at various CPU levels
  4. Worker ping index cleanup on shutdown
  5. Edge cases in calc_pull_ratio (boundary values, overflow scenarios)

Summary Recommendations

Must Fix Before Merge

  1. Fix cpu_usage.max(100.0)cpu_usage.min(100.0) bug
  2. Add division-by-zero protection in cpu_usage()
  3. Use checked arithmetic in calc_pull_ratio()

Should Fix Before Merge

  1. Document or add constants for load shedding magic numbers
  2. Add comments explaining isolation level trade-offs
  3. Handle potential race conditions in worker ping index

Nice to Have

  1. Add tests for core smart pulling logic
  2. Improve comment quality (remove "pizazz", complete "Check")
  3. Document metrics rename for existing monitoring setups

Overall, this is a well-architected solution to a complex distributed systems problem. The consistent hashing approach is sound, and the redundancy pattern provides good fault tolerance. The main concerns are around edge cases and the critical CPU usage calculation bug. With the suggested fixes, this will be a solid feature.

@claude
Copy link

claude bot commented Nov 13, 2025

PR Review: Smart Pulling for Horizontally Scaled Workers

Summary

This PR implements intelligent workflow distribution across horizontally scaled workers in the Gasoline workflow engine. The key innovation is a hash-based consistent assignment strategy combined with CPU-based load shedding.

Code Quality - Strengths

  1. Excellent architectural alignment with repository patterns
  2. Proper use of tracing with structured logging
  3. Consistent naming conventions
  4. Good transaction safety practices
  5. Updated Grafana dashboards correctly

Critical Issue: CPU Usage Bug

Location: engine/packages/gasoline/src/db/kv/mod.rs:1018

The code uses cpu_usage.max(100.0) which should be cpu_usage.min(100.0). This bug means if CPU is 50%, it uses 100%, defeating the load shedding feature.

Fix needed:

let load_shed_ratio_x1000 =
    calc_pull_ratio((cpu_usage.min(100.0) * 10.0) as u64, 500, 1000, 800, 100);

Issue: Hash Consistency

Location: engine/packages/gasoline/src/db/kv/mod.rs:1129-1141

After calling hasher.finish(), the hasher state is consumed. The code then hashes more data and calls finish() again, which may not work as expected.

Suggested fix: Create a new hasher for the pseudorandom value generation.

Issue: Missing Error Context

Location: engine/packages/gasoline/src/db/kv/mod.rs:1087-1091

When current worker is not found in active workers, add more diagnostic info to the error log (active_worker_ids, active_count).

Performance Concerns

  1. Load shedding threshold (50%-80%) might be too narrow - could cause oscillation under sustained load
  2. Lock contention on lease acquisition - monitor transaction conflict rates in production
  3. Consider making load shedding thresholds configurable

Missing Test Coverage

No tests for:

  • Smart pulling algorithm with multiple workers
  • Load shedding behavior at different CPU levels
  • Worker failover scenarios
  • Hash distribution uniformity

Observations

  • The pull_workflows method is 300+ lines - consider refactoring into smaller functions
  • Grafana dashboard updates look correct (generated from template as expected)

Recommendations Summary

Must Fix:

  1. Fix cpu_usage.max(100.0) → cpu_usage.min(100.0) bug

Should Fix:
2. Fix hasher usage after finish() for pseudorandom value
3. Add more context to worker not found error log

Consider:
4. Add integration tests for smart pulling
5. Make load shedding thresholds configurable
6. Monitor transaction conflict rates
7. Refactor pull_workflows into smaller functions

Conclusion

Well-architected PR with high code quality. The CPU usage bug is critical and should be fixed before merging. Other issues are minor improvements.

Overall: Approve with changes requested (primarily the CPU usage bug fix)

@NathanFlurry NathanFlurry force-pushed the 11-07-fix_pb_dont_immediately_reallocate_actors_on_runner_stopping branch from f5922d4 to 0bfb5d7 Compare November 13, 2025 21:39
@NathanFlurry NathanFlurry force-pushed the 11-07-feat_gas_implement_smart_pulling_for_horizontally_scaled_workers branch from 2db1ff0 to 0014809 Compare November 13, 2025 21:39
@claude
Copy link

claude bot commented Nov 13, 2025

Code Review: Smart Pulling for Horizontally Scaled Workers

This PR implements smart workflow distribution across horizontally scaled workers in Gasoline. Overall, this is a well-architected improvement that adds intelligent load balancing. Here are my findings:

Strengths

  1. Well-designed distribution algorithm: The consistent hashing approach using earliest_wake_condition_ts is elegant and ensures workflows are evenly distributed across workers while maintaining deterministic assignment.

  2. Load shedding implementation: The CPU-based load shedding is a good defensive mechanism to prevent overload (calc_pull_ratio with 50-100% CPU mapping to 80-10% pull ratio).

  3. Improved observability: The rename from worker_instance_id to worker_id in metrics and Grafana dashboards improves clarity.

  4. Better pubsub granularity: The new BumpSubSubject variants (WorkflowComplete, SignalPublish) are more targeted than the previous generic approach, reducing unnecessary wake-ups.

Issues & Concerns

1. Missing error handling for CPU measurement (engine/packages/gasoline/src/db/kv/mod.rs:1016-1018)

let cpu_usage = { self.system.lock().await.cpu_usage() };
let load_shed_ratio_x1000 =
    calc_pull_ratio((cpu_usage.max(100.0) * 10.0) as u64, 500, 1000, 800, 100);

Issue: If system.lock() fails or cpu_usage() returns an error value, the conversion and calculation could produce unexpected results.

Suggestion: Add error handling or use a default value on failure:

let cpu_usage = self.system.lock().await.cpu_usage().unwrap_or(50.0);

2. Potential race condition in worker index lookup (engine/packages/gasoline/src/db/kv/mod.rs:1077-1091)

let current_worker_idx = if let Some(current_worker_idx) = active_worker_ids
    .iter()
    .enumerate()
    .find_map(|(i, other_worker_id)| {
        (&worker_id == other_worker_id).then_some(i)
    }) {
    current_worker_idx as u64
} else {
    tracing::error!(
        ?worker_id,
        "current worker should have valid ping, defaulting to worker index 0"
    );
    0
};

Issue: The fallback to index 0 could cause multiple workers to pull the same workflows if they all miss their pings. This could lead to increased transaction conflicts.

Suggestion: Consider using a hash of worker_id as the fallback instead of 0 to maintain better distribution even in error cases.

3. Magic numbers in load shedding (engine/packages/gasoline/src/db/kv/mod.rs:1018)

let load_shed_ratio_x1000 =
    calc_pull_ratio((cpu_usage.max(100.0) * 10.0) as u64, 500, 1000, 800, 100);

Issue: The parameters (500, 1000, 800, 100) are not explained. From context, this appears to map CPU usage (50-100%) to pull ratio (80-10%), but it's unclear.

Suggestion: Add constants with descriptive names:

const CPU_LOAD_SHED_START: u64 = 500; // 50% CPU
const CPU_LOAD_SHED_END: u64 = 1000; // 100% CPU  
const PULL_RATIO_MAX: u64 = 800; // 80% of workflows
const PULL_RATIO_MIN: u64 = 100; // 10% of workflows

4. Redundant pulling may increase contention (engine/packages/gasoline/src/db/kv/mod.rs:1149-1154)

// Every worker pulls workflows that has to the current worker as well as the next
// worker for redundancy. this results in increased txn conflicts but less chance of
// orphaned workflows
let next_worker_idx = (current_worker_idx + 1) % active_worker_count;

wf_worker_idx == current_worker_idx || wf_worker_idx == next_worker_idx

Issue: While the comment acknowledges increased conflicts, this doubles the conflict rate. With many workers, this could significantly impact performance.

Suggestion: Consider making this configurable or measuring the actual benefit vs. cost. The failover mechanism (clear_expired_leases) already handles orphaned workflows.

5. Hardcoded worker ID generation (engine/packages/gasoline/src/worker.rs:53)

worker_id: Id::new_v1(config.dc_label()),

Issue: Using new_v1() with datacenter label means workers in the same DC will generate different IDs each time they restart (v1 UUIDs include timestamps). This is probably intentional but worth noting.

Concern: After a worker restart, it gets a new ID, so the consistent hash assignment changes. This could cause workflow reassignment churn.

Performance Considerations

  1. CPU monitoring overhead: The sysinfo crate refreshes system CPU stats. While there's a MINIMUM_CPU_UPDATE_INTERVAL check, ensure this doesn't add significant overhead at scale.

  2. Transaction buffer size: The .buffer_unordered(1024) in the workflow leasing loop (line 1190) could cause memory pressure if many workflows are pending.

Security Concerns

No security issues identified. The changes maintain the existing security model.

Testing

Missing: No new tests were added for:

  • The smart pulling algorithm
  • Load shedding behavior
  • Worker index assignment
  • Consistent hashing distribution

Recommendation: Add unit tests for the distribution algorithm and integration tests for multi-worker scenarios.

Minor Issues

  1. Typo in comment (engine/packages/gasoline/src/db/kv/mod.rs:1149): "has to the current worker" should be "hashes to the current worker"

  2. Inconsistent terminology: The PR title uses "smart pulling" but the code doesn't have comments explaining what makes it "smart". Consider adding a module-level doc comment explaining the algorithm.

  3. .gitattributes change: The addition of linguist-generated=true for docker configs is good housekeeping but unrelated to the PR's core purpose.

Recommendations

  1. Add constants for magic numbers in load shedding calculations
  2. Add unit tests for the distribution algorithm
  3. Consider making redundant pulling configurable
  4. Add a fallback hash-based worker index instead of defaulting to 0
  5. Add documentation explaining the smart pulling algorithm
  6. Monitor transaction conflict rates after deployment to validate the redundancy trade-off

Verdict

This is a solid improvement to Gasoline's horizontal scaling capabilities. The core algorithm is sound, but it would benefit from additional testing, better error handling, and improved documentation. The performance trade-offs (redundant pulling for reliability) should be monitored in production.

Recommendation: Approve with suggested improvements, particularly around error handling and testing.

@claude
Copy link

claude bot commented Nov 13, 2025

Code Review: Smart Pulling for Horizontally Scaled Workers

This is a well-designed feature that implements intelligent workflow distribution across horizontally scaled workers. Here's my comprehensive review:

Architecture & Design

Strengths:

  • Smart worker discovery: The implementation uses an indexed approach to discover active workers based on ping timestamps, which is efficient and scalable
  • Consistent hashing: Using hash-based distribution ensures workflows are consistently assigned to the same workers, reducing contention
  • Load shedding: CPU-based load shedding (lines 1014-1018 in db/kv/mod.rs) is a smart way to prevent overload
  • Redundancy: Workers pull for both their assigned slot and the next slot (lines 1149-1154), providing fault tolerance

Observations:

  • The refactoring from worker_instance_id to worker_id is clean and improves naming consistency throughout the codebase
  • The new BumpSubSubject variants enable more targeted pub/sub notifications, reducing unnecessary wakeups

Code Quality

Positive:

  • Follows repository conventions for error handling with custom error types
  • Proper use of structured logging with tracing
  • Good separation of concerns with the new worker.rs key module

Areas for Improvement:

  1. Integer overflow risk in calc_pull_ratio (db/kv/mod.rs:3034-3044):

    let neg_b = ay * neg_dy / dx;  // Potential overflow if ay * neg_dy is large

    Consider using checked arithmetic or wider integer types:

    let neg_b = (ay as u128 * neg_dy as u128 / dx as u128) as u64;
  2. CPU usage calculation in db/kv/system.rs:20-32:
    The .max(100.0) at line 1016 seems intended to be .min(100.0) for clamping:

    let cpu_usage = { self.system.lock().await.cpu_usage() };
    let load_shed_ratio_x1000 = 
        calc_pull_ratio((cpu_usage.max(100.0) * 10.0) as u64, 500, 1000, 800, 100);

    Using .max(100.0) means any CPU usage over 100% will be used as-is, which could cause unexpected behavior. Should this be .min(100.0) to cap at 100%?

  3. Magic numbers in pull_workflows (line 1018):

    calc_pull_ratio((cpu_usage.max(100.0) * 10.0) as u64, 500, 1000, 800, 100)

    Consider extracting these as named constants with documentation:

    const LOAD_SHED_CPU_MIN: u64 = 500; // 50% CPU
    const LOAD_SHED_RATIO_MAX: u64 = 1000; // 100% pull ratio
    const LOAD_SHED_CPU_MAX: u64 = 800; // 80% CPU  
    const LOAD_SHED_RATIO_MIN: u64 = 100; // 10% pull ratio
  4. Error handling: At line 1087-1088, when the current worker isn't found in the active worker list, it defaults to index 0. This could cause workflow distribution issues. Consider whether this should be a more serious error condition.

Performance Considerations

Positive:

  • Two-phase transaction approach (lease acquisition, then data fetch) reduces contention
  • Snapshot isolation for non-critical reads reduces conflicts
  • Stream-based processing with buffering for parallel operations

Potential Issues:

  • The buffer_unordered(1024) at lines 1190 and 1247 might cause memory spikes with many pending workflows. Consider making this configurable or documenting the trade-off.

Security & Correctness

Positive:

  • Proper conflict range handling for expired lease cleanup
  • Atomic lease acquisition prevents double-processing
  • Proper cleanup of wake conditions and indexes

Observations:

  • The hash-based worker assignment using DefaultHasher (line 1129) is fine for distribution but document that it's not cryptographically secure (which is appropriate here)

Testing

Concern:

  • No new tests were added for the smart pulling logic
  • The complex hash-based distribution logic (lines 1128-1155) should have unit tests
  • Load shedding behavior should be tested with various CPU loads
  • Worker failover scenarios should be tested

Recommendation: Add tests for:

  • Worker assignment consistency with various worker counts
  • Load shedding at different CPU thresholds
  • Edge cases (single worker, worker dropout, etc.)
  • The calc_pull_ratio function with boundary values

Documentation

Needed:

  • Comment explaining the redundancy strategy (why workers pull for current + next slot)
  • Document the hash stability requirement for earliest_wake_condition_ts
  • Explain the load shedding parameters and how to tune them

Minor Issues

  1. Grafana dashboards: The change from worker_instance_id to worker_id is correct but verify these queries work with existing data
  2. Dependency addition: sysinfo is properly added to workspace dependencies
  3. Code comment: Line 1137 comment says "Add a little pizazz" - consider more technical language for production code

Recommendations

  1. High Priority: Fix the potential .max(100.0) bug (should likely be .min(100.0))
  2. High Priority: Add unit tests for calc_pull_ratio and worker assignment logic
  3. Medium Priority: Extract magic numbers to named constants
  4. Medium Priority: Add overflow protection to calc_pull_ratio
  5. Low Priority: Improve inline documentation for the smart pulling algorithm

Summary

This is a solid implementation of a non-trivial distributed systems feature. The core algorithm is sound, but there are a few potential bugs and the lack of tests for the new logic is concerning. The performance optimizations show good understanding of database transaction patterns.

Recommendation: Request changes to address the CPU capping issue and add tests before merging.

Questions for Author

  1. Is the .max(100.0) intentional, or should it be .min(100.0)?
  2. Have you load-tested this with multiple workers under high CPU pressure?
  3. What happens if all workers are shedding load simultaneously?
  4. Should there be metrics for tracking the load shed ratio per worker?

@NathanFlurry NathanFlurry force-pushed the 11-07-fix_pb_dont_immediately_reallocate_actors_on_runner_stopping branch from 0bfb5d7 to 7ab4b5a Compare November 13, 2025 22:27
@NathanFlurry NathanFlurry force-pushed the 11-07-feat_gas_implement_smart_pulling_for_horizontally_scaled_workers branch from 0014809 to 4dad8e2 Compare November 13, 2025 22:27
@claude
Copy link

claude bot commented Nov 13, 2025

Code Review - PR #3417: Smart Pulling for Horizontally Scaled Workers

This is an excellent enhancement to the workflow engine's scalability. The implementation introduces intelligent workflow distribution across multiple workers with load shedding capabilities. Here's my detailed review:

Strengths

  1. Well-designed distributed coordination: The smart pulling algorithm uses consistent hashing on earliest_wake_condition_ts to distribute workflows evenly across workers. The redundancy mechanism (each worker pulls for itself + next worker) is a clever way to prevent orphaned workflows.

  2. CPU-based load shedding: The integration of sysinfo for CPU monitoring and the linear load shedding ratio calculation is elegant and practical. The mapping from 50% CPU (pull 80% of workflows) to 100% CPU (pull 10%) provides good backpressure.

  3. Proper metrics updates: Changed from worker_instance_id to worker_id in Grafana dashboards is cleaner and reflects the architectural shift.

  4. Targeted pubsub subscriptions: The new BumpSubSubject enum with specific subjects (WorkflowComplete, SignalPublish) reduces unnecessary wake-ups and improves efficiency.

  5. Separation of lease acquisition and data reading: The two-transaction approach (lines 998-1258, then 1273-1335) reduces contention - excellent optimization.

Issues & Concerns

1. Potential Division by Zero (High Priority)

Location: engine/packages/gasoline/src/db/kv/mod.rs:1147

let wf_worker_idx = wf_hash % active_worker_count;

If active_worker_ids is empty, active_worker_count will be 0, causing a panic. While there's error handling at line 1085-1091 for when the current worker isn't found, there's no guard against an empty worker list.

Recommendation: Add a check before the filter:

if active_worker_count == 0 {
    tracing::warn!("no active workers found, skipping workflow assignment");
    return Ok(Vec::new());
}

2. Integer Overflow Risk (Medium Priority)

Location: engine/packages/gasoline/src/db/kv/mod.rs:3041

let neg_b = ay * neg_dy / dx;

This multiplication could overflow for large values. Consider using checked_mul() or saturating_mul().

Recommendation:

let neg_b = ay.saturating_mul(neg_dy) / dx;

3. Magic Number in Load Shedding (Low Priority)

Location: engine/packages/gasoline/src/db/kv/mod.rs:1018

let load_shed_ratio_x1000 = calc_pull_ratio((cpu_usage.max(100.0) * 10.0) as u64, 500, 1000, 800, 100);

The parameters (500, 1000, 800, 100) are not documented. Consider adding constants:

const LOAD_SHED_CPU_START: u64 = 500; // 50% CPU
const LOAD_SHED_RATIO_START: u64 = 1000; // Pull 100% at 50% CPU
const LOAD_SHED_CPU_END: u64 = 800; // 80% CPU  
const LOAD_SHED_RATIO_END: u64 = 100; // Pull 10% at 80% CPU

4. Unclear CPU Usage Capping (Low Priority)

Location: engine/packages/gasoline/src/db/kv/mod.rs:1018

let cpu_usage = { self.system.lock().await.cpu_usage() };
let load_shed_ratio_x1000 = calc_pull_ratio((cpu_usage.max(100.0) * 10.0) as u64, ...);

Using .max(100.0) means CPU usage below 100% gets capped to 100%. This seems backwards - did you mean .min(100.0) to cap at 100%? Or is this intentional to disable load shedding below 50% CPU?

5. Unbounded Buffer Size (Low Priority)

Location: engine/packages/gasoline/src/db/kv/mod.rs:1190, 1247

.buffer_unordered(1024)

The comment "// TODO: How to get rid of this buffer?" suggests this is known. A buffer of 1024 concurrent operations could cause memory issues if workflows are large. Consider making this configurable or using a smaller value.

Performance Considerations

  1. Transaction Contention: The approach of having workers check overlapping sets (current + next worker) will increase transaction conflicts. Monitor the conflict rate - if it becomes problematic, you might need to adjust the redundancy strategy.

  2. CPU Monitoring Overhead: The sysinfo crate refreshes CPU data with a minimum interval check (line 22 in system.rs), which is good. However, consider if the mutex lock on SystemInfo could become a bottleneck under high load.

  3. Snapshot Isolation: Using Snapshot instead of Serializable for reading active workers (line 1038) is correct for reducing contention, but be aware this could lead to slightly stale data in edge cases.

Security Concerns

No security issues identified. The changes don't introduce new external inputs or privilege boundaries.

Test Coverage

The PR doesn't include new tests. Consider adding:

  1. Unit tests for calc_pull_ratio() with edge cases (zero values, overflow conditions)
  2. Integration tests for workflow distribution across multiple workers
  3. Tests for the load shedding behavior at various CPU levels
  4. Tests for the division by zero scenario mentioned above

Code Quality

  1. Consistent with CLAUDE.md: Uses structured logging correctly, follows error handling patterns, uses workspace dependencies properly.

  2. Minor: Typo in comment at line 1137:

// Add a little pizazz to the hash

"pizazz" → consider more technical language for production code, e.g., "Add entropy to the hash for distribution"

  1. Good separation of concerns: The new subjects.rs and system.rs modules are well-organized.

Summary

This is a solid implementation of a complex distributed systems feature. The main concerns are the potential division by zero bug (which could cause production outages) and some arithmetic overflow risks. Once those are addressed, this should significantly improve the workflow engine's horizontal scalability.

Recommended Actions:

  1. Fix the division by zero issue (high priority)
  2. Add overflow protection to calc_pull_ratio (medium priority)
  3. Document the load shedding parameters (low priority)
  4. Clarify the CPU capping logic (low priority)
  5. Add test coverage for the new logic

Great work on this feature! The architecture is sound and the implementation is well-structured.

@NathanFlurry NathanFlurry force-pushed the 11-07-fix_pb_dont_immediately_reallocate_actors_on_runner_stopping branch from 7ab4b5a to ca5973c Compare November 13, 2025 22:35
@NathanFlurry NathanFlurry force-pushed the 11-07-feat_gas_implement_smart_pulling_for_horizontally_scaled_workers branch from 4dad8e2 to 3df125e Compare November 13, 2025 22:35
@claude
Copy link

claude bot commented Nov 13, 2025

PR Review: Smart Pulling for Horizontally Scaled Workers

Summary

This PR implements intelligent workflow distribution across horizontally scaled workers in the Gasoline workflow engine. The main innovation is a hash-based work distribution algorithm with CPU-based load shedding and redundancy.

Code Quality & Best Practices

Strengths

  1. Well-structured refactoring: The migration from worker_instance_id to worker_id is clean and consistent across metrics and database keys
  2. Smart distribution algorithm: The hash-based workflow assignment ensures consistent distribution across workers
  3. Redundancy design: Each workflow is pulled by the assigned worker AND the next worker (lines 1149-1154 in db/kv/mod.rs), reducing the risk of orphaned workflows
  4. CPU-aware load shedding: Dynamic adjustment based on system CPU usage (lines 1016-1018)
  5. Proper use of workspace dependencies: sysinfo correctly added to workspace dependencies in root Cargo.toml

Issues & Concerns

1. CPU Usage Calculation Issue (engine/packages/gasoline/src/db/kv/mod.rs:1018)

let cpu_usage = { self.system.lock().await.cpu_usage() };
let load_shed_ratio_x1000 = 
    calc_pull_ratio((cpu_usage.max(100.0) * 10.0) as u64, 500, 1000, 800, 100);

Problem: cpu_usage.max(100.0) should be cpu_usage.min(100.0). Currently, if CPU is at 50%, it becomes 100% (max of 50 and 100). This completely breaks the load shedding logic.

Impact: Critical - load shedding will never activate properly, defeating one of the main features of this PR.

Fix:

let load_shed_ratio_x1000 = 
    calc_pull_ratio((cpu_usage.min(100.0) * 10.0) as u64, 500, 1000, 800, 100);

2. Potential Race Condition (engine/packages/gasoline/src/db/kv/mod.rs:1012)

let active_workers_after = now - i64::try_from(PING_INTERVAL.as_millis() * 2)?;

Concern: Using 2 * PING_INTERVAL as the active worker threshold could cause workers to disappear from the active list between pings. If a worker's ping is delayed slightly (e.g., due to GC or CPU contention), it might be excluded from active_worker_ids, causing workflow redistribution churn.

Recommendation: Use 3 * PING_INTERVAL or make it configurable. This provides better tolerance for transient delays while still being well under the WORKER_LOST_THRESHOLD_MS of 30 seconds.

3. Missing Error Context (engine/packages/gasoline/src/db/kv/mod.rs:1085-1090)

} else {
    tracing::error!(
        ?worker_id,
        "current worker should have valid ping, defaulting to worker index 0"
    );
    0
};

Issue: When a worker can't find itself in the active list, it defaults to index 0 silently. This could lead to multiple workers competing for the same workflows.

Recommendation: Return an error or retry the ping before continuing, rather than silently degrading behavior.

4. Lock Held Across Async Boundary (engine/packages/gasoline/src/db/kv/mod.rs:1016)

let cpu_usage = { self.system.lock().await.cpu_usage() };

Good Practice: The lock is properly scoped and dropped immediately. However, consider using tokio::sync::Mutex::blocking_lock() or moving CPU reading to a background task since sysinfo operations can be slow.

5. Magic Numbers (engine/packages/gasoline/src/db/kv/mod.rs:1018, 3034)

calc_pull_ratio((cpu_usage.max(100.0) * 10.0) as u64, 500, 1000, 800, 100);

The parameters 500, 1000, 800, 100 are not documented. Consider adding constants with descriptive names:

const CPU_THRESHOLD_LOW: u64 = 500;  // 50% CPU
const CPU_THRESHOLD_HIGH: u64 = 1000; // 100% CPU
const PULL_RATIO_AT_LOW_CPU: u64 = 800; // 80% of workflows
const PULL_RATIO_AT_HIGH_CPU: u64 = 100; // 10% of workflows

6. Unnecessary Comment (engine/packages/gasoline/src/db/kv/mod.rs:3035)

fn calc_pull_ratio(x: u64, ax: u64, ay: u64, bx: u64, by: u64) -> u64 {
    // must have neg slope, inversely proportional

Suggestion: The function name and parameters are not intuitive. Consider renaming or adding comprehensive documentation:

/// Calculates a linear interpolation for load shedding ratio.
/// Maps CPU usage (x) from range [ax, bx] to pull ratio [ay, by].
/// Higher CPU usage results in lower pull ratio (inverse relationship).
fn calc_pull_ratio(cpu_usage: u64, min_cpu: u64, max_cpu: u64, max_ratio: u64, min_ratio: u64) -> u64

Performance Considerations

Positive

  1. Efficient hashing: Using DefaultHasher with earliest_wake_condition_ts ensures consistent assignment
  2. Snapshot isolation: Proper use of Snapshot isolation level reduces transaction conflicts (lines 1038, 1065)
  3. Parallel processing: Good use of buffer_unordered(1024) for concurrent lease checks
  4. Two-phase approach: Separating lease acquisition from data reading reduces transaction contention

Concerns

  1. Buffer size: The hardcoded buffer_unordered(1024) at lines 1190 and 1247 could cause memory issues with many workflows. Consider making this configurable or adaptive.
  2. CPU polling overhead: The sysinfo crate may have overhead. Consider caching CPU readings for a short duration (e.g., 1 second) instead of reading on every pull.

Security Concerns

No significant security issues identified. The changes are primarily internal optimization and don't expose new attack surfaces.

Test Coverage

Major Gap: No tests found for the new smart pulling algorithm. Critical functionality like:

  • Hash-based workflow distribution
  • CPU-based load shedding (calc_pull_ratio)
  • Worker redundancy (next worker assignment)
  • Active worker index calculation
  • Edge cases (no active workers, single worker, etc.)

Recommendation: Add unit tests for calc_pull_ratio and integration tests for worker distribution behavior.

Architecture & Design

Positive

  1. Clean abstraction: The BumpSubSubject enum properly models different notification types
  2. Subject routing: The subjects.rs module cleanly maps subjects to NATS topics
  3. Worker lifecycle: The mark_worker_inactive function ensures clean shutdown

Concerns

  1. System state in database struct: The Mutex<system::SystemInfo> in DatabaseKv feels like a layering violation. Consider moving CPU monitoring to the Worker struct.

Documentation

The PR lacks a description explaining:

  • The problem being solved
  • How the smart pulling algorithm works
  • Performance implications
  • Migration considerations

Generated template dashboards: Good catch on regenerating Grafana dashboards from template, but the .gitattributes changes marking them as linguist-generated should have been in the original Docker template setup PR.

Recommendations

Must Fix

  1. Fix CPU calculation bug (.max(100.0).min(100.0))

Should Fix

  1. Increase active worker threshold to 3 * PING_INTERVAL
  2. Handle missing worker index case with retry/error
  3. Add comprehensive tests
  4. Document magic numbers with named constants
  5. Add PR description

Nice to Have

  1. Consider caching CPU readings
  2. Improve calc_pull_ratio naming and documentation
  3. Make buffer sizes configurable
  4. Move SystemInfo out of DatabaseKv

Verdict

The PR implements a well-designed algorithm for work distribution, but has a critical bug in CPU load shedding that must be fixed before merging. The lack of tests for such critical functionality is also concerning. Overall architecture is sound, but needs polish and validation.


🤖 Generated with Claude Code

@claude
Copy link

claude bot commented Nov 13, 2025

PR Review: Smart Pulling for Horizontally Scaled Workers

Thank you for this comprehensive implementation! This PR introduces intelligent workflow distribution across horizontally scaled workers using consistent hashing and load shedding. Here's my detailed review:

Code Quality & Architecture

Strengths:

  1. Well-designed smart pulling algorithm: The consistent hashing approach (lines 1128-1155 in db/kv/mod.rs) is solid for distributing workflows across workers while maintaining deterministic assignment
  2. Load shedding implementation: The CPU-based load shedding (lines 1014-1018) is a smart addition to prevent worker overload
  3. Proper separation of concerns: Moving from worker_instance_id to worker_id clarifies the conceptual model
  4. Good use of structured logging: Properly using tracing's structured logging throughout

Areas for improvement:

1. Potential Race Condition in Worker Discovery

// Lines 1030-1044 in db/kv/mod.rs
let (mut active_worker_ids, wake_keys) = tokio::try_join!(
    // Get active workers
    tx.get_ranges_keyvalues(..., Snapshot, ...), 
    // Get wake conditions
    ...
)

Issue: Using Snapshot isolation for reading active workers could lead to stale data. If a worker becomes inactive between when you read the active workers list and when you compute the hash assignment, workflows could be orphaned.

Suggestion: Consider using Serializable for the active workers query, or add a fallback mechanism to detect and handle orphaned workflows more aggressively.

2. Magic Numbers Need Documentation

// Line 1018
let load_shed_ratio_x1000 = calc_pull_ratio((cpu_usage.max(100.0) * 10.0) as u64, 500, 1000, 800, 100);

Issue: The parameters (500, 1000, 800, 100) are not documented. What do these thresholds represent?

Suggestion: Add inline documentation or constants:

// CPU usage thresholds for load shedding (in tenths of a percent)
const CPU_THRESHOLD_MIN: u64 = 500;  // 50% CPU
const CPU_THRESHOLD_MAX: u64 = 1000; // 100% CPU
const PULL_RATIO_MAX: u64 = 800;     // Pull 80% at low CPU
const PULL_RATIO_MIN: u64 = 100;     // Pull 10% at high CPU

3. Hash Collision Handling

// Lines 1136-1141
let pseudorandom_value_x1000 = {
    1234i32.hash(&mut hasher);
    hasher.finish() % 1000
};

Issue: The comment says "Add a little pizazz" but this is actually implementing load shedding. The magic constant 1234 is arbitrary.

Suggestion: Make this more explicit:

// Generate a pseudorandom value for load shedding (0-999)
let pseudorandom_value_x1000 = {
    const LOAD_SHED_SALT: i32 = 1234;
    LOAD_SHED_SALT.hash(&mut hasher);
    hasher.finish() % 1000
};

4. Redundancy Trade-offs Need Tuning Consideration

// Lines 1149-1154
// Every worker pulls workflows that has to the current worker as well as the next
// worker for redundancy.

Concern: This doubles transaction conflicts. With many workers, this could become a bottleneck.

Suggestion:

  • Add metrics to track conflict rates (txn_conflicts_per_pull)
  • Consider making redundancy configurable or adaptive based on conflict rates
  • Document the trade-off in CLAUDE.md

5. Missing calc_pull_ratio Function

The function calc_pull_ratio is called but not visible in this PR. Please ensure:

  • It's properly documented
  • It handles edge cases (division by zero, overflow)
  • It has unit tests

6. System Info CPU Sampling

// db/kv/system.rs:22-25
if self.last_cpu_usage_read.elapsed() > MINIMUM_CPU_UPDATE_INTERVAL {
    self.system.refresh_cpu_usage();
}

Issue: Multiple concurrent calls to cpu_usage() within MINIMUM_CPU_UPDATE_INTERVAL will return stale data without indication.

Suggestion: Either document this behavior or return a timestamp with the CPU value so callers know how fresh the data is.

Performance Considerations

  1. Buffer size: Lines 1190 and 1247 use buffer_unordered(1024) with a TODO comment. This is fine for now, but consider making it configurable based on expected workflow counts.

  2. Two-phase commit: The implementation wisely splits lease acquisition (line 1002) from data reading (line 1273) to reduce contention. Good design!

  3. Metrics overhead: Recording metrics on every pull (lines 1262-1265) is good for observability. Ensure these are async/non-blocking.

Security Concerns

No significant security issues identified. The implementation properly:

  • Uses proper isolation levels for transactions
  • Handles expired workers to prevent unauthorized workflow execution
  • Maintains lease integrity

Test Coverage

Missing tests for:

  1. Hash distribution fairness - verify workflows are evenly distributed
  2. Load shedding behavior at various CPU thresholds
  3. Worker failure scenarios - what happens when a worker dies mid-pull?
  4. Race conditions during worker scale-up/scale-down

Suggestion: Add integration tests in gasoline/tests/:

#[test]
async fn test_smart_pulling_distributes_evenly() {
    // Spawn multiple workers, dispatch many workflows
    // Assert that each worker gets approximately equal share
}

#[test] 
async fn test_load_shedding_reduces_pull_count() {
    // Mock high CPU usage
    // Assert fewer workflows are pulled
}

Bug: Grafana Dashboard Template Not Updated

The PR updates generated dashboards but according to CLAUDE.md, you should edit docker/template/ and regenerate:

cd docker/template && pnpm start

The template file was updated correctly, but please verify all generated files are in sync.

Documentation Suggestions

  1. Add architecture doc explaining the smart pulling algorithm
  2. Update CLAUDE.md with tuning parameters for production deployments
  3. Document expected behavior during worker scaling events

Minor Issues

  1. Line 1086-1090: The error case defaults to worker index 0. This could cause hot-spotting. Consider distributing failed lookups randomly.

  2. Metrics naming: Changed from worker_instance_id to worker_id - ensure Grafana dashboards and alerting rules are updated in production.

  3. CLAUDE.md compliance: Good use of structured logging and proper error handling. Consider adding this PR's context to CLAUDE.md under "Performance" section.

Verdict

This is a well-engineered solution to a complex distributed systems problem. The core algorithm is sound, but needs:

  • Better documentation of magic numbers and thresholds
  • Additional test coverage for edge cases
  • Consideration of the redundancy vs. contention trade-off

Recommendation: Request changes for documentation and testing improvements before merge.

Questions for Author

  1. What testing was done to validate the hash distribution is uniform?
  2. Have you observed the transaction conflict rate in practice with the 2x redundancy?
  3. What's the expected behavior during a deployment when workers are cycling?

Great work on tackling horizontal scaling! The implementation shows good systems thinking.

@NathanFlurry NathanFlurry force-pushed the 11-07-feat_gas_implement_smart_pulling_for_horizontally_scaled_workers branch from 3df125e to ff8b85c Compare November 13, 2025 22:42
@NathanFlurry NathanFlurry force-pushed the 11-07-fix_pb_dont_immediately_reallocate_actors_on_runner_stopping branch from ca5973c to 86c0f99 Compare November 13, 2025 22:42
@claude
Copy link

claude bot commented Nov 13, 2025

PR Review: Smart Pulling for Horizontally Scaled Workers

Summary

This PR implements smart pulling for horizontally scaled workers in the Gasoline workflow engine. The main improvement is an intelligent work distribution system that spreads workflow execution across multiple workers while implementing load shedding based on CPU usage. Overall, this is a well-designed feature with good architecture, but there are several areas that need attention.

Architecture & Implementation

✅ Strengths

  1. Smart Work Distribution: The hash-based approach to distribute workflows across workers (lines 1128-1154 in db/kv/mod.rs) is elegant and deterministic. Using the earliest wake condition timestamp for hashing ensures consistency even when conditions change.

  2. CPU-Based Load Shedding: The integration of sysinfo crate and the linear load shedding algorithm (lines 1014-1018) is a smart way to prevent worker overload.

  3. Redundant Pulling: The design where each worker pulls workflows for itself and the next worker (lines 1149-1154) is a good failover mechanism to prevent orphaned workflows.

  4. Improved PubSub Subjects: The refactoring of BumpSubSubject from a monolithic worker bump to specific subjects (WorkflowComplete, SignalPublish) is much more efficient and reduces unnecessary wake-ups.

  5. Proper Metrics Updates: Updated Grafana dashboards consistently renamed worker_instance_id to worker_id.

🔧 Areas for Improvement

Issues & Concerns

🔴 Critical Issues

  1. Integer Overflow Risk in calc_pull_ratio (db/kv/mod.rs:3034-3044)

    let neg_b = ay * neg_dy / dx;
    return neg_b.saturating_sub(x * neg_dy / dx);
    • The multiplication ay * neg_dy could overflow before the division
    • Consider using checked arithmetic or reordering operations
    • Recommendation: Use ay.saturating_mul(neg_dy) / dx or document why overflow is impossible
  2. Incorrect Error Message (worker.rs:113)

    tracing::error!(?res, "metrics task unexpectedly stopped");

    This error message is misleading - it says "metrics task" but it's actually checking the gc_handle. Should be "gc task unexpectedly stopped".

  3. Mutex Contention Under Load (db/kv/mod.rs:1016)

    let cpu_usage = { self.system.lock().await.cpu_usage() };
    • This mutex is acquired on every pull_workflows call, which happens frequently
    • Under high contention with many workers, this could become a bottleneck
    • Recommendation: Consider using an RwLock or caching the CPU value with a short TTL

⚠️ Major Issues

  1. Missing Worker ID Validation (db/kv/mod.rs:1077-1091)
    The code defaults to worker index 0 when the current worker isn't found in active workers:

    } else {
        tracing::error!(
            ?worker_id,
            "current worker should have valid ping, defaulting to worker index 0"
        );
        0
    }

    This could cause multiple workers to claim the same index. Better to:

    • Return an error and skip pulling for this tick
    • Or add the current worker to the list explicitly
  2. Hardcoded Magic Numbers (db/kv/mod.rs:1018)

    let load_shed_ratio_x1000 =
        calc_pull_ratio((cpu_usage.max(100.0) * 10.0) as u64, 500, 1000, 800, 100);

    The values 500, 1000, 800, 100 lack documentation:

    • What do these numbers represent?
    • How were they chosen?
    • Recommendation: Extract to named constants with comments explaining the load shedding curve
  3. Potential Race Condition (db/kv/mod.rs:1143-1145)

    if pseudorandom_value_x1000 < load_shed_ratio_x1000 {
        return false;
    }

    The comparison operator should probably be > not <. Higher CPU usage should mean more shedding (returning false more often), but currently high load_shed_ratio_x1000 values allow more workflows through.

💡 Minor Issues

  1. Typo in Comment (db/kv/mod.rs:1136)

    // Add a little pizazz to the hash

    While humorous, consider using "variation" or "entropy" for clarity in production code.

  2. Missing Documentation

    • The SystemInfo struct and its cpu_usage() method lack doc comments
    • The calc_pull_ratio function needs documentation explaining parameters and return value
    • The smart pulling algorithm itself deserves module-level documentation
  3. MINIMUM_CPU_UPDATE_INTERVAL Not Checked Correctly (db/kv/system.rs:22-25)

    if self.last_cpu_usage_read.elapsed() > MINIMUM_CPU_UPDATE_INTERVAL {
        self.system.refresh_cpu_usage();
        self.last_cpu_usage_read = Instant::now();
    }

    The sysinfo crate's MINIMUM_CPU_UPDATE_INTERVAL is the minimum time between refreshes. The current logic refreshes exactly at this interval, which is correct, but a comment would clarify this isn't a "maximum" interval.

  4. Inconsistent Worker ID Type
    Throughout the codebase, worker_id is consistently Id (UUID v1), but the PR title mentions "horizontally scaled workers" without documenting how worker IDs are generated to be unique across nodes. The Id::new_v1(config.dc_label()) in worker.rs:53 is good but could use a comment explaining why v1 UUIDs work for this use case.

Code Quality & Best Practices

✅ Good Practices

  • Proper use of tracing with structured logging
  • Good use of saturating_sub to prevent underflow in most places
  • Transactions are well-scoped
  • Error handling is consistent with the project's patterns

📝 Suggestions

  • Add integration tests for the smart pulling algorithm with multiple workers
  • Consider adding a metric for load_shed_ratio to help operators understand load shedding behavior
  • Add a metric for active_worker_count to track cluster size over time

Performance Considerations

  1. Positive: The snapshot isolation level for reading active workers (line 1038) is good - reduces contention
  2. Concern: Sorting active worker IDs on every pull (line 1074) could be expensive with many workers. Consider if O(n log n) is acceptable or if a different data structure could help
  3. Positive: The deduplication of workflows (lines 1095-1124) prevents redundant work

Security Concerns

No significant security issues identified. The use of deterministic hashing for work distribution doesn't introduce timing attacks since workflow IDs are not secret.

Test Coverage

The PR doesn't include new tests for:

  • Smart pulling distribution algorithm
  • Load shedding behavior
  • Worker failover scenarios
  • CPU usage tracking

Recommendation: Add tests in the driver test suite to verify:

  1. Workflows are evenly distributed across N workers
  2. When a worker goes down, its workflows are picked up by others
  3. Load shedding reduces workflow pulls under high CPU

Breaking Changes

The renaming from worker_instance_id to worker_id in metrics/dashboards is technically a breaking change for anyone with custom dashboards or alerts. Consider mentioning this in the PR description or release notes.

Conclusion

This is solid work that significantly improves Gasoline's horizontal scalability. The core algorithm is well-designed and the code quality is good. However, the issues listed above—especially the integer overflow risk and the load shedding comparison operator—should be addressed before merging.

Recommendation: Request changes for the critical issues, then approve after fixes.


Note: Line numbers reference engine/packages/gasoline/src/db/kv/mod.rs unless otherwise specified.

@graphite-app
Copy link
Contributor

graphite-app bot commented Nov 13, 2025

Merge activity

Base automatically changed from 11-07-fix_pb_dont_immediately_reallocate_actors_on_runner_stopping to main November 13, 2025 22:45
@claude
Copy link

claude bot commented Nov 13, 2025

Pull Request Review: Smart Pulling for Horizontally Scaled Workers

Summary

This PR implements intelligent workflow distribution across horizontally scaled workers using consistent hashing and CPU-based load shedding. The changes refactor the worker architecture from per-instance tracking to worker-based tracking with smart assignment logic.


Code Quality & Architecture

Strengths

  1. Well-structured refactoring: The migration from worker_instance to worker is clean and consistent across the codebase
  2. Smart hashing algorithm: The use of DefaultHasher with earliest_wake_condition_ts provides consistent workflow assignment across workers
  3. Load shedding implementation: CPU-based throttling is a pragmatic approach to prevent overload
  4. Redundancy design: Each workflow is assigned to current worker + next worker (modulo wraparound) provides fault tolerance

Concerns

1. Potential Integer Overflow in CPU Calculation (engine/packages/gasoline/src/db/kv/mod.rs:1018)

let load_shed_ratio_x1000 = calc_pull_ratio((cpu_usage.max(100.0) * 10.0) as u64, 500, 1000, 800, 100);
  • The max(100.0) seems backwards - should this be min(100.0) to cap at 100%?
  • If CPU usage exceeds 100% (which can happen on multi-core systems with the averaging method used), the multiplication by 10 could lead to unexpected behavior
  • Recommendation: Add bounds checking and clarify the intended behavior

2. Hash Collision Behavior (engine/packages/gasoline/src/db/kv/mod.rs:1129-1155)

let mut hasher = DefaultHasher::new();
wf.earliest_wake_condition_ts.hash(&mut hasher);
let wf_hash = hasher.finish();

1234i32.hash(&mut hasher);
hasher.finish() % 1000 // 0-1000
  • Using the same hasher instance after finish() is called - this continues hashing from the previous state, which may be intentional but is non-obvious
  • The magic number 1234 lacks documentation
  • Recommendation: Add a comment explaining why the hasher is reused and document the magic number's purpose

3. Missing Error Handling in Bump Task (engine/packages/gasoline/src/db/kv/mod.rs:67-84)

let spawn_res = tokio::task::Builder::new().name("bump").spawn(...)
if let Err(err) = spawn_res {
    tracing::error!(?err, "failed to spawn bump task");
}
  • If spawning fails, workers won't be notified of new work, potentially causing workflows to stall
  • Recommendation: Consider whether this should be a harder failure or have retry logic

4. Shutdown Race Condition (engine/packages/gasoline/src/worker.rs:282-284)

if let Err(err) = self.db.mark_worker_inactive(worker_id).await {
    tracing::error!(?err, worker_id=?self.worker_id, "failed to mark worker as inactive");
}
  • If marking inactive fails, the worker remains in the active list and other workers may avoid pulling workflows intended for this shutting-down worker
  • Recommendation: Add retry logic or more aggressive cleanup

Performance Considerations

Positive

  1. Snapshot isolation: Using Snapshot isolation for reading active workers and wake conditions reduces contention (lines 1038, 1063)
  2. Efficient filtering: Pre-filtering by workflow name before reading all wake conditions
  3. Parallel processing: Good use of buffer_unordered(1024) for concurrent operations

Concerns

1. CPU Usage Calculation Frequency (engine/packages/gasoline/src/db/kv/system.rs:22-25)

if self.last_cpu_usage_read.elapsed() > MINIMUM_CPU_UPDATE_INTERVAL {
    self.system.refresh_cpu_usage();
  • MINIMUM_CPU_UPDATE_INTERVAL from sysinfo crate might be too frequent for every pull operation
  • Recommendation: Consider caching CPU readings for longer periods since pull happens every 90 seconds

2. Active Worker Query on Every Pull (engine/packages/gasoline/src/db/kv/mod.rs:1030-1044)

  • Every worker queries all active workers on every pull cycle
  • For large deployments, this could become a bottleneck
  • Recommendation: Consider caching the active worker list between pulls with periodic refresh

3. Load Shedding Calculation Missing

  • The PR references calc_pull_ratio function but it's not visible in the diff
  • Cannot verify the algorithm's correctness
  • Recommendation: Ensure this function has appropriate unit tests

Potential Bugs

1. Off-by-One in Pseudorandom Range (engine/packages/gasoline/src/db/kv/mod.rs:1140)

hasher.finish() % 1000 // 0-1000
  • Comment says "0-1000" but modulo 1000 gives 0-999
  • Later comparison pseudorandom_value_x1000 < load_shed_ratio_x1000 depends on this range
  • Recommendation: Fix comment to say "0-999" or adjust logic if 0-1000 was intended

2. Worker Index Default to 0 (engine/packages/gasoline/src/db/kv/mod.rs:1084-1091)

} else {
    tracing::error!(?worker_id, "current worker should have valid ping, defaulting to worker index 0");
    0
}
  • If a worker's ping expires mid-pull, defaulting to index 0 means worker 0 gets extra load
  • This could create a thundering herd if multiple workers lose their active status simultaneously
  • Recommendation: Return early with error instead of defaulting

3. Division by Zero Risk (engine/packages/gasoline/src/db/kv/system.rs:30-31)

/ self.system.cpus().len() as f32
  • If cpus() returns empty array (unlikely but possible in containers), this panics
  • Recommendation: Add defensive check or document that this is guaranteed non-empty

Security Considerations

Low Risk

  • No obvious security vulnerabilities
  • Proper use of database transactions
  • No injection points for external input

Notes

  • The consistent hashing based on timestamps could theoretically be predicted by an attacker, but this is internal infrastructure
  • Worker IDs use v1 UUIDs which include datacenter labels - ensure these don't leak sensitive info

Testing

Missing Test Coverage

  1. Load shedding behavior: No visible tests for CPU-based throttling
  2. Worker failover: Tests for workflow reassignment when workers go inactive
  3. Hash distribution: Tests ensuring workflows are evenly distributed
  4. Edge cases:
    • Single worker scenario
    • All workers at high CPU
    • Workers joining/leaving during pull

Recommendation: Add integration tests for the smart pulling logic, particularly around worker failover scenarios.


Documentation

Missing

  • No documentation for the smart pulling algorithm
  • calc_pull_ratio function signature and behavior undocumented in the diff
  • Magic numbers (1234, 500, 1000, 800, 100) lack explanation

Recommendation: Add module-level documentation explaining the consistent hashing approach and load shedding algorithm.


Minor Issues

  1. Grafana dashboard changes: The change from worker_instance_id to worker_id is correct but verify existing dashboards in production won't break

  2. .gitattributes additions: Good addition to mark generated docker configs as linguist-generated

  3. Dependency management: sysinfo crate added correctly via workspace dependencies

  4. Metrics naming: Consider whether metric names should also migrate from "worker_instance" terminology for consistency


Recommendations Summary

Critical

  • Fix CPU calculation logic (min vs max)
  • Handle worker inactive marking failure more robustly
  • Add error handling for failed bump task spawns

Important

  • Add tests for smart pulling algorithm
  • Document the hashing and load shedding approach
  • Verify calc_pull_ratio implementation
  • Fix off-by-one in comment (0-999 vs 0-1000)

Nice to Have

  • Cache active worker list between pulls
  • Add defensive checks for edge cases (empty CPU list, etc.)
  • Document magic numbers

Conclusion

This is a solid implementation of smart workflow distribution. The architecture is sound and the approach is well-reasoned. However, there are several edge cases and potential bugs that should be addressed before merging. The lack of visible tests for the core smart pulling logic is concerning.

Recommendation: Request changes to address critical items, particularly around error handling and the CPU calculation logic.

@NathanFlurry NathanFlurry force-pushed the 11-07-feat_gas_implement_smart_pulling_for_horizontally_scaled_workers branch from ff8b85c to 96fc9a4 Compare November 13, 2025 22:45
@NathanFlurry NathanFlurry merged commit 47d0825 into main Nov 13, 2025
4 of 8 checks passed
@NathanFlurry NathanFlurry deleted the 11-07-feat_gas_implement_smart_pulling_for_horizontally_scaled_workers branch November 13, 2025 22:46
@claude
Copy link

claude bot commented Nov 13, 2025

Pull Request Review

This PR implements smart workflow pulling for horizontally scaled workers in the Gasoline workflow engine. The changes introduce an intelligent load distribution system that spreads workflows evenly across workers while implementing CPU-based load shedding. Here's my detailed review:


🎯 Positive Aspects

Architecture & Design

  • Excellent load distribution algorithm: The hash-based workflow assignment (lines 1128-1155 in db/kv/mod.rs) ensures consistent and fair distribution across workers
  • Redundancy strategy: The n+1 worker redundancy (lines 1149-1154) is a smart approach to prevent workflow orphaning during worker failures
  • CPU-based load shedding: The linear load shedding ratio calculation (lines 1015-1018, 3034-3044) is a practical approach to prevent overload
  • Clean separation of concerns: New worker.rs module for worker-specific keys and subjects.rs for pubsub subjects
  • Targeted bump subjects: Moving from a single worker bump to specific subjects (BumpSubSubject enum) reduces unnecessary wake-ups

Implementation Quality

  • Proper use of system info: The sysinfo crate integration respects MINIMUM_CPU_UPDATE_INTERVAL to avoid excessive CPU checks
  • Transaction optimization: Splitting lease acquisition and workflow data reading into separate transactions (lines 1002-1258, 1273-1533) reduces contention
  • Grafana dashboard updates: Consistent renaming from worker_instance_id to worker_id across all monitoring dashboards

⚠️ Issues & Concerns

Critical Issues

  1. Integer overflow in calc_pull_ratio (db/kv/mod.rs:3034-3044)

    • Line 3041: let neg_b = ay * neg_dy / dx; can overflow with large values
    • Line 3043: x * neg_dy / dx can also overflow
    • Impact: Could cause panics in production under high CPU load
    • Fix: Use checked arithmetic or cast to u128 for intermediate calculations:
    fn calc_pull_ratio(x: u64, ax: u64, ay: u64, bx: u64, by: u64) -> u64 {
        assert!(ax < bx);
        assert!(ay > by);
        
        let neg_dy = ay - by;
        let dx = bx - ax;
        
        // Use u128 to prevent overflow
        let neg_b = (ay as u128 * neg_dy as u128 / dx as u128) as u64;
        let slope_term = (x as u128 * neg_dy as u128 / dx as u128) as u64;
        
        neg_b.saturating_sub(slope_term)
    }
  2. CPU usage calculation bug (db/kv/system.rs:21-32)

    • Line 3018: (cpu_usage.max(100.0) * 10.0) should be min, not max
    • Current code: If CPU is 50%, it becomes 100%, then scaled to 1000 (full load shedding)
    • Expected: CPU 50% should map to 500 in the 0-1000 range
    • Fix:
    let cpu_usage = { self.system.lock().await.cpu_usage() };
    let load_shed_ratio_x1000 = 
        calc_pull_ratio((cpu_usage.min(100.0) * 10.0) as u64, 500, 1000, 800, 100);
  3. Race condition in worker ping index (db/kv/mod.rs:762-798)

    • Lines 777-783: Reading then deleting the old active worker index is not atomic within a single database operation
    • If a crash occurs between these operations, stale entries could accumulate
    • Severity: Low - handled by the 30-second worker lost threshold, but could cause temporary worker count inaccuracies
    • Suggestion: Consider using a transaction-level atomic operation or document this as acceptable eventual consistency

Design Concerns

  1. Hard-coded magic numbers

    • Line 1139: 1234i32 as a hash seed lacks explanation
    • Line 1018: Parameters (500, 1000, 800, 100) for calc_pull_ratio are unclear
    • Recommendation: Add constants with descriptive names and comments explaining the load shedding curve
  2. Potential workflow starvation under high load

    • When load_shed_ratio_x1000 is high (80%+ CPU), many workflows are randomly rejected
    • The pseudorandom filtering (lines 1136-1145) could theoretically skip workflows consistently
    • Recommendation: Consider implementing a priority queue or age-based boosting for workflows that have been waiting too long
  3. Buffer size hardcoded (db/kv/mod.rs:1190, 1247)

    • buffer_unordered(1024) lacks justification
    • TODO comments indicate uncertainty about this value
    • Recommendation: Make this configurable or conduct performance testing to determine optimal value

Code Quality Issues

  1. Missing error context (db/kv/mod.rs:1085-1091)

    • Worker not finding itself in active list logs error but continues with index 0
    • This could cause incorrect workflow assignment
    • Recommendation: Return an error instead of silently continuing with a default value
  2. Inconsistent terminology

    • PR title says "horizontally scaled workers" but code uses "active workers"
    • Some places refer to "worker instance" vs "worker"
    • Recommendation: Standardize terminology throughout
  3. Duplicate logic in bump_sub implementations

    • Signal listening code (lines 719-793 in ctx/workflow.rs) is nearly identical across three locations
    • Recommendation: Extract into a helper function to reduce duplication

Testing & Observability

  1. No tests for critical algorithm

    • The hash-based assignment and load shedding algorithms lack unit tests
    • calc_pull_ratio should have tests for edge cases and overflow scenarios
    • Recommendation: Add tests in a #[cfg(test)] module
  2. Missing metrics

    • No metrics for load_shed_ratio_x1000 or number of workflows skipped due to load
    • No metric for worker distribution fairness
    • Recommendation: Add observability for load shedding effectiveness

📝 Minor Issues

  1. Line 1018: Parentheses calculation is confusing - should be cpu_usage.min(100.0) then multiply by 10
  2. Docker config warning: PR modifies generated files that should only be edited via templates (per CLAUDE.md), but .gitattributes correctly marks them as generated
  3. Import ordering: db/kv/mod.rs has inconsistent import grouping (system imports mixed with local)

🔒 Security Considerations

  • ✅ No security issues identified
  • ✅ Worker IDs are generated deterministically from datacenter labels, which is appropriate for internal infrastructure
  • ✅ No user input is used in the hashing algorithm

🎯 Performance Considerations

Positive

  • ✅ Snapshot isolation for active workers and wake conditions reduces transaction conflicts
  • ✅ Splitting lease acquisition from data loading is an excellent optimization
  • ✅ CPU usage caching with minimum interval prevents excessive syscalls

Concerns

  • ⚠️ Line 1190: buffer_unordered(1024) could consume significant memory with many pending workflows
  • ⚠️ Double database round-trip: Lease acquisition transaction followed by separate data-fetching transaction adds latency
  • ⚠️ Mutex on system info (db/kv/mod.rs:55, line 1016): Could become a bottleneck under high concurrency
    • Recommendation: Consider using an atomic or lock-free approach for CPU usage caching

📊 Test Coverage

  • ❌ No tests included in this PR
  • ❌ Critical algorithms (hashing, load shedding, redundancy) are untested
  • ❌ Edge cases not covered: What happens with 1 worker? With worker count changes mid-pull?

Recommendations Summary

Must Fix (Before Merge)

  1. Fix CPU usage calculation: Change max(100.0) to min(100.0) on line 1018
  2. Fix potential overflow in calc_pull_ratio using checked arithmetic
  3. Add unit tests for calc_pull_ratio and workflow assignment algorithm

Should Fix (High Priority)

  1. Add metrics for load shedding ratio and skipped workflows
  2. Document magic numbers and add named constants
  3. Handle worker-not-found case with error instead of silent default

Nice to Have

  1. Extract duplicate bump_sub signal listening code
  2. Make buffer_unordered size configurable
  3. Add integration tests for multi-worker scenarios
  4. Consider lock-free CPU usage caching

🎉 Conclusion

This is a well-designed feature that addresses a real scalability need. The core algorithm is sound and the redundancy strategy is clever. However, there are two critical bugs that must be fixed before merging:

  1. The CPU calculation using max instead of min
  2. Potential integer overflow in calc_pull_ratio

With these fixes and additional test coverage, this PR will be ready to merge.

Overall Assessment: ⚠️ Needs Changes - Fix critical bugs, add tests, then approve.


Review conducted following repository conventions in CLAUDE.md. Generated with Claude Code.

@claude
Copy link

claude bot commented Nov 13, 2025

PR Review: Smart Pulling for Horizontally Scaled Workers

Summary

This PR implements an intelligent workflow distribution system for Gasoline's horizontally scaled workers. The implementation enables efficient load balancing across multiple workers while preventing duplicate work and adapting to system load.


Key Changes Overview

1. Worker-Based Architecture Refactoring

  • Changed from: Worker instance IDs
  • Changed to: Worker IDs based on datacenter labels
  • Impact: Workers are now identified by stable datacenter-based IDs rather than instance IDs
  • Files: worker.rs, db/kv/keys/worker.rs, Grafana dashboards

2. Smart Workflow Distribution Algorithm

The core innovation is in db/kv/mod.rs:pull_workflows():

Hash-Based Sharding:

  • Uses consistent hashing on earliest_wake_condition_ts to assign workflows to workers
  • Each worker pulls workflows assigned to itself AND the next worker (redundancy)
  • Prevents orphaned workflows while maintaining reasonable transaction conflict rates

Load Shedding:

  • Monitors CPU usage via new sysinfo dependency
  • Calculates a pull ratio using linear interpolation (500-1000 CPU → 80%-10% pull rate)
  • Gradually reduces workflow pulling as CPU pressure increases

3. Targeted PubSub Bumping

  • New enum: BumpSubSubject with three variants:
    • Worker: Generic worker wake-up
    • WorkflowComplete { workflow_id }: Specific workflow completion
    • SignalPublish { to_workflow_id }: Specific signal targeting
  • Benefit: Reduces unnecessary wake-ups by targeting specific workflows/workers
  • Files: db/mod.rs, db/kv/subjects.rs, multiple context files

Code Quality Review

Strengths ✅

  1. Consistent Hashing Algorithm: The workflow distribution logic is well-designed:

    let wf_hash = hasher.finish();
    let wf_worker_idx = wf_hash % active_worker_count;
    // Pull own + next worker for redundancy
    wf_worker_idx == current_worker_idx || wf_worker_idx == next_worker_idx
  2. Load Shedding Math: The calc_pull_ratio function correctly implements linear interpolation with proper assertions:

    • engine/packages/gasoline/src/db/kv/mod.rs:3034
  3. Transaction Separation: Smart separation of lease acquisition (first tx) from data reading (second tx) to reduce contention

    • engine/packages/gasoline/src/db/kv/mod.rs:998-1258 (lease tx)
    • engine/packages/gasoline/src/db/kv/mod.rs:1273-1340 (data tx)
  4. Proper Workspace Dependencies: sysinfo added to workspace dependencies correctly

    • Cargo.toml, engine/packages/gasoline/Cargo.toml

Concerns & Recommendations ⚠️

1. Potential Integer Overflow in Load Shedding 🔴

Location: engine/packages/gasoline/src/db/kv/mod.rs:1018

calc_pull_ratio((cpu_usage.max(100.0) * 10.0) as u64, 500, 1000, 800, 100);

Issue: cpu_usage is a float that could theoretically be > 100.0. The .max(100.0) is backwards - it ensures the value is AT LEAST 100, not capped at 100.

Should be:

calc_pull_ratio((cpu_usage.min(100.0) * 10.0) as u64, 500, 1000, 800, 100);

Recommendation: Change .max(100.0) to .min(100.0) to properly clamp CPU usage.

2. Magic Numbers in Load Shedding 🟡

Location: engine/packages/gasoline/src/db/kv/mod.rs:1018

calc_pull_ratio((cpu_usage.max(100.0) * 10.0) as u64, 500, 1000, 800, 100);

Issue: The constants (500, 1000, 800, 100) lack documentation.

Recommendation: Add constants with clear names:

const CPU_LOAD_SHED_START: u64 = 500; // 50% CPU
const CPU_LOAD_SHED_END: u64 = 1000;  // 100% CPU  
const PULL_RATIO_START: u64 = 800;     // 80% pull rate
const PULL_RATIO_END: u64 = 100;       // 10% pull rate

3. Worker Ping Timing Assumption 🟡

Location: engine/packages/gasoline/src/db/kv/mod.rs:1012

let active_workers_after = now - i64::try_from(PING_INTERVAL.as_millis() * 2)?;

Issue: Uses hardcoded * 2 multiplier. This creates tight coupling with PING_INTERVAL (10s → 20s window).

Recommendation: Extract to constant:

const ACTIVE_WORKER_PING_WINDOW_MULTIPLIER: i64 = 2;

4. Error Handling in Worker Index Calculation 🟡

Location: engine/packages/gasoline/src/db/kv/mod.rs:1077-1091

let current_worker_idx = if let Some(current_worker_idx) = active_worker_ids
    .iter()
    .enumerate()
    .find_map(|(i, other_worker_id)| {
        (&worker_id == other_worker_id).then_some(i)
    }) {
    current_worker_idx as u64
} else {
    tracing::error!(
        ?worker_id,
        "current worker should have valid ping, defaulting to worker index 0"
    );
    0
};

Issue: Silently defaults to index 0 on error. This could mask issues where update_worker_ping failed.

Recommendation: Consider returning an error or using a more defensive approach (skip pulling workflows entirely if worker isn't in active list).

5. Potential Race Condition 🟡

Location: Between engine/packages/gasoline/src/db/kv/mod.rs:1030 and worker ping updates

Issue: Active workers list is read with Snapshot isolation. A worker could become inactive between reading the list and acquiring leases, leading to stale worker assignments.

Mitigation: Currently mitigated by the redundancy (each workflow assigned to 2 workers), but worth documenting.

6. TODO Comments 🟢

Locations:

  • engine/packages/gasoline/src/db/kv/mod.rs:1189 - "How to get rid of this buffer?"
  • engine/packages/gasoline/src/db/kv/mod.rs:1246 - Same question

Recommendation: The buffer_unordered(1024) is necessary for concurrency. Consider adding a comment explaining why buffering is needed or create a constant for the buffer size.

7. Metric Attribute Rename 🟢

Grafana dashboards updated from worker_instance_id to worker_id correctly across all environments. Good consistency.


Performance Considerations

Positive Impacts ✅

  1. Reduced Wake Storm: Targeted bumping reduces unnecessary worker wake-ups
  2. Even Distribution: Hash-based sharding ensures workflows spread evenly
  3. Adaptive Load Management: CPU-based load shedding prevents overload

Potential Concerns ⚠️

  1. Increased Transaction Conflicts: Redundant pulling (2 workers per workflow) will increase conflicts on lease acquisition. Monitor pull_workflows_tx conflict rates.
  2. CPU Monitoring Overhead: sysinfo::System::refresh_cpu_usage() has overhead. Currently called per pull cycle, which seems reasonable given MINIMUM_CPU_UPDATE_INTERVAL throttling.
  3. Snapshot Isolation Trade-offs: Using Snapshot for active workers and wake conditions reduces contention but may read slightly stale data.

Security Considerations

No security concerns identified. The changes are internal to workflow orchestration.


Test Coverage

Missing: No new tests added for:

  • Load shedding algorithm (calc_pull_ratio)
  • Worker hash distribution
  • Redundant pulling logic
  • CPU monitoring integration

Recommendation: Add unit tests for calc_pull_ratio to verify edge cases:

#[test]
fn test_calc_pull_ratio() {
    // Test at start point
    assert_eq!(calc_pull_ratio(500, 500, 1000, 800, 100), 800);
    // Test at end point
    assert_eq!(calc_pull_ratio(1000, 500, 1000, 800, 100), 100);
    // Test midpoint
    assert_eq!(calc_pull_ratio(750, 500, 1000, 800, 100), 450);
    // Test overflow protection
    assert_eq!(calc_pull_ratio(0, 500, 1000, 800, 100), 1000);
}

Breaking Changes

Metric Names Changed:

  • Grafana queries updated from worker_instance_id to worker_id
  • Impact: Any external dashboards or alerts using these metrics will need updating
  • Mitigation: .gitattributes updated to mark generated files, good practice

Recommendations Summary

Must Fix 🔴

  1. Change cpu_usage.max(100.0) to cpu_usage.min(100.0) in load shedding calculation

Should Fix 🟡

  1. Add named constants for load shedding magic numbers
  2. Add unit tests for calc_pull_ratio
  3. Document buffer size choice or extract to constant
  4. Consider metric to track actual vs expected worker distribution

Nice to Have 🟢

  1. Add integration test demonstrating multi-worker pulling
  2. Document race condition mitigation strategy in code comments
  3. Add metric for load shed ratio to monitor effectiveness

Overall Assessment

Verdict: This is a well-architected solution to workflow distribution in a horizontally scaled environment. The consistent hashing approach is sound, and the load shedding mechanism is a smart addition. The main concern is the minor bug in CPU clamping that should be fixed before merge.

Code Quality: 8/10
Architecture: 9/10
Test Coverage: 5/10

The PR demonstrates strong understanding of distributed systems patterns and makes good use of Rust's type system for safety.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants