Skip to content

Commit 35e409b

Browse files
authored
Ensure detaching threads when switching to a new one (#709)
* implement detachRun (tests failing) * clean up and add unit tests * v0.0.42-alpha.0 * ensure this.activeRunDetach$ is not undefined * v0.0.42-alpha.1
1 parent d400078 commit 35e409b

File tree

7 files changed

+212
-10
lines changed

7 files changed

+212
-10
lines changed

sdks/typescript/packages/cli/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"name": "create-ag-ui-app",
33
"author": "Markus Ecker <markus.ecker@gmail.com>",
4-
"version": "0.0.41",
4+
"version": "0.0.42-alpha.1",
55
"private": false,
66
"publishConfig": {
77
"access": "public"

sdks/typescript/packages/client/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"name": "@ag-ui/client",
33
"author": "Markus Ecker <markus.ecker@gmail.com>",
4-
"version": "0.0.41",
4+
"version": "0.0.42-alpha.1",
55
"private": false,
66
"publishConfig": {
77
"access": "public"

sdks/typescript/packages/client/src/agent/__tests__/agent-result.test.ts

Lines changed: 165 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,11 @@ import {
1010
MessagesSnapshotEvent,
1111
RunFinishedEvent,
1212
RunStartedEvent,
13+
TextMessageStartEvent,
14+
TextMessageContentEvent,
15+
TextMessageEndEvent,
1316
} from "@ag-ui/core";
14-
import { Observable, of } from "rxjs";
17+
import { Observable, of, Subject } from "rxjs";
1518

1619
// Mock uuid module
1720
jest.mock("uuid", () => ({
@@ -59,6 +62,21 @@ class TestAgent extends AbstractAgent {
5962
}
6063
}
6164

65+
class StreamingTestAgent extends AbstractAgent {
66+
private eventSubject?: Subject<BaseEvent>;
67+
68+
setEventSubject(subject: Subject<BaseEvent>) {
69+
this.eventSubject = subject;
70+
}
71+
72+
run(input: RunAgentInput): Observable<BaseEvent> {
73+
if (!this.eventSubject) {
74+
throw new Error("eventSubject not set");
75+
}
76+
return this.eventSubject.asObservable();
77+
}
78+
}
79+
6280
describe("Agent Result", () => {
6381
let agent: TestAgent;
6482

@@ -582,4 +600,150 @@ describe("Agent Result", () => {
582600
expect(result.result).toMatchObject(complexResult);
583601
});
584602
});
603+
604+
describe("run detachment", () => {
605+
let streamingAgent: StreamingTestAgent;
606+
607+
beforeEach(() => {
608+
streamingAgent = new StreamingTestAgent({ threadId: "thread-detach" });
609+
});
610+
611+
it("finalizes immediately when detached", async () => {
612+
const subject = new Subject<BaseEvent>();
613+
streamingAgent.setEventSubject(subject);
614+
const onRunFinalized = jest.fn();
615+
616+
const runPromise = streamingAgent.runAgent({}, { onRunFinalized });
617+
await waitForAsyncNotifications();
618+
619+
subject.next({
620+
type: EventType.RUN_STARTED,
621+
threadId: "thread-detach",
622+
runId: "run-detach",
623+
} as RunStartedEvent);
624+
625+
await streamingAgent.detachActiveRun();
626+
await runPromise;
627+
subject.complete();
628+
629+
expect(onRunFinalized).toHaveBeenCalledTimes(1);
630+
});
631+
632+
it("ignores events emitted after detaching", async () => {
633+
const subject = new Subject<BaseEvent>();
634+
streamingAgent.setEventSubject(subject);
635+
const onMessagesChanged = jest.fn();
636+
637+
const runPromise = streamingAgent.runAgent({}, { onMessagesChanged });
638+
await waitForAsyncNotifications();
639+
const initialMessageCount = streamingAgent.messages.length;
640+
641+
subject.next({
642+
type: EventType.RUN_STARTED,
643+
threadId: "thread-detach",
644+
runId: "run-detach",
645+
} as RunStartedEvent);
646+
647+
const detachPromise = streamingAgent.detachActiveRun();
648+
649+
subject.next({
650+
type: EventType.TEXT_MESSAGE_START,
651+
messageId: "msg-after-detach",
652+
role: "assistant",
653+
} as TextMessageStartEvent);
654+
subject.next({
655+
type: EventType.TEXT_MESSAGE_CONTENT,
656+
messageId: "msg-after-detach",
657+
delta: "Should be ignored",
658+
} as TextMessageContentEvent);
659+
subject.next({
660+
type: EventType.TEXT_MESSAGE_END,
661+
messageId: "msg-after-detach",
662+
} as TextMessageEndEvent);
663+
664+
subject.complete();
665+
await Promise.all([detachPromise, runPromise]);
666+
667+
expect(streamingAgent.messages.length).toBe(initialMessageCount);
668+
expect(onMessagesChanged).not.toHaveBeenCalled();
669+
});
670+
671+
it("can start a new run on another thread after detaching", async () => {
672+
const firstSubject = new Subject<BaseEvent>();
673+
streamingAgent.setEventSubject(firstSubject);
674+
675+
const firstRunPromise = streamingAgent.runAgent();
676+
await waitForAsyncNotifications();
677+
678+
firstSubject.next({
679+
type: EventType.RUN_STARTED,
680+
threadId: "thread-detach",
681+
runId: "run-1",
682+
} as RunStartedEvent);
683+
684+
await streamingAgent.detachActiveRun();
685+
firstSubject.complete();
686+
await firstRunPromise;
687+
688+
streamingAgent.threadId = "thread-detach-2";
689+
const secondSubject = new Subject<BaseEvent>();
690+
streamingAgent.setEventSubject(secondSubject);
691+
692+
const secondRunPromise = streamingAgent.runAgent();
693+
await waitForAsyncNotifications();
694+
695+
secondSubject.next({
696+
type: EventType.RUN_STARTED,
697+
threadId: "thread-detach-2",
698+
runId: "run-2",
699+
} as RunStartedEvent);
700+
secondSubject.next({
701+
type: EventType.TEXT_MESSAGE_START,
702+
messageId: "msg-new",
703+
role: "assistant",
704+
} as TextMessageStartEvent);
705+
secondSubject.next({
706+
type: EventType.TEXT_MESSAGE_CONTENT,
707+
messageId: "msg-new",
708+
delta: "hello",
709+
} as TextMessageContentEvent);
710+
secondSubject.next({
711+
type: EventType.TEXT_MESSAGE_END,
712+
messageId: "msg-new",
713+
} as TextMessageEndEvent);
714+
secondSubject.next({
715+
type: EventType.RUN_FINISHED,
716+
threadId: "thread-detach-2",
717+
runId: "run-2",
718+
} as RunFinishedEvent);
719+
secondSubject.complete();
720+
721+
await secondRunPromise;
722+
await waitForAsyncNotifications();
723+
724+
expect(streamingAgent.messages.some((message) => message.id === "msg-new")).toBe(true);
725+
});
726+
727+
it("resolve order: detachActiveRun waits for finalize", async () => {
728+
const subject = new Subject<BaseEvent>();
729+
streamingAgent.setEventSubject(subject);
730+
const order: string[] = [];
731+
732+
const runPromise = streamingAgent.runAgent({}, { onRunFinalized: () => order.push("finalized") });
733+
await waitForAsyncNotifications();
734+
735+
subject.next({
736+
type: EventType.RUN_STARTED,
737+
threadId: "thread-detach",
738+
runId: "run-order",
739+
} as RunStartedEvent);
740+
741+
const detachPromise = streamingAgent.detachActiveRun().then(() => order.push("awaited"));
742+
subject.complete();
743+
744+
await Promise.all([runPromise, detachPromise]);
745+
746+
expect(order).toEqual(["finalized", "awaited"]);
747+
});
748+
});
585749
});

sdks/typescript/packages/client/src/agent/agent.ts

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ import { structuredClone_ } from "@/utils";
77
import { compareVersions } from "compare-versions";
88
import { catchError, map, tap } from "rxjs/operators";
99
import { finalize } from "rxjs/operators";
10-
import { pipe, Observable, from, of, EMPTY } from "rxjs";
10+
import { takeUntil } from "rxjs/operators";
11+
import { pipe, Observable, from, of, EMPTY, Subject } from "rxjs";
1112
import { verifyEvents } from "@/verify";
1213
import { convertToLegacyEvents } from "@/legacy/convert";
1314
import { LegacyRuntimeProtocolEvent } from "@/legacy/types";
@@ -38,6 +39,9 @@ export abstract class AbstractAgent {
3839
public subscribers: AgentSubscriber[] = [];
3940
public isRunning: boolean = false;
4041
private middlewares: Middleware[] = [];
42+
// Emits to immediately detach from the active run (stop processing its stream)
43+
private activeRunDetach$?: Subject<void>;
44+
private activeRunCompletionPromise?: Promise<void>;
4145

4246
get maxVersion() {
4347
return packageJson.version;
@@ -105,6 +109,13 @@ export abstract class AbstractAgent {
105109

106110
await this.onInitialize(input, subscribers);
107111

112+
// Per-run detachment signal + completion promise
113+
this.activeRunDetach$ = new Subject<void>();
114+
let resolveActiveRunCompletion: (() => void) | undefined;
115+
this.activeRunCompletionPromise = new Promise<void>((resolve) => {
116+
resolveActiveRunCompletion = resolve;
117+
});
118+
108119
const pipeline = pipe(
109120
() => {
110121
// Build middleware chain using reduceRight so middlewares can intercept runs.
@@ -124,6 +135,8 @@ export abstract class AbstractAgent {
124135
},
125136
transformChunks(this.debug),
126137
verifyEvents(this.debug),
138+
// Stop processing immediately when this run is detached
139+
(source$) => source$.pipe(takeUntil(this.activeRunDetach$!)),
127140
(source$) => this.apply(input, source$, subscribers),
128141
(source$) => this.processApplyEvents(input, source$, subscribers),
129142
catchError((error) => {
@@ -133,6 +146,10 @@ export abstract class AbstractAgent {
133146
finalize(() => {
134147
this.isRunning = false;
135148
void this.onFinalize(input, subscribers);
149+
resolveActiveRunCompletion?.();
150+
resolveActiveRunCompletion = undefined;
151+
this.activeRunCompletionPromise = undefined;
152+
this.activeRunDetach$ = undefined;
136153
}),
137154
);
138155

@@ -172,10 +189,19 @@ export abstract class AbstractAgent {
172189

173190
await this.onInitialize(input, subscribers);
174191

192+
// Per-run detachment signal + completion promise
193+
this.activeRunDetach$ = new Subject<void>();
194+
let resolveActiveRunCompletion: (() => void) | undefined;
195+
this.activeRunCompletionPromise = new Promise<void>((resolve) => {
196+
resolveActiveRunCompletion = resolve;
197+
});
198+
175199
const pipeline = pipe(
176200
() => this.connect(input),
177201
transformChunks(this.debug),
178202
verifyEvents(this.debug),
203+
// Stop processing immediately when this run is detached
204+
(source$) => source$.pipe(takeUntil(this.activeRunDetach$!)),
179205
(source$) => this.apply(input, source$, subscribers),
180206
(source$) => this.processApplyEvents(input, source$, subscribers),
181207
catchError((error) => {
@@ -188,6 +214,10 @@ export abstract class AbstractAgent {
188214
finalize(() => {
189215
this.isRunning = false;
190216
void this.onFinalize(input, subscribers);
217+
resolveActiveRunCompletion?.();
218+
resolveActiveRunCompletion = undefined;
219+
this.activeRunCompletionPromise = undefined;
220+
this.activeRunDetach$ = undefined;
191221
}),
192222
);
193223

@@ -203,6 +233,16 @@ export abstract class AbstractAgent {
203233

204234
public abortRun() {}
205235

236+
public async detachActiveRun(): Promise<void> {
237+
if (!this.activeRunDetach$) {
238+
return;
239+
}
240+
const completion = this.activeRunCompletionPromise ?? Promise.resolve();
241+
this.activeRunDetach$.next();
242+
this.activeRunDetach$?.complete();
243+
await completion;
244+
}
245+
206246
protected apply(
207247
input: RunAgentInput,
208248
events$: Observable<BaseEvent>,
@@ -246,9 +286,7 @@ export abstract class AbstractAgent {
246286

247287
protected prepareRunAgentInput(parameters?: RunAgentParameters): RunAgentInput {
248288
const clonedMessages = structuredClone_(this.messages) as Message[];
249-
const messagesWithoutActivity = clonedMessages.filter(
250-
(message) => message.role !== "activity",
251-
);
289+
const messagesWithoutActivity = clonedMessages.filter((message) => message.role !== "activity");
252290

253291
return {
254292
threadId: this.threadId,

sdks/typescript/packages/core/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"name": "@ag-ui/core",
33
"author": "Markus Ecker <markus.ecker@gmail.com>",
4-
"version": "0.0.41",
4+
"version": "0.0.42-alpha.1",
55
"private": false,
66
"publishConfig": {
77
"access": "public"

sdks/typescript/packages/encoder/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"name": "@ag-ui/encoder",
33
"author": "Markus Ecker <markus.ecker@gmail.com>",
4-
"version": "0.0.41",
4+
"version": "0.0.42-alpha.1",
55
"private": false,
66
"publishConfig": {
77
"access": "public"

sdks/typescript/packages/proto/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"name": "@ag-ui/proto",
33
"author": "Markus Ecker <markus.ecker@gmail.com>",
4-
"version": "0.0.41",
4+
"version": "0.0.42-alpha.1",
55
"private": false,
66
"publishConfig": {
77
"access": "public"

0 commit comments

Comments
 (0)