Skip to content

Conversation

@kiarashtandem
Copy link

@kiarashtandem kiarashtandem commented Nov 21, 2025

Thread-Safe Refactor of NativeFlowSubscription

Overview

This refactor addresses critical thread-safety issues in NativeFlowSubscription that were causing crashes when publishers were deallocated on the iOS side while new values were being published from Kotlin Native Flow. The improvements ensure proper synchronization across all state mutations and prevent race conditions that could lead to crashes.

This refactor fixes Issue #228.

The Crash Scenario

Reported Issue

The crash occurred when a publisher was deallocated on the iOS side while a new value was being published from Kotlin Native Flow. The crash manifested with the following error:

thunk for @escaping @callee_guaranteed (@in_guaranteed OurKMPSdkState, @guaranteed @escaping @callee_guaranteed @substituted <A> () -> (@out A) for <OurKMPSdkKotlinUnit>, @in_guaranteed OurKMPSdkKotlinUnit) -> (@out OurKMPSdkKotlinUnit)

Stack Trace Analysis

The crash stack trace reveals the execution flow:

0  Combine                        0xb0620 Publishers.TryMap.Inner.receive(_:) + 44
1  Combine                        0xb0d7c protocol witness for Subscriber.receive(_:) in conformance Publishers.TryMap<A, B>.Inner<A1> + 20
2  KMPNativeCoroutinesCombine     0x6708 $s26KMPNativeCoroutinesCombine22NativeFlowSubscriptionC7requestyy0C011SubscribersO6DemandVFq0_x_q0_ycq0_tcfU_ + 332
3  OurApp                         0x185468 thunk for @escaping @callee_guaranteed...

The crash occurs in the Combine framework when NativeFlowSubscription attempts to deliver a value to a subscriber that has been deallocated or is in an invalid state. The key issue is at frame 2, where the subscription's callback is executing concurrently with deallocation.

Root Cause

The crash happens due to a race condition between:

  1. Kotlin Native Flow callback thread (from com.apple.root.default-qos or any background thread):

    • Executing onItem callback to deliver a new value
    • Accessing self.subscriber to call receive(_:)
  2. Main/UI thread (or any other thread):

    • Calling cancel() on the subscription
    • Setting subscriber = nil
    • Deallocating the subscription object

When these operations happen concurrently:

  • The callback captures a reference to subscriber (line 109 in old code: guard let subscriber = self.subscriber)
  • Meanwhile, cancel() sets subscriber = nil (line 69 in old code)
  • The callback then tries to call subscriber.receive(item) on a deallocated or nil subscriber
  • This causes a crash in Combine's internal state machine

Problem Statement

The original implementation had several critical thread-safety vulnerabilities:

1. Unprotected State Access in cancel()

Old Implementation:

func cancel() {
    subscriber = nil
    nativeFlow = nil
    _ = nativeCancellable?()
    nativeCancellable = nil
}

Problems:

  • No synchronization mechanism - could execute concurrently with callbacks
  • Direct mutation of subscriber without checking if callbacks are using it
  • Could set subscriber = nil while a callback is reading it
  • No protection against multiple concurrent cancel() calls

Race Condition Example:

Thread 1 (Kotlin Flow callback):          Thread 2 (UI/main thread):
--------------------------------          ----------------------------
1. guard let subscriber = self.subscriber
   [subscriber is valid]                   
                                          2. subscriber = nil
                                          3. [subscriber deallocated]
3. subscriber.receive(item)  ❌ CRASH!
   [accessing deallocated object]

2. Unprotected Subscriber Access in Callbacks

Old Implementation:

nativeCancellable = nativeFlow({ item, next, unit in
    guard let subscriber = self.subscriber else { return unit }
    let demand = subscriber.receive(item)  // ❌ Unsafe!
    self.semaphore.wait()
    defer { self.semaphore.signal() }
    self.demand -= 1
    self.demand += demand
    // ...
}, ...)

Problems:

  • subscriber is accessed before acquiring the lock
  • Between the guard check and receive() call, cancel() could set subscriber = nil
  • The subscriber could be deallocated while receive() is executing
  • State mutations (demand) are protected, but subscriber access is not

Race Condition Example:

Thread 1 (Kotlin Flow callback):          Thread 2 (cancel on any thread):
--------------------------------          ----------------------------
1. guard let subscriber = self.subscriber
   [subscriber is valid]
2. [lock not acquired yet]
                                          3. lock.lock()
                                          4. subscriber = nil
                                          5. lock.unlock()
6. subscriber.receive(item)  ❌ CRASH!
   [subscriber was set to nil]

3. Incomplete Lock Coverage

