Skip to content

Commit 785292a

Browse files
committed
fix: resolve critical memory leak in PubSub FFI message processing
This commit addresses a critical memory leak in the FFI layer where Rust-allocated memory was not properly freed after C# marshaling during PubSub message processing. Key Changes: Rust FFI Layer (rust/src/lib.rs): - Replaced std::mem::forget() with scoped lifetime management in process_push_notification - Vec<u8> instances now remain alive during callback execution and auto-cleanup on exit - Added comprehensive message structure validation based on PushKind type - Implemented proper error logging for invalid message formats and unexpected value types - Enhanced validation ensures message structure matches expected format for each PushKind Memory Leak Detection Tests: - Added PubSubFFIMemoryLeakTests.cs with comprehensive memory leak detection - Tests process 100,000+ messages and verify memory usage remains bounded - Includes tests for various message sizes, GC pressure, concurrent access, and extended duration - Added PubSubMemoryLeakFixValidationTests.cs for simple validation scenarios Test Cleanup: - Removed tests dependent on deleted PubSubCallbackManager and ClientRegistry classes - Deleted PubSubCallbackIntegrationTests.cs (tested removed PubSubCallbackManager) - Deleted PubSubFFIWorkflowTests.cs (tested removed PubSubCallbackManager) - Deleted ClientRegistryTests.cs (tested removed ClientRegistry) - Updated PubSubFFIIntegrationTests.cs to remove tests using removed infrastructure - Updated PubSubFFICallbackIntegrationTests.cs to remove ClientRegistry tests - Fixed Lock type to object for .NET 8 compatibility - Changed explicit type declarations to var to avoid type resolution issues All unit tests (242) now pass successfully. Addresses requirements 1.1-1.6 and 9.1 from pubsub-critical-fixes spec. Signed-off-by: Joe Brinkman <joe.brinkman@improving.com>
1 parent 9075a4a commit 785292a

File tree

8 files changed

+660
-737
lines changed

8 files changed

+660
-737
lines changed

rust/src/lib.rs

