Skip to content

Commit a52a353

Browse files
committed
sse: proper keep-alives and refactor SSE loop state.
1 parent e6b6c3e commit a52a353

File tree

2 files changed

+91
-22
lines changed

2 files changed

+91
-22
lines changed

packages/sync-service/lib/electric/shapes/api.ex

Lines changed: 69 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ defmodule Electric.Shapes.Api do
77
alias __MODULE__
88
alias __MODULE__.Request
99
alias __MODULE__.Response
10+
alias __MODULE__.SseState
1011

1112
import Electric.Replication.LogOffset, only: [is_log_offset_lt: 2]
1213

@@ -25,6 +26,7 @@ defmodule Electric.Shapes.Api do
2526
required: true
2627
],
2728
allow_shape_deletion: [type: :boolean],
29+
keepalive_interval: [type: :integer],
2830
long_poll_timeout: [type: :integer],
2931
sse_timeout: [type: :integer],
3032
max_age: [type: :integer],
@@ -49,6 +51,7 @@ defmodule Electric.Shapes.Api do
4951
:stack_id,
5052
:storage,
5153
allow_shape_deletion: false,
54+
keepalive_interval: 21_000,
5255
long_poll_timeout: 20_000,
5356
sse_timeout: 60_000,
5457
max_age: 60,
@@ -562,26 +565,37 @@ defmodule Electric.Shapes.Api do
562565
%{
563566
new_changes_ref: ref,
564567
handle: shape_handle,
565-
api: %{sse_timeout: sse_timeout},
568+
api: %{keepalive_interval: keepalive_interval, sse_timeout: sse_timeout},
566569
params: %{offset: since_offset}
567570
} = request
568571

569572
Logger.debug(
570573
"Client #{inspect(self())} is streaming SSE for changes to #{shape_handle} since #{inspect(since_offset)}"
571574
)
572575

576+
# Set up timer for SSE comment as keep-alive
577+
keepalive_ref = Process.send_after(self(), {:sse_keepalive, ref}, keepalive_interval)
578+
573579
# Set up timer for SSE timeout
574-
timer_ref = Process.send_after(self(), {:sse_timeout, ref}, sse_timeout)
580+
timeout_ref = Process.send_after(self(), {:sse_timeout, ref}, sse_timeout)
575581

576582
# Stream changes as SSE events for the duration of the timer.
577583
sse_event_stream =
578584
Stream.resource(
579585
fn ->
580-
{request, since_offset}
586+
%SseState{
587+
mode: :receive,
588+
request: request,
589+
stream: nil,
590+
since_offset: since_offset,
591+
last_message_time: System.monotonic_time(:millisecond),
592+
keepalive_ref: keepalive_ref
593+
}
581594
end,
582595
&next_sse_event/1,
583-
fn _ ->
584-
Process.cancel_timer(timer_ref)
596+
fn %SseState{keepalive_ref: latest_keepalive_ref} ->
597+
Process.cancel_timer(latest_keepalive_ref)
598+
Process.cancel_timer(timeout_ref)
585599
end
586600
)
587601

@@ -590,12 +604,19 @@ defmodule Electric.Shapes.Api do
590604
%{response | trace_attrs: Map.put(response.trace_attrs || %{}, :ot_is_sse_response, true)}
591605
end
592606

593-
defp next_sse_event({%Request{} = request, since_offset}) do
607+
defp next_sse_event(%SseState{mode: :receive} = state) do
594608
%{
595-
api: api,
596-
handle: shape_handle,
597-
new_changes_ref: ref
598-
} = request
609+
keepalive_ref: keepalive_ref,
610+
last_message_time: last_message_time,
611+
request: %{
612+
api: %{
613+
keepalive_interval: keepalive_interval,
614+
} = api,
615+
handle: shape_handle,
616+
new_changes_ref: ref
617+
} = request,
618+
since_offset: since_offset
619+
} = state
599620

