Skip to content

Commit 4b51151

Browse files
committed
electric: add experimental support for streaming SSE in live mode.
1 parent e086815 commit 4b51151

File tree

6 files changed

+311
-42
lines changed

6 files changed

+311
-42
lines changed

packages/sync-service/lib/electric/plug/serve_shape_plug.ex

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ defmodule Electric.Plug.ServeShapePlug do
3131
all_params =
3232
Map.merge(conn.query_params, conn.path_params)
3333
|> Map.update("live", "false", &(&1 != "false"))
34+
|> Map.update("experimental_live_sse", "false", &(&1 != "false"))
3435

3536
case Api.validate(api, all_params) do
3637
{:ok, request} ->

packages/sync-service/lib/electric/shapes/api.ex

Lines changed: 124 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ defmodule Electric.Shapes.Api do
5656
stale_age: 300,
5757
send_cache_headers?: true,
5858
encoder: Electric.Shapes.Api.Encoder.JSON,
59+
sse_encoder: Electric.Shapes.Api.Encoder.SSE,
5960
configured: false
6061
]
6162

@@ -484,7 +485,7 @@ defmodule Electric.Shapes.Api do
484485
if live? && Enum.take(log, 1) == [] do
485486
request
486487
|> update_attrs(%{ot_is_immediate_response: false})
487-
|> hold_until_change()
488+
|> handle_live_request()
488489
else
489490
up_to_date_lsn =
490491
if live? do
@@ -497,9 +498,9 @@ defmodule Electric.Shapes.Api do
497498
max(global_last_seen_lsn, chunk_end_offset.tx_offset)
498499
end
499500

500-
body = Stream.concat([log, maybe_up_to_date(request, up_to_date_lsn)])
501+
log_stream = Stream.concat(log, maybe_up_to_date(request, up_to_date_lsn))
501502

502-
%{response | chunked: true, body: encode_log(request, body)}
503+
%{response | chunked: true, body: encode_log(request, log_stream)}
503504
end
504505

505506
{:error, error} ->
@@ -513,6 +514,13 @@ defmodule Electric.Shapes.Api do
513514
end
514515
end
515516

