Skip to content

Commit cd25051

Browse files
committed
feat(pubsub): replace Task.Run with channel-based message processing
- Implement bounded channel configuration with PubSubPerformanceConfig - Add configurable capacity (default 1000), backpressure strategies, and shutdown timeout - Replace per-message Task.Run() calls with single dedicated background processing task - Use System.Threading.Channels.Channel<PubSubMessage> for bounded message queuing - Implement non-blocking TryWrite() in PubSubCallback for backpressure handling - Add graceful shutdown with cancellation token support - Create comprehensive performance validation tests covering: - High throughput (10,000+ msg/sec) - Allocation pressure and GC impact - Concurrent message handling - Burst traffic patterns - Long-running stability Benefits: - Single dedicated thread instead of thousands of Task.Run calls - Reduced allocation pressure and GC impact - Predictable performance characteristics - Better resource utilization without thread pool starvation - Configurable performance options for different scenarios All tests pass: 255 unit tests, 1520 integration tests Addresses requirements 3.1-3.6 and 6.1-6.4 from pubsub-critical-fixes spec Signed-off-by: Joe Brinkman <joe.brinkman@improving.com>
1 parent de313ea commit cd25051

File tree

5 files changed

+616
-61
lines changed

5 files changed

+616
-61
lines changed

sources/Valkey.Glide/BaseClient.cs

Lines changed: 127 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0
22

33
using System.Runtime.InteropServices;
4+
using System.Threading.Channels;
45

56
using Valkey.Glide.Internals;
67
using Valkey.Glide.Pipeline;
@@ -187,36 +188,41 @@ private void PubSubCallback(
187188
IntPtr patternPtr,
188189
long patternLen)
189190
{
190-
// Work needs to be offloaded from the calling thread, because otherwise we might starve the client's thread pool.
191-
_ = Task.Run(() =>
191+
try
192192
{
193-
try
193+
// Only process actual message notifications, ignore subscription confirmations
194+
if (!IsMessageNotification((PushKind)pushKind))
194195
{
195-
// Only process actual message notifications, ignore subscription confirmations
196-
if (!IsMessageNotification((PushKind)pushKind))
197-
{
198-
Logger.Log(Level.Debug, "PubSubCallback", $"PubSub notification received: {(PushKind)pushKind}");
199-
return;
200-
}
201-
202-
// Marshal the message from FFI callback parameters
203-
PubSubMessage message = MarshalPubSubMessage(
204-
(PushKind)pushKind,
205-
messagePtr,
206-
messageLen,
207-
channelPtr,
208-
channelLen,
209-
patternPtr,
210-
patternLen);
211-
212-
// Process the message through the handler
213-
HandlePubSubMessage(message);
196+
Logger.Log(Level.Debug, "PubSubCallback", $"PubSub notification received: {(PushKind)pushKind}");
197+
return;
214198
}
215-
catch (Exception ex)
199+
200+
// Marshal the message from FFI callback parameters
201+
PubSubMessage message = MarshalPubSubMessage(
202+
(PushKind)pushKind,
203+
messagePtr,
204+
messageLen,
205+
channelPtr,
206+
channelLen,
207+
patternPtr,
208+
patternLen);
209+
210+
// Write to channel (non-blocking with backpressure)
211+
Channel<PubSubMessage>? channel = _messageChannel;
212+
if (channel != null)
216213
{
217-
Logger.Log(Level.Error, "PubSubCallback", $"Error in PubSub callback: {ex.Message}", ex);
214+
if (!channel.Writer.TryWrite(message))
215+
{
216+
Logger.Log(Level.Warn, "PubSubCallback",
217+
$"PubSub message channel full, message dropped for channel {message.Channel}");
218+
}
218219
}
219-
});
220+
}
221+
catch (Exception ex)
222+
{
223+
Logger.Log(Level.Error, "PubSubCallback",
224+
$"Error in PubSub callback: {ex.Message}", ex);
225+
}
220226
}
221227

222228
private static bool IsMessageNotification(PushKind pushKind) =>
@@ -280,10 +286,58 @@ private void InitializePubSubHandler(BasePubSubSubscriptionConfig? config)
280286
return;
281287
}
282288