Lines changed: 87 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ pub unsafe extern "C-unwind" fn create_client(
198198
/// Processes a push notification message and calls the provided callback function.
199199
///
200200
/// This function extracts the message data from the PushInfo and invokes the C# callback
201-
/// with the appropriate parameters.
201+
/// with the appropriate parameters using scoped lifetime management to prevent memory leaks.
202202
///
203203
/// # Parameters
204204
/// - `push_msg`: The push notification message to process.
@@ -211,50 +211,103 @@ pub unsafe extern "C-unwind" fn create_client(
211211
///
212212
/// The caller must ensure:
213213
/// - `pubsub_callback` is a valid function pointer to a properly implemented callback
214-
/// - Memory allocated during conversion is properly handled by C#
214+
/// - The callback copies data synchronously before returning
215+
///
216+
/// # Memory Safety
217+
/// This implementation uses scoped lifetime management instead of `std::mem::forget()`.
218+
/// Vec<u8> instances are kept alive during callback execution and automatically cleaned up
219+
/// when the function exits, preventing memory leaks.
215220
unsafe fn process_push_notification(push_msg: redis::PushInfo, pubsub_callback: PubSubCallback) {
216221
use redis::Value;
217222

218-
// Convert push_msg.data to extract message components
219-
let strings: Vec<(*const u8, i64)> = push_msg
223+
// Keep Vec<u8> instances alive for the duration of the callback
224+
let strings: Vec<Vec<u8>> = push_msg
220225
.data
221226
.into_iter()
222227
.filter_map(|value| match value {
223-
Value::BulkString(bytes) => {
224-
let len = bytes.len() as i64;
225-
let ptr = bytes.as_ptr();
226-
std::mem::forget(bytes); // Prevent deallocation - C# will handle it
227-
Some((ptr, len))
228+
Value::BulkString(bytes) => Some(bytes),
229+
_ => {
230+
logger_core::log(
231+
logger_core::Level::Warn,
232+
"pubsub",
233+
&format!("Unexpected value type in PubSub message: {:?}", value),
234+
);
235+
None
228236
}
229-
_ => None,
230237
})
231238
.collect();
232239

233-
// Extract pattern, channel, and message based on the push kind
234-
let ((pattern_ptr, pattern_len), (channel_ptr, channel_len), (message_ptr, message_len)) = {
235-
match strings.len() {
236-
2 => ((std::ptr::null(), 0), strings[0], strings[1]), // No pattern (exact subscription)
237-
3 => (strings[0], strings[1], strings[2]), // With pattern
238-
_ => return, // Invalid message format
240+
// Store the kind to avoid move issues
241+
let push_kind = push_msg.kind.clone();
242+
243+
// Validate message structure based on PushKind and convert to FFI kind
244+
let (pattern, channel, message, kind) = match (push_kind.clone(), strings.len()) {
245+
(redis::PushKind::Message, 2) => {
246+
// Regular message: [channel, message]
247+
(None, &strings[0], &strings[1], 0u32)
248+
}
249+
(redis::PushKind::PMessage, 3) => {
250+
// Pattern message: [pattern, channel, message]
251+
(Some(&strings[0]), &strings[1], &strings[2], 1u32)
252+
}
253+
(redis::PushKind::SMessage, 2) => {
254+
// Sharded message: [channel, message]
255+
(None, &strings[0], &strings[1], 2u32)
256+
}
257+
(redis::PushKind::Subscribe, 2) => {
258+
// Subscribe confirmation: [channel, count]
259+
(None, &strings[0], &strings[1], 3u32)
260+
}
261+
(redis::PushKind::PSubscribe, 3) => {
262+
// Pattern subscribe confirmation: [pattern, channel, count]
263+
(Some(&strings[0]), &strings[1], &strings[2], 4u32)
264+
}
265+
(redis::PushKind::SSubscribe, 2) => {
266+
// Sharded subscribe confirmation: [channel, count]
267+
(None, &strings[0], &strings[1], 5u32)
268+
}
269+
(redis::PushKind::Unsubscribe, 2) => {
270+
// Unsubscribe confirmation: [channel, count]
271+
(None, &strings[0], &strings[1], 6u32)
272+
}
273+
(redis::PushKind::PUnsubscribe, 3) => {
274+
// Pattern unsubscribe confirmation: [pattern, channel, count]
275+
(Some(&strings[0]), &strings[1], &strings[2], 7u32)
276+
}
277+
(redis::PushKind::SUnsubscribe, 2) => {
278+
// Sharded unsubscribe confirmation: [channel, count]
279+
(None, &strings[0], &strings[1], 8u32)
280+
}
281+
(redis::PushKind::Disconnection, _) => {
282+
logger_core::log(
283+
logger_core::Level::Info,
284+
"pubsub",
285+
"PubSub disconnection received",
286+
);
287+
return;
288+
}
289+
(kind, len) => {
290+
logger_core::log(
291+
logger_core::Level::Error,
292+
"pubsub",
293+
&format!(
294+
"Invalid PubSub message structure: kind={:?}, len={}",
295+
kind, len
296+
),
297+
);
298+
return;
239299
}
240300
};
241301

242-
// Convert PushKind to the FFI-safe enum
243-
let kind = match push_msg.kind {
244-
redis::PushKind::Disconnection => return, // Don't send disconnection to callback
245-
redis::PushKind::Message => 0u32, // PushMessage
246-
redis::PushKind::PMessage => 1u32, // PushPMessage
247-
redis::PushKind::SMessage => 2u32, // PushSMessage
248-
redis::PushKind::Subscribe => 3u32, // Subscription confirmation
249-
redis::PushKind::PSubscribe => 4u32, // Pattern subscription confirmation
250-
redis::PushKind::SSubscribe => 5u32, // Sharded subscription confirmation
251-
redis::PushKind::Unsubscribe => 6u32, // Unsubscription confirmation
252-
redis::PushKind::PUnsubscribe => 7u32, // Pattern unsubscription confirmation
253-
redis::PushKind::SUnsubscribe => 8u32, // Sharded unsubscription confirmation
254-
_ => return, // Unknown/unsupported kind
255-
};
302+
// Prepare pointers while keeping strings alive
303+
let pattern_ptr = pattern.map(|p| p.as_ptr()).unwrap_or(std::ptr::null());
304+
let pattern_len = pattern.map(|p| p.len() as i64).unwrap_or(0);
305+
let channel_ptr = channel.as_ptr();
306+
let channel_len = channel.len() as i64;
307+
let message_ptr = message.as_ptr();
308+
let message_len = message.len() as i64;
256309

257-
// Call the C# callback with the push notification data
310+
// Call callback while strings are still alive
258311
unsafe {
259312
pubsub_callback(
260313
kind,
@@ -266,6 +319,9 @@ unsafe fn process_push_notification(push_msg: redis::PushInfo, pubsub_callback:
266319
pattern_len,
267320
);
268321
}
322+
323+
// Vec<u8> instances are automatically cleaned up here
324+
// No memory leak, no use-after-free
269325
}
270326

271327
/// Closes the given client, deallocating it from the heap.

0 commit comments

Comments
 (0)