Skip to content

Commit f1648a6

Browse files
committed
sse: avoid materializing inner log.
1 parent 4b51151 commit f1648a6

File tree

2 files changed

+133
-28
lines changed

2 files changed

+133
-28
lines changed

packages/sync-service/lib/electric/shapes/api.ex

Lines changed: 52 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -561,33 +561,35 @@ defmodule Electric.Shapes.Api do
561561
%{
562562
new_changes_ref: ref,
563563
handle: shape_handle,
564-
api: %{sse_timeout: sse_timeout}
564+
api: %{sse_timeout: sse_timeout},
565+
params: %{offset: since_offset}
565566
} = request
566567

567-
Logger.debug("Client #{inspect(self())} is streaming SSE for changes to #{shape_handle}")
568+
Logger.debug(
569+
"Client #{inspect(self())} is streaming SSE for changes to #{shape_handle} since #{inspect(since_offset)}"
570+
)
568571

569572
# Set up timer for SSE timeout
570573
timer_ref = Process.send_after(self(), {:sse_timeout, ref}, sse_timeout)
571574

572575
# Stream changes as SSE events for the duration of the timer.
573-
sse_event_stream = Stream.resource(
574-
fn ->
575-
request
576-
end,
577-
&next_sse_event/1,
578-
fn _ ->
579-
Process.cancel_timer(timer_ref)
580-
end
581-
)
576+
sse_event_stream =
577+
Stream.resource(
578+
fn ->
579+
{request, since_offset}
580+
end,
581+
&next_sse_event/1,
582+
fn _ ->
583+
Process.cancel_timer(timer_ref)
584+
end
585+
)
582586

583587
response = %{request.response | chunked: true, body: sse_event_stream}
584588

585589
%{response | trace_attrs: Map.put(response.trace_attrs || %{}, :ot_is_sse_response, true)}
586590
end
587591