283-
// Create the PubSub message handler with thread-safe initialization
284289
lock (_pubSubLock)
285290
{
291+
// Get performance configuration or use defaults
292+
PubSubPerformanceConfig perfConfig = config.PerformanceConfig ?? new();
293+
294+
// Create bounded channel with configurable capacity and backpressure strategy
295+
BoundedChannelOptions channelOptions = new(perfConfig.ChannelCapacity)
296+
{
297+
FullMode = perfConfig.FullMode,
298+
SingleReader = true, // Optimization: only one processor task
299+
SingleWriter = false // Multiple FFI callbacks may write
300+
};
301+
302+
_messageChannel = Channel.CreateBounded<PubSubMessage>(channelOptions);
303+
_processingCancellation = new CancellationTokenSource();
304+
305+
// Create message handler
286306
_pubSubHandler = new PubSubMessageHandler(config.Callback, config.Context);
307+
308+
// Start dedicated processing task
309+
_messageProcessingTask = Task.Run(async () =>
310+
{
311+
try
312+
{
313+
await foreach (PubSubMessage message in _messageChannel.Reader.ReadAllAsync(_processingCancellation.Token))
314+
{
315+
try
316+
{
317+
// Thread-safe access to handler
318+
PubSubMessageHandler? handler = _pubSubHandler;
319+
if (handler != null && !_processingCancellation.Token.IsCancellationRequested)
320+
{
321+
handler.HandleMessage(message);
322+
}
323+
}
324+
catch (Exception ex)
325+
{
326+
Logger.Log(Level.Error, "BaseClient",
327+
$"Error processing PubSub message: {ex.Message}", ex);
328+
}
329+
}
330+
}
331+
catch (OperationCanceledException)
332+
{
333+
Logger.Log(Level.Info, "BaseClient", "PubSub processing cancelled");
334+
}
335+
catch (Exception ex)
336+
{
337+
Logger.Log(Level.Error, "BaseClient",
338+
$"PubSub processing task failed: {ex.Message}", ex);
339+
}
340+
}, _processingCancellation.Token);
287341
}
288342
}
289343

@@ -322,41 +376,57 @@ internal virtual void HandlePubSubMessage(PubSubMessage message)
322376
private void CleanupPubSubResources()
323377
{
324378
PubSubMessageHandler? handler = null;
379+
Channel<PubSubMessage>? channel = null;
380+
Task? processingTask = null;
381+
CancellationTokenSource? cancellation = null;
382+
TimeSpan shutdownTimeout = TimeSpan.FromSeconds(PubSubPerformanceConfig.DefaultShutdownTimeoutSeconds);
325383

326-
// Acquire lock and capture handler reference, then set to null
384+
// Acquire lock and capture references, then set to null
327385
lock (_pubSubLock)
328386
{
329387
handler = _pubSubHandler;
388+
channel = _messageChannel;
389+
processingTask = _messageProcessingTask;
390+
cancellation = _processingCancellation;
391+
330392
_pubSubHandler = null;
393+
_messageChannel = null;
394+
_messageProcessingTask = null;
395+
_processingCancellation = null;
331396
}
332397

333-
// Dispose outside of lock to prevent deadlocks
334-
if (handler != null)
398+
// Cleanup outside of lock to prevent deadlocks
399+
try
335400
{
336-
try
337-
{
338-
// Create a task to dispose the handler with timeout
339-
var disposeTask = Task.Run(() => handler.Dispose());
401+
// Signal shutdown
402+
cancellation?.Cancel();
403+
404+
// Complete channel to stop message processing
405+
channel?.Writer.Complete();
340406

341-
// Wait for disposal with timeout (5 seconds)
342-
if (!disposeTask.Wait(TimeSpan.FromSeconds(5)))
407+
// Wait for processing task to complete (with timeout)
408+
if (processingTask != null)
409+
{
410+
if (!processingTask.Wait(shutdownTimeout))
343411
{
344412
Logger.Log(Level.Warn, "BaseClient",
345-
"PubSub handler disposal did not complete within timeout (5 seconds)");
413+
$"PubSub processing task did not complete within timeout ({shutdownTimeout.TotalSeconds}s)");
346414
}
347415
}
348-
catch (AggregateException ex)
349-
{
350-
// Log the error but continue with disposal
351-
Logger.Log(Level.Warn, "BaseClient",
352-
$"Error cleaning up PubSub resources: {ex.InnerException?.Message ?? ex.Message}", ex);
353-
}
354-
catch (Exception ex)
355-
{
356-
// Log the error but continue with disposal
357-
Logger.Log(Level.Warn, "BaseClient",
358-
$"Error cleaning up PubSub resources: {ex.Message}", ex);
359-
}
416+
417+
// Dispose resources
418+
handler?.Dispose();
419+
cancellation?.Dispose();
420+
}
421+
catch (AggregateException ex)
422+
{
423+
Logger.Log(Level.Warn, "BaseClient",
424+
$"Error during PubSub cleanup: {ex.InnerException?.Message ?? ex.Message}", ex);
425+
}
426+
catch (Exception ex)
427+
{
428+
Logger.Log(Level.Warn, "BaseClient",
429+
$"Error during PubSub cleanup: {ex.Message}", ex);
360430
}
361431
}
362432

@@ -403,5 +473,14 @@ private delegate void PubSubAction(
403473
/// Lock object for coordinating PubSub handler access and disposal.
404474
private readonly object _pubSubLock = new();
405475

476+
/// Channel for bounded message queuing with backpressure support.
477+
private Channel<PubSubMessage>? _messageChannel;
478+
479+
/// Dedicated background task for processing PubSub messages.
480+
private Task? _messageProcessingTask;
481+
482+
/// Cancellation token source for graceful shutdown of message processing.
483+
private CancellationTokenSource? _processingCancellation;
484+
406485
#endregion private fields
407486
}

0 commit comments

Comments
 (0)