@@ -93,11 +93,11 @@ function open_subscription(fn::Function,
9393 " payload" => payload
9494 )
9595 message_str = JSON3. write (message)
96-
96+ throw_if_assigned = Ref {GraphQLError} ()
9797 HTTP. WebSockets. open (client. ws_endpoint; retry= retry, headers= client. headers) do ws
9898 # Start sub
9999 output_info (verbose) && println (" Starting $(get_name (subscription_name)) subscription with ID $sub_id " )
100- write (ws, message_str)
100+ HTTP . send (ws, message_str)
101101 subscription_tracker[][sub_id] = " open"
102102
103103 # Init function
@@ -120,11 +120,12 @@ function open_subscription(fn::Function,
120120 output_info (verbose) && println (" Subscription $sub_id stopped by the stop function supplied" )
121121 break
122122 end
123- response = JSON3. read (data:: Vector{UInt8} , GQLSubscriptionResponse{output_type})
123+ response = JSON3. read (data, GQLSubscriptionResponse{output_type})
124124 payload = response. payload
125125 if ! isnothing (payload. errors) && ! isempty (payload. errors) && throw_on_execution_error
126126 subscription_tracker[][sub_id] = " errored"
127- throw (GraphQLError (" Error during subscription." , payload))
127+ throw_if_assigned[] = GraphQLError (" Error during subscription." , payload)
128+ break
128129 end
129130 # Handle multiple subs, do we need this?
130131 if response. id == string (sub_id)
@@ -137,6 +138,8 @@ function open_subscription(fn::Function,
137138 end
138139 end
139140 end
141+ # We can't throw errors from the ws handle function in HTTP.jl 1.0, as they get digested.
142+ isassigned (throw_if_assigned) && throw (throw_if_assigned[])
140143 output_debug (verbose) && println (" Finished. Closing subscription" )
141144 subscription_tracker[][sub_id] = " closed"
142145 return
@@ -155,7 +158,7 @@ function clear_subscriptions()
155158 end
156159end
157160
158- function async_reader_with_timeout (io :: IO , subtimeout):: Channel
161+ function async_reader_with_timeout (ws :: HTTP.WebSockets.WebSocket , subtimeout):: Channel
159162 ch = Channel (1 )
160163 task = @async begin
161164 reader_task = current_task ()
@@ -164,15 +167,15 @@ function async_reader_with_timeout(io::IO, subtimeout)::Channel
164167 Base. throwto (reader_task, InterruptException ())
165168 end
166169 timeout = Timer (timeout_cb, subtimeout)
167- data = readavailable (io )
170+ data = HTTP . receive (ws )
168171 subtimeout > 0 && close (timeout) # Cancel the timeout
169172 put! (ch, data)
170173 end
171174 bind (ch, task)
172175 return ch
173176end
174177
175- function async_reader_with_stopfn (io :: IO , stopfn, checktime):: Channel
178+ function async_reader_with_stopfn (ws :: HTTP.WebSockets.WebSocket , stopfn, checktime):: Channel
176179 ch = Channel (1 ) # Could we make this channel concretely typed?
177180 task = @async begin
178181 reader_task = current_task ()
@@ -185,7 +188,7 @@ function async_reader_with_stopfn(io::IO, stopfn, checktime)::Channel
185188 end
186189 end
187190 timeout = Timer (timeout_cb, checktime)
188- data = readavailable (io )
191+ data = HTTP . WebSockets . receive (ws )
189192 close (timeout) # Cancel the timeout
190193 put! (ch, data)
191194 end
@@ -209,7 +212,7 @@ A channel is returned with the data. If `stopfn` stops the websocket,
209212the data will be `:stopfn`. If the timeout stops the websocket,
210213the data will be `:timeout`
211214"""
212- function readfromwebsocket (ws:: IO , stopfn, subtimeout)
215+ function readfromwebsocket (ws:: HTTP.WebSockets.WebSocket , stopfn, subtimeout)
213216 if isnothing (stopfn) && subtimeout > 0
214217 ch_out = async_reader_with_timeout (ws, subtimeout)
215218 data = take! (ch_out)
@@ -218,7 +221,7 @@ function readfromwebsocket(ws::IO, stopfn, subtimeout)
218221 ch_out = async_reader_with_stopfn (ws, stopfn, checktime)
219222 data = take! (ch_out)
220223 else
221- data = readavailable (ws)
224+ data = HTTP . receive (ws)
222225 end
223226 return data
224- end
227+ end
0 commit comments