Skip to content

Commit d761bb9

Browse files
committed
feat: implement apollographql/subscriptions-transport-ws protocol
1 parent 02057a6 commit d761bb9

File tree

4 files changed

+103
-28
lines changed

4 files changed

+103
-28
lines changed

src/GraphQLClient.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ export query, mutate, open_subscription, Client, GQLEnum, Alias,
1111
create_introspected_struct, list_all_introspected_objects, global_graphql_client,
1212
@gql_str
1313

14+
include("constants.jl")
1415
# Types
1516
include("client.jl")
1617
include("types.jl")

src/constants.jl

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
# GQL over WS Protocol constants
2+
# https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md
3+
4+
const GQL_CLIENT_CONNECTION_INIT = "connection_init"
5+
const GQL_SERVER_CONNECTION_ACK = "connection_ack"
6+
const GQL_SERVER_CONNECTION_ERROR = "connection_error"
7+
const GQL_SERVER_CONNECTION_KEEP_ALIVE = "ka"
8+
const GQL_CLIENT_START = "start"
9+
const GQL_CLIENT_STOP = "stop"
10+
const GQL_CLIENT_CONNECTION_TERMINATE = "connection_terminate"
11+
const GQL_SERVER_DATA = "data"
12+
const GQL_SERVER_ERROR = "error"
13+
const GQL_SERVER_COMPLETE = "complete"
14+
15+
# Subscription tracker
16+
const SUBSCRIPTION_STATUS_OPEN = "open"
17+
const SUBSCRIPTION_STATUS_ERROR = "errored"
18+
const SUBSCRIPTION_STATUS_CLOSED = "closed"
19+

