1717package com.google.firebase.ai.type
1818
1919import android.Manifest.permission.RECORD_AUDIO
20+ import android.annotation.SuppressLint
2021import android.content.pm.PackageManager
2122import android.media.AudioFormat
2223import android.media.AudioTrack
24+ import android.os.Process
25+ import android.os.StrictMode
26+ import android.os.StrictMode.ThreadPolicy
2327import android.util.Log
2428import androidx.annotation.RequiresPermission
2529import androidx.core.content.ContextCompat
30+ import com.google.firebase.BuildConfig
2631import com.google.firebase.FirebaseApp
2732import com.google.firebase.ai.common.JSON
2833import com.google.firebase.ai.common.util.CancelledCoroutineScope
@@ -34,21 +39,27 @@ import io.ktor.websocket.Frame
3439import io.ktor.websocket.close
3540import io.ktor.websocket.readBytes
3641import java.util.concurrent.ConcurrentLinkedQueue
42+ import java.util.concurrent.Executors
43+ import java.util.concurrent.ThreadFactory
3744import java.util.concurrent.atomic.AtomicBoolean
45+ import java.util.concurrent.atomic.AtomicLong
3846import kotlin.coroutines.CoroutineContext
47+ import kotlinx.coroutines.CoroutineName
3948import kotlinx.coroutines.CoroutineScope
49+ import kotlinx.coroutines.asCoroutineDispatcher
4050import kotlinx.coroutines.cancel
4151import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
52+ import kotlinx.coroutines.delay
4253import kotlinx.coroutines.flow.Flow
4354import kotlinx.coroutines.flow.buffer
4455import kotlinx.coroutines.flow.catch
4556import kotlinx.coroutines.flow.flow
57+ import kotlinx.coroutines.flow.flowOn
4658import kotlinx.coroutines.flow.launchIn
4759import kotlinx.coroutines.flow.onCompletion
4860import kotlinx.coroutines.flow.onEach
4961import kotlinx.coroutines.isActive
5062import kotlinx.coroutines.launch
51- import kotlinx.coroutines.yield
5263import kotlinx.serialization.ExperimentalSerializationApi
5364import kotlinx.serialization.Serializable
5465import kotlinx.serialization.encodeToString
@@ -65,11 +76,21 @@ internal constructor(
6576 private val firebaseApp: FirebaseApp ,
6677) {
6778 /* *
68- * Coroutine scope that we batch data on for [startAudioConversation] .
79+ * Coroutine scope that we batch data on for network related behavior .
6980 *
7081 * Makes it easy to stop all the work with [stopAudioConversation] by just cancelling the scope.
7182 */
72- private var scope = CancelledCoroutineScope
83+ private var networkScope = CancelledCoroutineScope
84+
85+ /* *
86+ * Coroutine scope that we batch data on for audio recording and playback.
87+ *
88+ * Separate from [networkScope] to ensure interchanging of dispatchers doesn't cause any deadlocks
89+ * or issues.
90+ *
91+ * Makes it easy to stop all the work with [stopAudioConversation] by just cancelling the scope.
92+ */
93+ private var audioScope = CancelledCoroutineScope
7394
7495 /* *
7596 * Playback audio data sent from the model.
@@ -159,16 +180,17 @@ internal constructor(
159180 }
160181
161182 FirebaseAIException .catchAsync {
162- if (scope .isActive) {
183+ if (networkScope.isActive || audioScope .isActive) {
163184 Log .w(
164185 TAG ,
165186 " startAudioConversation called after the recording has already started. " +
166187 " Call stopAudioConversation to close the previous connection."
167188 )
168189 return @catchAsync
169190 }
170-
171- scope = CoroutineScope (blockingDispatcher + childJob())
191+ networkScope =
192+ CoroutineScope (blockingDispatcher + childJob() + CoroutineName (" LiveSession Network" ))
193+ audioScope = CoroutineScope (audioDispatcher + childJob() + CoroutineName (" LiveSession Audio" ))
172194 audioHelper = AudioHelper .build()
173195
174196 recordUserAudio()
@@ -188,7 +210,8 @@ internal constructor(
188210 FirebaseAIException .catch {
189211 if (! startedReceiving.getAndSet(false )) return @catch
190212
191- scope.cancel()
213+ networkScope.cancel()
214+ audioScope.cancel()
192215 playBackQueue.clear()
193216
194217 audioHelper?.release()
@@ -231,7 +254,9 @@ internal constructor(
231254 )
232255 }
233256 ?.let { emit(it.toPublic()) }
234- yield ()
257+ // delay uses a different scheduler in the backend, so it's "stickier" in its
258+ // enforcement when compared to yield.
259+ delay(0 )
235260 }
236261 }
237262 .onCompletion { stopAudioConversation() }
@@ -258,7 +283,8 @@ internal constructor(
258283 FirebaseAIException .catch {
259284 if (! startedReceiving.getAndSet(false )) return @catch
260285
261- scope.cancel()
286+ networkScope.cancel()
287+ audioScope.cancel()
262288 playBackQueue.clear()
263289
264290 audioHelper?.release()
@@ -403,18 +429,24 @@ internal constructor(
403429 audioHelper
404430 ?.listenToRecording()
405431 ?.buffer(UNLIMITED )
432+ ?.flowOn(audioDispatcher)
406433 ?.accumulateUntil(MIN_BUFFER_SIZE )
407- ?.onEach { sendAudioRealtime(InlineData (it, " audio/pcm" )) }
434+ ?.onEach {
435+ sendAudioRealtime(InlineData (it, " audio/pcm" ))
436+ // delay uses a different scheduler in the backend, so it's "stickier" in its enforcement
437+ // when compared to yield.
438+ delay(0 )
439+ }
408440 ?.catch { throw FirebaseAIException .from(it) }
409- ?.launchIn(scope )
441+ ?.launchIn(networkScope )
410442 }
411443
412444 /* *
413445 * Processes responses from the model during an audio conversation.
414446 *
415447 * Audio messages are added to [playBackQueue].
416448 *
417- * Launched asynchronously on [scope ].
449+ * Launched asynchronously on [networkScope ].
418450 *
419451 * @param functionCallHandler A callback function that is invoked whenever the server receives a
420452 * function call.
@@ -471,18 +503,18 @@ internal constructor(
471503 }
472504 }
473505 }
474- .launchIn(scope )
506+ .launchIn(networkScope )
475507 }
476508
477509 /* *
478510 * Listens for playback data from the model and plays the audio.
479511 *
480512 * Polls [playBackQueue] for data, and calls [AudioHelper.playAudio] when data is received.
481513 *
482- * Launched asynchronously on [scope ].
514+ * Launched asynchronously on [networkScope ].
483515 */
484516 private fun listenForModelPlayback (enableInterruptions : Boolean = false) {
485- scope .launch {
517+ audioScope .launch {
486518 while (isActive) {
487519 val playbackData = playBackQueue.poll()
488520 if (playbackData == null ) {
@@ -491,14 +523,16 @@ internal constructor(
491523 if (! enableInterruptions) {
492524 audioHelper?.resumeRecording()
493525 }
494- yield ()
526+ // delay uses a different scheduler in the backend, so it's "stickier" in its enforcement
527+ // when compared to yield.
528+ delay(0 )
495529 } else {
496530 /* *
497531 * We pause the recording while the model is speaking to avoid interrupting it because of
498532 * no echo cancellation
499533 */
500534 // TODO(b/408223520): Conditionally pause when param is added
501- if (enableInterruptions != true ) {
535+ if (! enableInterruptions ) {
502536 audioHelper?.pauseRecording()
503537 }
504538 audioHelper?.playAudio(playbackData)
@@ -583,5 +617,38 @@ internal constructor(
583617 AudioFormat .CHANNEL_OUT_MONO ,
584618 AudioFormat .ENCODING_PCM_16BIT
585619 )
620+ @SuppressLint(" ThreadPoolCreation" )
621+ val audioDispatcher =
622+ Executors .newCachedThreadPool(AudioThreadFactory ()).asCoroutineDispatcher()
623+ }
624+ }
625+
626+ internal class AudioThreadFactory : ThreadFactory {
627+ private val threadCount = AtomicLong ()
628+ private val policy: ThreadPolicy = audioPolicy()
629+
630+ override fun newThread (task : Runnable ? ): Thread ? {
631+ val thread =
632+ DEFAULT .newThread {
633+ Process .setThreadPriority(Process .THREAD_PRIORITY_AUDIO )
634+ StrictMode .setThreadPolicy(policy)
635+ task?.run ()
636+ }
637+ thread.name = " Firebase Audio Thread #${threadCount.andIncrement} "
638+ return thread
639+ }
640+
641+ companion object {
642+ val DEFAULT : ThreadFactory = Executors .defaultThreadFactory()
643+
644+ private fun audioPolicy (): ThreadPolicy {
645+ val builder = ThreadPolicy .Builder ().detectNetwork()
646+
647+ if (BuildConfig .DEBUG ) {
648+ builder.penaltyDeath()
649+ }
650+
651+ return builder.penaltyLog().build()
652+ }
586653 }
587654}
0 commit comments