Skip to content

Commit d9af5c5

Browse files
committed
feat: implement apollographql/subscriptions-transport-ws protocol
1 parent 02057a6 commit d9af5c5

File tree

4 files changed

+106
-27
lines changed

4 files changed

+106
-27
lines changed

src/GraphQLClient.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ export query, mutate, open_subscription, Client, GQLEnum, Alias,
1111
create_introspected_struct, list_all_introspected_objects, global_graphql_client,
1212
@gql_str
1313

14+
include("constants.jl")
1415
# Types
1516
include("client.jl")
1617
include("types.jl")

src/constants.jl

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
# GQL over WS Protocol constants
2+
# https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md
3+
4+
const GQL_CLIENT_CONNECTION_INIT = "connection_init"
5+
const GQL_SERVER_CONNECTION_ACK = "connection_ack"
6+
const GQL_SERVER_CONNECTION_ERROR = "connection_error"
7+
const GQL_SERVER_CONNECTION_KEEP_ALIVE = "ka"
8+
const GQL_CLIENT_START = "start"
9+
const GQL_CLIENT_STOP = "stop"
10+
const GQL_CLIENT_CONNECTION_TERMINATE = "connection_terminate"
11+
const GQL_SERVER_DATA = "data"
12+
const GQL_SERVER_ERROR = "error"
13+
const GQL_SERVER_COMPLETE = "complete"
14+
15+
# Subscription tracker
16+
const SUBSCRIPTION_STATUS_OPEN = "open"
17+
const SUBSCRIPTION_STATUS_ERROR = "errored"
18+
const SUBSCRIPTION_STATUS_CLOSED = "closed"
19+