517+
defp handle_live_request(%Request{params: %{experimental_live_sse: true}} = request) do
518+
stream_sse_events(request)
519+
end
520+
defp handle_live_request(%Request{} = request) do
521+
hold_until_change(request)
522+
end
523+
516524
defp hold_until_change(%Request{} = request) do
517525
%{
518526
new_changes_ref: ref,
@@ -549,10 +557,107 @@ defmodule Electric.Shapes.Api do
549557
end
550558
end
551559

552-
defp clean_up_change_listener(%Request{handle: shape_handle} = request)
553-
when not is_nil(shape_handle) do
554-
%{api: %{registry: registry}} = request
555-
Registry.unregister(registry, shape_handle)
560+
defp stream_sse_events(%Request{} = request) do
561+
%{
562+
new_changes_ref: ref,
563+
handle: shape_handle,
564+
api: %{sse_timeout: sse_timeout}
565+
} = request
566+
567+
Logger.debug("Client #{inspect(self())} is streaming SSE for changes to #{shape_handle}")
568+
569+
# Set up timer for SSE timeout
570+
timer_ref = Process.send_after(self(), {:sse_timeout, ref}, sse_timeout)
571+
572+
# Stream changes as SSE events for the duration of the timer.
573+
sse_event_stream = Stream.resource(
574+
fn ->
575+
request
576+
end,
577+
&next_sse_event/1,
578+
fn _ ->
579+
Process.cancel_timer(timer_ref)
580+
end
581+
)
582+
583+
response = %{request.response | chunked: true, body: sse_event_stream}
584+
585+
%{response | trace_attrs: Map.put(response.trace_attrs || %{}, :ot_is_sse_response, true)}
586+
end
587+
588+
defp next_sse_event(:done), do: {:halt, :done}
589+
590+
defp next_sse_event(%Request{} = request) do
591+
%{
592+
api: api,
593+
handle: shape_handle,
594+
new_changes_ref: ref
595+
} = request
596+
597+
receive do
598+
{^ref, :new_changes, latest_log_offset} ->
599+
updated_request =
600+
%{request | last_offset: latest_log_offset}
601+
|> determine_global_last_seen_lsn()
602+
|> determine_log_chunk_offset()
603+
|> determine_up_to_date()
604+
605+
case Shapes.get_merged_log_stream(
606+
updated_request.api,
607+
shape_handle,
608+
since: updated_request.params.offset,
609+
up_to: updated_request.chunk_end_offset
610+
) do
611+
{:ok, log} ->
612+
up_to_date_lsn = updated_request.chunk_end_offset.tx_offset
613+
up_to_date_messages = maybe_up_to_date(updated_request, up_to_date_lsn)
614+
615+
message_stream = Stream.concat(log, up_to_date_messages)
616+
messages = Enum.to_list(encode_log(updated_request, message_stream))
617+
618+
{messages, updated_request}
619+
620+
{:error, _error} ->
621+
{[], request}
622+
end
623+
624+
{^ref, :shape_rotation} ->
625+
must_refetch = %{headers: %{control: "must-refetch"}}
626+
message = encode_message(api, must_refetch)
627+
628+
{message, :done}
629+
630+
{:sse_timeout, ^ref} ->
631+
{[], :done}
632+
end
633+
end
634+
635+
defp clean_up_change_listener(%Request{handle: shape_handle} = request) when not is_nil(shape_handle) do
636+
%{
637+
api: %{
638+
registry: registry,
639+
sse_timeout: sse_timeout
640+
},
641+
params: %{
642+
live: live?,
643+
experimental_live_sse: live_sse?
644+
}
645+
} = request
646+
647+
# When handling SSE requests, the response body is a stream that listens for
648+
# :new_changes events. If we unregister the shape_handle event listener immediately,
649+
# we don't receive the events. So, in this case, we unregister the shape_handle
650+
# listener after the sse_timeout, when we can be sure that the request is over.
651+
if live? and live_sse? do
652+
spawn(fn ->
653+
:timer.sleep(sse_timeout)
654+
655+
Registry.unregister(registry, shape_handle)
656+
end)
657+
else
658+
Registry.unregister(registry, shape_handle)
659+
end
660+
556661
request
557662
end
558663

@@ -600,6 +705,10 @@ defmodule Electric.Shapes.Api do
600705
def stack_id(%Api{stack_id: stack_id}), do: stack_id
601706
def stack_id(%{api: %{stack_id: stack_id}}), do: stack_id
602707

708+
defp encode_log(%Request{api: api, params: %{live: true, experimental_live_sse: true}}, stream) do
709+
encode_sse(api, :log, stream)
710+
end
711+
603712
defp encode_log(%Request{api: api}, stream) do
604713
encode(api, :log, stream)
605714
end
@@ -609,6 +718,10 @@ defmodule Electric.Shapes.Api do
609718
encode(api, :message, message)
610719
end
611720

721+
def encode_message(%Request{api: api, params: %{live: true, experimental_live_sse: true}}, message) do
722+
encode_sse(api, :message, message)
723+
end
724+
612725
def encode_message(%Request{api: api}, message) do
613726
encode(api, :message, message)
614727
end
@@ -617,6 +730,10 @@ defmodule Electric.Shapes.Api do
617730
apply(encoder, type, [message])
618731
end
619732

733+
defp encode_sse(%Api{sse_encoder: sse_encoder}, type, message) when type in [:message, :log] do
734+
apply(sse_encoder, type, [message])
735+
end
736+
620737
def schema(%Response{
621738
api: %Api{inspector: inspector},
622739
shape_definition: %Shapes.Shape{} = shape

packages/sync-service/lib/electric/shapes/api/encoder.ex

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,36 @@ defmodule Electric.Shapes.Api.Encoder.JSON do
5656
end
5757
end
5858

59+
defmodule Electric.Shapes.Api.Encoder.SSE do
60+
@behaviour Electric.Shapes.Api.Encoder
61+
62+
@impl Electric.Shapes.Api.Encoder
63+
def log(item_stream) do
64+
# Note that, unlike the JSON log encoder, this doesn't currently use
65+
# `Stream.chunk_every/1`.
66+
#
67+
# This is because it's only handling live events and is usually used
68+
# for small updates (the point of enabling SSE mode is to avoid request
69+
# overhead when consuming small changes).
70+
71+
item_stream
72+
|> Stream.flat_map(&message/1)
73+
end
74+
75+
@impl Electric.Shapes.Api.Encoder
76+
def message(message) do
77+
["data: ", ensure_json(message), "\n\n"]
78+
end
79+
80+
defp ensure_json(json) when is_binary(json) do
81+
json
82+
end
83+
84+
defp ensure_json(term) do
85+
Jason.encode_to_iodata!(term)
86+
end
87+
end
88+
5989
defmodule Electric.Shapes.Api.Encoder.Term do
6090
@behaviour Electric.Shapes.Api.Encoder
6191

packages/sync-service/lib/electric/shapes/api/params.ex

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ defmodule Electric.Shapes.Api.Params do
88
import Ecto.Changeset
99

1010
@tmp_compaction_flag :experimental_compaction
11+
@tmp_sse_flag :experimental_live_sse
1112

1213
@primary_key false
1314
defmodule ColumnList do
@@ -44,13 +45,13 @@ defmodule Electric.Shapes.Api.Params do
4445
field(:offset, :string)
4546
field(:handle, :string)
4647
field(:live, :boolean, default: false)
47-
field(:sse, :boolean, default: false)
4848
field(:where, :string)
4949
field(:columns, ColumnList)
5050
field(:shape_definition, :string)
5151
field(:replica, Ecto.Enum, values: [:default, :full], default: :default)
5252
field(:params, {:map, :string}, default: %{})
5353
field(@tmp_compaction_flag, :boolean, default: false)
54+
field(@tmp_sse_flag, :boolean, default: false)
5455
end
5556

5657
@type t() :: %__MODULE__{}
@@ -62,7 +63,7 @@ defmodule Electric.Shapes.Api.Params do
6263
|> cast_offset()
6364
|> validate_handle_with_offset()
6465
|> validate_live_with_offset()
65-
|> validate_sse_with_live()
66+
|> validate_live_sse()
6667
|> cast_root_table(api)
6768
|> apply_action(:validate)
6869
|> convert_error(api)
@@ -152,15 +153,15 @@ defmodule Electric.Shapes.Api.Params do
152153
end
153154
end
154155

155-
def validate_sse_with_live(%Ecto.Changeset{valid?: false} = changeset), do: changeset
156+
def validate_live_sse(%Ecto.Changeset{valid?: false} = changeset), do: changeset
156157

157-
def validate_sse_with_live(%Ecto.Changeset{} = changeset) do
158+
def validate_live_sse(%Ecto.Changeset{} = changeset) do
158159
live = get_field(changeset, :live)
159160

160161
if live do
161162
changeset
162163
else
163-
validate_exclusion(changeset, :sse, [true], message: "can't be true unless live is also true")
164+
validate_exclusion(changeset, @tmp_sse_flag, [true], message: "can't be true unless live is also true")
164165
end
165166
end
166167

packages/sync-service/lib/electric/shapes/api/response.ex

Lines changed: 36 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ defmodule Electric.Shapes.Api.Response do
112112
|> put_schema_header(response)
113113
|> put_up_to_date_header(response)
114114
|> put_offset_header(response)
115+
|> put_sse_headers(response)
115116
end
116117

117118
defp put_location_header(conn, %__MODULE__{status: 409} = response) do
@@ -180,37 +181,32 @@ defmodule Electric.Shapes.Api.Response do
180181
end
181182

182183
defp put_cache_headers(conn, %__MODULE__{api: api} = response) do
183-
case response do
184-
# If the offset is -1, set a 1 week max-age, 1 hour s-maxage (shared cache)
185-
# and 1 month stale-while-revalidate We want private caches to cache the
186-
# initial offset for a long time but for shared caches to frequently
187-
# revalidate so they're serving a fairly fresh copy of the initials shape
188-
# log.
189-
%{params: %{offset: @before_all_offset}} ->
190-
conn
191-
|> put_cache_header(
192-
"cache-control",
193-
"public, max-age=604800, s-maxage=3600, stale-while-revalidate=2629746",
194-
api
195-
)
196-
197-
# For live requests we want short cache lifetimes and to update the live cursor
198-
%{params: %{live: true}, api: api} ->
199-
conn
200-
|> put_cache_header(
201-
"cache-control",
202-
"public, max-age=5, stale-while-revalidate=5",
203-
api
204-
)
184+
header_value =
185+
case response do
186+
# If the offset is -1, set a 1 week max-age, 1 hour s-maxage (shared cache)
187+
# and 1 month stale-while-revalidate We want private caches to cache the
188+
# initial offset for a long time but for shared caches to frequently
189+
# revalidate so they're serving a fairly fresh copy of the initials shape
190+
# log.
191+
%{params: %{offset: @before_all_offset}} ->
192+
"public, max-age=604800, s-maxage=3600, stale-while-revalidate=2629746"
193+
194+
# For live SSE requests we want to cache for just under the
195+
# sse_timeout, in order to enable request collapsing.
196+
%{params: %{live: true, experimental_live_sse: true}} ->
197+
"public, max-age=#{max(1, div(api.sse_timeout, 1000) - 1)}"
198+
199+
# For normal live requests we want short cache lifetimes.
200+
%{params: %{live: true}} ->
201+
"public, max-age=5, stale-while-revalidate=5"
202+
203+
# Non-live requests have the default cache headers.
204+
%{params: %{live: false}} ->
205+
"public, max-age=#{api.max_age}, stale-while-revalidate=#{api.stale_age}"
206+
end
205207

206-
%{params: %{live: false}, api: api} ->
207-
conn
208-
|> put_cache_header(
209-
"cache-control",
210-
"public, max-age=#{api.max_age}, stale-while-revalidate=#{api.stale_age}",
211-
api
212-
)
213-
end
208+
conn
209+
|> put_cache_header("cache-control", header_value, api)
214210
end
215211

216212
defp put_cache_header(conn, header, value, %{send_cache_headers?: true}) do
@@ -263,6 +259,16 @@ defmodule Electric.Shapes.Api.Response do
263259
Plug.Conn.put_resp_header(conn, "electric-offset", "#{offset}")
264260
end
265261

262+
defp put_sse_headers(conn, %__MODULE__{params: %{live: true, experimental_live_sse: true}}) do
263+
conn
264+
|> Plug.Conn.put_resp_header("content-type", "text/event-stream")
265+
|> Plug.Conn.put_resp_header("connection", "keep-alive")
266+
end
267+
268+
defp put_sse_headers(conn, _response) do
269+
conn
270+
end
271+
266272
defp send_stream(%Plug.Conn{} = conn, %__MODULE__{body: stream, status: status} = response) do
267273
stack_id = Api.stack_id(response)
268274
conn = Plug.Conn.send_chunked(conn, status)

0 commit comments

Comments
 (0)