@@ -26,7 +26,7 @@ The subscription uses the `ws_endpoint` field of the `client.`
2626This function is designed to be used with the `do` keyword.
2727
2828# Arguments
29- - `fn::Function`: function to be run on each result, recieves the response from the
29+ - `fn::Function`: function to be run on each result, receives the response from the
3030 subscription`. Must return a boolean to indicate whether or not to close the subscription,
3131 with `true` closing the subscription.
3232- `client::Client`: GraphQL client (optional). If not supplied, [`global_graphql_client`](@ref) is used.
@@ -87,61 +87,105 @@ function open_subscription(fn::Function,
8787
8888 sub_id = string (length (keys (subscription_tracker[])) + 1 )
8989 sub_id *= " -" * string (Threads. threadid ())
90- message = Dict (
91- " id" => string (sub_id),
92- " type" => " start" ,
93- " payload" => payload
94- )
95- message_str = JSON3. write (message)
90+
9691 throw_if_assigned = Ref {GraphQLError} ()
97- HTTP. WebSockets. open (client. ws_endpoint; retry= retry, headers= client. headers, suppress_close_error= false ) do ws
92+ headers = Dict (client. headers)
93+ # We currently implement the `apollographql/subscriptions-transport-ws` which is default in hasura and others
94+ # Defined here https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md
95+ # TODO : Add support for the newer `graphql-transport-ws` from the graphql-ws library.
96+ # ()
97+ headers[" Sec-WebSocket-Protocol" ] = " apollographql/subscriptions-transport-ws"
98+
99+ HTTP. WebSockets. open (client. ws_endpoint; retry= retry, headers= headers) do ws
98100 # Start sub
99101 output_info (verbose) && println (" Starting $(get_name (subscription_name)) subscription with ID $sub_id " )
100- HTTP. send (ws, message_str)
101- subscription_tracker[][sub_id] = " open"
102-
102+ HTTP. send (ws, JSON3. write (Dict (" id" => sub_id, " type" => GQL_CLIENT_CONNECTION_INIT)))
103103 # Init function
104104 if ! isnothing (initfn)
105105 output_debug (verbose) && println (" Running subscription initialisation function" )
106106 initfn ()
107107 end
108108
109+ data = readfromwebsocket (ws, stopfn, subtimeout)
110+ try checkreturn (data, verbose, sub_id)
111+ catch e
112+ e isa Interrupt && return
113+ end
114+ response = JSON3. read (data, GQLSubscriptionResponse{output_type})
115+ while response. type == GQL_SERVER_CONNECTION_KEEP_ALIVE
116+ data = readfromwebsocket (ws, stopfn, subtimeout)
117+ try checkreturn (data, verbose, sub_id)
118+ catch e
119+ e isa Interrupt && return
120+ end
121+ response = JSON3. read (data, GQLSubscriptionResponse{output_type})
122+ end
123+ if response. type == GQL_SERVER_CONNECTION_ERROR && throw_on_execution_error
124+ subscription_tracker[][sub_id] = SUBSCRIPTION_STATUS_ERROR
125+ throw (GraphQLError (" Error while establishing connection." , payload))
126+ end
127+
128+ start_message = Dict (
129+ " id" => string (sub_id),
130+ " type" => GQL_CLIENT_START,
131+ " payload" => payload
132+ )
133+ message_str = JSON3. write (start_message)
134+ HTTP. send (ws, message_str)
135+ subscription_tracker[][sub_id] = SUBSCRIPTION_STATUS_OPEN
136+
109137 # Get listening
110138 output_debug (verbose) && println (" Listening to $(get_name (subscription_name)) with ID $sub_id ..." )
111139
112140 # Run function
113- finish = false
114- while ! finish
141+ while true
115142 data = readfromwebsocket (ws, stopfn, subtimeout)
116- if data === :timeout
117- output_info (verbose) && println (" Subscription $sub_id timed out" )
118- break
119- elseif data === :stopfn
120- output_info (verbose) && println (" Subscription $sub_id stopped by the stop function supplied" )
121- break
143+ try checkreturn (data, verbose, sub_id)
144+ catch e
145+ e isa Interrupt && break
122146 end
147+ # data = String(data)
148+ # println(data)
123149 response = JSON3. read (data, GQLSubscriptionResponse{output_type})
150+
151+ response. type == GQL_SERVER_CONNECTION_KEEP_ALIVE && continue
152+ response. type == GQL_SERVER_COMPLETE && break
153+ response. type == GQL_SERVER_CONNECTION_ERROR && begin
154+ throw_if_assigned[] = GraphQLError (" Error during subscription. Server reporeted connection error" )
155+ end
156+ response. type == GQL_SERVER_ERROR && begin
157+ throw_if_assigned[] = GraphQLError (" Error during subscription - GQL_SERVER_ERROR." , response. payload)
158+ end
159+ # response.type == GQL_SERVER_DATA
124160 payload = response. payload
125161 if ! isnothing (payload. errors) && ! isempty (payload. errors) && throw_on_execution_error
126- subscription_tracker[][sub_id] = " errored "
162+ subscription_tracker[][sub_id] = SUBSCRIPTION_STATUS_ERROR
127163 throw_if_assigned[] = GraphQLError (" Error during subscription." , payload)
128164 break
129165 end
130166 # Handle multiple subs, do we need this?
131167 if response. id == string (sub_id)
132- output_debug (verbose) && println (" Result recieved on subscription with ID $sub_id " )
168+ output_debug (verbose) && println (" Result received on subscription with ID $sub_id " )
133169 finish = fn (payload)
134170 if ! isa (finish, Bool)
135- subscription_tracker[][sub_id] = " errored"
136- error (" Subscription function must return a boolean" )
171+ subscription_tracker[][sub_id] = SUBSCRIPTION_STATUS_ERROR
172+ throw_if_assigned[] = ErrorException (" Subscription function must return a boolean" )
173+ end
174+ if finish
175+ # Protocol says we need to let the server know we're unsubscribing
176+ output_debug (verbose) && println (" Finished. Closing subscription" )
177+ HTTP. send (ws, JSON3. write (Dict (" id" => sub_id, " type" => GQL_CLIENT_STOP)))
178+ HTTP. send (ws, JSON3. write (Dict (" id" => sub_id, " type" => GQL_CLIENT_CONNECTION_TERMINATE)))
179+ close (ws)
180+ break
137181 end
138182 end
139183 end
140184 end
141- # We can't throw errors from the ws handle function in HTTP.jl 1.0 , as they get digested.
185+ # We can't throw errors from the ws handle function in HTTP.jl 1.# , as they get digested.
142186 isassigned (throw_if_assigned) && throw (throw_if_assigned[])
143187 output_debug (verbose) && println (" Finished. Closing subscription" )
144- subscription_tracker[][sub_id] = " closed "
188+ subscription_tracker[][sub_id] = SUBSCRIPTION_STATUS_CLOSED
145189 return
146190end
147191
@@ -158,7 +202,7 @@ function clear_subscriptions()
158202 end
159203end
160204
161- function async_reader_with_timeout (ws:: HTTP.WebSocket , subtimeout):: Channel
205+ function async_reader_with_timeout (ws:: HTTP.WebSockets. WebSocket , subtimeout):: Channel
162206 ch = Channel (1 )
163207 task = @async begin
164208 reader_task = current_task ()
@@ -225,3 +269,15 @@ function readfromwebsocket(ws::HTTP.WebSockets.WebSocket, stopfn, subtimeout)
225269 end
226270 return data
227271end
272+ struct Interrupt <: Exception
273+ end
274+
275+ function checkreturn (data, verbose, sub_id)
276+ if data === :timeout
277+ output_info (verbose) && println (" Subscription $sub_id timed out" )
278+ throw (Interrupt ())
279+ elseif data === :stopfn
280+ output_info (verbose) && println (" Subscription $sub_id stopped by the stop function supplied" )
281+ throw (Interrupt ())
282+ end
283+ end
0 commit comments