src/gqlresponse.jl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,9 +99,9 @@ end
9999
Struct for subsriptions that wraps a `GQLReponse{T}` alongside various metadata.
100100
"""
101101
struct GQLSubscriptionResponse{T}
102-
id::String
102+
id::Union{String, Nothing}
103103
type::String
104-
payload::GQLResponse{T}
104+
payload::Union{GQLResponse{T}, Nothing}
105105
end
106106
StructTypes.StructType(::Type{<:GQLSubscriptionResponse}) = StructTypes.Struct()
107107

src/subscriptions.jl

Lines changed: 84 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ The subscription uses the `ws_endpoint` field of the `client.`
2626
This function is designed to be used with the `do` keyword.
2727
2828
# Arguments
29-
- `fn::Function`: function to be run on each result, recieves the response from the
29+
- `fn::Function`: function to be run on each result, receives the response from the
3030
subscription`. Must return a boolean to indicate whether or not to close the subscription,
3131
with `true` closing the subscription.
3232
- `client::Client`: GraphQL client (optional). If not supplied, [`global_graphql_client`](@ref) is used.
@@ -87,61 +87,108 @@ function open_subscription(fn::Function,
8787

8888
sub_id = string(length(keys(subscription_tracker[])) + 1)
8989
sub_id *= "-" * string(Threads.threadid())
90-
message = Dict(
91-
"id" => string(sub_id),
92-
"type" => "start",
93-
"payload" => payload
94-
)
95-
message_str = JSON3.write(message)
90+
9691
throw_if_assigned = Ref{GraphQLError}()
97-
HTTP.WebSockets.open(client.ws_endpoint; retry=retry, headers=client.headers, suppress_close_error=false) do ws
92+
headers = Dict(client.headers)
93+
# We currently implement the `apollographql/subscriptions-transport-ws` which is default in hasura and others
94+
# Defined here https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md
95+
# TODO: Add support for the newer `graphql-transport-ws` from the graphql-ws library.
96+
# ()
97+
headers["Sec-WebSocket-Protocol"] = "apollographql/subscriptions-transport-ws"
98+
99+
HTTP.WebSockets.open(client.ws_endpoint; retry=retry, headers=headers) do ws
98100
# Start sub
99101
output_info(verbose) && println("Starting $(get_name(subscription_name)) subscription with ID $sub_id")
100-
HTTP.send(ws, message_str)
101-
subscription_tracker[][sub_id] = "open"
102-
102+
HTTP.send(ws, JSON3.write(Dict("id" => sub_id, "type" => GQL_CLIENT_CONNECTION_INIT)))
103103
# Init function
104104
if !isnothing(initfn)
105105
output_debug(verbose) && println("Running subscription initialisation function")
106106
initfn()
107107
end
108108

109+
data = readfromwebsocket(ws, stopfn, subtimeout)
110+
try checkreturn(data, verbose, sub_id)
111+
catch e
112+
e isa Interrupt && return
113+
end
114+
response = JSON3.read(data, GQLSubscriptionResponse{output_type})
115+
while response.type == GQL_SERVER_CONNECTION_KEEP_ALIVE
116+
data = readfromwebsocket(ws, stopfn, subtimeout)
117+
try checkreturn(data, verbose, sub_id)
118+
catch e
119+
e isa Interrupt && return
120+
end
121+
response = JSON3.read(data, GQLSubscriptionResponse{output_type})
122+
end
123+
if response.type == GQL_SERVER_CONNECTION_ERROR && throw_on_execution_error
124+
subscription_tracker[][sub_id] = SUBSCRIPTION_STATUS_ERROR
125+
throw(GraphQLError("Error while establishing connection.", payload))
126+
end
127+
128+
start_message = Dict(
129+
"id" => string(sub_id),
130+
"type" => GQL_CLIENT_START,
131+
"payload" => payload
132+
)
133+
message_str = JSON3.write(start_message)
134+
HTTP.send(ws, message_str)
135+
subscription_tracker[][sub_id] = SUBSCRIPTION_STATUS_OPEN
136+
109137
# Get listening
110138
output_debug(verbose) && println("Listening to $(get_name(subscription_name)) with ID $sub_id...")
111139

112140
# Run function
113-
finish = false
114-
while !finish
141+
while true
115142
data = readfromwebsocket(ws, stopfn, subtimeout)
116-
if data === :timeout
117-
output_info(verbose) && println("Subscription $sub_id timed out")
143+
try checkreturn(data, verbose, sub_id)
144+
catch e
145+
e isa Interrupt && break
146+
end
147+
# data = String(data)
148+
# println(data)
149+
response = JSON3.read(data, GQLSubscriptionResponse{output_type})
150+
151+
response.type == GQL_SERVER_CONNECTION_KEEP_ALIVE && continue
152+
response.type == GQL_SERVER_COMPLETE && break
153+
response.type == GQL_SERVER_CONNECTION_ERROR && begin
154+
throw_if_assigned[] = GraphQLError("Error during subscription. Server reporeted connection error")
118155
break
119-
elseif data === :stopfn
120-
output_info(verbose) && println("Subscription $sub_id stopped by the stop function supplied")
156+
end
157+
response.type == GQL_SERVER_ERROR && begin
158+
throw_if_assigned[] = GraphQLError("Error during subscription - GQL_SERVER_ERROR.", response.payload)
121159
break
122160
end
123-
response = JSON3.read(data, GQLSubscriptionResponse{output_type})
161+
# response.type == GQL_SERVER_DATA
124162
payload = response.payload
125163
if !isnothing(payload.errors) && !isempty(payload.errors) && throw_on_execution_error
126-
subscription_tracker[][sub_id] = "errored"
164+
subscription_tracker[][sub_id] = SUBSCRIPTION_STATUS_ERROR
127165
throw_if_assigned[] = GraphQLError("Error during subscription.", payload)
128166
break
129167
end
130168
# Handle multiple subs, do we need this?
131169
if response.id == string(sub_id)
132-
output_debug(verbose) && println("Result recieved on subscription with ID $sub_id")
170+
output_debug(verbose) && println("Result received on subscription with ID $sub_id")
133171
finish = fn(payload)
134172
if !isa(finish, Bool)
135-
subscription_tracker[][sub_id] = "errored"
136-
error("Subscription function must return a boolean")
173+
subscription_tracker[][sub_id] = SUBSCRIPTION_STATUS_ERROR
174+
throw_if_assigned[] = ErrorException("Subscription function must return a boolean")
175+
break
176+
end
177+
if finish
178+
# Protocol says we need to let the server know we're unsubscribing
179+
output_debug(verbose) && println("Finished. Closing subscription")
180+
HTTP.send(ws, JSON3.write(Dict("id" => sub_id, "type" => GQL_CLIENT_STOP)))
181+
HTTP.send(ws, JSON3.write(Dict("id" => sub_id, "type" => GQL_CLIENT_CONNECTION_TERMINATE)))
182+
# close(ws)
183+
break
137184
end
138185
end
139186
end
140187
end
141-
# We can't throw errors from the ws handle function in HTTP.jl 1.0, as they get digested.
188+
# We can't throw errors from the ws handle function in HTTP.jl 1.#, as they get digested.
142189
isassigned(throw_if_assigned) && throw(throw_if_assigned[])
143190
output_debug(verbose) && println("Finished. Closing subscription")
144-
subscription_tracker[][sub_id] = "closed"
191+
subscription_tracker[][sub_id] = SUBSCRIPTION_STATUS_CLOSED
145192
return
146193
end
147194

@@ -158,7 +205,7 @@ function clear_subscriptions()
158205
end
159206
end
160207

161-
function async_reader_with_timeout(ws::HTTP.WebSocket, subtimeout)::Channel
208+
function async_reader_with_timeout(ws::HTTP.WebSockets.WebSocket, subtimeout)::Channel
162209
ch = Channel(1)
163210
task = @async begin
164211
reader_task = current_task()
@@ -225,3 +272,15 @@ function readfromwebsocket(ws::HTTP.WebSockets.WebSocket, stopfn, subtimeout)
225272
end
226273
return data
227274
end
275+
struct Interrupt <: Exception
276+
end
277+
278+
function checkreturn(data, verbose, sub_id)
279+
if data === :timeout
280+
output_info(verbose) && println("Subscription $sub_id timed out")
281+
throw(Interrupt())
282+
elseif data === :stopfn
283+
output_info(verbose) && println("Subscription $sub_id stopped by the stop function supplied")
284+
throw(Interrupt())
285+
end
286+
end

0 commit comments

Comments
 (0)