Old Implementation:

  • Only request() method was protected by DispatchSemaphore
  • cancel() had no synchronization at all
  • Callbacks had partial protection (only for demand updates, not subscriber access)

Problems:

  • Critical sections were not fully protected
  • Multiple entry points could modify shared state concurrently
  • No guarantee of atomic state transitions

4. Missing Lifecycle State Tracking

Old Implementation:

  • No explicit cancelled flag
  • No explicit finished flag
  • Could not prevent operations after termination

Problems:

  • Callbacks could execute after cancellation
  • Completion could be delivered multiple times
  • No way to check if subscription is still valid before operations

5. Unsafe Completion Handling

Old Implementation:

{ error, unit in
    if let error = error {
        self.subscriber?.receive(completion: .failure(error))  // ❌ Unsafe!
    } else {
        self.subscriber?.receive(completion: .finished)  // ❌ Unsafe!
    }
    return unit
}

Problems:

  • Subscriber accessed without lock
  • Could deliver completion after cancellation
  • Could deliver completion multiple times if called concurrently
  • No check for cancelled or finished state

Threading Model

Kotlin Native Flow Threading Behavior

Kotlin Native Flow callbacks can be invoked from any thread, including:

  • Background coroutine dispatchers (Dispatchers.Default, Dispatchers.IO)
  • Main thread (when using Dispatchers.Main)
  • Worker threads (from thread pools)
  • Any thread managed by the Kotlin/Native runtime

The specific thread depends on:

  • The coroutine context where the Flow is collected
  • The dispatcher used in the Flow chain (e.g., .flowOn(Dispatchers.Default))
  • The Kotlin/Native runtime's thread scheduling

Swift Combine Threading Requirements

Combine's Subscriber protocol has specific threading requirements:

  • receive(_:) can be called from any thread
  • However, the subscriber must remain valid for the duration of the call
  • If the subscription is cancelled, the subscriber should not receive further values
  • Completion should be delivered exactly once

The Concurrency Problem

The fundamental issue is that:

  1. Kotlin Native Flow callbacks (onItem, onComplete, onCancelled) execute on arbitrary threads
  2. Swift Combine operations (request(), cancel()) can execute on any thread
  3. These operations can execute concurrently, creating race conditions

Example Timeline:

Time    Thread A (Kotlin Flow)              Thread B (Swift/Combine)
----    ---------------------                ------------------------
T1      Flow emits value
T2      onItem callback starts
T3      guard let subscriber = self.subscriber
T4                                          cancel() called
T5                                          subscriber = nil
T6      subscriber.receive(item)  ❌ CRASH!

Solution: Comprehensive Thread Safety

Key Improvements

1. Comprehensive Locking with NSLock

Why NSLock over DispatchSemaphore?

  • NSLock is optimized for short critical sections (which we have)
  • DispatchSemaphore is better for waiting/signaling between threads
  • NSLock provides better performance for frequent lock/unlock operations

New Implementation:

private let lock = NSLock()

Lock Coverage:

  • request() method - state mutations protected, callbacks executed outside lock
  • cancel() method - fully protected
  • onItem callback - fully protected
  • onComplete callback - fully protected
  • onCancelled callback - fully protected

Important: While callbacks are protected by locks for state access, the actual callback invocation (e.g., start(flow:)) happens outside the lock to prevent deadlocks when callbacks synchronously invoke completion or error handlers.

2. State Flags for Lifecycle Management

New Implementation:

private var finished = false
private var cancelled = false

Benefits:

  • Explicit tracking of subscription lifecycle
  • Prevents operations after termination
  • Enables idempotent cancel() calls
  • Prevents duplicate completion delivery

Usage Pattern:

lock.lock()
if self.cancelled || self.finished || self.subscriber == nil {
    lock.unlock()
    return unit  // Early return - subscription is terminated
}
// ... perform operation
lock.unlock()

3. Thread-Safe Cancel Method

New Implementation:

func cancel() {
    var toCancel: NativeCancellable<Unit>?
    
    lock.lock()
    if cancelled {
        lock.unlock()
        return  // Idempotent - already cancelled
    }
    cancelled = true
    toCancel = nativeCancellable
    nativeCancellable = nil
    subscriber = nil
    pendingNext = nil
    nativeFlow = nil
    lock.unlock()
    
    _ = toCancel?()  // Execute outside lock to prevent deadlock
}

Key Improvements:

  1. Fully synchronized - all state mutations protected by lock
  2. Idempotent - multiple calls are safe
  3. Atomic state transition - cancelled flag set atomically with state cleanup
  4. Cancellation callback outside lock - prevents potential deadlocks
  5. Early return - prevents unnecessary work if already cancelled

