From 2cfc1a4e8a825736e2a09a3a51638937b032ad85 Mon Sep 17 00:00:00 2001 From: Panagiotis Georgakopoulos Date: Sun, 17 Jul 2022 16:06:21 +0300 Subject: [PATCH 01/11] Update compat --- Project.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Project.toml b/Project.toml index 1068bf4..acf675c 100644 --- a/Project.toml +++ b/Project.toml @@ -10,7 +10,7 @@ StructTypes = "856f2bd8-1eba-4b0a-8007-ebc267875bd4" [compat] GraphQLParser = "0.1.1" -HTTP = "0.8.17, 0.9" +HTTP = "0.8.17, 0.9, 1" JSON3 = "1.1.2" StructTypes = "1.5" julia = "1.6" From cc991e622f359e97595182e7e468a5c951eb6711 Mon Sep 17 00:00:00 2001 From: Panagiotis Georgakopoulos Date: Sun, 17 Jul 2022 17:02:56 +0300 Subject: [PATCH 02/11] Adjust to new StatusCode handler --- test/http_execution.jl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/http_execution.jl b/test/http_execution.jl index 69f1670..b57157a 100644 --- a/test/http_execution.jl +++ b/test/http_execution.jl @@ -17,8 +17,8 @@ end # handle_error @test_throws ArgumentError test_error_handler(GraphQLClient.handle_error, ArgumentError("msg")) - @test_throws HTTP.StatusError test_error_handler(GraphQLClient.handle_error, HTTP.StatusError(404, HTTP.Response(404;request=HTTP.Request(), body="{}"))) - @test_throws GraphQLClient.GraphQLError test_error_handler(GraphQLClient.handle_error, HTTP.StatusError(400, HTTP.Response(400;request=HTTP.Request(), body="{}"))) + @test_throws HTTP.StatusError test_error_handler(GraphQLClient.handle_error, HTTP.StatusError(404, "POST", "", HTTP.Response(404;request=HTTP.Request(), body="{}"))) + @test_throws GraphQLClient.GraphQLError test_error_handler(GraphQLClient.handle_error, HTTP.StatusError(400, "POST", "", HTTP.Response(400;request=HTTP.Request(), body="{}"))) # handle_deserialisation_error @test_throws MethodError test_error_handler(GraphQLClient.handle_deserialisation_error, MethodError(""), "", "") From a0d7f152a5b82103ce2f648bc42c537899c5efc2 Mon Sep 17 00:00:00 2001 From: Panagiotis Georgakopoulos Date: Sun, 17 Jul 2022 17:10:33 +0300 Subject: [PATCH 03/11] Fix subscriptions and limit compat to 1.0 --- Project.toml | 2 +- src/subscriptions.jl | 69 ++++++++++++++++++++++++++++++++++++++++++- test/subscriptions.jl | 36 +++++++++++----------- 3 files changed, 86 insertions(+), 21 deletions(-) diff --git a/Project.toml b/Project.toml index acf675c..85d2270 100644 --- a/Project.toml +++ b/Project.toml @@ -10,7 +10,7 @@ StructTypes = "856f2bd8-1eba-4b0a-8007-ebc267875bd4" [compat] GraphQLParser = "0.1.1" -HTTP = "0.8.17, 0.9, 1" +HTTP = "1" JSON3 = "1.1.2" StructTypes = "1.5" julia = "1.6" diff --git a/src/subscriptions.jl b/src/subscriptions.jl index 87e1d90..a94995d 100644 --- a/src/subscriptions.jl +++ b/src/subscriptions.jl @@ -1,5 +1,17 @@ const subscription_tracker = Ref{Dict}(Dict()) +function writews(ws::HTTP.WebSockets.WebSocket, msg) + if isdefined(HTTP, :send) + HTTP.send(ws, msg) + else + write(ws, msg) + end +end + +function writews(ws::IO, msg) + write(ws, msg) +end + """ open_subscription(fn::Function, [client::Client], @@ -97,7 +109,7 @@ function open_subscription(fn::Function, HTTP.WebSockets.open(client.ws_endpoint; retry=retry, headers=client.headers) do ws # Start sub output_info(verbose) && println("Starting $(get_name(subscription_name)) subscription with ID $sub_id") - write(ws, message_str) + writews(ws, message_str) subscription_tracker[][sub_id] = "open" # Init function @@ -172,6 +184,25 @@ function async_reader_with_timeout(io::IO, subtimeout)::Channel return ch end + +function async_reader_with_timeout(ws::HTTP.WebSocket, subtimeout)::Channel + ch = Channel(1) + task = @async begin + reader_task = current_task() + function timeout_cb(timer) + put!(ch, :timeout) + Base.throwto(reader_task, InterruptException()) + end + timeout = Timer(timeout_cb, subtimeout) + data = HTTP.receive(ws) + subtimeout > 0 && close(timeout) # Cancel the timeout + put!(ch, data) + end + bind(ch, task) + return ch +end + + function async_reader_with_stopfn(io::IO, stopfn, checktime)::Channel ch = Channel(1) # Could we make this channel concretely typed? task = @async begin @@ -193,6 +224,28 @@ function async_reader_with_stopfn(io::IO, stopfn, checktime)::Channel return ch end +function async_reader_with_stopfn(ws::HTTP.WebSockets.WebSocket, stopfn, checktime)::Channel + ch = Channel(1) # Could we make this channel concretely typed? + task = @async begin + reader_task = current_task() + function timeout_cb(timer) + if stopfn() + put!(ch, :stopfn) + Base.throwto(reader_task, InterruptException()) + else + timeout = Timer(timeout_cb, checktime) + end + end + timeout = Timer(timeout_cb, checktime) + data = HTTP.WebSockets.receive(ws) + close(timeout) # Cancel the timeout + put!(ch, data) + end + bind(ch, task) + return ch +end + + """ readfromwebsocket(ws::IO, stopfn, subtimeout) @@ -221,4 +274,18 @@ function readfromwebsocket(ws::IO, stopfn, subtimeout) data = readavailable(ws) end return data +end + +function readfromwebsocket(ws::HTTP.WebSockets.WebSocket, stopfn, subtimeout) + if isnothing(stopfn) && subtimeout > 0 + ch_out = async_reader_with_timeout(ws, subtimeout) + data = take!(ch_out) + elseif !isnothing(stopfn) + checktime = subtimeout > 0 ? subtimeout : 2 + ch_out = async_reader_with_stopfn(ws, stopfn, checktime) + data = take!(ch_out) + else + data = HTTP.receive(ws) + end + return data end \ No newline at end of file diff --git a/test/subscriptions.jl b/test/subscriptions.jl index 7db8f5a..a844c3d 100644 --- a/test/subscriptions.jl +++ b/test/subscriptions.jl @@ -1,10 +1,9 @@ function listen_localhost() @async HTTP.listen(HTTP.Sockets.localhost, 8080) do http - if HTTP.WebSockets.is_upgrade(http.message) + if HTTP.WebSockets.isupgrade(http.message) HTTP.WebSockets.upgrade(http) do ws - while !eof(ws) - data = readavailable(ws) - write(ws, data) + for data in ws + GraphQLClient.writews(ws, data) end end end @@ -13,10 +12,10 @@ end function do_nothing_localhost() @async HTTP.listen(HTTP.Sockets.localhost, 8081) do http - if HTTP.WebSockets.is_upgrade(http.message) + if HTTP.WebSockets.isupgrade(http.message) HTTP.WebSockets.upgrade(http) do ws - while !eof(ws) - data = readavailable(ws) + for data in ws + nothing; end end end @@ -31,7 +30,7 @@ end @test take!(ch) == :timeout ch = GraphQLClient.async_reader_with_timeout(ws, 5) - write(ws, "Data") + GraphQLClient.writews(ws, "Data") @test String(take!(ch)) == "Data" # stopfn @@ -44,11 +43,11 @@ end @test take!(ch) == :stopfn stop[] = false ch = GraphQLClient.async_reader_with_stopfn(ws, stopfn, 0.5) - write(ws, "Data") + GraphQLClient.writews(ws, "Data") @test String(take!(ch)) == "Data" # readfromwebsocket - no timeout or stopfn - write(ws, "Data") + GraphQLClient.writews(ws, "Data") @test String(GraphQLClient.readfromwebsocket(ws, nothing, 0)) == "Data" # readfromwebsocket - timeout @@ -70,10 +69,9 @@ end function send_error_localhost(message, port) @async HTTP.listen(HTTP.Sockets.localhost, port) do http - if HTTP.WebSockets.is_upgrade(http.message) + if HTTP.WebSockets.isupgrade(http.message) HTTP.WebSockets.upgrade(http) do ws - while !eof(ws) - data = readavailable(ws) + for data in ws isempty(data) && continue query = JSON3.read(data) error_payload = """ @@ -92,7 +90,7 @@ function send_error_localhost(message, port) } } """ - write(ws, error_payload) + GraphQLClient.writews(ws, error_payload) end end end @@ -101,10 +99,9 @@ end function send_data_localhost(sub_name, port) @async HTTP.listen(HTTP.Sockets.localhost, port) do http - if HTTP.WebSockets.is_upgrade(http.message) + if HTTP.WebSockets.isupgrade(http.message) HTTP.WebSockets.upgrade(http) do ws - while !eof(ws) - data = readavailable(ws) + for data in ws isempty(data) && continue query = JSON3.read(data) data_payload = """ @@ -119,7 +116,7 @@ function send_data_localhost(sub_name, port) } } """ - write(ws, data_payload) + GraphQLClient.writews(ws, data_payload) end end end @@ -148,7 +145,7 @@ end output_fields="field", subtimeout=0.1) @test val[] == 2 - +#= # Error response - not throwing port = 8093 send_error_localhost("This failed", port) @@ -208,4 +205,5 @@ end @test results[1] isa GraphQLClient.GQLResponse{Response} @test isnothing(results[1].errors) @test !isnothing(results[1].data) # No point testing content as we've coded it in the test function + =# end \ No newline at end of file From 51592fbaa2e5dfc2627445cb2cfcb0e8cddfa308 Mon Sep 17 00:00:00 2001 From: Panagiotis Georgakopoulos Date: Sun, 17 Jul 2022 17:56:47 +0300 Subject: [PATCH 04/11] Fix tests --- src/subscriptions.jl | 12 ++++++++---- test/subscriptions.jl | 4 ++-- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/src/subscriptions.jl b/src/subscriptions.jl index a94995d..834e6bc 100644 --- a/src/subscriptions.jl +++ b/src/subscriptions.jl @@ -105,8 +105,8 @@ function open_subscription(fn::Function, "payload" => payload ) message_str = JSON3.write(message) - - HTTP.WebSockets.open(client.ws_endpoint; retry=retry, headers=client.headers) do ws + throw_if_assigned = Ref{GraphQLClientException}() + HTTP.WebSockets.open(client.ws_endpoint; retry=retry, headers=client.headers, suppress_close_error=false) do ws # Start sub output_info(verbose) && println("Starting $(get_name(subscription_name)) subscription with ID $sub_id") writews(ws, message_str) @@ -132,11 +132,13 @@ function open_subscription(fn::Function, output_info(verbose) && println("Subscription $sub_id stopped by the stop function supplied") break end - response = JSON3.read(data::Vector{UInt8}, GQLSubscriptionResponse{output_type}) + # Dropping typeassert as this may be string + response = JSON3.read(data, GQLSubscriptionResponse{output_type}) payload = response.payload if !isnothing(payload.errors) && !isempty(payload.errors) && throw_on_execution_error subscription_tracker[][sub_id] = "errored" - throw(GraphQLError("Error during subscription.", payload)) + throw_if_assigned[] = GraphQLError("Error during subscription.", payload) + break end # Handle multiple subs, do we need this? if response.id == string(sub_id) @@ -149,6 +151,8 @@ function open_subscription(fn::Function, end end end + # We can't throw errors from the ws handle function in HTTP 1.0, as they get digested. + isassigned(throw_if_assigned) && throw(throw_if_assigned[]) output_debug(verbose) && println("Finished. Closing subscription") subscription_tracker[][sub_id] = "closed" return diff --git a/test/subscriptions.jl b/test/subscriptions.jl index a844c3d..8af32a4 100644 --- a/test/subscriptions.jl +++ b/test/subscriptions.jl @@ -145,7 +145,7 @@ end output_fields="field", subtimeout=0.1) @test val[] == 2 -#= + # Error response - not throwing port = 8093 send_error_localhost("This failed", port) @@ -205,5 +205,5 @@ end @test results[1] isa GraphQLClient.GQLResponse{Response} @test isnothing(results[1].errors) @test !isnothing(results[1].data) # No point testing content as we've coded it in the test function - =# + end \ No newline at end of file From c46c45d8b02cfec23c583a0b08a137d8064097aa Mon Sep 17 00:00:00 2001 From: Panagiotis Georgakopoulos Date: Sun, 17 Jul 2022 17:58:16 +0300 Subject: [PATCH 05/11] Ooops --- src/subscriptions.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/subscriptions.jl b/src/subscriptions.jl index 834e6bc..772cc7a 100644 --- a/src/subscriptions.jl +++ b/src/subscriptions.jl @@ -105,7 +105,7 @@ function open_subscription(fn::Function, "payload" => payload ) message_str = JSON3.write(message) - throw_if_assigned = Ref{GraphQLClientException}() + throw_if_assigned = Ref{GraphQLError}() HTTP.WebSockets.open(client.ws_endpoint; retry=retry, headers=client.headers, suppress_close_error=false) do ws # Start sub output_info(verbose) && println("Starting $(get_name(subscription_name)) subscription with ID $sub_id") From ef88af4ca6b6ef632f2219c63fa059478aaeb02c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=CE=A0=CE=B1=CE=BD=CE=B1=CE=B3=CE=B9=CF=8E=CF=84=CE=B7?= =?UTF-8?q?=CF=82=20=CE=93=CE=B5=CF=89=CF=81=CE=B3=CE=B1=CE=BA=CF=8C=CF=80?= =?UTF-8?q?=CE=BF=CF=85=CE=BB=CE=BF=CF=82?= Date: Sun, 17 Jul 2022 19:17:29 +0300 Subject: [PATCH 06/11] Update src/subscriptions.jl Co-authored-by: Mal Miller <59854849+mmiller-max@users.noreply.github.com> --- src/subscriptions.jl | 1 - 1 file changed, 1 deletion(-) diff --git a/src/subscriptions.jl b/src/subscriptions.jl index 772cc7a..33932fc 100644 --- a/src/subscriptions.jl +++ b/src/subscriptions.jl @@ -132,7 +132,6 @@ function open_subscription(fn::Function, output_info(verbose) && println("Subscription $sub_id stopped by the stop function supplied") break end - # Dropping typeassert as this may be string response = JSON3.read(data, GQLSubscriptionResponse{output_type}) payload = response.payload if !isnothing(payload.errors) && !isempty(payload.errors) && throw_on_execution_error From 8d128b51b957a68e2bef1563292c061386471a40 Mon Sep 17 00:00:00 2001 From: Panagiotis Georgakopoulos Date: Sun, 28 Aug 2022 19:15:07 +0300 Subject: [PATCH 07/11] Simplify PR --- src/subscriptions.jl | 70 ++----------------------------------------- test/subscriptions.jl | 14 ++++----- 2 files changed, 9 insertions(+), 75 deletions(-) diff --git a/src/subscriptions.jl b/src/subscriptions.jl index 33932fc..d658629 100644 --- a/src/subscriptions.jl +++ b/src/subscriptions.jl @@ -1,17 +1,5 @@ const subscription_tracker = Ref{Dict}(Dict()) -function writews(ws::HTTP.WebSockets.WebSocket, msg) - if isdefined(HTTP, :send) - HTTP.send(ws, msg) - else - write(ws, msg) - end -end - -function writews(ws::IO, msg) - write(ws, msg) -end - """ open_subscription(fn::Function, [client::Client], @@ -109,7 +97,7 @@ function open_subscription(fn::Function, HTTP.WebSockets.open(client.ws_endpoint; retry=retry, headers=client.headers, suppress_close_error=false) do ws # Start sub output_info(verbose) && println("Starting $(get_name(subscription_name)) subscription with ID $sub_id") - writews(ws, message_str) + HTTP.send(ws, message_str) subscription_tracker[][sub_id] = "open" # Init function @@ -170,24 +158,6 @@ function clear_subscriptions() end end -function async_reader_with_timeout(io::IO, subtimeout)::Channel - ch = Channel(1) - task = @async begin - reader_task = current_task() - function timeout_cb(timer) - put!(ch, :timeout) - Base.throwto(reader_task, InterruptException()) - end - timeout = Timer(timeout_cb, subtimeout) - data = readavailable(io) - subtimeout > 0 && close(timeout) # Cancel the timeout - put!(ch, data) - end - bind(ch, task) - return ch -end - - function async_reader_with_timeout(ws::HTTP.WebSocket, subtimeout)::Channel ch = Channel(1) task = @async begin @@ -205,28 +175,6 @@ function async_reader_with_timeout(ws::HTTP.WebSocket, subtimeout)::Channel return ch end - -function async_reader_with_stopfn(io::IO, stopfn, checktime)::Channel - ch = Channel(1) # Could we make this channel concretely typed? - task = @async begin - reader_task = current_task() - function timeout_cb(timer) - if stopfn() - put!(ch, :stopfn) - Base.throwto(reader_task, InterruptException()) - else - timeout = Timer(timeout_cb, checktime) - end - end - timeout = Timer(timeout_cb, checktime) - data = readavailable(io) - close(timeout) # Cancel the timeout - put!(ch, data) - end - bind(ch, task) - return ch -end - function async_reader_with_stopfn(ws::HTTP.WebSockets.WebSocket, stopfn, checktime)::Channel ch = Channel(1) # Could we make this channel concretely typed? task = @async begin @@ -265,20 +213,6 @@ A channel is returned with the data. If `stopfn` stops the websocket, the data will be `:stopfn`. If the timeout stops the websocket, the data will be `:timeout` """ -function readfromwebsocket(ws::IO, stopfn, subtimeout) - if isnothing(stopfn) && subtimeout > 0 - ch_out = async_reader_with_timeout(ws, subtimeout) - data = take!(ch_out) - elseif !isnothing(stopfn) - checktime = subtimeout > 0 ? subtimeout : 2 - ch_out = async_reader_with_stopfn(ws, stopfn, checktime) - data = take!(ch_out) - else - data = readavailable(ws) - end - return data -end - function readfromwebsocket(ws::HTTP.WebSockets.WebSocket, stopfn, subtimeout) if isnothing(stopfn) && subtimeout > 0 ch_out = async_reader_with_timeout(ws, subtimeout) @@ -291,4 +225,4 @@ function readfromwebsocket(ws::HTTP.WebSockets.WebSocket, stopfn, subtimeout) data = HTTP.receive(ws) end return data -end \ No newline at end of file +end diff --git a/test/subscriptions.jl b/test/subscriptions.jl index 8af32a4..db34679 100644 --- a/test/subscriptions.jl +++ b/test/subscriptions.jl @@ -3,7 +3,7 @@ function listen_localhost() if HTTP.WebSockets.isupgrade(http.message) HTTP.WebSockets.upgrade(http) do ws for data in ws - GraphQLClient.writews(ws, data) + HTTP.send(ws, data) end end end @@ -30,7 +30,7 @@ end @test take!(ch) == :timeout ch = GraphQLClient.async_reader_with_timeout(ws, 5) - GraphQLClient.writews(ws, "Data") + HTTP.send(ws, "Data") @test String(take!(ch)) == "Data" # stopfn @@ -43,11 +43,11 @@ end @test take!(ch) == :stopfn stop[] = false ch = GraphQLClient.async_reader_with_stopfn(ws, stopfn, 0.5) - GraphQLClient.writews(ws, "Data") + HTTP.send(ws, "Data") @test String(take!(ch)) == "Data" # readfromwebsocket - no timeout or stopfn - GraphQLClient.writews(ws, "Data") + HTTP.send(ws, "Data") @test String(GraphQLClient.readfromwebsocket(ws, nothing, 0)) == "Data" # readfromwebsocket - timeout @@ -90,7 +90,7 @@ function send_error_localhost(message, port) } } """ - GraphQLClient.writews(ws, error_payload) + HTTP.send(ws, error_payload) end end end @@ -116,7 +116,7 @@ function send_data_localhost(sub_name, port) } } """ - GraphQLClient.writews(ws, data_payload) + HTTP.send(ws, data_payload) end end end @@ -206,4 +206,4 @@ end @test isnothing(results[1].errors) @test !isnothing(results[1].data) # No point testing content as we've coded it in the test function -end \ No newline at end of file +end From 4415375fcf65deef0e282a10ae0cf9b2ef389c3c Mon Sep 17 00:00:00 2001 From: Panagiotis Georgakopoulos Date: Sun, 28 Aug 2022 19:20:00 +0300 Subject: [PATCH 08/11] remove ws --- src/subscriptions.jl | 1 - test/subscriptions.jl | 1 - 2 files changed, 2 deletions(-) diff --git a/src/subscriptions.jl b/src/subscriptions.jl index d658629..650809c 100644 --- a/src/subscriptions.jl +++ b/src/subscriptions.jl @@ -196,7 +196,6 @@ function async_reader_with_stopfn(ws::HTTP.WebSockets.WebSocket, stopfn, checkti return ch end - """ readfromwebsocket(ws::IO, stopfn, subtimeout) diff --git a/test/subscriptions.jl b/test/subscriptions.jl index db34679..eb71cdf 100644 --- a/test/subscriptions.jl +++ b/test/subscriptions.jl @@ -205,5 +205,4 @@ end @test results[1] isa GraphQLClient.GQLResponse{Response} @test isnothing(results[1].errors) @test !isnothing(results[1].data) # No point testing content as we've coded it in the test function - end From 02057a6c914fe242faddedb5149c320167984c02 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=CE=A0=CE=B1=CE=BD=CE=B1=CE=B3=CE=B9=CF=8E=CF=84=CE=B7?= =?UTF-8?q?=CF=82=20=CE=93=CE=B5=CF=89=CF=81=CE=B3=CE=B1=CE=BA=CF=8C=CF=80?= =?UTF-8?q?=CE=BF=CF=85=CE=BB=CE=BF=CF=82?= Date: Fri, 14 Oct 2022 10:34:41 +0300 Subject: [PATCH 09/11] docs: clarify HTTP refers to HTTP.jl Co-authored-by: Mal Miller <59854849+mmiller-max@users.noreply.github.com> --- src/subscriptions.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/subscriptions.jl b/src/subscriptions.jl index 650809c..0686916 100644 --- a/src/subscriptions.jl +++ b/src/subscriptions.jl @@ -138,7 +138,7 @@ function open_subscription(fn::Function, end end end - # We can't throw errors from the ws handle function in HTTP 1.0, as they get digested. + # We can't throw errors from the ws handle function in HTTP.jl 1.0, as they get digested. isassigned(throw_if_assigned) && throw(throw_if_assigned[]) output_debug(verbose) && println("Finished. Closing subscription") subscription_tracker[][sub_id] = "closed" From d9af5c5d93f71dc73bf965c6df20ad22aa264a6c Mon Sep 17 00:00:00 2001 From: Panagiotis Georgakopoulos Date: Fri, 14 Oct 2022 11:37:24 +0000 Subject: [PATCH 10/11] feat: implement apollographql/subscriptions-transport-ws protocol --- src/GraphQLClient.jl | 1 + src/constants.jl | 19 ++++++++ src/gqlresponse.jl | 4 +- src/subscriptions.jl | 109 +++++++++++++++++++++++++++++++++---------- 4 files changed, 106 insertions(+), 27 deletions(-) create mode 100644 src/constants.jl diff --git a/src/GraphQLClient.jl b/src/GraphQLClient.jl index d8592fe..041df35 100644 --- a/src/GraphQLClient.jl +++ b/src/GraphQLClient.jl @@ -11,6 +11,7 @@ export query, mutate, open_subscription, Client, GQLEnum, Alias, create_introspected_struct, list_all_introspected_objects, global_graphql_client, @gql_str +include("constants.jl") # Types include("client.jl") include("types.jl") diff --git a/src/constants.jl b/src/constants.jl new file mode 100644 index 0000000..f4144bb --- /dev/null +++ b/src/constants.jl @@ -0,0 +1,19 @@ +# GQL over WS Protocol constants +# https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md + +const GQL_CLIENT_CONNECTION_INIT = "connection_init" +const GQL_SERVER_CONNECTION_ACK = "connection_ack" +const GQL_SERVER_CONNECTION_ERROR = "connection_error" +const GQL_SERVER_CONNECTION_KEEP_ALIVE = "ka" +const GQL_CLIENT_START = "start" +const GQL_CLIENT_STOP = "stop" +const GQL_CLIENT_CONNECTION_TERMINATE = "connection_terminate" +const GQL_SERVER_DATA = "data" +const GQL_SERVER_ERROR = "error" +const GQL_SERVER_COMPLETE = "complete" + +# Subscription tracker +const SUBSCRIPTION_STATUS_OPEN = "open" +const SUBSCRIPTION_STATUS_ERROR = "errored" +const SUBSCRIPTION_STATUS_CLOSED = "closed" + diff --git a/src/gqlresponse.jl b/src/gqlresponse.jl index 0a4a858..91b577d 100644 --- a/src/gqlresponse.jl +++ b/src/gqlresponse.jl @@ -99,9 +99,9 @@ end Struct for subsriptions that wraps a `GQLReponse{T}` alongside various metadata. """ struct GQLSubscriptionResponse{T} - id::String + id::Union{String, Nothing} type::String - payload::GQLResponse{T} + payload::Union{GQLResponse{T}, Nothing} end StructTypes.StructType(::Type{<:GQLSubscriptionResponse}) = StructTypes.Struct() diff --git a/src/subscriptions.jl b/src/subscriptions.jl index 0686916..74368f0 100644 --- a/src/subscriptions.jl +++ b/src/subscriptions.jl @@ -26,7 +26,7 @@ The subscription uses the `ws_endpoint` field of the `client.` This function is designed to be used with the `do` keyword. # Arguments -- `fn::Function`: function to be run on each result, recieves the response from the +- `fn::Function`: function to be run on each result, receives the response from the subscription`. Must return a boolean to indicate whether or not to close the subscription, with `true` closing the subscription. - `client::Client`: GraphQL client (optional). If not supplied, [`global_graphql_client`](@ref) is used. @@ -87,61 +87,108 @@ function open_subscription(fn::Function, sub_id = string(length(keys(subscription_tracker[])) + 1) sub_id *= "-" * string(Threads.threadid()) - message = Dict( - "id" => string(sub_id), - "type" => "start", - "payload" => payload - ) - message_str = JSON3.write(message) + throw_if_assigned = Ref{GraphQLError}() - HTTP.WebSockets.open(client.ws_endpoint; retry=retry, headers=client.headers, suppress_close_error=false) do ws + headers = Dict(client.headers) + # We currently implement the `apollographql/subscriptions-transport-ws` which is default in hasura and others + # Defined here https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md + # TODO: Add support for the newer `graphql-transport-ws` from the graphql-ws library. + # () + headers["Sec-WebSocket-Protocol"] = "apollographql/subscriptions-transport-ws" + + HTTP.WebSockets.open(client.ws_endpoint; retry=retry, headers=headers) do ws # Start sub output_info(verbose) && println("Starting $(get_name(subscription_name)) subscription with ID $sub_id") - HTTP.send(ws, message_str) - subscription_tracker[][sub_id] = "open" - + HTTP.send(ws, JSON3.write(Dict("id" => sub_id, "type" => GQL_CLIENT_CONNECTION_INIT))) # Init function if !isnothing(initfn) output_debug(verbose) && println("Running subscription initialisation function") initfn() end + data = readfromwebsocket(ws, stopfn, subtimeout) + try checkreturn(data, verbose, sub_id) + catch e + e isa Interrupt && return + end + response = JSON3.read(data, GQLSubscriptionResponse{output_type}) + while response.type == GQL_SERVER_CONNECTION_KEEP_ALIVE + data = readfromwebsocket(ws, stopfn, subtimeout) + try checkreturn(data, verbose, sub_id) + catch e + e isa Interrupt && return + end + response = JSON3.read(data, GQLSubscriptionResponse{output_type}) + end + if response.type == GQL_SERVER_CONNECTION_ERROR && throw_on_execution_error + subscription_tracker[][sub_id] = SUBSCRIPTION_STATUS_ERROR + throw(GraphQLError("Error while establishing connection.", payload)) + end + + start_message = Dict( + "id" => string(sub_id), + "type" => GQL_CLIENT_START, + "payload" => payload + ) + message_str = JSON3.write(start_message) + HTTP.send(ws, message_str) + subscription_tracker[][sub_id] = SUBSCRIPTION_STATUS_OPEN + # Get listening output_debug(verbose) && println("Listening to $(get_name(subscription_name)) with ID $sub_id...") # Run function - finish = false - while !finish + while true data = readfromwebsocket(ws, stopfn, subtimeout) - if data === :timeout - output_info(verbose) && println("Subscription $sub_id timed out") + try checkreturn(data, verbose, sub_id) + catch e + e isa Interrupt && break + end + # data = String(data) + # println(data) + response = JSON3.read(data, GQLSubscriptionResponse{output_type}) + + response.type == GQL_SERVER_CONNECTION_KEEP_ALIVE && continue + response.type == GQL_SERVER_COMPLETE && break + response.type == GQL_SERVER_CONNECTION_ERROR && begin + throw_if_assigned[] = GraphQLError("Error during subscription. Server reporeted connection error") break - elseif data === :stopfn - output_info(verbose) && println("Subscription $sub_id stopped by the stop function supplied") + end + response.type == GQL_SERVER_ERROR && begin + throw_if_assigned[] = GraphQLError("Error during subscription - GQL_SERVER_ERROR.", response.payload) break end - response = JSON3.read(data, GQLSubscriptionResponse{output_type}) + # response.type == GQL_SERVER_DATA payload = response.payload if !isnothing(payload.errors) && !isempty(payload.errors) && throw_on_execution_error - subscription_tracker[][sub_id] = "errored" + subscription_tracker[][sub_id] = SUBSCRIPTION_STATUS_ERROR throw_if_assigned[] = GraphQLError("Error during subscription.", payload) break end # Handle multiple subs, do we need this? if response.id == string(sub_id) - output_debug(verbose) && println("Result recieved on subscription with ID $sub_id") + output_debug(verbose) && println("Result received on subscription with ID $sub_id") finish = fn(payload) if !isa(finish, Bool) - subscription_tracker[][sub_id] = "errored" - error("Subscription function must return a boolean") + subscription_tracker[][sub_id] = SUBSCRIPTION_STATUS_ERROR + throw_if_assigned[] = ErrorException("Subscription function must return a boolean") + break + end + if finish + # Protocol says we need to let the server know we're unsubscribing + output_debug(verbose) && println("Finished. Closing subscription") + HTTP.send(ws, JSON3.write(Dict("id" => sub_id, "type" => GQL_CLIENT_STOP))) + HTTP.send(ws, JSON3.write(Dict("id" => sub_id, "type" => GQL_CLIENT_CONNECTION_TERMINATE))) + # close(ws) + break end end end end - # We can't throw errors from the ws handle function in HTTP.jl 1.0, as they get digested. + # We can't throw errors from the ws handle function in HTTP.jl 1.#, as they get digested. isassigned(throw_if_assigned) && throw(throw_if_assigned[]) output_debug(verbose) && println("Finished. Closing subscription") - subscription_tracker[][sub_id] = "closed" + subscription_tracker[][sub_id] = SUBSCRIPTION_STATUS_CLOSED return end @@ -158,7 +205,7 @@ function clear_subscriptions() end end -function async_reader_with_timeout(ws::HTTP.WebSocket, subtimeout)::Channel +function async_reader_with_timeout(ws::HTTP.WebSockets.WebSocket, subtimeout)::Channel ch = Channel(1) task = @async begin reader_task = current_task() @@ -225,3 +272,15 @@ function readfromwebsocket(ws::HTTP.WebSockets.WebSocket, stopfn, subtimeout) end return data end +struct Interrupt <: Exception +end + +function checkreturn(data, verbose, sub_id) + if data === :timeout + output_info(verbose) && println("Subscription $sub_id timed out") + throw(Interrupt()) + elseif data === :stopfn + output_info(verbose) && println("Subscription $sub_id stopped by the stop function supplied") + throw(Interrupt()) + end +end \ No newline at end of file From ad669c8630ba4bae1b4cc703dde4e47e20f6cf66 Mon Sep 17 00:00:00 2001 From: Panagiotis Georgakopoulos Date: Sun, 16 Oct 2022 21:25:15 +0000 Subject: [PATCH 11/11] feat: Add support for graphql-transport-ws (newer) protocol --- src/GraphQLClient.jl | 1 + src/constants.jl | 13 ++ src/subscriptions.jl | 136 +++++++------------ src/ws_subscription_protocols.jl | 220 +++++++++++++++++++++++++++++++ 4 files changed, 281 insertions(+), 89 deletions(-) create mode 100644 src/ws_subscription_protocols.jl diff --git a/src/GraphQLClient.jl b/src/GraphQLClient.jl index 041df35..ec23259 100644 --- a/src/GraphQLClient.jl +++ b/src/GraphQLClient.jl @@ -27,6 +27,7 @@ include("type_construction.jl") include("http_execution.jl") include("queries.jl") include("mutations.jl") +include("ws_subscription_protocols.jl") include("subscriptions.jl") include("introspection.jl") include("gql_string.jl") diff --git a/src/constants.jl b/src/constants.jl index f4144bb..e9ee032 100644 --- a/src/constants.jl +++ b/src/constants.jl @@ -17,3 +17,16 @@ const SUBSCRIPTION_STATUS_OPEN = "open" const SUBSCRIPTION_STATUS_ERROR = "errored" const SUBSCRIPTION_STATUS_CLOSED = "closed" +# New GQL over WS constanst +# https://github.com/enisdenjo/graphql-ws/blob/master/PROTOCOL.md + +const GQLWS_CLIENT_INIT = "connection_init" +const GQLWS_SERVER_CONNECTION_ACK = "connection_ack" +const GQLWS_BI_PING = "ping" +const GQLWS_BI_PONG = "pong" +const GQLWS_CLIENT_SUBSCRIBE = "subscribe" +const GQLWS_SERVER_NEXT = "next" +const GQLWS_SERVER_ERROR = "error" +const GQLWS_BI_COMPLETE = "complete" + + diff --git a/src/subscriptions.jl b/src/subscriptions.jl index 74368f0..6ff32a1 100644 --- a/src/subscriptions.jl +++ b/src/subscriptions.jl @@ -11,7 +11,8 @@ const subscription_tracker = Ref{Dict}(Dict()) retry=true, subtimeout=0, stopfn=nothing, - throw_on_execution_error=false) + throw_on_execution_error=false, + websocket_protocol=join(", ", GQS_WS_PROTOCOLS)) Subscribe to `subscription_name`, running `fn` on each received result and ending the subcription when `fn` returns `true`. @@ -51,7 +52,12 @@ This function is designed to be used with the `do` keyword. - `throw_on_execution_error=false`: set to `true` to stop an error being thrown if the GraphQL server response contains errors that occurred during execution. - `verbose=0`: set to 1, 2 for extra logging. - +- `websocket_protocol=join(", ", GQL_WS_PROTOCOLS)`: Will try to communicate with [apollographql's + subcription-transport-protocol](https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md) + or with the newer [graphql-ws](https://github.com/enisdenjo/graphql-ws/blob/master/PROTOCOL.md) protocol. + With the default setup, the protocol that is actually used, is selected by the server. If you want to + enforce the subprotocol, you can adjust this accordingly. The string constants `PROTOCOL_APOLLO_OLD`, `PROTOCOL_GRAPHQL_WS` + contain the names of the sub-protocols, respectively. # Examples ```julia julia> open_subscription("subSaveUser", sub_args=Dict("role" => "SYSTEM_ADMIN")) do result @@ -78,12 +84,13 @@ function open_subscription(fn::Function, subtimeout=0, stopfn=nothing, throw_on_execution_error=false, - verbose=0) + verbose=0, + websocket_protocol=join(", ", PROTOCOL_GRAPHQL_WS)) !in(get_name(subscription_name), get_subscriptions(client)) && throw(GraphQLError("$(get_name(subscription_name)) is not an existing subscription")) output_str = get_output_str(output_fields) - payload = get_generic_query_payload(client, "subscription", subscription_name, sub_args, output_str) + subscription_payload = get_generic_query_payload(client, "subscription", subscription_name, sub_args, output_str) sub_id = string(length(keys(subscription_tracker[])) + 1) sub_id *= "-" * string(Threads.threadid()) @@ -94,95 +101,46 @@ function open_subscription(fn::Function, # Defined here https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md # TODO: Add support for the newer `graphql-transport-ws` from the graphql-ws library. # () - headers["Sec-WebSocket-Protocol"] = "apollographql/subscriptions-transport-ws" + headers["Sec-WebSocket-Protocol"] = websocket_protocol HTTP.WebSockets.open(client.ws_endpoint; retry=retry, headers=headers) do ws # Start sub output_info(verbose) && println("Starting $(get_name(subscription_name)) subscription with ID $sub_id") - HTTP.send(ws, JSON3.write(Dict("id" => sub_id, "type" => GQL_CLIENT_CONNECTION_INIT))) - # Init function - if !isnothing(initfn) - output_debug(verbose) && println("Running subscription initialisation function") - initfn() - end - - data = readfromwebsocket(ws, stopfn, subtimeout) - try checkreturn(data, verbose, sub_id) - catch e - e isa Interrupt && return - end - response = JSON3.read(data, GQLSubscriptionResponse{output_type}) - while response.type == GQL_SERVER_CONNECTION_KEEP_ALIVE - data = readfromwebsocket(ws, stopfn, subtimeout) - try checkreturn(data, verbose, sub_id) - catch e - e isa Interrupt && return - end - response = JSON3.read(data, GQLSubscriptionResponse{output_type}) - end - if response.type == GQL_SERVER_CONNECTION_ERROR && throw_on_execution_error - subscription_tracker[][sub_id] = SUBSCRIPTION_STATUS_ERROR - throw(GraphQLError("Error while establishing connection.", payload)) - end - - start_message = Dict( - "id" => string(sub_id), - "type" => GQL_CLIENT_START, - "payload" => payload - ) - message_str = JSON3.write(start_message) - HTTP.send(ws, message_str) - subscription_tracker[][sub_id] = SUBSCRIPTION_STATUS_OPEN - - # Get listening - output_debug(verbose) && println("Listening to $(get_name(subscription_name)) with ID $sub_id...") - - # Run function - while true - data = readfromwebsocket(ws, stopfn, subtimeout) - try checkreturn(data, verbose, sub_id) - catch e - e isa Interrupt && break - end - # data = String(data) - # println(data) - response = JSON3.read(data, GQLSubscriptionResponse{output_type}) - - response.type == GQL_SERVER_CONNECTION_KEEP_ALIVE && continue - response.type == GQL_SERVER_COMPLETE && break - response.type == GQL_SERVER_CONNECTION_ERROR && begin - throw_if_assigned[] = GraphQLError("Error during subscription. Server reporeted connection error") - break - end - response.type == GQL_SERVER_ERROR && begin - throw_if_assigned[] = GraphQLError("Error during subscription - GQL_SERVER_ERROR.", response.payload) - break - end - # response.type == GQL_SERVER_DATA - payload = response.payload - if !isnothing(payload.errors) && !isempty(payload.errors) && throw_on_execution_error - subscription_tracker[][sub_id] = SUBSCRIPTION_STATUS_ERROR - throw_if_assigned[] = GraphQLError("Error during subscription.", payload) - break - end - # Handle multiple subs, do we need this? - if response.id == string(sub_id) - output_debug(verbose) && println("Result received on subscription with ID $sub_id") - finish = fn(payload) - if !isa(finish, Bool) - subscription_tracker[][sub_id] = SUBSCRIPTION_STATUS_ERROR - throw_if_assigned[] = ErrorException("Subscription function must return a boolean") - break - end - if finish - # Protocol says we need to let the server know we're unsubscribing - output_debug(verbose) && println("Finished. Closing subscription") - HTTP.send(ws, JSON3.write(Dict("id" => sub_id, "type" => GQL_CLIENT_STOP))) - HTTP.send(ws, JSON3.write(Dict("id" => sub_id, "type" => GQL_CLIENT_CONNECTION_TERMINATE))) - # close(ws) - break - end - end + selected_protocol = HTTP.header(ws.response, "Sec-WebSocket-Protocol") + output_info(verbose) && println("Headers - $selected_protocol, $(join(' ', HTTP.headers(ws.response)))") + if selected_protocol == PROTOCOL_APOLLO_OLD + handle_apollo_old( + fn, + ws, + subscription_name, + subscription_payload, + sub_id, + output_type; + initfn=initfn, + subtimeout=subtimeout, + stopfn=stopfn, + throw_on_execution_error=throw_on_execution_error, + verbose=verbose, + throw_if_assigned_ref=throw_if_assigned + ) + else + if selected_protocol != PROTOCOL_GRAPHQL_WS + @warn("None of the implemented protocols match - trying to use \"$(PROTOCOL_GRAPHQL_WS)\"") + end + handle_graphql_ws( + fn, + ws, + subscription_name, + subscription_payload, + sub_id, + output_type; + initfn=initfn, + subtimeout=subtimeout, + stopfn=stopfn, + throw_on_execution_error=throw_on_execution_error, + verbose=verbose, + throw_if_assigned_ref=throw_if_assigned + ) end end # We can't throw errors from the ws handle function in HTTP.jl 1.#, as they get digested. diff --git a/src/ws_subscription_protocols.jl b/src/ws_subscription_protocols.jl new file mode 100644 index 0000000..1e818b6 --- /dev/null +++ b/src/ws_subscription_protocols.jl @@ -0,0 +1,220 @@ +import HTTP +import Base.string + +supported_protocols = [] + +PROTOCOL_APOLLO_OLD = "graphql-ws" +PROTOCOL_GRAPHQL_WS = "graphql-transport-ws" + +GQL_WS_PROTOCOLS = [PROTOCOL_APOLLO_OLD, PROTOCOL_GRAPHQL_WS] + +function handle_apollo_old( + fn::Function, + ws::HTTP.WebSockets.WebSocket, + subscription_name::Union{Alias, AbstractString}, + subscription_payload, + sub_id::AbstractString, + output_type::Type=Any; + initfn=nothing, + subtimeout=0, + stopfn=nothing, + throw_on_execution_error=false, + verbose=0, + throw_if_assigned_ref=nothing) + + output_debug(verbose) && println("Communicating with unmaintained apollo ws protocol ($PROTOCOL_APOLLO_OLD)") + HTTP.send(ws, JSON3.write(Dict("id" => sub_id, "type" => GQL_CLIENT_CONNECTION_INIT))) + # Init function + if !isnothing(initfn) + output_debug(verbose) && println("Running subscription initialisation function") + initfn() + end + + data = readfromwebsocket(ws, stopfn, subtimeout) + try checkreturn(data, verbose, sub_id) + catch e + e isa Interrupt && return + end + response = JSON3.read(data, GQLSubscriptionResponse{output_type}) + while response.type == GQL_SERVER_CONNECTION_KEEP_ALIVE + data = readfromwebsocket(ws, stopfn, subtimeout) + try checkreturn(data, verbose, sub_id) + catch e + e isa Interrupt && return + end + response = JSON3.read(data, GQLSubscriptionResponse{output_type}) + end + if response.type == GQL_SERVER_CONNECTION_ERROR && throw_on_execution_error + subscription_tracker[][sub_id] = SUBSCRIPTION_STATUS_ERROR + throw(GraphQLError("Error while establishing connection.", response.payload)) + end + + start_message = Dict( + "id" => string(sub_id), + "type" => GQL_CLIENT_START, + "payload" => subscription_payload, + ) + message_str = JSON3.write(start_message) + HTTP.send(ws, message_str) + subscription_tracker[][sub_id] = SUBSCRIPTION_STATUS_OPEN + + # Get listening + output_debug(verbose) && println("Listening to $(get_name(subscription_name)) with ID $sub_id...") + + # Run function + while true + data = readfromwebsocket(ws, stopfn, subtimeout) + try checkreturn(data, verbose, sub_id) + catch e + e isa Interrupt && break + end + # data = String(data) + # println(data) + response = JSON3.read(data, GQLSubscriptionResponse{output_type}) + + response.type == GQL_SERVER_CONNECTION_KEEP_ALIVE && continue + response.type == GQL_SERVER_COMPLETE && break + response.type == GQL_SERVER_CONNECTION_ERROR && begin + throw_if_assigned_ref[] = GraphQLError("Error during subscription. Server reporeted connection error") + break + end + response.type == GQL_SERVER_ERROR && begin + throw_if_assigned_ref[] = GraphQLError("Error during subscription - GQL_SERVER_ERROR.", response.payload) + break + end + # response.type == GQL_SERVER_DATA + payload = response.payload + if !isnothing(payload.errors) && !isempty(payload.errors) && throw_on_execution_error + subscription_tracker[][sub_id] = SUBSCRIPTION_STATUS_ERROR + throw_if_assigned_ref[] = GraphQLError("Error during subscription.", payload) + break + end + # Handle multiple subs, do we need this? + if response.id == string(sub_id) + output_debug(verbose) && println("Result received on subscription with ID $sub_id") + finish = fn(payload) + if !isa(finish, Bool) + subscription_tracker[][sub_id] = SUBSCRIPTION_STATUS_ERROR + throw_if_assigned_ref[] = ErrorException("Subscription function must return a boolean") + break + end + if finish + # Protocol says we need to let the server know we're unsubscribing + output_debug(verbose) && println("Finished. Closing subscription") + HTTP.send(ws, JSON3.write(Dict("id" => sub_id, "type" => GQL_CLIENT_STOP))) + HTTP.send(ws, JSON3.write(Dict("id" => sub_id, "type" => GQL_CLIENT_CONNECTION_TERMINATE))) + # close(ws) + break + end + end + end +end + +function send_pong(ws::HTTP.WebSockets.WebSocket) + return HTTP.send(ws, JSON3.write(Dict("type" => GQLWS_BI_PONG))) +end + +function handle_graphql_ws( + fn::Function, + ws::HTTP.WebSockets.WebSocket, + subscription_name::Union{Alias, AbstractString}, + subscription_payload, + sub_id::AbstractString, + output_type::Type=Any; + initfn=nothing, + subtimeout=0, + stopfn=nothing, + throw_on_execution_error=false, + verbose=0, + throw_if_assigned_ref=nothing) + + # TODO: Lock this; each client must only have *one* request open in this stage + output_debug(verbose) && println("Communicating with GraphQL-WS protocol ($PROTOCOL_GRAPHQL_WS)") + HTTP.send(ws, JSON3.write(Dict( + "id" => sub_id, + "type" => GQLWS_CLIENT_INIT))) + # Init function + if !isnothing(initfn) + output_debug(verbose) && println("Running subscription initialisation function") + initfn() + end + + data = readfromwebsocket(ws, stopfn, subtimeout) + response = JSON3.read(data, GQLSubscriptionResponse{output_type}) + while response.type == GQLWS_BI_PING + send_pong(ws) + data = readfromwebsocket(ws, stopfn, subtimeout) + try checkreturn(data, verbose, sub_id) + catch e + e isa Interrupt && return + end + response = JSON3.read(data, GQLSubscriptionResponse{output_type}) + end + + if response.type == GQLWS_SERVER_ERROR && throw_on_execution_error + subscription_tracker[][sub_id] = SUBSCRIPTION_STATUS_ERROR + throw(GraphQLError("Error while establishing connection.", response.payload)) + end + + if response.type != GQLWS_SERVER_CONNECTION_ACK + error("Connection could not be established; Server did not ACK the request to initialize.") + end + + start_message = Dict( + "id" => string(sub_id), + "type" => GQLWS_CLIENT_SUBSCRIBE, + "payload" => subscription_payload + ) + message_str = JSON3.write(start_message) + HTTP.send(ws, message_str) + subscription_tracker[][sub_id] = SUBSCRIPTION_STATUS_OPEN + + # Get listening + output_debug(verbose) && println("Listening to $(get_name(subscription_name)) with ID $sub_id...") + + # Run function + while true + data = readfromwebsocket(ws, stopfn, subtimeout) + try checkreturn(data, verbose, sub_id) + catch e + e isa Interrupt && break + end + # data = String(data) + # println(data) + response = JSON3.read(data, GQLSubscriptionResponse{output_type}) + + response.type == GQLWS_BI_PING && begin + send_pong(ws) + continue + end + response.type == GQLWS_BI_COMPLETE && break + response.type == GQLWS_SERVER_ERROR && begin + throw_if_assigned_ref[] = GraphQLError("Error during subscription. Server reporeted connection error") + break + end + # response.type == GQLWS_SERVER_NEXT + payload = response.payload + if !isnothing(payload.errors) && !isempty(payload.errors) && throw_on_execution_error + subscription_tracker[][sub_id] = SUBSCRIPTION_STATUS_ERROR + throw_if_assigned_ref[] = GraphQLError("Error during subscription.", payload) + break + end + # Handle multiple subs, do we need this? + if response.id == string(sub_id) + output_debug(verbose) && println("Result received on subscription with ID $sub_id") + finish = fn(payload) + if !isa(finish, Bool) + subscription_tracker[][sub_id] = SUBSCRIPTION_STATUS_ERROR + throw_if_assigned_ref[] = ErrorException("Subscription function must return a boolean") + break + end + if finish + # Protocol says we need to let the server know we're unsubscribing + output_debug(verbose) && println("Finished. Closing subscription") + HTTP.send(ws, JSON3.write(Dict("id" => sub_id, "type" => GQLWS_BI_COMPLETE))) + # close(ws) + break + end + end + end +end \ No newline at end of file