Skip to content

Commit 17b51e8

Browse files
committed
WIP feat(client): experimental support for streaming SSE in live mode
1 parent 4d2c23a commit 17b51e8

File tree

5 files changed

+159
-5
lines changed

5 files changed

+159
-5
lines changed

packages/typescript-client/src/client.ts

Lines changed: 146 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import {
3939
TABLE_QUERY_PARAM,
4040
REPLICA_PARAM,
4141
FORCE_DISCONNECT_AND_REFRESH,
42+
EXPERIMENTAL_LIVE_SSE_QUERY_PARAM,
4243
} from './constants'
4344

4445
const RESERVED_PARAMS: Set<ReservedParamKeys> = new Set([
@@ -244,6 +245,11 @@ export interface ShapeStreamOptions<T = never> {
244245
*/
245246
subscribe?: boolean
246247

248+
/**
249+
* Experimental support for Server-Sent Events (SSE) for live updates.
250+
*/
251+
experimentalLiveSse?: boolean
252+
247253
signal?: AbortSignal
248254
fetchClient?: typeof fetch
249255
backoffOptions?: BackoffOptions
@@ -281,8 +287,9 @@ export interface ShapeStreamInterface<T extends Row<unknown> = Row> {
281287
}
282288

283289
/**
284-
* Reads updates to a shape from Electric using HTTP requests and long polling. Notifies subscribers
285-
* when new messages come in. Doesn't maintain any history of the
290+
* Reads updates to a shape from Electric using HTTP requests and long polling or
291+
* Server-Sent Events (SSE).
292+
* Notifies subscribers when new messages come in. Doesn't maintain any history of the
286293
* log but does keep track of the offset position and is the best way
287294
* to consume the HTTP `GET /v1/shape` api.
288295
*
@@ -297,6 +304,14 @@ export interface ShapeStreamInterface<T extends Row<unknown> = Row> {
297304
* })
298305
* ```
299306
*
307+
* To use Server-Sent Events (SSE) for real-time updates:
308+
* ```
309+
* const stream = new ShapeStream({
310+
* url: `http://localhost:3000/v1/shape`,
311+
* liveMode: 'sse'
312+
* })
313+
* ```
314+
*
300315
* To abort the stream, abort the `signal`
301316
* passed in via the `ShapeStreamOptions`.
302317
* ```
@@ -484,6 +499,26 @@ export class ShapeStream<T extends Row<unknown> = Row>
484499
}
485500
}
486501

502+
// If using SSE mode we handle the connection differently using the
503+
// this.#connectSSE method which wraps the EventSource API.
504+
if (this.#isUpToDate && this.options.experimentalLiveSse) {
505+
fetchUrl.searchParams.set(EXPERIMENTAL_LIVE_SSE_QUERY_PARAM, `true`)
506+
try {
507+
await this.#connectSSE(fetchUrl.toString())
508+
} catch (error) {
509+
if (error instanceof SSEConnectionAborted) {
510+
break
511+
}
512+
this.#sendErrorToSubscribers(
513+
error instanceof Error ? error : new Error(String(error))
514+
)
515+
throw error
516+
}
517+
// TODO: What should we do here? Is this the behaviour we want?
518+
// Skip the regular fetch and continue the loop to reconnect if needed
519+
continue
520+
}
521+
487522
let response!: Response
488523
try {
489524
response = await this.#fetchClient(fetchUrl.toString(), {
@@ -714,6 +749,108 @@ export class ShapeStream<T extends Row<unknown> = Row>
714749
this.#connected = false
715750
this.#schema = undefined
716751
}
752+
753+
/**
754+
* Connects to the server using Server-Sent Events.
755+
* Returns a promise that resolves when the connection is closed.
756+
*/
757+
async #connectSSE(url: string): Promise<void> {
758+
return new Promise<void>((resolve, reject) => {
759+
try {
760+
if (!this.#requestAbortController) {
761+
reject(
762+
new Error(
763+
`Request abort controller is not set - this should never happen`
764+
)
765+
)
766+
return
767+
}
768+
769+
if (this.#requestAbortController.signal.aborted) {
770+
reject(
771+
new SSEConnectionAborted(
772+
`Connection aborted before SSE connection established`
773+
)
774+
)
775+
return
776+
}
777+
778+
// Create an EventSource instance
779+
const eventSource = new EventSource(url)
780+
781+
// Set up event handlers
782+
eventSource.onopen = () => {
783+
this.#connected = true
784+
}
785+
786+
eventSource.onmessage = async (event: MessageEvent) => {
787+
try {
788+
if (event.data) {
789+
// Process the SSE message
790+
// Provide an empty schema object if schema is undefined, which it
791+
// should not be as we only get to SSE mode after being in normal mode
792+
// and getting a schema from a header then.
793+
// The event.data is a single JSON object, so we wrap it in an array
794+
// to be consistent with the way we parse the response from the HTTP API.
795+
// TODO: Is this needed?
796+
const batch = this.#messageParser.parse(
797+
`[${event.data}]`,
798+
this.#schema || {}
799+
)
800+
801+
if (batch.length > 0) {
802+
const lastMessage = batch[batch.length - 1]
803+
if (isUpToDateMessage(lastMessage)) {
804+
const upToDateMsg = lastMessage as typeof lastMessage & {
805+
headers: { global_last_seen_lsn: string }
806+
}
807+
this.#lastSyncedAt = Date.now()
808+
this.#isUpToDate = true
809+
this.#lastOffset =
810+
`${upToDateMsg.headers.global_last_seen_lsn}_0` as Offset
811+
// TODO: we also need the cache buster `cursor` value
812+
}
813+
814+
await this.#publish(batch)
815+
}
816+
}
817+
} catch (error) {
818+
// Handle parsing errors
819+
this.#sendErrorToSubscribers(
820+
error instanceof Error ? error : new Error(String(error))
821+
)
822+
}
823+
}
824+
825+
eventSource.onerror = (_error: Event) => {
826+
// Connection was closed or errored
827+
// EventSource would normally automatically reconnect but want to close the
828+
// connection and reconnect on the next outer loop iteration with the new
829+
// url and offset.
830+
// TODO: It may be that some errors we should elevate to the user
831+
eventSource.close()
832+
resolve()
833+
}
834+
835+
// Listen for abort signals
836+
const abortHandler = () => {
837+
eventSource.close()
838+
reject(new SSEConnectionAborted(`SSE connection aborted`))
839+
}
840+
841+
this.#requestAbortController.signal.addEventListener(
842+
`abort`,
843+
abortHandler,
844+
{ once: true }
845+
)
846+
} catch (error) {
847+
this.#sendErrorToSubscribers(
848+
error instanceof Error ? error : new Error(String(error))
849+
)
850+
reject(error)
851+
}
852+
})
853+
}
717854
}
718855

