Skip to content

Commit 510bbe0

Browse files
committed
fix: serialize web socket client APIs
1 parent 6878467 commit 510bbe0

File tree

4 files changed

+69
-25
lines changed

4 files changed

+69
-25
lines changed

Sources/AWSAppSyncApolloExtensions/Websocket/AppSyncWebSocketClient.swift

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -79,36 +79,46 @@ public class AppSyncWebSocketClient: NSObject, ApolloWebSocket.WebSocketClient,
7979
}
8080

8181
public func connect() {
82-
AppSyncApolloLogger.debug("Calling Connect")
83-
guard connection?.state != .running else {
84-
AppSyncApolloLogger.debug("[AppSyncWebSocketClient] WebSocket is already in connecting state")
85-
return
86-
}
87-
88-
subscribeToAppSyncResponse()
89-
90-
Task {
82+
taskQueue.async { [weak self] in
83+
guard let self else { return }
84+
AppSyncApolloLogger.debug("Calling Connect")
85+
guard connection?.state != .running else {
86+
AppSyncApolloLogger.debug("[AppSyncWebSocketClient] WebSocket is already in connecting state")
87+
return
88+
}
89+
90+
subscribeToAppSyncResponse()
91+
9192
AppSyncApolloLogger.debug("[AppSyncWebSocketClient] Creating new connection and starting read")
92-
self.connection = try await createWebSocketConnection()
93+
self.connection = try await self.createWebSocketConnection()
94+
9395
// Perform reading from a WebSocket in a separate task recursively to avoid blocking the execution.
94-
Task { await self.startReadMessage() }
96+
Task {
97+
await self.startReadMessage()
98+
}
99+
95100
self.connection?.resume()
96101
}
97102
}
98103

99104
public func disconnect(forceTimeout: TimeInterval?) {
100-
AppSyncApolloLogger.debug("Calling Disconnect")
101-
heartBeatMonitorCancellable?.cancel()
102-
guard connection?.state == .running else {
103-
AppSyncApolloLogger.debug("[AppSyncWebSocketClient] client should be in connected state to trigger disconnect")
104-
return
105+
taskQueue.async { [weak self] in
106+
guard let self else { return }
107+
AppSyncApolloLogger.debug("Calling Disconnect")
108+
heartBeatMonitorCancellable?.cancel()
109+
guard connection?.state == .running else {
110+
AppSyncApolloLogger.debug("[AppSyncWebSocketClient] client should be in connected state to trigger disconnect")
111+
return
112+
}
113+
114+
connection?.cancel(with: .goingAway, reason: nil)
105115
}
106-
107-
connection?.cancel(with: .goingAway, reason: nil)
108116
}
109117

110118
public func write(ping: Data, completion: (() -> Void)?) {
111-
AppSyncApolloLogger.debug("Not called, not implemented.")
119+
taskQueue.async {
120+
AppSyncApolloLogger.debug("Not called, not implemented.")
121+
}
112122
}
113123

114124
public func write(string: String) {

Tests/AWSAppSyncApolloExtensionsTests/Websocket/AppSyncWebSocketClientTests.swift

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,31 @@ final class AppSyncWebSocketClientTests: XCTestCase {
3939
let webSocketClient = AppSyncWebSocketClient(endpointURL: endpoint, authorizer: MockAppSyncAuthorizer())
4040
await verifyConnected(webSocketClient)
4141
}
42+
43+
func testConnect_ConcurrentInvoke() async throws {
44+
guard let endpoint = try localWebSocketServer?.start() else {
45+
XCTFail("Local WebSocket server failed to start")
46+
return
47+
}
48+
let webSocketClient = AppSyncWebSocketClient(endpointURL: endpoint, authorizer: MockAppSyncAuthorizer())
49+
let connectedExpectation = expectation(description: "WebSocket should received connected event only once")
50+
connectedExpectation.expectedFulfillmentCount = 1
51+
let sink = webSocketClient.publisher.sink { event in
52+
switch event {
53+
case .connected:
54+
connectedExpectation.fulfill()
55+
default:
56+
XCTFail("No other type of event should be received")
57+
}
58+
}
59+
60+
for _ in 1...100 {
61+
let task = Task {
62+
webSocketClient.connect()
63+
}
64+
}
65+
await fulfillment(of: [connectedExpectation], timeout: 5)
66+
}
4267

4368
func testDisconnect_didDisconnectFromRemote() async throws {
4469
var cancellables = Set<AnyCancellable>()

Tests/IntegrationTestApp/IntegrationTestAppTests/APIKeyTests.swift

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -148,9 +148,13 @@ final class APIKeyTests: IntegrationTestBase {
148148
let websocket = AppSyncWebSocketClient(endpointURL: configuration.endpoint,
149149
authorizer: authorizer)
150150
let receivedConnection = expectation(description: "received connection")
151+
receivedConnection.expectedFulfillmentCount = 200
152+
151153
let receivedMaxSubscriptionsReachedError = expectation(description: "received MaxSubscriptionsReachedError")
152-
receivedConnection.expectedFulfillmentCount = 100
154+
receivedMaxSubscriptionsReachedError.expectedFulfillmentCount = 5
155+
153156
let sink = websocket.publisher.sink { event in
157+
print("Received event: \(event)")
154158
if case .string(let message) = event {
155159
if message.contains("start_ack") {
156160
receivedConnection.fulfill()
@@ -168,12 +172,15 @@ final class APIKeyTests: IntegrationTestBase {
168172
)
169173
let client = ApolloClient(networkTransport: splitTransport, store: store)
170174

171-
for _ in 1...101 {
172-
_ = client.subscribe(subscription: OnCreateSubscription()) { _ in
173-
}
175+
var cancellables = [Cancellable]()
176+
for _ in 1...205 {
177+
cancellables.append(client.subscribe(subscription: OnCreateSubscription()) { _ in })
174178
}
175179

176-
await fulfillment(of: [receivedConnection, receivedMaxSubscriptionsReachedError], timeout: 10)
180+
await fulfillment(of: [receivedConnection, receivedMaxSubscriptionsReachedError], timeout: 15)
181+
for cancellable in cancellables {
182+
cancellable.cancel()
183+
}
177184
}
178185

179186
}

Tests/IntegrationTestApp/IntegrationTestAppTests/AuthTokenTests.swift

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ final class AuthTokenTests: IntegrationTestBase {
9090
let receivedDisconnectError = expectation(description: "received disconnect")
9191
receivedDisconnectError.assertForOverFulfill = false
9292
let sink = websocket.publisher.sink { event in
93-
if case .error(let error) = event, error.localizedDescription.contains("Socket is not connected") {
93+
if case .disconnected(_, _) = event {
9494
receivedDisconnectError.fulfill()
9595
}
9696
}
@@ -99,8 +99,10 @@ final class AuthTokenTests: IntegrationTestBase {
9999
uploadingNetworkTransport: transport,
100100
webSocketNetworkTransport: webSocketTransport
101101
)
102+
102103
let apolloCUPInvalidToken = ApolloClient(networkTransport: splitTransport, store: store)
103104

105+
try await Task.sleep(nanoseconds: 5 * 1_000_000_000) // 5 seconds
104106
await fulfillment(of: [receivedDisconnectError], timeout: 10)
105107
}
106108

0 commit comments

Comments
 (0)