Skip to content

Commit 4119d72

Browse files
Merge pull request #3 from gophpeek/refactor/metrics-abstraction-layers
refactor!: Restructure metrics response for clear abstraction separation
2 parents f45820e + 1d2a1bf commit 4119d72

File tree

8 files changed

+451
-132
lines changed

8 files changed

+451
-132
lines changed

src/Actions/CalculateQueueMetricsAction.php

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -40,21 +40,34 @@ public function execute(string $connection, string $queue): void
4040
return;
4141
}
4242

43-
// Aggregate metrics across all job classes
43+
// Aggregate metrics across all job classes for CURRENT window (last 60 seconds)
44+
// This ensures throughput and avg_duration are calculated from the same time window
45+
$windowSeconds = 60;
46+
$throughputPerMinute = 0;
47+
$totalDurationMs = 0.0;
48+
$totalJobsInWindow = 0;
49+
50+
// Aggregate lifetime metrics for failure rate
4451
$totalProcessed = 0;
4552
$totalFailed = 0;
46-
$totalDurationMs = 0.0;
4753
$lastProcessedAt = null;
4854

4955
foreach ($queueJobs as $job) {
5056
$jobClass = $job['jobClass'];
57+
58+
// Get current window metrics (last 60 seconds)
59+
$jobThroughput = $this->jobRepository->getThroughput($jobClass, $connection, $queue, $windowSeconds);
60+
$jobAvgDuration = $this->jobRepository->getAverageDurationInWindow($jobClass, $connection, $queue, $windowSeconds);
61+
62+
$throughputPerMinute += $jobThroughput;
63+
$totalDurationMs += ($jobAvgDuration * $jobThroughput); // Weighted by job count
64+
$totalJobsInWindow += $jobThroughput;
65+
66+
// Get lifetime metrics for failure rate and last_processed_at
5167
$metrics = $this->jobRepository->getMetrics($jobClass, $connection, $queue);
5268

5369
$totalProcessed += is_int($metrics['total_processed']) ? $metrics['total_processed'] : 0;
5470
$totalFailed += is_int($metrics['total_failed']) ? $metrics['total_failed'] : 0;
55-
$totalDurationMs += is_float($metrics['total_duration_ms']) || is_int($metrics['total_duration_ms'])
56-
? (float) $metrics['total_duration_ms']
57-
: 0.0;
5871

5972
if ($metrics['last_processed_at'] instanceof \Carbon\Carbon) {
6073
if ($lastProcessedAt === null || $metrics['last_processed_at']->greaterThan($lastProcessedAt)) {
@@ -63,24 +76,14 @@ public function execute(string $connection, string $queue): void
6376
}
6477
}
6578

66-
// Calculate aggregated metrics
67-
$avgDuration = $totalProcessed > 0 ? $totalDurationMs / $totalProcessed : 0.0;
79+
// Calculate aggregated metrics for current window
80+
$avgDuration = $totalJobsInWindow > 0 ? $totalDurationMs / $totalJobsInWindow : 0.0;
81+
82+
// Calculate failure rate from lifetime totals
6883
$failureRate = ($totalProcessed + $totalFailed) > 0
6984
? ($totalFailed / ($totalProcessed + $totalFailed)) * 100.0
7085
: 0.0;
7186

72-
// Calculate throughput per minute (jobs completed in last 60 seconds)
73-
$throughputPerMinute = 0.0;
74-
foreach ($queueJobs as $job) {
75-
$jobClass = $job['jobClass'];
76-
$throughputPerMinute += $this->jobRepository->getThroughput(
77-
$jobClass,
78-
$connection,
79-
$queue,
80-
60 // last 60 seconds
81-
);
82-
}
83-
8487
// Store aggregated metrics
8588
$this->queueRepository->recordSnapshot($connection, $queue, [
8689
'throughput_per_minute' => $throughputPerMinute,

src/Repositories/Contracts/JobMetricsRepository.php

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,18 @@ public function getThroughput(
114114
int $windowSeconds,
115115
): int;
116116

117+
/**
118+
* Get average duration for jobs completed within a specific time window.
119+
*
120+
* @return float Average duration in milliseconds, 0.0 if no jobs in window
121+
*/
122+
public function getAverageDurationInWindow(
123+
string $jobClass,
124+
string $connection,
125+
string $queue,
126+
int $windowSeconds,
127+
): float;
128+
117129
/**
118130
* Record when a job is queued for time-to-start tracking.
119131
*/

src/Repositories/RedisJobMetricsRepository.php

Lines changed: 63 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -95,27 +95,33 @@ public function recordCompletion(
9595
$memoryMb,
9696
$cpuTimeMs,
9797
$completedAt,
98-
$ttl
98+
$ttl,
99+
$jobId
99100
) {
100101
$pipe->incrementHashField($metricsKey, 'total_processed', 1);
101102
$pipe->incrementHashField($metricsKey, 'total_duration_ms', $durationMs);
102103
$pipe->incrementHashField($metricsKey, 'total_memory_mb', $memoryMb);
103104
$pipe->incrementHashField($metricsKey, 'total_cpu_time_ms', $cpuTimeMs);
104105
$pipe->setHash($metricsKey, ['last_processed_at' => $completedAt->timestamp]);
105106

106-
// Store duration sample (sorted set with timestamp as score)
107+
// Store samples in sorted sets with timestamp as score
108+
// Use a unique member format: "jobId:value" to ensure each job gets a separate entry
109+
// This allows multiple jobs with the same duration/memory/cpu to be stored
110+
$durationMember = $jobId.':'.$durationMs;
107111
/** @var array<string, int> $durationSample */
108-
$durationSample = [(string) $durationMs => (int) $completedAt->timestamp];
112+
$durationSample = [$durationMember => (int) $completedAt->timestamp];
109113
$pipe->addToSortedSet($durationKey, $durationSample, $ttl);
110114

111-
// Store memory sample
115+
// Store memory sample with unique member
116+
$memoryMember = $jobId.':'.$memoryMb;
112117
/** @var array<string, int> $memorySample */
113-
$memorySample = [(string) $memoryMb => (int) $completedAt->timestamp];
118+
$memorySample = [$memoryMember => (int) $completedAt->timestamp];
114119
$pipe->addToSortedSet($memoryKey, $memorySample, $ttl);
115120

116-
// Store CPU time sample
121+
// Store CPU time sample with unique member
122+
$cpuMember = $jobId.':'.$cpuTimeMs;
117123
/** @var array<string, int> $cpuSample */
118-
$cpuSample = [(string) $cpuTimeMs => (int) $completedAt->timestamp];
124+
$cpuSample = [$cpuMember => (int) $completedAt->timestamp];
119125
$pipe->addToSortedSet($cpuKey, $cpuSample, $ttl);
120126

121127
// Refresh TTL on metrics key
@@ -406,6 +412,56 @@ public function getThroughput(
406412
return $driver->eval($script, 1, $key, $windowSeconds);
407413
}
408414

415+
public function getAverageDurationInWindow(
416+
string $jobClass,
417+
string $connection,
418+
string $queue,
419+
int $windowSeconds,
420+
): float {
421+
$key = $this->redis->key('durations', $connection, $queue, $jobClass);
422+
$driver = $this->redis->driver();
423+
424+
// Use Lua script to atomically get samples within window and calculate average
425+
// This ensures consistency between throughput and average duration calculations
426+
$script = <<<'LUA'
427+
local key = KEYS[1]
428+
local windowSeconds = tonumber(ARGV[1])
429+
local cutoff = redis.call('TIME')[1] - windowSeconds
430+
431+
-- Get all members in the window (members are "jobId:duration")
432+
local samples = redis.call('ZRANGEBYSCORE', key, cutoff, '+inf')
433+
434+
if #samples == 0 then
435+
return 0
436+
end
437+
438+
-- Parse members to extract duration values and calculate average
439+
-- Each member is formatted as "jobId:duration"
440+
local sum = 0
441+
local count = 0
442+
for i = 1, #samples do
443+
local member = samples[i]
444+
local colonPos = string.find(member, ":")
445+
if colonPos then
446+
local duration = string.sub(member, colonPos + 1)
447+
sum = sum + tonumber(duration)
448+
count = count + 1
449+
end
450+
end
451+
452+
if count == 0 then
453+
return 0
454+
end
455+
456+
return sum / count
457+
LUA;
458+
459+
/** @var float */
460+
$result = $driver->eval($script, 1, $key, $windowSeconds);
461+
462+
return (float) $result;
463+
}
464+
409465
public function recordQueuedAt(
410466
string $jobClass,
411467
string $connection,

src/Services/OverviewQueryService.php

Lines changed: 60 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -134,15 +134,21 @@ public function getOverview(bool $slim = true): array
134134
private function filterQueuesForDashboard(array $queues): array
135135
{
136136
return array_map(function ($queue) {
137+
$depth = is_array($queue['depth'] ?? null) ? $queue['depth'] : [];
138+
$performance60s = is_array($queue['performance_60s'] ?? null) ? $queue['performance_60s'] : [];
139+
$lifetime = is_array($queue['lifetime'] ?? null) ? $queue['lifetime'] : [];
140+
$workers = is_array($queue['workers'] ?? null) ? $queue['workers'] : [];
141+
137142
return [
138143
'connection' => $queue['connection'] ?? '',
139144
'queue' => $queue['queue'] ?? '',
140-
'depth' => $queue['depth'] ?? 0,
141-
'pending' => $queue['pending'] ?? 0,
142-
'active_workers' => $queue['active_workers'] ?? 0,
143-
'throughput_per_minute' => $queue['throughput_per_minute'] ?? 0,
144-
'failure_rate' => $queue['failure_rate'] ?? 0,
145-
'utilization_rate' => $queue['utilization_rate'] ?? 0,
145+
'depth' => $depth['total'] ?? 0,
146+
'pending' => $depth['pending'] ?? 0,
147+
'active_workers' => $workers['active_count'] ?? 0,
148+
'throughput_per_minute' => $performance60s['throughput_per_minute'] ?? 0,
149+
'failure_rate' => $lifetime['failure_rate_percent'] ?? 0,
150+
'current_busy_percent' => $workers['current_busy_percent'] ?? 0,
151+
'lifetime_busy_percent' => $workers['lifetime_busy_percent'] ?? 0,
146152
];
147153
}, $queues);
148154
}
@@ -176,27 +182,52 @@ private function filterJobsForDashboard(array $jobs): array
176182
/**
177183
* Filter server metrics to essential dashboard fields only.
178184
*
185+
* Returns simplified server data with clear separation between:
186+
* - Worker metrics: Worker count, utilization (from queue workers)
187+
* - Job metrics: Jobs processed (from queue workers)
188+
* - System metrics: Actual server CPU/memory from SystemMetrics (physical server resources)
189+
*
190+
* Note: Worker process CPU/memory metrics are NOT included in dashboard as they're not
191+
* useful for server-level overview. Use server_resources for actual server resource usage.
192+
*
179193
* @param array<string, array<string, mixed>> $servers
180194
* @return array<string, array<string, mixed>>
181195
*/
182196
private function filterServersForDashboard(array $servers): array
183197
{
184198
return array_map(function ($server) {
185-
$workers = is_array($server['workers'] ?? null) ? $server['workers'] : [];
186-
$utilization = is_array($server['utilization'] ?? null) ? $server['utilization'] : [];
187-
$performance = is_array($server['performance'] ?? null) ? $server['performance'] : [];
188-
189-
$serverUtilization = $utilization['server_utilization'] ?? 0;
190-
$utilizationPercent = is_numeric($serverUtilization) ? round((float) $serverUtilization * 100, 2) : 0;
191-
192-
return [
199+
$queueWorkers = is_array($server['queue_workers'] ?? null) ? $server['queue_workers'] : [];
200+
$workerCount = is_array($queueWorkers['count'] ?? null) ? $queueWorkers['count'] : [];
201+
$workerUtilization = is_array($queueWorkers['utilization'] ?? null) ? $queueWorkers['utilization'] : [];
202+
$jobProcessing = is_array($server['job_processing'] ?? null) ? $server['job_processing'] : [];
203+
$jobLifetime = is_array($jobProcessing['lifetime'] ?? null) ? $jobProcessing['lifetime'] : [];
204+
$serverResources = is_array($server['server_resources'] ?? null) ? $server['server_resources'] : null;
205+
206+
$result = [
193207
'hostname' => $server['hostname'] ?? '',
194-
'workers_total' => $workers['total'] ?? 0,
195-
'workers_active' => $workers['active'] ?? 0,
196-
'workers_idle' => $workers['idle'] ?? 0,
197-
'utilization_percent' => $utilizationPercent,
198-
'jobs_processed' => $performance['total_jobs_processed'] ?? 0,
208+
// Worker-level metrics (from queue workers)
209+
'workers' => [
210+
'total' => $workerCount['total'] ?? 0,
211+
'active' => $workerCount['active'] ?? 0,
212+
'idle' => $workerCount['idle'] ?? 0,
213+
'current_busy_percent' => $workerUtilization['current_busy_percent'] ?? 0.0,
214+
'lifetime_busy_percent' => $workerUtilization['lifetime_busy_percent'] ?? 0.0,
215+
],
216+
// Job processing metrics (from queue workers)
217+
'jobs' => [
218+
'total_processed' => $jobLifetime['total_processed'] ?? 0,
219+
'total_failed' => $jobLifetime['total_failed'] ?? 0,
220+
'failure_rate_percent' => $jobLifetime['failure_rate_percent'] ?? 0.0,
221+
],
199222
];
223+
224+
// System resource metrics (actual server CPU/memory from SystemMetrics)
225+
// This is the REAL server usage, not worker process usage
226+
if ($serverResources !== null) {
227+
$result['server_resources'] = $serverResources;
228+
}
229+
230+
return $result;
200231
}, $servers);
201232
}
202233

@@ -208,11 +239,17 @@ private function filterServersForDashboard(array $servers): array
208239
*/
209240
private function filterWorkersForDashboard(array $workers): array
210241
{
242+
$count = is_array($workers['count'] ?? null) ? $workers['count'] : [];
243+
$utilization = is_array($workers['utilization'] ?? null) ? $workers['utilization'] : [];
244+
$performance = is_array($workers['performance'] ?? null) ? $workers['performance'] : [];
245+
211246
return [
212-
'total' => $workers['total'] ?? 0,
213-
'active' => $workers['active'] ?? 0,
214-
'idle' => $workers['idle'] ?? 0,
215-
'total_jobs_processed' => $workers['total_jobs_processed'] ?? 0,
247+
'total' => $count['total'] ?? 0,
248+
'active' => $count['active'] ?? 0,
249+
'idle' => $count['idle'] ?? 0,
250+
'current_busy_percent' => $utilization['current_busy_percent'] ?? 0.0,
251+
'lifetime_busy_percent' => $utilization['lifetime_busy_percent'] ?? 0.0,
252+
'total_jobs_processed' => $performance['total_jobs_processed'] ?? 0,
216253
];
217254
}
218255
}

src/Services/QueueMetricsQueryService.php

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ public function getAllQueuesWithMetrics(): array
143143

144144
$activeWorkers = $workers->count();
145145

146-
// Calculate queue utilization rate from worker busy/idle time
146+
// Calculate worker utilization from busy/idle time
147147
$totalBusyTime = 0;
148148
$totalIdleTime = 0;
149149
foreach ($workers as $worker) {
@@ -152,7 +152,11 @@ public function getAllQueuesWithMetrics(): array
152152
}
153153

154154
$totalTime = $totalBusyTime + $totalIdleTime;
155-
$utilizationRate = $totalTime > 0 ? ($totalBusyTime / $totalTime) * 100 : 0;
155+
$lifetimeBusyPercent = $totalTime > 0 ? ($totalBusyTime / $totalTime) * 100 : 0;
156+
157+
// Calculate current worker state (% busy right now)
158+
$busyWorkers = $workers->filter(fn ($w) => $w->state->value === 'busy')->count();
159+
$currentBusyPercent = $activeWorkers > 0 ? ($busyWorkers / $activeWorkers) * 100 : 0;
156160

157161
// Get trend data
158162
$trends = $this->getQueueTrends($connection, $queue);
@@ -161,17 +165,31 @@ public function getAllQueuesWithMetrics(): array
161165
'connection' => $connection,
162166
'queue' => $queue,
163167
'driver' => $connection,
164-
'depth' => $depth->totalJobs(),
165-
'pending' => $depth->pendingJobs,
166-
'scheduled' => $depth->delayedJobs,
167-
'reserved' => $depth->reservedJobs,
168-
'oldest_job_age_seconds' => $depth->secondsOldestPendingJob() ?? 0,
169-
'oldest_job_age_status' => $depth->oldestPendingJobAge?->toIso8601String() ?? 'unknown',
170-
'throughput_per_minute' => $metrics->throughputPerMinute,
171-
'avg_duration_ms' => $metrics->avgDuration,
172-
'failure_rate' => $metrics->failureRate,
173-
'utilization_rate' => round($utilizationRate, 2),
174-
'active_workers' => $activeWorkers,
168+
// Instantaneous queue state (current snapshot)
169+
'depth' => [
170+
'total' => $depth->totalJobs(),
171+
'pending' => $depth->pendingJobs,
172+
'scheduled' => $depth->delayedJobs,
173+
'reserved' => $depth->reservedJobs,
174+
'oldest_job_age_seconds' => $depth->secondsOldestPendingJob() ?? 0,
175+
'oldest_job_age_status' => $depth->oldestPendingJobAge?->toIso8601String() ?? 'unknown',
176+
],
177+
// Windowed performance metrics (60-second window from CalculateQueueMetricsAction)
178+
'performance_60s' => [
179+
'throughput_per_minute' => $metrics->throughputPerMinute,
180+
'avg_duration_ms' => $metrics->avgDuration,
181+
'window_seconds' => 60,
182+
],
183+
// Lifetime metrics (since first job)
184+
'lifetime' => [
185+
'failure_rate_percent' => $metrics->failureRate,
186+
],
187+
// Worker metrics for this queue
188+
'workers' => [
189+
'active_count' => $activeWorkers,
190+
'current_busy_percent' => round($currentBusyPercent, 2),
191+
'lifetime_busy_percent' => round($lifetimeBusyPercent, 2),
192+
],
175193
'baseline' => $baseline ? $baseline->toArray() : null,
176194
'trends' => $trends,
177195
'timestamp' => now()->toIso8601String(),

0 commit comments

Comments
 (0)