719856
/**
@@ -782,3 +919,10 @@ function convertWhereParamsToObj(
782919
}
783920
return allPgParams
784921
}
922+
923+
class SSEConnectionAborted extends Error {
924+
constructor(message: string) {
925+
super(message)
926+
this.name = `SSEConnectionAborted`
927+
}
928+
}

packages/typescript-client/src/constants.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,5 @@ export const TABLE_QUERY_PARAM = `table`
1212
export const WHERE_QUERY_PARAM = `where`
1313
export const REPLICA_PARAM = `replica`
1414
export const WHERE_PARAMS_PARAM = `params`
15+
export const EXPERIMENTAL_LIVE_SSE_QUERY_PARAM = `experimental_live_sse`
1516
export const FORCE_DISCONNECT_AND_REFRESH = `force-disconnect-and-refresh`

packages/typescript-client/test/client.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,7 @@ describe(`Shape`, () => {
391391
todo: `fail`,
392392
},
393393
fetchClient: async (input, _init) => {
394-
const url = new URL(input)
394+
const url = new URL(input instanceof Request ? input.url : input)
395395
if (url.searchParams.get(`todo`) === `fail`) {
396396
return new Response(undefined, {
397397
status: 401,
Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
{
22
"extends": "../../tsconfig.build.json",
33
"include": ["src/**/*"],
4-
"exclude": ["node_modules", "tests", "dist"]
4+
"exclude": ["node_modules", "tests", "dist"],
5+
"compilerOptions": {
6+
"lib": [
7+
"ESNext",
8+
"DOM",
9+
"dom.iterable"
10+
]
11+
}
512
}

packages/typescript-client/tsconfig.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414
/* Language and Environment */
1515
"target": "es2016" /* Set the JavaScript language version for emitted JavaScript and include compatible library declarations. */,
1616
"lib": [
17-
"ESNext"
17+
"ESNext",
18+
"DOM",
19+
"dom.iterable"
1820
] /* Specify a set of bundled library declaration files that describe the target runtime environment. */,
1921
"jsx": "preserve" /* Specify what JSX code is generated. */,
2022
// "experimentalDecorators": true, /* Enable experimental support for TC39 stage 2 draft decorators. */

0 commit comments

Comments
 (0)