Skip to content

Commit 8d128b5

Browse files
committed
Simplify PR
1 parent ef88af4 commit 8d128b5

File tree

2 files changed

+9
-75
lines changed

2 files changed

+9
-75
lines changed

src/subscriptions.jl

Lines changed: 2 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,5 @@
11
const subscription_tracker = Ref{Dict}(Dict())
22

3-
function writews(ws::HTTP.WebSockets.WebSocket, msg)
4-
if isdefined(HTTP, :send)
5-
HTTP.send(ws, msg)
6-
else
7-
write(ws, msg)
8-
end
9-
end
10-
11-
function writews(ws::IO, msg)
12-
write(ws, msg)
13-
end
14-
153
"""
164
open_subscription(fn::Function,
175
[client::Client],
@@ -109,7 +97,7 @@ function open_subscription(fn::Function,
10997
HTTP.WebSockets.open(client.ws_endpoint; retry=retry, headers=client.headers, suppress_close_error=false) do ws
11098
# Start sub
11199
output_info(verbose) && println("Starting $(get_name(subscription_name)) subscription with ID $sub_id")
112-
writews(ws, message_str)
100+
HTTP.send(ws, message_str)
113101
subscription_tracker[][sub_id] = "open"
114102

115103
# Init function
@@ -170,24 +158,6 @@ function clear_subscriptions()
170158
end
171159
end
172160

173-
function async_reader_with_timeout(io::IO, subtimeout)::Channel
174-
ch = Channel(1)
175-
task = @async begin
176-
reader_task = current_task()
177-
function timeout_cb(timer)
178-
put!(ch, :timeout)
179-
Base.throwto(reader_task, InterruptException())
180-
end
181-
timeout = Timer(timeout_cb, subtimeout)
182-
data = readavailable(io)
183-
subtimeout > 0 && close(timeout) # Cancel the timeout
184-
put!(ch, data)
185-
end
186-
bind(ch, task)
187-
return ch
188-
end
189-
190-
191161
function async_reader_with_timeout(ws::HTTP.WebSocket, subtimeout)::Channel
192162
ch = Channel(1)
193163
task = @async begin
@@ -205,28 +175,6 @@ function async_reader_with_timeout(ws::HTTP.WebSocket, subtimeout)::Channel
205175
return ch
206176
end
207177

208-
209-
function async_reader_with_stopfn(io::IO, stopfn, checktime)::Channel
210-
ch = Channel(1) # Could we make this channel concretely typed?
211-
task = @async begin
212-
reader_task = current_task()
213-
function timeout_cb(timer)
214-
if stopfn()
215-
put!(ch, :stopfn)
216-
Base.throwto(reader_task, InterruptException())
217-
else
218-
timeout = Timer(timeout_cb, checktime)
219-
end
220-
end
221-
timeout = Timer(timeout_cb, checktime)
222-
data = readavailable(io)
223-
close(timeout) # Cancel the timeout
224-
put!(ch, data)
225-
end
226-
bind(ch, task)
227-
return ch
228-
end
229-
230178
function async_reader_with_stopfn(ws::HTTP.WebSockets.WebSocket, stopfn, checktime)::Channel
231179
ch = Channel(1) # Could we make this channel concretely typed?
232180
task = @async begin
@@ -265,20 +213,6 @@ A channel is returned with the data. If `stopfn` stops the websocket,
265213
the data will be `:stopfn`. If the timeout stops the websocket,
266214
the data will be `:timeout`
267215
"""
268-
function readfromwebsocket(ws::IO, stopfn, subtimeout)
269-
if isnothing(stopfn) && subtimeout > 0
270-
ch_out = async_reader_with_timeout(ws, subtimeout)
271-
data = take!(ch_out)
272-
elseif !isnothing(stopfn)
273-
checktime = subtimeout > 0 ? subtimeout : 2
274-
ch_out = async_reader_with_stopfn(ws, stopfn, checktime)
275-
data = take!(ch_out)
276-
else
277-
data = readavailable(ws)
278-
end
279-
return data
280-
end
281-
282216
function readfromwebsocket(ws::HTTP.WebSockets.WebSocket, stopfn, subtimeout)
283217
if isnothing(stopfn) && subtimeout > 0
284218
ch_out = async_reader_with_timeout(ws, subtimeout)
@@ -291,4 +225,4 @@ function readfromwebsocket(ws::HTTP.WebSockets.WebSocket, stopfn, subtimeout)
291225
data = HTTP.receive(ws)
292226
end
293227
return data
294-
end
228+
end

test/subscriptions.jl

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ function listen_localhost()
33
if HTTP.WebSockets.isupgrade(http.message)
44
HTTP.WebSockets.upgrade(http) do ws
55
for data in ws
6-
GraphQLClient.writews(ws, data)
6+
HTTP.send(ws, data)
77
end
88
end
99
end
@@ -30,7 +30,7 @@ end
3030
@test take!(ch) == :timeout
3131

3232
ch = GraphQLClient.async_reader_with_timeout(ws, 5)
33-
GraphQLClient.writews(ws, "Data")
33+
HTTP.send(ws, "Data")
3434
@test String(take!(ch)) == "Data"
3535

3636
# stopfn
@@ -43,11 +43,11 @@ end
4343
@test take!(ch) == :stopfn
4444
stop[] = false
4545
ch = GraphQLClient.async_reader_with_stopfn(ws, stopfn, 0.5)
46-
GraphQLClient.writews(ws, "Data")
46+
HTTP.send(ws, "Data")
4747
@test String(take!(ch)) == "Data"
4848

4949
# readfromwebsocket - no timeout or stopfn
50-
GraphQLClient.writews(ws, "Data")
50+
HTTP.send(ws, "Data")
5151
@test String(GraphQLClient.readfromwebsocket(ws, nothing, 0)) == "Data"
5252

5353
# readfromwebsocket - timeout
@@ -90,7 +90,7 @@ function send_error_localhost(message, port)
9090
}
9191
}
9292
"""
93-
GraphQLClient.writews(ws, error_payload)
93+
HTTP.send(ws, error_payload)
9494
end
9595
end
9696
end
@@ -116,7 +116,7 @@ function send_data_localhost(sub_name, port)
116116
}
117117
}
118118
"""
119-
GraphQLClient.writews(ws, data_payload)
119+
HTTP.send(ws, data_payload)
120120
end
121121
end
122122
end
@@ -206,4 +206,4 @@ end
206206
@test isnothing(results[1].errors)
207207
@test !isnothing(results[1].data) # No point testing content as we've coded it in the test function
208208

209-
end
209+
end

0 commit comments

Comments
 (0)