600621
receive do
601622
{^ref, :new_changes, latest_log_offset} ->
@@ -614,47 +635,73 @@ defmodule Electric.Shapes.Api do
614635
up_to: end_offset
615636
) do
616637
{:ok, log} ->
638+
Process.cancel_timer(keepalive_ref)
639+
617640
control_messages = maybe_up_to_date(updated_request, end_offset.tx_offset)
618641
message_stream = Stream.concat(log, control_messages)
619642
encoded_stream = encode_log(updated_request, message_stream)
620643

621-
{[], {:emit, encoded_stream, updated_request, end_offset}}
644+
current_time = System.monotonic_time(:millisecond)
645+
new_keepalive_ref = Process.send_after(self(), {:sse_keepalive, ref}, keepalive_interval)
646+
647+
{[], %{state |
648+
mode: :emit,
649+
stream: encoded_stream,
650+
since_offset: end_offset,
651+
last_message_time: current_time,
652+
keepalive_ref: new_keepalive_ref
653+
}}
622654

623655
{:error, _error} ->
624-
{[], {request, since_offset}}
656+
{[], state}
625657
end
626658

627659
{^ref, :shape_rotation} ->
628660
must_refetch = %{headers: %{control: "must-refetch"}}
629661
message = encode_message(api, must_refetch)
630662

631-
{message, :done}
663+
{message, %{state | mode: :done}}
664+
665+
{:sse_keepalive, ^ref} ->
666+
current_time = System.monotonic_time(:millisecond)
667+
time_since_last_message = current_time - last_message_time
668+
669+
if time_since_last_message >= keepalive_interval do
670+
new_keepalive_ref = Process.send_after(self(), {:sse_keepalive, ref}, keepalive_interval)
671+
672+
{[": keep-alive\n\n"], %{state | last_message_time: current_time, keepalive_ref: new_keepalive_ref}}
673+
else
674+
# Not time to send a keep-alive yet, schedule for the remaining time
675+
remaining_time = keepalive_interval - time_since_last_message
676+
new_keepalive_ref = Process.send_after(self(), {:sse_keepalive, ref}, remaining_time)
677+
678+
{[], %{state | keepalive_ref: new_keepalive_ref}}
679+
end
632680

633681
{:sse_timeout, ^ref} ->
634-
{[], :done}
682+
{[], %{state | mode: :done}}
635683
end
636684
end
637685

638-
defp next_sse_event({:emit, stream, %Request{} = request, since_offset}) do
686+
defp next_sse_event(%SseState{mode: :emit} = state) do
687+
%{stream: stream} = state
688+
639689
# Can change the number taken to adjust the grouping. Currently three
640690
# because there's typically 3 elements per SSE -- the actual message
641691
# and the "data: " and "\n\n" delimiters around it.
642692
#
643693
# The JSON encoder groups stream elements by 500. So perhaps this
644694
# could be a larger number for more efficiency?
645695
case Electric.Utils.take_and_drop(stream, 3) do
646-
{[], []} ->
647-
{[], {request, since_offset}}
648-
649-
{head, []} ->
650-
{head, {request, since_offset}}
696+
{[], _tail} ->
697+
{[], %{state | mode: :receive, stream: nil}}
651698

652699
{head, tail} ->
653-
{head, {:emit, tail, request, since_offset}}
700+
{head, %{state | stream: tail}}
654701
end
655702
end
656703

657-
defp next_sse_event(:done), do: {:halt, :done}
704+
defp next_sse_event(%SseState{mode: :done} = state), do: {:halt, state}
658705

659706
defp clean_up_change_listener(%Request{handle: shape_handle} = request)
660707
when not is_nil(shape_handle) do
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
defmodule Electric.Shapes.Api.SseState do
2+
alias Electric.Shapes.Api
3+
alias Electric.Replication.LogOffset
4+
5+
defstruct [
6+
:mode,
7+
:request,
8+
:stream,
9+
:since_offset,
10+
:last_message_time,
11+
:keepalive_ref
12+
]
13+
14+
@type t() :: %__MODULE__{
15+
mode: :receive | :emit | :done,
16+
request: Api.Request.t(),
17+
stream: Enumerable.t() | nil,
18+
since_offset: LogOffset.t(),
19+
last_message_time: pos_integer(),
20+
keepalive_ref: reference()
21+
}
22+
end

0 commit comments

Comments
 (0)