src/gqlresponse.jl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,9 +99,9 @@ end
9999
Struct for subsriptions that wraps a `GQLReponse{T}` alongside various metadata.
100100
"""
101101
struct GQLSubscriptionResponse{T}
102-
id::String
102+
id::Union{String, Nothing}
103103
type::String
104-
payload::GQLResponse{T}
104+
payload::Union{GQLResponse{T}, Nothing}
105105
end
106106
StructTypes.StructType(::Type{<:GQLSubscriptionResponse}) = StructTypes.Struct()
107107

src/subscriptions.jl

Lines changed: 81 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ The subscription uses the `ws_endpoint` field of the `client.`
2626
This function is designed to be used with the `do` keyword.
2727
2828
# Arguments
29-
- `fn::Function`: function to be run on each result, recieves the response from the
29+
- `fn::Function`: function to be run on each result, receives the response from the
3030
subscription`. Must return a boolean to indicate whether or not to close the subscription,
3131
with `true` closing the subscription.
3232
- `client::Client`: GraphQL client (optional). If not supplied, [`global_graphql_client`](@ref) is used.
@@ -87,61 +87,104 @@ function open_subscription(fn::Function,
8787

8888
sub_id = string(length(keys(subscription_tracker[])) + 1)
8989
sub_id *= "-" * string(Threads.threadid())
90-
message = Dict(
91-
"id" => string(sub_id),
92-
"type" => "start",
93-
"payload" => payload
94-
)
95-
message_str = JSON3.write(message)
90+
9691
throw_if_assigned = Ref{GraphQLError}()
97-
HTTP.WebSockets.open(client.ws_endpoint; retry=retry, headers=client.headers, suppress_close_error=false) do ws
92+
headers = Dict(client.headers)
93+
# We currently implement the `apollographql/subscriptions-transport-ws` which is default in hasura and others
94+
# Defined here https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md
95+
# TODO: Add support for the newer `graphql-transport-ws` from the graphql-ws library.
96+
# ()
97+
headers["Sec-WebSocket-Protocol"] = "apollographql/subscriptions-transport-ws"
98+
99+
HTTP.WebSockets.open(client.ws_endpoint; retry=retry, headers=headers) do ws
98100
# Start sub
99101
output_info(verbose) && println("Starting $(get_name(subscription_name)) subscription with ID $sub_id")
100-
HTTP.send(ws, message_str)
101-
subscription_tracker[][sub_id] = "open"
102-
102+
HTTP.send(ws, JSON3.write(Dict("id" => sub_id, "type" => GQL_CLIENT_CONNECTION_INIT)))
103103
# Init function
104104
if !isnothing(initfn)
105105
output_debug(verbose) && println("Running subscription initialisation function")
106106
initfn()
107107
end
108108

109+
data = readfromwebsocket(ws, stopfn, subtimeout)
110+
try checkreturn(data, verbose, sub_id)
111+
catch e
112+
e isa Interrupt && return
113+
end
114+
response = JSON3.read(data, GQLSubscriptionResponse{output_type})
115+
while response.type == GQL_SERVER_CONNECTION_KEEP_ALIVE
116+
response = JSON3.read(readfromwebsocket(ws, stopfn, subtimeout), GQLSubscriptionResponse{output_type})
117+
try checkreturn(data, verbose, sub_id)
118+
catch e
119+
e isa Interrupt && return
120+
end
121+
end
122+
if response.type == GQL_SERVER_CONNECTION_ERROR && throw_on_execution_error
123+
subscription_tracker[][sub_id] = SUBSCRIPTION_STATUS_ERROR
124+
throw(GraphQLError("Error while establishing connection.", payload))
125+
end
126+
127+
start_message = Dict(
128+
"id" => string(sub_id),
129+
"type" => GQL_CLIENT_START,
130+
"payload" => payload
131+
)
132+
message_str = JSON3.write(start_message)
133+
HTTP.send(ws, message_str)
134+
subscription_tracker[][sub_id] = SUBSCRIPTION_STATUS_OPEN
135+
109136
# Get listening
110137
output_debug(verbose) && println("Listening to $(get_name(subscription_name)) with ID $sub_id...")
111138

112139
# Run function
113-
finish = false
114-
while !finish
140+
while true
115141
data = readfromwebsocket(ws, stopfn, subtimeout)
116-
if data === :timeout
117-
output_info(verbose) && println("Subscription $sub_id timed out")
118-
break
119-
elseif data === :stopfn
120-
output_info(verbose) && println("Subscription $sub_id stopped by the stop function supplied")
121-
break
142+
try checkreturn(data, verbose, sub_id)
143+
catch e
144+
e isa Interrupt && break
122145
end
146+
# data = String(data)
147+
# println(data)
123148
response = JSON3.read(data, GQLSubscriptionResponse{output_type})
149+
150+
response.type == GQL_SERVER_CONNECTION_KEEP_ALIVE && continue
151+
response.type == GQL_SERVER_COMPLETE && break
152+
response.type == GQL_SERVER_CONNECTION_ERROR && begin
153+
throw_if_assigned[] = GraphQLError("Error during subscription. Server reporeted connection error")
154+
end
155+
response.type == GQL_SERVER_ERROR && begin
156+
throw_if_assigned[] = GraphQLError("Error during subscription - GQL_SERVER_ERROR.", response.payload)
157+
end
158+
response.type == GQL_SERVER_DATA #
124159
payload = response.payload
125160
if !isnothing(payload.errors) && !isempty(payload.errors) && throw_on_execution_error
126-
subscription_tracker[][sub_id] = "errored"
161+
subscription_tracker[][sub_id] = SUBSCRIPTION_STATUS_ERROR
127162
throw_if_assigned[] = GraphQLError("Error during subscription.", payload)
128163
break
129164
end
130165
# Handle multiple subs, do we need this?
131166
if response.id == string(sub_id)
132-
output_debug(verbose) && println("Result recieved on subscription with ID $sub_id")
167+
output_debug(verbose) && println("Result received on subscription with ID $sub_id")
133168
finish = fn(payload)
134169
if !isa(finish, Bool)
135-
subscription_tracker[][sub_id] = "errored"
136-
error("Subscription function must return a boolean")
170+
subscription_tracker[][sub_id] = SUBSCRIPTION_STATUS_ERROR
171+
throw_if_assigned[] = ErrorException("Subscription function must return a boolean")
172+
end
173+
if finish
174+
# Protocol says we need to let the server know we're unsubscribing
175+
output_debug(verbose) && println("Finished. Closing subscription")
176+
HTTP.send(ws, JSON3.write(Dict("id" => sub_id, "type" => GQL_CLIENT_STOP)))
177+
HTTP.send(ws, JSON3.write(Dict("id" => sub_id, "type" => GQL_CLIENT_CONNECTION_TERMINATE)))
178+
close(ws)
179+
break
137180
end
138181
end
139182
end
140183
end
141-
# We can't throw errors from the ws handle function in HTTP.jl 1.0, as they get digested.
184+
# We can't throw errors from the ws handle function in HTTP.jl 1.#, as they get digested.
142185
isassigned(throw_if_assigned) && throw(throw_if_assigned[])
143186
output_debug(verbose) && println("Finished. Closing subscription")
144-
subscription_tracker[][sub_id] = "closed"
187+
subscription_tracker[][sub_id] = SUBSCRIPTION_STATUS_CLOSED
145188
return
146189
end
147190

@@ -158,7 +201,7 @@ function clear_subscriptions()
158201
end
159202
end
160203

161-
function async_reader_with_timeout(ws::HTTP.WebSocket, subtimeout)::Channel
204+
function async_reader_with_timeout(ws::HTTP.WebSockets.WebSocket, subtimeout)::Channel
162205
ch = Channel(1)
163206
task = @async begin
164207
reader_task = current_task()
@@ -225,3 +268,15 @@ function readfromwebsocket(ws::HTTP.WebSockets.WebSocket, stopfn, subtimeout)
225268
end
226269
return data
227270
end
271+
struct Interrupt <: Exception
272+
end
273+
274+
function checkreturn(data, verbose, sub_id)
275+
if data === :timeout
276+
output_info(verbose) && println("Subscription $sub_id timed out")
277+
throw(Interrupt())
278+
elseif data === :stopfn
279+
output_info(verbose) && println("Subscription $sub_id stopped by the stop function supplied")
280+
throw(Interrupt())
281+
end
282+
end

0 commit comments

Comments
 (0)