Skip to content

Commit b56fec4

Browse files
author
Elad Zelingher
committed
Fixing #155
1 parent e8de13f commit b56fec4

File tree

3 files changed

+27
-22
lines changed

3 files changed

+27
-22
lines changed

src/net45/WampSharp/WAMP2/V2/Api/CalleeProxy/Callbacks/Async/AsyncOperationCallback.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public void Error<TMessage>(IWampFormatter<TMessage> formatter, TMessage details
8585

8686
public void SetException(Exception exception)
8787
{
88-
mTask.SetException(exception);
88+
mTask.TrySetException(exception);
8989
}
9090
}
9191
}

src/net45/WampSharp/WAMP2/V2/Api/CalleeProxy/ClientInvocationHandler.cs

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ internal class ClientInvocationHandler : WampCalleeProxyInvocationHandler
2121
private readonly IWampRpcOperationCatalogProxy mCatalogProxy;
2222
private readonly IWampClientConnectionMonitor mMonitor;
2323

24-
private TaskCompletionSource<object> mDisconnectionTaskCompletionSource;
24+
private TaskCompletionSource<Exception> mDisconnectionTaskCompletionSource;
2525

2626
private ManualResetEvent mDisconnectionWaitHandle;
2727

@@ -37,10 +37,10 @@ public ClientInvocationHandler(IWampRpcOperationCatalogProxy catalogProxy,
3737
{
3838
mCatalogProxy = catalogProxy;
3939
mMonitor = monitor;
40-
mDisconnectionTaskCompletionSource = new TaskCompletionSource<object>();
40+
41+
mDisconnectionTaskCompletionSource = new TaskCompletionSource<Exception>();
4142
mDisconnectionWaitHandle = new ManualResetEvent(false);
4243

43-
mMonitor.ConnectionEstablished += OnConnectionEstablished;
4444
mMonitor.ConnectionError += OnConnectionError;
4545
mMonitor.ConnectionBroken += OnConnectionBroken;
4646
}
@@ -54,16 +54,14 @@ public ClientInvocationHandler(IWampRealmProxy realmProxy) :
5454

5555
#region Private Methods
5656

57-
private void OnConnectionEstablished(object sender, WampSessionCreatedEventArgs e)
58-
{
59-
mDisconnectionTaskCompletionSource = new TaskCompletionSource<object>();
60-
mDisconnectionWaitHandle = new ManualResetEvent(false);
61-
}
6257

6358
private void OnConnectionBroken(object sender, WampSessionCloseEventArgs e)
6459
{
6560
Exception exception = new WampConnectionBrokenException(e);
6661
SetException(exception);
62+
63+
mDisconnectionTaskCompletionSource = new TaskCompletionSource<Exception>();
64+
mDisconnectionWaitHandle = new ManualResetEvent(false);
6765
}
6866

6967
private void OnConnectionError(object sender, WampConnectionErrorEventArgs e)
@@ -75,7 +73,7 @@ private void OnConnectionError(object sender, WampConnectionErrorEventArgs e)
7573
private void SetException(Exception exception)
7674
{
7775
mDisconnectionException = exception;
78-
mDisconnectionTaskCompletionSource.TrySetException(exception);
76+
mDisconnectionTaskCompletionSource.TrySetResult(exception);
7977
mDisconnectionWaitHandle.Set();
8078
}
8179

@@ -92,17 +90,29 @@ protected override Task<T> AwaitForResult<T>(AsyncOperationCallback<T> asyncOper
9290
#if ASYNC
9391
Task<T> operationTask = asyncOperationCallback.Task;
9492

93+
Task<Exception> disconnectionTask = mDisconnectionTaskCompletionSource.Task;
94+
9595
Task task = await Task.WhenAny(operationTask,
96-
mDisconnectionTaskCompletionSource.Task)
96+
disconnectionTask)
9797
.ConfigureAwait(false);
9898

99+
if (!operationTask.IsCompleted)
100+
{
101+
Exception exception = await disconnectionTask.ConfigureAwait(false);
102+
103+
asyncOperationCallback.SetException(exception);
104+
}
105+
99106
T result = await operationTask.ConfigureAwait(false);
100107

101108
return result;
109+
102110
#else
103111
IObservable<T> merged =
104-
Observable.Amb(asyncOperationCallback.Task.ToObservable(),
105-
mDisconnectionTaskCompletionSource.Task.ToObservable().Cast<T>());
112+
Observable.Amb
113+
(asyncOperationCallback.Task.ToObservable(),
114+
mDisconnectionTaskCompletionSource.Task.ToObservable()
115+
.SelectMany(x => Observable.Throw<T>(x)));
106116

107117
Task<T> task = merged.ToTask();
108118

@@ -114,7 +124,7 @@ protected override Task<T> AwaitForResult<T>(AsyncOperationCallback<T> asyncOper
114124
protected override void WaitForResult<T>(SyncCallback<T> callback)
115125
{
116126
int signaledIndex =
117-
WaitHandle.WaitAny(new[] {mDisconnectionWaitHandle, callback.WaitHandle},
127+
WaitHandle.WaitAny(new[] { mDisconnectionWaitHandle, callback.WaitHandle },
118128
Timeout.Infinite);
119129

120130

@@ -137,6 +147,6 @@ protected override void Invoke(ICalleeProxyInterceptor interceptor, IWampRawRpcO
137147
arguments);
138148
}
139149

140-
#endregion
150+
#endregion
141151
}
142152
}

src/net45/WampSharp/WAMP2/V2/Client/Session/WampSessionClient.cs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@ public void OnConnectionClosed()
194194
}
195195

196196
mConnectionBrokenRaised = false;
197+
mOpenTask = new TaskCompletionSource<bool>();
197198
}
198199

199200
public void OnConnectionError(Exception exception)
@@ -207,13 +208,7 @@ public void OnConnectionError(Exception exception)
207208

208209
private void SetOpenTaskErrorIfNeeded(Exception exception)
209210
{
210-
TaskCompletionSource<bool> openTask;
211-
212-
lock (mLock)
213-
{
214-
openTask = mOpenTask;
215-
mOpenTask = new TaskCompletionSource<bool>();
216-
}
211+
TaskCompletionSource<bool> openTask = mOpenTask;
217212

218213
if (openTask != null)
219214
{

0 commit comments

Comments
 (0)