11// Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0
22
3- using System ;
43using System . Collections . Concurrent ;
54using System . Diagnostics ;
6- using System . Runtime . InteropServices ;
7- using System . Threading ;
8- using System . Threading . Tasks ;
9-
10- using Valkey . Glide . Internals ;
11-
12- using Xunit ;
13-
14- using static Valkey . Glide . ConnectionConfiguration ;
155
166namespace Valkey . Glide . IntegrationTests ;
177
188/// <summary>
199/// Integration tests for FFI PubSub callback flow infrastructure.
2010/// These tests verify end-to-end message processing, client registry operations,
2111/// error handling, and async message processing using simulated FFI callbacks.
22- /// Note: Tests use simulated FFI callbacks since PubSub commands are not yet implemented (tasks 8-10).
12+ /// Note: Uses simulated FFI callbacks since full PubSub server integration requires additional infrastructure.
13+ /// Future enhancement: Replace with real PUBLISH commands via CustomCommand when PubSub infrastructure is complete.
2314/// </summary>
2415[ Collection ( "ClientRegistry" ) ]
2516public class PubSubFFICallbackIntegrationTests : IDisposable
2617{
27- private readonly List < BaseClient > _testClients = new ( ) ;
28- private readonly ConcurrentBag < Exception > _callbackExceptions = new ( ) ;
29- private readonly ConcurrentBag < PubSubMessage > _receivedMessages = new ( ) ;
18+ private readonly List < BaseClient > _testClients = [ ] ;
19+ private readonly ConcurrentBag < Exception > _callbackExceptions = [ ] ;
20+ private readonly ConcurrentBag < PubSubMessage > _receivedMessages = [ ] ;
3021 private readonly ManualResetEventSlim _messageReceivedEvent = new ( false ) ;
31- private readonly object _lockObject = new ( ) ;
22+ private readonly Lock _lockObject = new ( ) ;
3223
3324 public void Dispose ( )
3425 {
@@ -50,7 +41,8 @@ public void Dispose()
5041
5142 /// <summary>
5243 /// Simulates an FFI callback by directly invoking the client's message handler.
53- /// This allows testing the callback infrastructure without requiring actual server PubSub.
44+ /// This allows testing the callback infrastructure without requiring full server PubSub integration.
45+ /// Future enhancement: Replace with real PUBLISH commands via CustomCommand when PubSub infrastructure is complete.
5446 /// </summary>
5547 private async Task SimulateFFICallback ( BaseClient client , string channel , string message , string ? pattern )
5648 {
@@ -89,7 +81,7 @@ public async Task EndToEndMessageFlow_WithStandaloneClient_ProcessesMessagesCorr
8981 GlideClient subscriberClient = await GlideClient . CreateClient ( config ) ;
9082 _testClients . Add ( subscriberClient ) ;
9183
92- // Simulate FFI callback invocation (since PubSub commands not yet implemented)
84+ // Simulate FFI callback invocation - tests the callback infrastructure
9385 await SimulateFFICallback ( subscriberClient , testChannel , testMessage , null ) ;
9486
9587 // Wait for message to be received
@@ -136,7 +128,7 @@ public async Task EndToEndMessageFlow_WithClusterClient_ProcessesMessagesCorrect
136128 GlideClusterClient subscriberClient = await GlideClusterClient . CreateClient ( config ) ;
137129 _testClients . Add ( subscriberClient ) ;
138130
139- // Simulate FFI callback invocation (since PubSub commands not yet implemented)
131+ // Simulate FFI callback invocation - tests the callback infrastructure
140132 await SimulateFFICallback ( subscriberClient , testChannel , testMessage , null ) ;
141133
142134 // Wait for message to be received
@@ -199,7 +191,7 @@ public async Task ClientRegistryOperations_UnderConcurrentAccess_WorksCorrectly(
199191 // Arrange
200192 const int clientCount = 10 ;
201193 const int messagesPerClient = 5 ;
202- List < Task > clientTasks = new ( ) ;
194+ List < Task > clientTasks = [ ] ;
203195 ConcurrentDictionary < string , int > messageCountsByChannel = new ( ) ;
204196
205197 // Act - Create multiple clients concurrently
@@ -215,8 +207,8 @@ public async Task ClientRegistryOperations_UnderConcurrentAccess_WorksCorrectly(
215207 . WithChannel ( clientChannel )
216208 . WithCallback < StandalonePubSubSubscriptionConfig > ( ( message , context ) =>
217209 {
218- Interlocked . Increment ( ref messagesReceived ) ;
219- messageCountsByChannel . AddOrUpdate ( clientChannel , 1 , ( key , value ) => value + 1 ) ;
210+ _ = Interlocked . Increment ( ref messagesReceived ) ;
211+ _ = messageCountsByChannel . AddOrUpdate ( clientChannel , 1 , ( key , value ) => value + 1 ) ;
220212 } ) ;
221213
222214 StandaloneClientConfiguration config = TestConfiguration . DefaultClientConfig ( )
@@ -229,8 +221,6 @@ public async Task ClientRegistryOperations_UnderConcurrentAccess_WorksCorrectly(
229221 _testClients . Add ( subscriberClient ) ;
230222 }
231223
232- // No need for publisher client in simulation mode
233-
234224 // Simulate multiple messages via FFI callbacks
235225 for ( int j = 0 ; j < messagesPerClient ; j ++ )
236226 {
@@ -286,7 +276,7 @@ public async Task CallbackErrorHandling_WithExceptionInCallback_IsolatesErrorsAn
286276 throw new InvalidOperationException ( "Test exception in callback" ) ;
287277 }
288278
289- Interlocked . Increment ( ref successfulMessages ) ;
279+ _ = Interlocked . Increment ( ref successfulMessages ) ;
290280 if ( successfulMessages >= 2 )
291281 {
292282 _messageReceivedEvent . Set ( ) ;
@@ -322,8 +312,8 @@ public async Task AsyncMessageProcessing_WithRealClients_CompletesQuicklyWithout
322312 {
323313 // Arrange
324314 string testChannel = $ "async-test-{ Guid . NewGuid ( ) } ";
325- List < TimeSpan > callbackDurations = new ( ) ;
326- List < TimeSpan > processingDurations = new ( ) ;
315+ List < TimeSpan > callbackDurations = [ ] ;
316+ List < TimeSpan > processingDurations = [ ] ;
327317 int messagesProcessed = 0 ;
328318
329319 StandalonePubSubSubscriptionConfig pubsubConfig = new StandalonePubSubSubscriptionConfig ( )
@@ -460,7 +450,7 @@ public async Task MemoryManagement_WithMarshaledData_HandlesCleanupCorrectly()
460450 {
461451 // Arrange
462452 string testChannel = $ "memory-test-{ Guid . NewGuid ( ) } ";
463- List < string > receivedMessages = new ( ) ;
453+ List < string > receivedMessages = [ ] ;
464454 int messageCount = 0 ;
465455
466456 StandalonePubSubSubscriptionConfig pubsubConfig = new StandalonePubSubSubscriptionConfig ( )
@@ -488,7 +478,7 @@ public async Task MemoryManagement_WithMarshaledData_HandlesCleanupCorrectly()
488478 _testClients . Add ( subscriberClient ) ;
489479
490480 // Simulate messages with various content to test marshaling
491- string [ ] testMessages = {
481+ string [ ] testMessages = [
492482 "Simple message" ,
493483 "Message with special chars: !@#$%^&*()" ,
494484 "Unicode message: 你好世界 🌍" ,
@@ -499,7 +489,7 @@ public async Task MemoryManagement_WithMarshaledData_HandlesCleanupCorrectly()
499489 "XML: <root><item>value</item></root>" ,
500490 "Base64: SGVsbG8gV29ybGQ=" ,
501491 "Final message"
502- } ;
492+ ] ;
503493
504494 foreach ( string message in testMessages )
505495 {
@@ -623,6 +613,6 @@ public async Task ErrorIsolation_WithMessageHandlerExceptions_DoesNotCrashProces
623613 Assert . True ( callbackCount >= 3 , "All callbacks should have been invoked despite exceptions" ) ;
624614
625615 // Process should still be running (not crashed)
626- Assert . True ( Environment . HasShutdownStarted == false , "Process should not have initiated shutdown" ) ;
616+ Assert . True ( ! Environment . HasShutdownStarted , "Process should not have initiated shutdown" ) ;
627617 }
628618}
0 commit comments