How This Prevents the Crash:

Thread 1 (Kotlin Flow callback):          Thread 2 (cancel on any thread):
--------------------------------          ----------------------------
1. self.lock.lock()
2. if self.cancelled || ...              [waiting for lock]
   [checking state]
                                          3. lock.lock() [acquired]
                                          4. cancelled = true
                                          5. subscriber = nil
                                          6. lock.unlock()
7. [lock acquired - but cancelled=true]
8. if self.cancelled || ... ✅
9. return unit  ✅ SAFE - early return

4. Protected Subscriber Access in Callbacks

New Implementation:

{ [weak self] item, next, unit in
    guard let self else { return unit }
    
    var deliverTo: S?
    var callNext: (() -> Unit)?
    
    self.lock.lock()
    if self.cancelled || self.finished || self.subscriber == nil {
        self.lock.unlock()
        return unit  // ✅ Safe early return
    }
    
    if self.demand == .none {
        self.pendingNext = next
        self.lock.unlock()
        return unit
    }
    
    self.demand -= 1
    deliverTo = self.subscriber  // ✅ Capture inside lock
    self.lock.unlock()
    
    // ✅ Use captured reference outside lock
    let more = deliverTo?.receive(item) ?? .none
    
    self.lock.lock()
    self.demand += more
    if self.demand > .none {
        callNext = next
    } else {
        self.pendingNext = next
    }
    self.lock.unlock()
    
    if let callNext = callNext {
        return callNext()
    } else {
        return unit
    }
}

Key Improvements:

  1. Subscriber captured inside lock - ensures atomic check and capture
  2. State checked before access - cancelled, finished, and subscriber == nil all checked
  3. Subscriber used outside lock - prevents lock contention during receive() call
  4. Demand management protected - all demand mutations are synchronized
  5. Early returns - multiple exit points prevent invalid operations

How This Prevents the Crash:

Thread 1 (Kotlin Flow callback):          Thread 2 (cancel on any thread):
--------------------------------          ----------------------------
1. self.lock.lock() [acquired]
2. if self.cancelled || ...               [waiting for lock]
   [checking state - all valid]
3. deliverTo = self.subscriber
   [captured reference]
4. self.lock.unlock()
                                          5. lock.lock() [acquired]
                                          6. cancelled = true
                                          7. subscriber = nil
                                          8. lock.unlock()
9. deliverTo?.receive(item)  ✅ SAFE!
   [using captured reference, not self.subscriber]

5. Weak Self References

New Implementation:

{ [weak self] item, next, unit in
    guard let self else { return unit }
    // ...
}

Benefits:

  • Prevents retain cycles
  • Allows subscription to be deallocated if owner is released
  • Early return if subscription is deallocated
  • Memory safety

7. Thread-Safe Completion Handling

New Implementation:

{ [weak self] maybeError, unit in
    guard let self else { return unit }
    
    var finishWith: Subscribers.Completion<Failure>?
    var toNotify: S?
    
    self.lock.lock()
    if !self.cancelled && !self.finished {
        toNotify = self.subscriber
        self.subscriber = nil  // ✅ Clear to prevent further access
        self.finished = true   // ✅ Mark as finished
        finishWith = (maybeError != nil) ? .failure(maybeError!) : .finished
    }
    self.lock.unlock()
    
    if let sub = toNotify, let completion = finishWith {
        sub.receive(completion: completion)  // ✅ Deliver outside lock
    }
    return unit
}

Key Improvements:

  1. Protected by lock - all state access synchronized
  2. Checks cancellation/completion state - prevents duplicate delivery
  3. Subscriber cleared - prevents further access after completion
  4. Completion delivered outside lock - prevents lock contention
  5. Idempotent - multiple calls are safe (only first one delivers)

How This Prevents Issues:

Thread 1 (onComplete callback):           Thread 2 (cancel):
--------------------------------          ----------------------------
1. self.lock.lock() [acquired]
2. if !self.cancelled && !self.finished   [waiting for lock]
   [state is valid]
3. toNotify = self.subscriber
4. self.subscriber = nil
5. self.finished = true
6. self.lock.unlock()
                                          7. lock.lock() [acquired]
                                          8. if cancelled { return }
                                          9. [but finished=true, so early return]
                                          10. lock.unlock()
11. toNotify?.receive(completion)  ✅ SAFE!

Detailed Crash Prevention Analysis

Scenario 1: Publisher Deallocated During Value Emission

Old Code Behavior:

