-
Notifications
You must be signed in to change notification settings - Fork 131
feat(gas): implement smart pulling for horizontally scaled workers #3417
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(gas): implement smart pulling for horizontally scaled workers #3417
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
3 Skipped Deployments
|
PR Review: Smart Pulling for Horizontally Scaled WorkersOverviewThis PR implements intelligent workflow distribution across horizontally scaled workers in the Gasoline workflow engine. The key innovation is a hash-based consistent assignment algorithm with load shedding based on CPU usage. Key Changes1. Worker Identification Refactor
2. Smart Workflow Distribution AlgorithmThe core algorithm in a) Active Worker Discovery
b) Workflow Assignment via Consistent Hashing let wf_hash = hash(wf.earliest_wake_condition_ts);
let wf_worker_idx = wf_hash % active_worker_count;
// Assign to current worker OR next worker (for redundancy)
wf_worker_idx == current_worker_idx || wf_worker_idx == next_worker_idxc) CPU-Based Load Shedding let cpu_usage = system.cpu_usage();
let load_shed_ratio_x1000 = calc_pull_ratio(
(cpu_usage.max(100.0) * 10.0) as u64,
500, 1000, // At 50% CPU: pull all (1000/1000)
800, 100 // At 80% CPU: pull 10% (100/1000)
);3. New System Monitoring
4. Smarter Bump Subscriptions
Code Quality AssessmentStrengths ✅
Concerns and Issues
|
More templates
@rivetkit/actor
@rivetkit/cloudflare-workers
@rivetkit/core
@rivetkit/db
@rivetkit/framework-base
@rivetkit/next-js
@rivetkit/react
rivetkit
@rivetkit/sql-loader
@rivetkit/engine-runner
@rivetkit/engine-runner-protocol
commit: |
d4a6b38 to
6f27fe7
Compare
PR Review: Smart Pulling for Horizontally Scaled WorkersSummaryThis PR implements smart workflow pulling for horizontally scaled workers in the Gasoline workflow engine. The changes introduce CPU-based load shedding, worker coordination via active worker tracking, and more targeted pub/sub notifications. Architecture & DesignStrengths:
Concerns:
Code Quality & Best PracticesPositive:
Issues:
Potential Bugs & Edge Cases
Performance ConsiderationsImprovements:
Concerns:
Security ConcernsNo major security issues identified. The changes are primarily internal orchestration logic. Minor note: The Test CoverageThe PR doesn't include test changes. Given the complexity of the worker coordination logic, consider adding:
RecommendationsMust fix:
Should fix: Nice to have: ConclusionThis is a solid implementation that addresses horizontal scaling concerns with thoughtful coordination mechanisms. The main issues are:
The architecture is sound and follows project conventions well. The targeted pub/sub approach is particularly nice for reducing unnecessary worker wake-ups. |
6f27fe7 to
e5c1006
Compare
PR Review: Smart Pulling for Horizontally Scaled WorkersThis is a well-architected feature that implements intelligent workflow distribution across horizontally scaled workers. The implementation shows good attention to performance and scalability. Strengths✅ Excellent architecture: The hash-based workflow distribution algorithm ensures even load balancing across workers while maintaining consistency Code Quality Issues1. Potential Logic Error in Load Shedding (line 1016 in db/kv/mod.rs)let load_shed_ratio_x1000 = calc_pull_ratio((cpu_usage.max(100.0) * 10.0) as u64, 500, 1000, 800, 100);Issue: Fix: let load_shed_ratio_x1000 = calc_pull_ratio((cpu_usage.min(100.0) * 10.0) as u64, 500, 1000, 800, 100);2. Unnecessary Mutex Lock Duration (line 1014 in db/kv/mod.rs)let cpu_usage = { self.system.lock().await.cpu_usage() };The block scoping is good, but 3. Import Organization (line 3 in db/kv/mod.rs)use std::{
collections::{HashMap, HashSet},
hash::{DefaultHasher, Hash, Hasher},
sync::Arc,
time::Instant,
};Per CLAUDE.md guidelines: "Always add imports at the top of the file inside of inline within the function." These imports are correctly at the top, which is good. 4. Missing Unit TestsThe #[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_calc_pull_ratio() {
// Test at minimum CPU (50%)
assert_eq!(calc_pull_ratio(500, 500, 1000, 800, 100), 1000);
// Test at maximum CPU (80%)
assert_eq!(calc_pull_ratio(800, 500, 1000, 800, 100), 100);
// Test at midpoint CPU (65%)
assert_eq!(calc_pull_ratio(650, 500, 1000, 800, 100), 550);
}
}Performance Considerations
Security ConcernsNo security issues identified. The implementation properly handles:
Test CoverageGap: No integration tests specifically for the smart pulling algorithm. Consider adding tests that:
Minor Issues
Documentation📝 The PR description is empty. Consider adding:
RecommendationApprove with minor fixes. The critical issue is the Great work on this complex distributed systems feature! 🎉 |
79bbca9 to
87f1c1a
Compare
e5c1006 to
ff49fad
Compare
PR Review: Smart Pulling for Horizontally Scaled WorkersOverviewThis PR implements smart workflow pulling for horizontally scaled Gasoline workers, enabling efficient distribution of workflow execution across multiple worker instances. The implementation includes CPU-based load shedding and consistent hashing for workflow distribution. Positive AspectsArchitecture & Design
Code Quality
Issues & Concerns1. Critical: Load Shedding Logic Bug// line 1015-1016 in db/kv/mod.rs
let cpu_usage = { self.system.lock().await.cpu_usage() };
let load_shed_ratio_x1000 = calc_pull_ratio((cpu_usage.max(100.0) * 10.0) as u64, 500, 1000, 800, 100);Problem: Impact: Load shedding will not work as intended for normal CPU loads (0-99%), potentially causing performance issues. Fix: Change to 2. Missing
|
Pull Request Review: Smart Pulling for Horizontally Scaled WorkersSummaryThis PR implements an intelligent workflow distribution system for horizontally scaled workers in the Gasoline workflow engine. The changes introduce CPU-based load shedding and hash-based workflow assignment to evenly distribute work across active workers. Code Quality & Architecture ⭐Strengths:
Concerns:
Potential Bugs & Edge Cases 🐛Critical:
Medium: cpu_usage.max(100.0) * 10.0Using
Minor: Performance Considerations ⚡Improvements:
Concerns:
Security Considerations 🔒Good practices:
No major security concerns identified. Test Coverage 🧪Missing:
Recommendation: Add integration tests in Additional NotesNaming ConsistencyThe Grafana dashboards correctly updated Dependency AdditionThe Breaking ChangesThe refactoring from Recommendations
ConclusionThis is a well-architected feature that significantly improves the scalability of the Gasoline workflow engine. The core logic is sound, but there are a few bugs and edge cases that need attention before merging. Once the critical issues are addressed and tests are added, this will be a solid improvement to the system. Overall Assessment: Approve with changes requested for the critical bugs. |
ff49fad to
728f761
Compare
Code Review: Smart Pulling for Horizontally Scaled WorkersSummaryThis PR implements intelligent workflow distribution across horizontally scaled gasoline workers using consistent hashing and CPU-based load shedding. The implementation introduces a worker ping index and sophisticated pull logic to distribute workflows evenly while handling worker failures gracefully. StrengthsArchitecture & Design
Data Model
Issues & ConcernsCritical Issues1. CPU Usage Calculation Precision IssueLocation: engine/packages/gasoline/src/db/kv/mod.rs:1016-1018 let cpu_usage = { self.system.lock().await.cpu_usage() };
let load_shed_ratio_x1000 =
calc_pull_ratio((cpu_usage.max(100.0) * 10.0) as u64, 500, 1000, 800, 100);Problem: Impact: Load shedding won't activate until CPU is exactly at or above 100%, defeating its purpose. Fix: let load_shed_ratio_x1000 =
calc_pull_ratio((cpu_usage.min(100.0) * 10.0) as u64, 500, 1000, 800, 100);2. Race Condition in Worker Ping IndexLocation: engine/packages/gasoline/src/db/kv/mod.rs:777-789 if let Some(last_last_ping_ts) =
tx.read_opt(&last_ping_ts_key, Serializable).await?
{
let active_worker_idx_key =
keys::worker::ActiveWorkerIdxKey::new(last_last_ping_ts, worker_id);
tx.delete(&active_worker_idx_key);
}
tx.write(&last_ping_ts_key, last_ping_ts)?;
let active_worker_idx_key =
keys::worker::ActiveWorkerIdxKey::new(last_ping_ts, worker_id);
tx.write(&active_worker_idx_key, ())?;Problem: If a worker restarts with the same ID between transactions, you could have multiple Potential Impact: Duplicate worker entries in the active index could skew load distribution calculations. Mitigation: Consider using a range delete or adding a secondary index to clean up all entries for a worker_id. 3. Integer Overflow Risk in calc_pull_ratioLocation: engine/packages/gasoline/src/db/kv/mod.rs:3034-3044 fn calc_pull_ratio(x: u64, ax: u64, ay: u64, bx: u64, by: u64) -> u64 {
let neg_dy = ay - by;
let dx = bx - ax;
let neg_b = ay * neg_dy / dx; // Potential overflow
return neg_b.saturating_sub(x * neg_dy / dx); // Also potential overflow
}Problem: Multiplication before division can overflow with large u64 values. While current call site uses small values (500, 1000, 800, 100), this function lacks safety guarantees. Fix: Add overflow checks or use checked arithmetic: fn calc_pull_ratio(x: u64, ax: u64, ay: u64, bx: u64, by: u64) -> u64 {
let neg_dy = ay - by;
let dx = bx - ax;
// Use checked math or restructure to avoid overflow
let neg_b = ay.checked_mul(neg_dy)
.and_then(|v| v.checked_div(dx))
.unwrap_or(0);
let slope_term = x.checked_mul(neg_dy)
.and_then(|v| v.checked_div(dx))
.unwrap_or(u64::MAX);
neg_b.saturating_sub(slope_term)
}High Priority Issues4. MINIMUM_CPU_UPDATE_INTERVAL Not DocumentedLocation: engine/packages/gasoline/src/db/kv/system.rs:3, 22 The code references Suggestion: Add a comment explaining that sysinfo requires waiting between CPU usage reads for accurate measurements. 5. Missing Error Handling for CPU UsageLocation: engine/packages/gasoline/src/db/kv/system.rs:21-32 The Fix: pub fn cpu_usage(&mut self) -> f32 {
if self.last_cpu_usage_read.elapsed() > MINIMUM_CPU_UPDATE_INTERVAL {
self.system.refresh_cpu_usage();
self.last_cpu_usage_read = Instant::now();
}
let cpu_count = self.system.cpus().len();
if cpu_count == 0 {
tracing::warn!("no CPUs detected, returning 0% usage");
return 0.0;
}
self.system
.cpus()
.iter()
.fold(0.0, |s, cpu| s + cpu.cpu_usage())
/ cpu_count as f32
}6. Snapshot Isolation May Miss Active WorkersLocation: engine/packages/gasoline/src/db/kv/mod.rs:1032-1044 The active worker query uses Trade-off: This is mentioned as intentional for reducing contention, but the comment should explicitly call out this staleness and its implications. Suggestion: Add a comment: // This is Snapshot to reduce contention. Note: worker list may be slightly
// stale (missing very recent pings), but this is acceptable since we use
// redundant assignment (current + next worker) and leases prevent conflicts.
Snapshot,Medium Priority Issues7. Magic Numbers in Load SheddingLocation: engine/packages/gasoline/src/db/kv/mod.rs:1018 calc_pull_ratio((cpu_usage.min(100.0) * 10.0) as u64, 500, 1000, 800, 100);These parameters (500, 1000, 800, 100) lack documentation. What do they represent? Suggestion: Add constants with clear names: const LOAD_SHED_START_CPU_X10: u64 = 500; // Start load shedding at 50% CPU
const LOAD_SHED_END_CPU_X10: u64 = 1000; // Max load shedding at 100% CPU
const LOAD_SHED_START_RATIO: u64 = 800; // Pull 80% of workflows at start
const LOAD_SHED_END_RATIO: u64 = 100; // Pull 10% of workflows at 100% CPU
let load_shed_ratio_x1000 = calc_pull_ratio(
(cpu_usage.min(100.0) * 10.0) as u64,
LOAD_SHED_START_CPU_X10,
LOAD_SHED_END_CPU_X10,
LOAD_SHED_START_RATIO,
LOAD_SHED_END_RATIO,
);8. Metrics Renamed But Not DocumentedLocation: All Grafana dashboard files The metrics were renamed from Impact: Existing alerts or dashboards outside of the repo may break. Suggestion: Add a note in the PR description or CHANGELOG about this metric label rename. 9. Pseudorandom Hash Enhancement Has Magic NumberLocation: engine/packages/gasoline/src/db/kv/mod.rs:1138-1141 // Add a little pizazz to the hash so its a different number than wf_hash but
// still consistent
1234i32.hash(&mut hasher);
hasher.finish() % 1000 // 0-1000The comment "pizazz" is too informal for production code. Also, why specifically 1234? Suggestion: // Hash an additional constant to derive a distinct but consistent
// pseudorandom value from the same workflow hash seed
const LOAD_SHED_HASH_SALT: i32 = 1234;
LOAD_SHED_HASH_SALT.hash(&mut hasher);
hasher.finish() % 1000 // Range: 0-999 (representing 0.0%-99.9%)10. Glob Import ViolationLocation: engine/packages/gasoline/src/db/kv/debug.rs:18 According to CLAUDE.md, glob imports from anyhow should be avoided, but I see the pattern is used elsewhere. If this is intentional, consider documenting the exception. Low Priority / Style Issues11. .gitattributes ChangesLocation: .gitattributes:18-23 The docker-related linguist-generated markers are good, but per CLAUDE.md, changes to docker configs should be made in 12. Comment TypoLocation: engine/packages/gasoline/src/db/kv/mod.rs:1032 "Check" is an incomplete comment. Should probably be removed or completed. 13. TODO Comments RemainLocation: engine/packages/gasoline/src/db/kv/mod.rs:1189, 1246 // TODO: How to get rid of this buffer?These TODOs suggest potential performance optimization opportunities. Consider creating follow-up issues if the buffer_unordered(1024) is a known concern. Performance ConsiderationsPositive
Concerns
SecurityNo security concerns identified. The changes are internal to workflow orchestration. TestingMissing Coverage
RecommendationsAdd integration tests covering:
Summary RecommendationsMust Fix Before Merge
Should Fix Before Merge
Nice to Have
Overall, this is a well-architected solution to a complex distributed systems problem. The consistent hashing approach is sound, and the redundancy pattern provides good fault tolerance. The main concerns are around edge cases and the critical CPU usage calculation bug. With the suggested fixes, this will be a solid feature. |
728f761 to
09a68bc
Compare
PR Review: Smart Pulling for Horizontally Scaled WorkersSummaryThis PR implements intelligent workflow distribution across horizontally scaled workers in the Gasoline workflow engine. The key innovation is a hash-based consistent assignment strategy combined with CPU-based load shedding. Code Quality - Strengths
Critical Issue: CPU Usage BugLocation: engine/packages/gasoline/src/db/kv/mod.rs:1018 The code uses Fix needed: let load_shed_ratio_x1000 =
calc_pull_ratio((cpu_usage.min(100.0) * 10.0) as u64, 500, 1000, 800, 100);Issue: Hash ConsistencyLocation: engine/packages/gasoline/src/db/kv/mod.rs:1129-1141 After calling Suggested fix: Create a new hasher for the pseudorandom value generation. Issue: Missing Error ContextLocation: engine/packages/gasoline/src/db/kv/mod.rs:1087-1091 When current worker is not found in active workers, add more diagnostic info to the error log (active_worker_ids, active_count). Performance Concerns
Missing Test CoverageNo tests for:
Observations
Recommendations SummaryMust Fix:
Should Fix: Consider: ConclusionWell-architected PR with high code quality. The CPU usage bug is critical and should be fixed before merging. Other issues are minor improvements. Overall: Approve with changes requested (primarily the CPU usage bug fix) |
f5922d4 to
0bfb5d7
Compare
2db1ff0 to
0014809
Compare
Code Review: Smart Pulling for Horizontally Scaled WorkersThis PR implements smart workflow distribution across horizontally scaled workers in Gasoline. Overall, this is a well-architected improvement that adds intelligent load balancing. Here are my findings: Strengths
Issues & Concerns1. Missing error handling for CPU measurement (engine/packages/gasoline/src/db/kv/mod.rs:1016-1018)let cpu_usage = { self.system.lock().await.cpu_usage() };
let load_shed_ratio_x1000 =
calc_pull_ratio((cpu_usage.max(100.0) * 10.0) as u64, 500, 1000, 800, 100);Issue: If Suggestion: Add error handling or use a default value on failure: let cpu_usage = self.system.lock().await.cpu_usage().unwrap_or(50.0);2. Potential race condition in worker index lookup (engine/packages/gasoline/src/db/kv/mod.rs:1077-1091)let current_worker_idx = if let Some(current_worker_idx) = active_worker_ids
.iter()
.enumerate()
.find_map(|(i, other_worker_id)| {
(&worker_id == other_worker_id).then_some(i)
}) {
current_worker_idx as u64
} else {
tracing::error!(
?worker_id,
"current worker should have valid ping, defaulting to worker index 0"
);
0
};Issue: The fallback to index 0 could cause multiple workers to pull the same workflows if they all miss their pings. This could lead to increased transaction conflicts. Suggestion: Consider using a hash of 3. Magic numbers in load shedding (engine/packages/gasoline/src/db/kv/mod.rs:1018)let load_shed_ratio_x1000 =
calc_pull_ratio((cpu_usage.max(100.0) * 10.0) as u64, 500, 1000, 800, 100);Issue: The parameters (500, 1000, 800, 100) are not explained. From context, this appears to map CPU usage (50-100%) to pull ratio (80-10%), but it's unclear. Suggestion: Add constants with descriptive names: const CPU_LOAD_SHED_START: u64 = 500; // 50% CPU
const CPU_LOAD_SHED_END: u64 = 1000; // 100% CPU
const PULL_RATIO_MAX: u64 = 800; // 80% of workflows
const PULL_RATIO_MIN: u64 = 100; // 10% of workflows4. Redundant pulling may increase contention (engine/packages/gasoline/src/db/kv/mod.rs:1149-1154)// Every worker pulls workflows that has to the current worker as well as the next
// worker for redundancy. this results in increased txn conflicts but less chance of
// orphaned workflows
let next_worker_idx = (current_worker_idx + 1) % active_worker_count;
wf_worker_idx == current_worker_idx || wf_worker_idx == next_worker_idxIssue: While the comment acknowledges increased conflicts, this doubles the conflict rate. With many workers, this could significantly impact performance. Suggestion: Consider making this configurable or measuring the actual benefit vs. cost. The failover mechanism ( 5. Hardcoded worker ID generation (engine/packages/gasoline/src/worker.rs:53)worker_id: Id::new_v1(config.dc_label()),Issue: Using Concern: After a worker restart, it gets a new ID, so the consistent hash assignment changes. This could cause workflow reassignment churn. Performance Considerations
Security ConcernsNo security issues identified. The changes maintain the existing security model. TestingMissing: No new tests were added for:
Recommendation: Add unit tests for the distribution algorithm and integration tests for multi-worker scenarios. Minor Issues
Recommendations
VerdictThis is a solid improvement to Gasoline's horizontal scaling capabilities. The core algorithm is sound, but it would benefit from additional testing, better error handling, and improved documentation. The performance trade-offs (redundant pulling for reliability) should be monitored in production. Recommendation: Approve with suggested improvements, particularly around error handling and testing. |
Code Review: Smart Pulling for Horizontally Scaled WorkersThis is a well-designed feature that implements intelligent workflow distribution across horizontally scaled workers. Here's my comprehensive review: Architecture & DesignStrengths:
Observations:
Code QualityPositive:
Areas for Improvement:
Performance ConsiderationsPositive:
Potential Issues:
Security & CorrectnessPositive:
Observations:
TestingConcern:
Recommendation: Add tests for:
DocumentationNeeded:
Minor Issues
Recommendations
SummaryThis is a solid implementation of a non-trivial distributed systems feature. The core algorithm is sound, but there are a few potential bugs and the lack of tests for the new logic is concerning. The performance optimizations show good understanding of database transaction patterns. Recommendation: Request changes to address the CPU capping issue and add tests before merging. Questions for Author
|
0bfb5d7 to
7ab4b5a
Compare
0014809 to
4dad8e2
Compare
Code Review - PR #3417: Smart Pulling for Horizontally Scaled WorkersThis is an excellent enhancement to the workflow engine's scalability. The implementation introduces intelligent workflow distribution across multiple workers with load shedding capabilities. Here's my detailed review: Strengths
Issues & Concerns1. Potential Division by Zero (High Priority)Location: let wf_worker_idx = wf_hash % active_worker_count;If Recommendation: Add a check before the filter: if active_worker_count == 0 {
tracing::warn!("no active workers found, skipping workflow assignment");
return Ok(Vec::new());
}2. Integer Overflow Risk (Medium Priority)Location: let neg_b = ay * neg_dy / dx;This multiplication could overflow for large values. Consider using Recommendation: let neg_b = ay.saturating_mul(neg_dy) / dx;3. Magic Number in Load Shedding (Low Priority)Location: let load_shed_ratio_x1000 = calc_pull_ratio((cpu_usage.max(100.0) * 10.0) as u64, 500, 1000, 800, 100);The parameters const LOAD_SHED_CPU_START: u64 = 500; // 50% CPU
const LOAD_SHED_RATIO_START: u64 = 1000; // Pull 100% at 50% CPU
const LOAD_SHED_CPU_END: u64 = 800; // 80% CPU
const LOAD_SHED_RATIO_END: u64 = 100; // Pull 10% at 80% CPU4. Unclear CPU Usage Capping (Low Priority)Location: let cpu_usage = { self.system.lock().await.cpu_usage() };
let load_shed_ratio_x1000 = calc_pull_ratio((cpu_usage.max(100.0) * 10.0) as u64, ...);Using 5. Unbounded Buffer Size (Low Priority)Location: .buffer_unordered(1024)The comment "// TODO: How to get rid of this buffer?" suggests this is known. A buffer of 1024 concurrent operations could cause memory issues if workflows are large. Consider making this configurable or using a smaller value. Performance Considerations
Security ConcernsNo security issues identified. The changes don't introduce new external inputs or privilege boundaries. Test CoverageThe PR doesn't include new tests. Consider adding:
Code Quality
// Add a little pizazz to the hash"pizazz" → consider more technical language for production code, e.g., "Add entropy to the hash for distribution"
SummaryThis is a solid implementation of a complex distributed systems feature. The main concerns are the potential division by zero bug (which could cause production outages) and some arithmetic overflow risks. Once those are addressed, this should significantly improve the workflow engine's horizontal scalability. Recommended Actions:
Great work on this feature! The architecture is sound and the implementation is well-structured. |
7ab4b5a to
ca5973c
Compare
4dad8e2 to
3df125e
Compare
PR Review: Smart Pulling for Horizontally Scaled WorkersSummaryThis PR implements intelligent workflow distribution across horizontally scaled workers in the Gasoline workflow engine. The main innovation is a hash-based work distribution algorithm with CPU-based load shedding and redundancy. Code Quality & Best PracticesStrengths
Issues & Concerns1. CPU Usage Calculation Issue (engine/packages/gasoline/src/db/kv/mod.rs:1018)let cpu_usage = { self.system.lock().await.cpu_usage() };
let load_shed_ratio_x1000 =
calc_pull_ratio((cpu_usage.max(100.0) * 10.0) as u64, 500, 1000, 800, 100);Problem: Impact: Critical - load shedding will never activate properly, defeating one of the main features of this PR. Fix: let load_shed_ratio_x1000 =
calc_pull_ratio((cpu_usage.min(100.0) * 10.0) as u64, 500, 1000, 800, 100);2. Potential Race Condition (engine/packages/gasoline/src/db/kv/mod.rs:1012)let active_workers_after = now - i64::try_from(PING_INTERVAL.as_millis() * 2)?;Concern: Using Recommendation: Use 3. Missing Error Context (engine/packages/gasoline/src/db/kv/mod.rs:1085-1090)} else {
tracing::error!(
?worker_id,
"current worker should have valid ping, defaulting to worker index 0"
);
0
};Issue: When a worker can't find itself in the active list, it defaults to index 0 silently. This could lead to multiple workers competing for the same workflows. Recommendation: Return an error or retry the ping before continuing, rather than silently degrading behavior. 4. Lock Held Across Async Boundary (engine/packages/gasoline/src/db/kv/mod.rs:1016)let cpu_usage = { self.system.lock().await.cpu_usage() };Good Practice: The lock is properly scoped and dropped immediately. However, consider using 5. Magic Numbers (engine/packages/gasoline/src/db/kv/mod.rs:1018, 3034)calc_pull_ratio((cpu_usage.max(100.0) * 10.0) as u64, 500, 1000, 800, 100);The parameters const CPU_THRESHOLD_LOW: u64 = 500; // 50% CPU
const CPU_THRESHOLD_HIGH: u64 = 1000; // 100% CPU
const PULL_RATIO_AT_LOW_CPU: u64 = 800; // 80% of workflows
const PULL_RATIO_AT_HIGH_CPU: u64 = 100; // 10% of workflows6. Unnecessary Comment (engine/packages/gasoline/src/db/kv/mod.rs:3035)fn calc_pull_ratio(x: u64, ax: u64, ay: u64, bx: u64, by: u64) -> u64 {
// must have neg slope, inversely proportionalSuggestion: The function name and parameters are not intuitive. Consider renaming or adding comprehensive documentation: /// Calculates a linear interpolation for load shedding ratio.
/// Maps CPU usage (x) from range [ax, bx] to pull ratio [ay, by].
/// Higher CPU usage results in lower pull ratio (inverse relationship).
fn calc_pull_ratio(cpu_usage: u64, min_cpu: u64, max_cpu: u64, max_ratio: u64, min_ratio: u64) -> u64Performance ConsiderationsPositive
Concerns
Security ConcernsNo significant security issues identified. The changes are primarily internal optimization and don't expose new attack surfaces. Test CoverageMajor Gap: No tests found for the new smart pulling algorithm. Critical functionality like:
Recommendation: Add unit tests for Architecture & DesignPositive
Concerns
DocumentationThe PR lacks a description explaining:
Generated template dashboards: Good catch on regenerating Grafana dashboards from template, but the RecommendationsMust Fix
Should Fix
Nice to Have
VerdictThe PR implements a well-designed algorithm for work distribution, but has a critical bug in CPU load shedding that must be fixed before merging. The lack of tests for such critical functionality is also concerning. Overall architecture is sound, but needs polish and validation. 🤖 Generated with Claude Code |
PR Review: Smart Pulling for Horizontally Scaled WorkersThank you for this comprehensive implementation! This PR introduces intelligent workflow distribution across horizontally scaled workers using consistent hashing and load shedding. Here's my detailed review: Code Quality & ArchitectureStrengths:
Areas for improvement: 1. Potential Race Condition in Worker Discovery// Lines 1030-1044 in db/kv/mod.rs
let (mut active_worker_ids, wake_keys) = tokio::try_join!(
// Get active workers
tx.get_ranges_keyvalues(..., Snapshot, ...),
// Get wake conditions
...
)Issue: Using Suggestion: Consider using 2. Magic Numbers Need Documentation// Line 1018
let load_shed_ratio_x1000 = calc_pull_ratio((cpu_usage.max(100.0) * 10.0) as u64, 500, 1000, 800, 100);Issue: The parameters Suggestion: Add inline documentation or constants: // CPU usage thresholds for load shedding (in tenths of a percent)
const CPU_THRESHOLD_MIN: u64 = 500; // 50% CPU
const CPU_THRESHOLD_MAX: u64 = 1000; // 100% CPU
const PULL_RATIO_MAX: u64 = 800; // Pull 80% at low CPU
const PULL_RATIO_MIN: u64 = 100; // Pull 10% at high CPU3. Hash Collision Handling// Lines 1136-1141
let pseudorandom_value_x1000 = {
1234i32.hash(&mut hasher);
hasher.finish() % 1000
};Issue: The comment says "Add a little pizazz" but this is actually implementing load shedding. The magic constant Suggestion: Make this more explicit: // Generate a pseudorandom value for load shedding (0-999)
let pseudorandom_value_x1000 = {
const LOAD_SHED_SALT: i32 = 1234;
LOAD_SHED_SALT.hash(&mut hasher);
hasher.finish() % 1000
};4. Redundancy Trade-offs Need Tuning Consideration// Lines 1149-1154
// Every worker pulls workflows that has to the current worker as well as the next
// worker for redundancy.Concern: This doubles transaction conflicts. With many workers, this could become a bottleneck. Suggestion:
5. Missing
|
3df125e to
ff8b85c
Compare
ca5973c to
86c0f99
Compare
PR Review: Smart Pulling for Horizontally Scaled WorkersSummaryThis PR implements smart pulling for horizontally scaled workers in the Gasoline workflow engine. The main improvement is an intelligent work distribution system that spreads workflow execution across multiple workers while implementing load shedding based on CPU usage. Overall, this is a well-designed feature with good architecture, but there are several areas that need attention. Architecture & Implementation✅ Strengths
🔧 Areas for ImprovementIssues & Concerns🔴 Critical Issues
|
Merge activity
|
Pull Request Review: Smart Pulling for Horizontally Scaled WorkersSummaryThis PR implements intelligent workflow distribution across horizontally scaled workers using consistent hashing and CPU-based load shedding. The changes refactor the worker architecture from per-instance tracking to worker-based tracking with smart assignment logic. Code Quality & ArchitectureStrengths
Concerns1. Potential Integer Overflow in CPU Calculation (engine/packages/gasoline/src/db/kv/mod.rs:1018)let load_shed_ratio_x1000 = calc_pull_ratio((cpu_usage.max(100.0) * 10.0) as u64, 500, 1000, 800, 100);
2. Hash Collision Behavior (engine/packages/gasoline/src/db/kv/mod.rs:1129-1155)let mut hasher = DefaultHasher::new();
wf.earliest_wake_condition_ts.hash(&mut hasher);
let wf_hash = hasher.finish();
1234i32.hash(&mut hasher);
hasher.finish() % 1000 // 0-1000
3. Missing Error Handling in Bump Task (engine/packages/gasoline/src/db/kv/mod.rs:67-84)let spawn_res = tokio::task::Builder::new().name("bump").spawn(...)
if let Err(err) = spawn_res {
tracing::error!(?err, "failed to spawn bump task");
}
4. Shutdown Race Condition (engine/packages/gasoline/src/worker.rs:282-284)if let Err(err) = self.db.mark_worker_inactive(worker_id).await {
tracing::error!(?err, worker_id=?self.worker_id, "failed to mark worker as inactive");
}
Performance ConsiderationsPositive
Concerns1. CPU Usage Calculation Frequency (engine/packages/gasoline/src/db/kv/system.rs:22-25)if self.last_cpu_usage_read.elapsed() > MINIMUM_CPU_UPDATE_INTERVAL {
self.system.refresh_cpu_usage();
2. Active Worker Query on Every Pull (engine/packages/gasoline/src/db/kv/mod.rs:1030-1044)
3. Load Shedding Calculation Missing
Potential Bugs1. Off-by-One in Pseudorandom Range (engine/packages/gasoline/src/db/kv/mod.rs:1140)hasher.finish() % 1000 // 0-1000
2. Worker Index Default to 0 (engine/packages/gasoline/src/db/kv/mod.rs:1084-1091)} else {
tracing::error!(?worker_id, "current worker should have valid ping, defaulting to worker index 0");
0
}
3. Division by Zero Risk (engine/packages/gasoline/src/db/kv/system.rs:30-31)/ self.system.cpus().len() as f32
Security ConsiderationsLow Risk
Notes
TestingMissing Test Coverage
Recommendation: Add integration tests for the smart pulling logic, particularly around worker failover scenarios. DocumentationMissing
Recommendation: Add module-level documentation explaining the consistent hashing approach and load shedding algorithm. Minor Issues
Recommendations SummaryCritical
Important
Nice to Have
ConclusionThis is a solid implementation of smart workflow distribution. The architecture is sound and the approach is well-reasoned. However, there are several edge cases and potential bugs that should be addressed before merging. The lack of visible tests for the core smart pulling logic is concerning. Recommendation: Request changes to address critical items, particularly around error handling and the CPU calculation logic. |
ff8b85c to
96fc9a4
Compare
Pull Request ReviewThis PR implements smart workflow pulling for horizontally scaled workers in the Gasoline workflow engine. The changes introduce an intelligent load distribution system that spreads workflows evenly across workers while implementing CPU-based load shedding. Here's my detailed review: 🎯 Positive AspectsArchitecture & Design
Implementation Quality
|
PR Review: Smart Pulling for Horizontally Scaled WorkersSummaryThis PR implements an intelligent workflow distribution system for Gasoline's horizontally scaled workers. The implementation enables efficient load balancing across multiple workers while preventing duplicate work and adapting to system load. Key Changes Overview1. Worker-Based Architecture Refactoring
2. Smart Workflow Distribution AlgorithmThe core innovation is in Hash-Based Sharding:
Load Shedding:
3. Targeted PubSub Bumping
Code Quality ReviewStrengths ✅
Concerns & Recommendations
|

No description provided.