Skip to content
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ StructTypes = "856f2bd8-1eba-4b0a-8007-ebc267875bd4"

[compat]
GraphQLParser = "0.1.1"
HTTP = "0.8.17, 0.9"
HTTP = "1"
JSON3 = "1.1.2"
StructTypes = "1.5"
julia = "1.6"
2 changes: 2 additions & 0 deletions src/GraphQLClient.jl
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export query, mutate, open_subscription, Client, GQLEnum, Alias,
create_introspected_struct, list_all_introspected_objects, global_graphql_client,
@gql_str

include("constants.jl")
# Types
include("client.jl")
include("types.jl")
Expand All @@ -26,6 +27,7 @@ include("type_construction.jl")
include("http_execution.jl")
include("queries.jl")
include("mutations.jl")
include("ws_subscription_protocols.jl")
include("subscriptions.jl")
include("introspection.jl")
include("gql_string.jl")
Expand Down
32 changes: 32 additions & 0 deletions src/constants.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# GQL over WS Protocol constants
# https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md

const GQL_CLIENT_CONNECTION_INIT = "connection_init"
const GQL_SERVER_CONNECTION_ACK = "connection_ack"
const GQL_SERVER_CONNECTION_ERROR = "connection_error"
const GQL_SERVER_CONNECTION_KEEP_ALIVE = "ka"
const GQL_CLIENT_START = "start"
const GQL_CLIENT_STOP = "stop"
const GQL_CLIENT_CONNECTION_TERMINATE = "connection_terminate"
const GQL_SERVER_DATA = "data"
const GQL_SERVER_ERROR = "error"
const GQL_SERVER_COMPLETE = "complete"

# Subscription tracker
const SUBSCRIPTION_STATUS_OPEN = "open"
const SUBSCRIPTION_STATUS_ERROR = "errored"
const SUBSCRIPTION_STATUS_CLOSED = "closed"

# New GQL over WS constanst
# https://github.com/enisdenjo/graphql-ws/blob/master/PROTOCOL.md

const GQLWS_CLIENT_INIT = "connection_init"
const GQLWS_SERVER_CONNECTION_ACK = "connection_ack"
const GQLWS_BI_PING = "ping"
const GQLWS_BI_PONG = "pong"
const GQLWS_CLIENT_SUBSCRIBE = "subscribe"
const GQLWS_SERVER_NEXT = "next"
const GQLWS_SERVER_ERROR = "error"
const GQLWS_BI_COMPLETE = "complete"