1. Kotlin Flow emits value on background thread
2. onItem callback starts executing
3. guard let subscriber = self.subscriber  [subscriber is valid]
4. [No lock - subscriber could be set to nil here]
5. Main thread calls cancel()
6. subscriber = nil  [no synchronization]
7. onItem callback continues: subscriber.receive(item)  ❌ CRASH

New Code Behavior:

1. Kotlin Flow emits value on background thread
2. onItem callback starts executing
3. self.lock.lock()  [acquire lock]
4. if self.cancelled || self.finished || self.subscriber == nil
   [check state - all valid]
5. deliverTo = self.subscriber  [capture reference inside lock]
6. self.lock.unlock()
7. Main thread calls cancel()
8. lock.lock()  [wait for lock]
9. [waiting...]
10. [lock acquired after callback releases it]
11. cancelled = true
12. subscriber = nil
13. lock.unlock()
14. deliverTo?.receive(item)  ✅ SAFE - using captured reference

Scenario 2: Concurrent Cancel Calls

Old Code Behavior:

Thread 1: cancel() -> subscriber = nil
Thread 2: cancel() -> subscriber = nil  [already nil, but no check]
         -> nativeCancellable?()  [could be called twice]

New Code Behavior:

Thread 1: cancel() -> lock.lock() -> cancelled = true -> lock.unlock()
Thread 2: cancel() -> lock.lock() -> if cancelled { return }  ✅ SAFE

Scenario 3: Completion After Cancellation

Old Code Behavior:

1. cancel() called -> subscriber = nil
2. onComplete callback executes
3. self.subscriber?.receive(completion)  [nil, but no explicit check]
4. [Could deliver completion to deallocated subscriber]

New Code Behavior:

1. cancel() called -> lock.lock() -> cancelled = true -> lock.unlock()
2. onComplete callback executes
3. lock.lock() -> if !self.cancelled && !self.finished
   [cancelled=true, so early return]  ✅ SAFE

Scenario 4: Multiple Completion Attempts

Old Code Behavior:

1. onComplete callback 1: subscriber?.receive(completion: .finished)
2. onComplete callback 2: subscriber?.receive(completion: .finished)
   [Could deliver completion twice]

New Code Behavior:

1. onComplete callback 1: lock.lock() -> finished = true -> deliver completion
2. onComplete callback 2: lock.lock() -> if !finished { ... }
   [finished=true, so early return]  ✅ SAFE - only one delivery

Performance Considerations

Lock Granularity

The implementation uses fine-grained locking:

  • Lock is held only for state mutations
  • Subscriber method calls (receive()) happen outside the lock
  • This minimizes lock contention and improves performance

Lock-Free Operations

Some operations are intentionally lock-free:

  • Reading captured references (e.g., deliverTo?.receive(item))
  • Executing cancellation callbacks
  • Delivering completion to captured subscriber

This design balances safety (all state mutations protected) with performance (minimal lock hold time).

Migration Notes

This is a non-breaking change in terms of public API. The public interface remains unchanged:

public func createPublisher<Output, Failure: Error, Unit>(
    for nativeFlow: @escaping NativeFlow<Output, Failure, Unit>
) -> AnyPublisher<Output, Failure>

Existing code using createPublisher(for:) will automatically benefit from the improved thread safety without requiring any changes.

Related Issues

Testing

All tests are passing for this PR, including:

  • Thread-safety tests - Verifying concurrent access scenarios
  • Completion tests - Including testCompletionWithError which validates synchronous completion callbacks
  • Cancellation tests - Ensuring proper cleanup and idempotent cancellation
  • Integration tests - Real-world usage scenarios with Kotlin Native Flow

Summary

The thread-safety refactor transforms NativeFlowSubscription from a vulnerable implementation with multiple race conditions into a robust, thread-safe component that:

  1. Eliminates race conditions through comprehensive locking
  2. Prevents crashes by ensuring subscribers are never accessed after deallocation
  3. Prevents deadlocks by releasing locks before invoking callbacks
  4. Provides correct semantics with proper lifecycle management
  5. Maintains performance through fine-grained locking
  6. Ensures memory safety with weak references and proper cleanup

The key insight is that Kotlin Native Flow callbacks can execute on any thread, and they can execute concurrently with Swift Combine operations. By protecting all state mutations with a single lock and using proper lifecycle flags, we ensure that the subscription remains in a consistent state regardless of thread interleaving. Additionally, by releasing the lock before invoking callbacks, we prevent deadlocks that could occur when callbacks synchronously invoke completion or error handlers.

@kiarashtandem kiarashtandem changed the title Refactor NativeFlowSubscription To Become Thread Safe Thread-Safe Refactor of NativeFlowSubscription Nov 21, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Crash "thunk for @escaping @callee_guaranteed (@in_guaranteed"

1 participant