Thread-Safe Refactor of NativeFlowSubscription #237
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Thread-Safe Refactor of NativeFlowSubscription
Overview
This refactor addresses critical thread-safety issues in
NativeFlowSubscriptionthat were causing crashes when publishers were deallocated on the iOS side while new values were being published from Kotlin Native Flow. The improvements ensure proper synchronization across all state mutations and prevent race conditions that could lead to crashes.This refactor fixes Issue #228.
The Crash Scenario
Reported Issue
The crash occurred when a publisher was deallocated on the iOS side while a new value was being published from Kotlin Native Flow. The crash manifested with the following error:
Stack Trace Analysis
The crash stack trace reveals the execution flow:
The crash occurs in the Combine framework when
NativeFlowSubscriptionattempts to deliver a value to a subscriber that has been deallocated or is in an invalid state. The key issue is at frame 2, where the subscription's callback is executing concurrently with deallocation.Root Cause
The crash happens due to a race condition between:
Kotlin Native Flow callback thread (from
com.apple.root.default-qosor any background thread):onItemcallback to deliver a new valueself.subscriberto callreceive(_:)Main/UI thread (or any other thread):
cancel()on the subscriptionsubscriber = nilWhen these operations happen concurrently:
subscriber(line 109 in old code:guard let subscriber = self.subscriber)cancel()setssubscriber = nil(line 69 in old code)subscriber.receive(item)on a deallocated or nil subscriberProblem Statement
The original implementation had several critical thread-safety vulnerabilities:
1. Unprotected State Access in
cancel()Old Implementation:
Problems:
subscriberwithout checking if callbacks are using itsubscriber = nilwhile a callback is reading itcancel()callsRace Condition Example:
2. Unprotected Subscriber Access in Callbacks
Old Implementation:
Problems:
subscriberis accessed before acquiring the lockguardcheck andreceive()call,cancel()could setsubscriber = nilreceive()is executingdemand) are protected, but subscriber access is notRace Condition Example:
3. Incomplete Lock Coverage
Old Implementation:
request()method was protected byDispatchSemaphorecancel()had no synchronization at alldemandupdates, not subscriber access)Problems:
4. Missing Lifecycle State Tracking
Old Implementation:
cancelledflagfinishedflagProblems:
5. Unsafe Completion Handling
Old Implementation:
Problems:
cancelledorfinishedstateThreading Model
Kotlin Native Flow Threading Behavior
Kotlin Native Flow callbacks can be invoked from any thread, including:
Dispatchers.Default,Dispatchers.IO)Dispatchers.Main)The specific thread depends on:
.flowOn(Dispatchers.Default))Swift Combine Threading Requirements
Combine's
Subscriberprotocol has specific threading requirements:receive(_:)can be called from any threadThe Concurrency Problem
The fundamental issue is that:
onItem,onComplete,onCancelled) execute on arbitrary threadsrequest(),cancel()) can execute on any threadExample Timeline:
Solution: Comprehensive Thread Safety
Key Improvements
1. Comprehensive Locking with NSLock
Why NSLock over DispatchSemaphore?
New Implementation:
Lock Coverage:
request()method - state mutations protected, callbacks executed outside lockcancel()method - fully protectedonItemcallback - fully protectedonCompletecallback - fully protectedonCancelledcallback - fully protectedImportant: While callbacks are protected by locks for state access, the actual callback invocation (e.g.,
start(flow:)) happens outside the lock to prevent deadlocks when callbacks synchronously invoke completion or error handlers.2. State Flags for Lifecycle Management
New Implementation:
Benefits:
cancel()callsUsage Pattern:
3. Thread-Safe Cancel Method
New Implementation:
Key Improvements:
cancelledflag set atomically with state cleanupHow This Prevents the Crash:
4. Protected Subscriber Access in Callbacks
New Implementation:
Key Improvements:
cancelled,finished, andsubscriber == nilall checkedreceive()calldemandmutations are synchronizedHow This Prevents the Crash:
5. Weak Self References
New Implementation:
Benefits:
7. Thread-Safe Completion Handling
New Implementation:
Key Improvements:
How This Prevents Issues:
Detailed Crash Prevention Analysis
Scenario 1: Publisher Deallocated During Value Emission
Old Code Behavior:
New Code Behavior:
Scenario 2: Concurrent Cancel Calls
Old Code Behavior:
New Code Behavior:
Scenario 3: Completion After Cancellation
Old Code Behavior:
New Code Behavior:
Scenario 4: Multiple Completion Attempts
Old Code Behavior:
New Code Behavior:
Performance Considerations
Lock Granularity
The implementation uses fine-grained locking:
receive()) happen outside the lockLock-Free Operations
Some operations are intentionally lock-free:
deliverTo?.receive(item))This design balances safety (all state mutations protected) with performance (minimal lock hold time).
Migration Notes
This is a non-breaking change in terms of public API. The public interface remains unchanged:
Existing code using
createPublisher(for:)will automatically benefit from the improved thread safety without requiring any changes.Related Issues
Testing
All tests are passing for this PR, including:
testCompletionWithErrorwhich validates synchronous completion callbacksSummary
The thread-safety refactor transforms
NativeFlowSubscriptionfrom a vulnerable implementation with multiple race conditions into a robust, thread-safe component that:The key insight is that Kotlin Native Flow callbacks can execute on any thread, and they can execute concurrently with Swift Combine operations. By protecting all state mutations with a single lock and using proper lifecycle flags, we ensure that the subscription remains in a consistent state regardless of thread interleaving. Additionally, by releasing the lock before invoking callbacks, we prevent deadlocks that could occur when callbacks synchronously invoke completion or error handlers.