4 changes: 2 additions & 2 deletions src/gqlresponse.jl
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,9 @@ end
Struct for subsriptions that wraps a `GQLReponse{T}` alongside various metadata.
"""
struct GQLSubscriptionResponse{T}
id::String
id::Union{String, Nothing}
type::String
payload::GQLResponse{T}
payload::Union{GQLResponse{T}, Nothing}
end
StructTypes.StructType(::Type{<:GQLSubscriptionResponse}) = StructTypes.Struct()

Expand Down
136 changes: 78 additions & 58 deletions src/subscriptions.jl
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ const subscription_tracker = Ref{Dict}(Dict())
retry=true,
subtimeout=0,
stopfn=nothing,
throw_on_execution_error=false)
throw_on_execution_error=false,
websocket_protocol=join(", ", GQS_WS_PROTOCOLS))

Subscribe to `subscription_name`, running `fn` on each received result and ending the
subcription when `fn` returns `true`.
Expand All @@ -26,7 +27,7 @@ The subscription uses the `ws_endpoint` field of the `client.`
This function is designed to be used with the `do` keyword.

# Arguments
- `fn::Function`: function to be run on each result, recieves the response from the
- `fn::Function`: function to be run on each result, receives the response from the
subscription`. Must return a boolean to indicate whether or not to close the subscription,
with `true` closing the subscription.
- `client::Client`: GraphQL client (optional). If not supplied, [`global_graphql_client`](@ref) is used.
Expand All @@ -51,7 +52,12 @@ This function is designed to be used with the `do` keyword.
- `throw_on_execution_error=false`: set to `true` to stop an error being thrown if the GraphQL server
response contains errors that occurred during execution.
- `verbose=0`: set to 1, 2 for extra logging.

- `websocket_protocol=join(", ", GQL_WS_PROTOCOLS)`: Will try to communicate with [apollographql's
subcription-transport-protocol](https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md)
or with the newer [graphql-ws](https://github.com/enisdenjo/graphql-ws/blob/master/PROTOCOL.md) protocol.
With the default setup, the protocol that is actually used, is selected by the server. If you want to
enforce the subprotocol, you can adjust this accordingly. The string constants `PROTOCOL_APOLLO_OLD`, `PROTOCOL_GRAPHQL_WS`
contain the names of the sub-protocols, respectively.
# Examples
```julia
julia> open_subscription("subSaveUser", sub_args=Dict("role" => "SYSTEM_ADMIN")) do result
Expand All @@ -78,67 +84,69 @@ function open_subscription(fn::Function,
subtimeout=0,
stopfn=nothing,
throw_on_execution_error=false,
verbose=0)
verbose=0,
websocket_protocol=join(", ", PROTOCOL_GRAPHQL_WS))

!in(get_name(subscription_name), get_subscriptions(client)) && throw(GraphQLError("$(get_name(subscription_name)) is not an existing subscription"))

output_str = get_output_str(output_fields)
payload = get_generic_query_payload(client, "subscription", subscription_name, sub_args, output_str)
subscription_payload = get_generic_query_payload(client, "subscription", subscription_name, sub_args, output_str)

sub_id = string(length(keys(subscription_tracker[])) + 1)
sub_id *= "-" * string(Threads.threadid())
message = Dict(
"id" => string(sub_id),
"type" => "start",
"payload" => payload
)
message_str = JSON3.write(message)

HTTP.WebSockets.open(client.ws_endpoint; retry=retry, headers=client.headers) do ws
# Start sub
output_info(verbose) && println("Starting $(get_name(subscription_name)) subscription with ID $sub_id")
write(ws, message_str)
subscription_tracker[][sub_id] = "open"

# Init function
if !isnothing(initfn)
output_debug(verbose) && println("Running subscription initialisation function")
initfn()
end
throw_if_assigned = Ref{GraphQLError}()
headers = Dict(client.headers)
# We currently implement the `apollographql/subscriptions-transport-ws` which is default in hasura and others
# Defined here https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md
# TODO: Add support for the newer `graphql-transport-ws` from the graphql-ws library.
# ()
headers["Sec-WebSocket-Protocol"] = websocket_protocol

# Get listening
output_debug(verbose) && println("Listening to $(get_name(subscription_name)) with ID $sub_id...")

# Run function
finish = false
while !finish
data = readfromwebsocket(ws, stopfn, subtimeout)
if data === :timeout
output_info(verbose) && println("Subscription $sub_id timed out")
break
elseif data === :stopfn
output_info(verbose) && println("Subscription $sub_id stopped by the stop function supplied")
break
end
response = JSON3.read(data::Vector{UInt8}, GQLSubscriptionResponse{output_type})
payload = response.payload
if !isnothing(payload.errors) && !isempty(payload.errors) && throw_on_execution_error
subscription_tracker[][sub_id] = "errored"
throw(GraphQLError("Error during subscription.", payload))
end
# Handle multiple subs, do we need this?
if response.id == string(sub_id)
output_debug(verbose) && println("Result recieved on subscription with ID $sub_id")
finish = fn(payload)
if !isa(finish, Bool)
subscription_tracker[][sub_id] = "errored"
error("Subscription function must return a boolean")
end
end
HTTP.WebSockets.open(client.ws_endpoint; retry=retry, headers=headers) do ws
# Start sub
output_info(verbose) && println("Starting $(get_name(subscription_name)) subscription with ID $sub_id")
selected_protocol = HTTP.header(ws.response, "Sec-WebSocket-Protocol")
output_info(verbose) && println("Headers - $selected_protocol, $(join(' ', HTTP.headers(ws.response)))")
if selected_protocol == PROTOCOL_APOLLO_OLD
handle_apollo_old(
fn,
ws,
subscription_name,
subscription_payload,
sub_id,
output_type;
initfn=initfn,
subtimeout=subtimeout,
stopfn=stopfn,
throw_on_execution_error=throw_on_execution_error,
verbose=verbose,
throw_if_assigned_ref=throw_if_assigned
)
else
if selected_protocol != PROTOCOL_GRAPHQL_WS
@warn("None of the implemented protocols match - trying to use \"$(PROTOCOL_GRAPHQL_WS)\"")
end
handle_graphql_ws(
fn,
ws,
subscription_name,
subscription_payload,
sub_id,
output_type;
initfn=initfn,
subtimeout=subtimeout,
stopfn=stopfn,
throw_on_execution_error=throw_on_execution_error,
verbose=verbose,
throw_if_assigned_ref=throw_if_assigned
)
end
end
# We can't throw errors from the ws handle function in HTTP.jl 1.#, as they get digested.
isassigned(throw_if_assigned) && throw(throw_if_assigned[])
output_debug(verbose) && println("Finished. Closing subscription")
subscription_tracker[][sub_id] = "closed"
subscription_tracker[][sub_id] = SUBSCRIPTION_STATUS_CLOSED
return
end

Expand All @@ -155,7 +163,7 @@ function clear_subscriptions()
end
end

function async_reader_with_timeout(io::IO, subtimeout)::Channel
function async_reader_with_timeout(ws::HTTP.WebSockets.WebSocket, subtimeout)::Channel
ch = Channel(1)
task = @async begin
reader_task = current_task()
Expand All @@ -164,15 +172,15 @@ function async_reader_with_timeout(io::IO, subtimeout)::Channel
Base.throwto(reader_task, InterruptException())
end
timeout = Timer(timeout_cb, subtimeout)
data = readavailable(io)
data = HTTP.receive(ws)
subtimeout > 0 && close(timeout) # Cancel the timeout
put!(ch, data)
end
bind(ch, task)
return ch
end

function async_reader_with_stopfn(io::IO, stopfn, checktime)::Channel
function async_reader_with_stopfn(ws::HTTP.WebSockets.WebSocket, stopfn, checktime)::Channel
ch = Channel(1) # Could we make this channel concretely typed?
task = @async begin
reader_task = current_task()
Expand All @@ -185,7 +193,7 @@ function async_reader_with_stopfn(io::IO, stopfn, checktime)::Channel
end
end
timeout = Timer(timeout_cb, checktime)
data = readavailable(io)
data = HTTP.WebSockets.receive(ws)
close(timeout) # Cancel the timeout
put!(ch, data)
end
Expand All @@ -209,7 +217,7 @@ A channel is returned with the data. If `stopfn` stops the websocket,
the data will be `:stopfn`. If the timeout stops the websocket,
the data will be `:timeout`
"""
function readfromwebsocket(ws::IO, stopfn, subtimeout)
function readfromwebsocket(ws::HTTP.WebSockets.WebSocket, stopfn, subtimeout)
if isnothing(stopfn) && subtimeout > 0
ch_out = async_reader_with_timeout(ws, subtimeout)
data = take!(ch_out)
Expand All @@ -218,7 +226,19 @@ function readfromwebsocket(ws::IO, stopfn, subtimeout)
ch_out = async_reader_with_stopfn(ws, stopfn, checktime)
data = take!(ch_out)
else
data = readavailable(ws)
data = HTTP.receive(ws)
end
return data
end
struct Interrupt <: Exception
end

function checkreturn(data, verbose, sub_id)
if data === :timeout
output_info(verbose) && println("Subscription $sub_id timed out")
throw(Interrupt())
elseif data === :stopfn
output_info(verbose) && println("Subscription $sub_id stopped by the stop function supplied")
throw(Interrupt())
end
end
Loading