588-
defp next_sse_event(:done), do: {:halt, :done}
589-
590-
defp next_sse_event(%Request{} = request) do
592+
defp next_sse_event({%Request{} = request, since_offset}) do
591593
%{
592594
api: api,
593595
handle: shape_handle,
@@ -602,23 +604,23 @@ defmodule Electric.Shapes.Api do
602604
|> determine_log_chunk_offset()
603605
|> determine_up_to_date()
604606

605-
case Shapes.get_merged_log_stream(
606-
updated_request.api,
607-
shape_handle,
608-
since: updated_request.params.offset,
609-
up_to: updated_request.chunk_end_offset
610-
) do
611-
{:ok, log} ->
612-
up_to_date_lsn = updated_request.chunk_end_offset.tx_offset
613-
up_to_date_messages = maybe_up_to_date(updated_request, up_to_date_lsn)
607+
# This is usually but not always the `latest_log_offset`
608+
# as per `determine_log_chunk_offset/1`.
609+
end_offset = updated_request.chunk_end_offset
614610

615-
message_stream = Stream.concat(log, up_to_date_messages)
616-
messages = Enum.to_list(encode_log(updated_request, message_stream))
611+
case Shapes.get_merged_log_stream(updated_request.api, shape_handle,
612+
since: since_offset,
613+
up_to: end_offset
614+
) do
615+
{:ok, log} ->
616+
control_messages = maybe_up_to_date(updated_request, end_offset.tx_offset)
617+
message_stream = Stream.concat(log, control_messages)
618+
encoded_stream = encode_log(updated_request, message_stream)
617619

618-
{messages, updated_request}
620+
{[], {:emit, encoded_stream, updated_request, end_offset}}
619621

620622
{:error, _error} ->
621-
{[], request}
623+
{[], {request, since_offset}}
622624
end
623625

624626
{^ref, :shape_rotation} ->
@@ -632,7 +634,29 @@ defmodule Electric.Shapes.Api do
632634
end
633635
end
634636

635-
defp clean_up_change_listener(%Request{handle: shape_handle} = request) when not is_nil(shape_handle) do
637+
defp next_sse_event({:emit, stream, %Request{} = request, since_offset}) do
638+
# Can change the number taken to adjust the grouping. Currently three
639+
# because there's typically 3 elements per SSE -- the actual message
640+
# and the "data: " and "\n\n" delimiters around it.
641+
#
642+
# The JSON encoder groups stream elements by 500. So perhaps this
643+
# could be a larger number for more efficiency?
644+
case Electric.Utils.take_and_drop(stream, 3) do
645+
{[], []} ->
646+
{[], {request, since_offset}}
647+
648+
{head, []} ->
649+
{head, {request, since_offset}}
650+
651+
{head, tail} ->
652+
{head, {:emit, tail, request, since_offset}}
653+
end
654+
end
655+
656+
defp next_sse_event(:done), do: {:halt, :done}
657+
658+
defp clean_up_change_listener(%Request{handle: shape_handle} = request)
659+
when not is_nil(shape_handle) do
636660
%{
637661
api: %{
638662
registry: registry,

packages/sync-service/lib/electric/utils.ex

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -717,4 +717,85 @@ defmodule Electric.Utils do
717717
after_fun
718718
)
719719
end
720+
721+
@doc """
722+
This function is a combination of `Enum.take/2` and `Enum.drop/2` returning
723+
first `n` dropped elements and the rest of the enum as a stream.
724+
725+
The important difference is that the enumerable is only iterated once, and
726+
only for the required `n` items. The rest of the enumerable may be iterated
727+
lazily later from the returned stream.
728+
729+
## Examples
730+
iex> {head, tail} = take_and_drop(Stream.cycle(1..3), 4)
731+
iex> head
732+
[1, 2, 3, 1]
733+
iex> Enum.take(tail, 7)
734+
[2, 3, 1, 2, 3, 1, 2]
735+
736+
Cribbed from https://github.com/tallakt/stream_split
737+
"""
738+
@spec take_and_drop(Enumerable.t(), pos_integer) :: {List.t(), Enumerable.t()}
739+
def take_and_drop(enum, n) when n > 0 do
740+
case apply_reduce(enum, n) do
741+
{:done, {_, list}} ->
742+
{:lists.reverse(list), []}
743+
744+
{:suspended, {_, list}, cont} ->
745+
stream_split = %{continuation: cont, stream: continuation_to_stream(cont)}
746+
{:lists.reverse(list), stream_split}
747+
748+
{:halted, {_, list}} ->
749+
{:lists.reverse(list), []}
750+
end
751+
end
752+
753+
def take_and_drop(enum, 0) do
754+
{[], enum}
755+
end
756+
757+
defp apply_reduce(%{continuation: cont}, n) do
758+
cont.({:cont, {n, []}})
759+
end
760+
761+
defp apply_reduce(enum, n) do
762+
Enumerable.reduce(enum, {:cont, {n, []}}, &reducer_helper/2)
763+
end
764+
765+
defp reducer_helper(item, :tail) do
766+
{:suspend, item}
767+
end
768+
769+
defp reducer_helper(item, {c, list}) when c > 1 do
770+
{:cont, {c - 1, [item | list]}}
771+
end
772+
773+
defp reducer_helper(item, {_, list}) do
774+
{:suspend, {0, [item | list]}}
775+
end
776+
777+
defp continuation_to_stream(cont) do
778+
wrapped = fn {_, _, acc_cont} ->
779+
case acc_cont.({:cont, :tail}) do
780+
acc = {:suspended, item, _cont} ->
781+
{[item], acc}
782+
783+
{:halted, acc} ->
784+
{:halt, acc}
785+
786+
{:done, acc} ->
787+
{:halt, acc}
788+
end
789+
end
790+
791+
cleanup = fn
792+
{:suspended, _, acc_cont} ->
793+
acc_cont.({:halt, nil})
794+
795+
_ ->
796+
nil
797+
end
798+
799+
Stream.resource(fn -> {:suspended, nil, cont} end, wrapped, cleanup)
800+
end
720801
end

0 commit comments

Comments
 (0)