Skip to content

Commit 4d2c23a

Browse files
committed
sse: use stream_split dep rather than half-assed vendoring.
1 parent 10d84c9 commit 4d2c23a

File tree

4 files changed

+3
-82
lines changed

4 files changed

+3
-82
lines changed

packages/sync-service/lib/electric/shapes/api.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -700,7 +700,7 @@ defmodule Electric.Shapes.Api do
700700
#
701701
# The JSON encoder groups stream elements by 500. So perhaps this
702702
# could be a larger number for more efficiency?
703-
case Electric.Utils.take_and_drop(stream, 3) do
703+
case StreamSplit.take_and_drop(stream, 3) do
704704
{[], _tail} ->
705705
{[], %{state | mode: :receive, stream: nil}}
706706

packages/sync-service/lib/electric/utils.ex

Lines changed: 0 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -717,85 +717,4 @@ defmodule Electric.Utils do
717717
after_fun
718718
)
719719
end
720-
721-
@doc """
722-
This function is a combination of `Enum.take/2` and `Enum.drop/2` returning
723-
first `n` dropped elements and the rest of the enum as a stream.
724-
725-
The important difference is that the enumerable is only iterated once, and
726-
only for the required `n` items. The rest of the enumerable may be iterated
727-
lazily later from the returned stream.
728-
729-
## Examples
730-
iex> {head, tail} = take_and_drop(Stream.cycle(1..3), 4)
731-
iex> head
732-
[1, 2, 3, 1]
733-
iex> Enum.take(tail, 7)
734-
[2, 3, 1, 2, 3, 1, 2]
735-
736-
Cribbed from https://github.com/tallakt/stream_split
737-
"""
738-
@spec take_and_drop(Enumerable.t(), pos_integer) :: {List.t(), Enumerable.t()}
739-
def take_and_drop(enum, n) when n > 0 do
740-
case apply_reduce(enum, n) do
741-
{:done, {_, list}} ->
742-
{:lists.reverse(list), []}
743-
744-
{:suspended, {_, list}, cont} ->
745-
stream_split = %{continuation: cont, stream: continuation_to_stream(cont)}
746-
{:lists.reverse(list), stream_split}
747-
748-
{:halted, {_, list}} ->
749-
{:lists.reverse(list), []}
750-
end
751-
end
752-
753-
def take_and_drop(enum, 0) do
754-
{[], enum}
755-
end
756-
757-
defp apply_reduce(%{continuation: cont}, n) do
758-
cont.({:cont, {n, []}})
759-
end
760-
761-
defp apply_reduce(enum, n) do
762-
Enumerable.reduce(enum, {:cont, {n, []}}, &reducer_helper/2)
763-
end
764-
765-
defp reducer_helper(item, :tail) do
766-
{:suspend, item}
767-
end
768-
769-
defp reducer_helper(item, {c, list}) when c > 1 do
770-
{:cont, {c - 1, [item | list]}}
771-
end
772-
773-
defp reducer_helper(item, {_, list}) do
774-
{:suspend, {0, [item | list]}}
775-
end
776-
777-
defp continuation_to_stream(cont) do
778-
wrapped = fn {_, _, acc_cont} ->
779-
case acc_cont.({:cont, :tail}) do
780-
acc = {:suspended, item, _cont} ->
781-
{[item], acc}
782-
783-
{:halted, acc} ->
784-
{:halt, acc}
785-
786-
{:done, acc} ->
787-
{:halt, acc}
788-
end
789-
end
790-
791-
cleanup = fn
792-
{:suspended, _, acc_cont} ->
793-
acc_cont.({:halt, nil})
794-
795-
_ ->
796-
nil
797-
end
798-
799-
Stream.resource(fn -> {:suspended, nil, cont} end, wrapped, cleanup)
800-
end
801720
end

packages/sync-service/mix.exs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ defmodule Electric.MixProject do
103103
{:retry, "~> 0.18"},
104104
{:remote_ip, "~> 1.2"},
105105
{:req, "~> 0.5"},
106+
{:stream_split, "~> 0.1"},
106107
{:telemetry_poller, "~> 1.1"},
107108
{:tls_certificate_check, "~> 1.23"},
108109
{:tz, "~> 0.27"}

packages/sync-service/mix.lock

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
"sentry": {:hex, :sentry, "10.8.0", "1e8cc0ef21401e5914e6fc2f37489d6c685d31a0556dbd8ab4709cc1587a7232", [:mix], [{:hackney, "~> 1.8", [hex: :hackney, repo: "hexpm", optional: true]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: true]}, {:nimble_options, "~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_ownership, "~> 0.3.0 or ~> 1.0", [hex: :nimble_ownership, repo: "hexpm", optional: false]}, {:phoenix, "~> 1.6", [hex: :phoenix, repo: "hexpm", optional: true]}, {:phoenix_live_view, "~> 0.20", [hex: :phoenix_live_view, repo: "hexpm", optional: true]}, {:plug, "~> 1.6", [hex: :plug, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm", "92549e7ba776b7ccfed4e74d58987272d37d99606b130e4141bc015a1a8e4235"},
5353
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.7", "354c321cf377240c7b8716899e182ce4890c5938111a1296add3ec74cf1715df", [:make, :mix, :rebar3], [], "hexpm", "fe4c190e8f37401d30167c8c405eda19469f34577987c76dde613e838bbc67f8"},
5454
"stream_data": {:hex, :stream_data, "1.1.3", "15fdb14c64e84437901258bb56fc7d80aaf6ceaf85b9324f359e219241353bfb", [:mix], [], "hexpm", "859eb2be72d74be26c1c4f272905667672a52e44f743839c57c7ee73a1a66420"},
55+
"stream_split": {:hex, :stream_split, "0.1.7", "2d3fd1fd21697da7f91926768d65f79409086052c9ec7ae593987388f52425f8", [:mix], [], "hexpm", "1dc072ff507a64404a0ad7af90df97096183fee8eeac7b300320cea7c4679147"},
5556
"telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"},
5657
"telemetry_metrics": {:hex, :telemetry_metrics, "1.1.0", "5bd5f3b5637e0abea0426b947e3ce5dd304f8b3bc6617039e2b5a008adc02f8f", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "e7b79e8ddfde70adb6db8a6623d1778ec66401f366e9a8f5dd0955c56bc8ce67"},
5758
"telemetry_metrics_prometheus_core": {:hex, :telemetry_metrics_prometheus_core, "1.2.1", "c9755987d7b959b557084e6990990cb96a50d6482c683fb9622a63837f3cd3d8", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 0.6 or ~> 1.0", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "5e2c599da4983c4f88a33e9571f1458bf98b0cf6ba930f1dc3a6e8cf45d5afb6"},

0 commit comments

Comments
 (0)