Skip to content

Commit de313ea

Browse files
committed
feat(pubsub): add thread safety to PubSub handler access in BaseClient
- Add volatile modifier to _pubSubHandler field for memory barrier guarantees - Add _pubSubLock object for coordinating thread-safe access - Implement lock-based handler access in HandlePubSubMessage() to prevent race conditions - Update InitializePubSubHandler() with thread-safe initialization - Enhance CleanupPubSubResources() with proper synchronization and timeout-based cleanup - Add thread-safe access to PubSubQueue property Add comprehensive thread safety tests: - Test concurrent message processing from 100+ threads - Test disposal during active message processing - Add stress test with 100 iterations of concurrent operations - Test concurrent access to PubSubQueue and HasPubSubSubscriptions properties - Test rapid create/dispose cycles for memory leak detection - Test disposal timeout handling All 1,771 tests pass (250 unit + 1,521 integration tests). Addresses requirements 2.1-2.6 from pubsub-critical-fixes spec. Signed-off-by: Joe Brinkman <joe.brinkman@improving.com>
1 parent 785292a commit de313ea

File tree

3 files changed

+448
-30
lines changed

3 files changed

+448
-30
lines changed

sources/Valkey.Glide/BaseClient.cs

Lines changed: 69 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,22 @@ public void Dispose()
4545
/// <summary>
4646
/// Get the PubSub message queue for manual message retrieval.
4747
/// Returns null if no PubSub subscriptions are configured.
48+
/// Uses thread-safe access to prevent race conditions.
4849
/// </summary>
49-
public PubSubMessageQueue? PubSubQueue => _pubSubHandler?.GetQueue();
50+
public PubSubMessageQueue? PubSubQueue
51+
{
52+
get
53+
{
54+
lock (_pubSubLock)
55+
{
56+
return _pubSubHandler?.GetQueue();
57+
}
58+
}
59+
}
5060

5161
/// <summary>
5262
/// Indicates whether this client has PubSub subscriptions configured.
63+
/// Uses volatile read for thread-safe access without locking.
5364
/// </summary>
5465
public bool HasPubSubSubscriptions => _pubSubHandler != null;
5566

@@ -259,6 +270,7 @@ private static PubSubMessage MarshalPubSubMessage(
259270

260271
/// <summary>
261272
/// Initializes PubSub message handling if PubSub subscriptions are configured.
273+
/// Uses thread-safe initialization to ensure proper visibility across threads.
262274
/// </summary>
263275
/// <param name="config">The PubSub subscription configuration.</param>
264276
private void InitializePubSubHandler(BasePubSubSubscriptionConfig? config)
@@ -268,45 +280,82 @@ private void InitializePubSubHandler(BasePubSubSubscriptionConfig? config)
268280
return;
269281
}
270282

271-
// Create the PubSub message handler
272-
_pubSubHandler = new PubSubMessageHandler(config.Callback, config.Context);
283+
// Create the PubSub message handler with thread-safe initialization
284+
lock (_pubSubLock)
285+
{
286+
_pubSubHandler = new PubSubMessageHandler(config.Callback, config.Context);
287+
}
273288
}
274289

275290
/// <summary>
276291
/// Handles incoming PubSub messages from the FFI layer.
277-
/// This method is called directly by the FFI callback.
292+
/// This method is called directly by the FFI callback and uses thread-safe access to the handler.
278293
/// </summary>
279294
/// <param name="message">The PubSub message to handle.</param>
280295
internal virtual void HandlePubSubMessage(PubSubMessage message)
281296
{
282-
try
297+
// Thread-safe access to handler - use local copy to avoid race conditions
298+
PubSubMessageHandler? handler;
299+
lock (_pubSubLock)
283300
{
284-
_pubSubHandler?.HandleMessage(message);
301+
handler = _pubSubHandler;
285302
}
286-
catch (Exception ex)
303+
304+
if (handler != null)
287305
{
288-
// Log the error but don't let exceptions escape
289-
Logger.Log(Level.Error, "BaseClient", $"Error handling PubSub message: {ex.Message}", ex);
306+
try
307+
{
308+
handler.HandleMessage(message);
309+
}
310+
catch (Exception ex)
311+
{
312+
// Log the error but don't let exceptions escape
313+
Logger.Log(Level.Error, "BaseClient", $"Error handling PubSub message: {ex.Message}", ex);
314+
}
290315
}
291316
}
292317

