@@ -109,14 +109,23 @@ public abstract class ChannelFlow<T>(
109109 * For non-atomic start it is possible to observe the situation,
110110 * where the pipeline after the [flowOn] call successfully executes (mostly, its `onCompletion`)
111111 * handlers, while the pipeline before does not, because it was cancelled during its dispatch.
112- * Thus `onCompletion` and `finally` blocks won't be executed and it may lead to a different kinds of memory leaks.
112+ * Thus `onCompletion` and `finally` blocks won't be executed, and it may lead to a different kind of memory leaks.
113113 */
114114 public open fun produceImpl (scope : CoroutineScope ): ReceiveChannel <T > =
115- scope.produce(context, produceCapacity, onBufferOverflow, start = CoroutineStart .ATOMIC , block = collectToFun)
115+ produceImplInternal(scope, CoroutineStart .ATOMIC )
116+
117+ internal open fun produceImplInternal (scope : CoroutineScope , start : CoroutineStart ): ReceiveChannel <T > = scope.produce(context, produceCapacity, onBufferOverflow, start = start, block = collectToFun)
116118
117119 override suspend fun collect (collector : FlowCollector <T >): Unit =
118120 coroutineScope {
119- collector.emitAll(produceImpl(this ))
121+ // If upstream and collect have the same dispatcher, launch the `produce` coroutine undispatched.
122+ // This allows the collector to reliably subscribe to the flow before it starts emitting.
123+ val current = currentCoroutineContext()[ContinuationInterceptor ]
124+ val desired = context[ContinuationInterceptor ]
125+ val start = if (desired == null || desired == current) {
126+ CoroutineStart .UNDISPATCHED
127+ } else CoroutineStart .ATOMIC
128+ collector.emitAll(produceImplInternal(this , start))
120129 }
121130
122131 protected open fun additionalToStringProps (): String? = null
0 commit comments