293318
/// <summary>
294-
/// Cleans up PubSub resources during client disposal.
319+
/// Cleans up PubSub resources during client disposal with proper synchronization.
320+
/// Uses locking to coordinate safe disposal and prevent conflicts with concurrent message processing.
295321
/// </summary>
296322
private void CleanupPubSubResources()
297323
{
298-
if (_pubSubHandler != null)
324+
PubSubMessageHandler? handler = null;
325+
326+
// Acquire lock and capture handler reference, then set to null
327+
lock (_pubSubLock)
328+
{
329+
handler = _pubSubHandler;
330+
_pubSubHandler = null;
331+
}
332+
333+
// Dispose outside of lock to prevent deadlocks
334+
if (handler != null)
299335
{
300336
try
301337
{
302-
// Dispose the message handler
303-
_pubSubHandler.Dispose();
304-
_pubSubHandler = null;
338+
// Create a task to dispose the handler with timeout
339+
var disposeTask = Task.Run(() => handler.Dispose());
340+
341+
// Wait for disposal with timeout (5 seconds)
342+
if (!disposeTask.Wait(TimeSpan.FromSeconds(5)))
343+
{
344+
Logger.Log(Level.Warn, "BaseClient",
345+
"PubSub handler disposal did not complete within timeout (5 seconds)");
346+
}
347+
}
348+
catch (AggregateException ex)
349+
{
350+
// Log the error but continue with disposal
351+
Logger.Log(Level.Warn, "BaseClient",
352+
$"Error cleaning up PubSub resources: {ex.InnerException?.Message ?? ex.Message}", ex);
305353
}
306354
catch (Exception ex)
307355
{
308356
// Log the error but continue with disposal
309-
Logger.Log(Level.Warn, "BaseClient", $"Error cleaning up PubSub resources: {ex.Message}", ex);
357+
Logger.Log(Level.Warn, "BaseClient",
358+
$"Error cleaning up PubSub resources: {ex.Message}", ex);
310359
}
311360
}
312361
}
@@ -348,7 +397,11 @@ private delegate void PubSubAction(
348397
protected Version? _serverVersion; // cached server version
349398

350399
/// PubSub message handler for routing messages to callbacks or queues.
351-
private PubSubMessageHandler? _pubSubHandler;
400+
/// Uses volatile to ensure visibility across threads without locking on every read.
401+
private volatile PubSubMessageHandler? _pubSubHandler;
402+
403+
/// Lock object for coordinating PubSub handler access and disposal.
404+
private readonly object _pubSubLock = new();
352405

353406
#endregion private fields
354407
}

tests/Valkey.Glide.UnitTests/PubSubFFIMemoryLeakTests.cs

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,7 @@
11
// Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0
22

3-
using System;
43
using System.Diagnostics;
54
using System.Runtime.InteropServices;
6-
using System.Threading;
7-
using System.Threading.Tasks;
8-
9-
using Valkey.Glide.Internals;
10-
11-
using Xunit;
125

136
namespace Valkey.Glide.UnitTests;
147

@@ -88,7 +81,7 @@ public void ProcessVariousMessageSizes_NoMemoryLeak_ConsistentBehavior()
8881
{
8982
// Arrange
9083
const int iterationsPerSize = 1_000;
91-
var messageSizes = new[] { 10, 100, 1_000, 10_000, 100_000 }; // Various sizes
84+
int[] messageSizes = [10, 100, 1_000, 10_000, 100_000]; // Various sizes
9285

9386
GC.Collect();
9487
GC.WaitForPendingFinalizers();
@@ -102,7 +95,7 @@ public void ProcessVariousMessageSizes_NoMemoryLeak_ConsistentBehavior()
10295
{
10396
Console.WriteLine($"Testing message size: {messageSize} bytes");
10497

105-
string largeMessage = new string('X', messageSize);
98+
string largeMessage = new('X', messageSize);
10699
string channel = "test-channel";
107100

108101
long beforeSizeTest = GC.GetTotalMemory(true);
@@ -164,7 +157,7 @@ public void ProcessMessagesUnderGCPressure_NoMemoryLeak_StableUnderPressure()
164157
if (i % gcInterval == 0)
165158
{
166159
// Create some temporary objects to increase GC pressure
167-
var tempObjects = new object[1000];
160+
object[] tempObjects = new object[1000];
168161
for (int j = 0; j < tempObjects.Length; j++)
169162
{
170163
tempObjects[j] = new byte[1024]; // 1KB objects
@@ -209,8 +202,8 @@ public void ProcessConcurrentMessages_NoMemoryLeak_ThreadSafeMemoryManagement()
209202
Console.WriteLine($"Initial memory for concurrent test: {initialMemory:N0} bytes");
210203

211204
// Act: Process messages concurrently from multiple threads
212-
var tasks = new Task[threadsCount];
213-
var exceptions = new Exception?[threadsCount];
205+
Task[] tasks = new Task[threadsCount];
206+
Exception?[] exceptions = new Exception?[threadsCount];
214207

215208
for (int threadIndex = 0; threadIndex < threadsCount; threadIndex++)
216209
{
@@ -278,9 +271,9 @@ public void ProcessExtendedDuration_NoMemoryLeak_StableOverTime()
278271
Console.WriteLine($"Starting extended duration test for {durationSeconds} seconds");
279272
Console.WriteLine($"Initial memory: {initialMemory:N0} bytes");
280273

281-
var stopwatch = Stopwatch.StartNew();
274+
Stopwatch stopwatch = Stopwatch.StartNew();
282275
int messageCount = 0;
283-
var memorySnapshots = new List<(TimeSpan Time, long Memory)>();
276+
List<(TimeSpan Time, long Memory)> memorySnapshots = [];
284277

285278
// Act: Process messages for extended duration
286279
while (stopwatch.Elapsed.TotalSeconds < durationSeconds)

0 commit comments

Comments
 (0)