11using System ;
22using System . Collections . Generic ;
33using System . Linq ;
4+ using System . Reactive . Disposables ;
5+ using System . Reactive . Linq ;
46using System . Threading . Tasks ;
57using NUnit . Framework ;
68using WampSharp . Core . Serialization ;
@@ -64,6 +66,42 @@ public async Task ProgressiveCallsCalleeProxyProgress()
6466 Assert . That ( result . Result , Is . EqualTo ( 10 ) ) ;
6567 }
6668
69+ [ Test ]
70+ public async Task ProgressiveCallsCalleeProxyObservable ( )
71+ {
72+ WampPlayground playground = new WampPlayground ( ) ;
73+
74+ CallerCallee dualChannel = await playground . GetCallerCalleeDualChannel ( ) ;
75+ IWampChannel calleeChannel = dualChannel . CalleeChannel ;
76+ IWampChannel callerChannel = dualChannel . CallerChannel ;
77+
78+ MyOperation myOperation = new MyOperation ( ) ;
79+
80+ await calleeChannel . RealmProxy . RpcCatalog . Register ( myOperation , new RegisterOptions ( ) ) ;
81+ ILongOpObsService proxy = callerChannel . RealmProxy . Services . GetCalleeProxy < ILongOpObsService > ( ) ;
82+
83+ IEnumerable < int > results = proxy . LongOp ( 9 , false ) . ToEnumerable ( ) ; // it will emit one more than asked
84+
85+ CollectionAssert . AreEquivalent ( Enumerable . Range ( 0 , 10 ) , results ) ;
86+ }
87+
88+ [ Test ]
89+ public async Task ProgressiveCallsCalleeProxyObservableError ( )
90+ {
91+ WampPlayground playground = new WampPlayground ( ) ;
92+
93+ CallerCallee dualChannel = await playground . GetCallerCalleeDualChannel ( ) ;
94+ IWampChannel calleeChannel = dualChannel . CalleeChannel ;
95+ IWampChannel callerChannel = dualChannel . CallerChannel ;
96+
97+ MyOperation myOperation = new MyOperation ( ) ;
98+
99+ await calleeChannel . RealmProxy . RpcCatalog . Register ( myOperation , new RegisterOptions ( ) ) ;
100+ ILongOpObsService proxy = callerChannel . RealmProxy . Services . GetCalleeProxy < ILongOpObsService > ( ) ;
101+
102+ Assert . Throws ( typeof ( WampException ) , ( ) => proxy . LongOp ( 9 , true ) . ToEnumerable ( ) . Count ( ) ) ;
103+ }
104+
67105 public class MyOperation : IWampRpcOperation
68106 {
69107 public string Procedure => "com.myapp.longop" ;
@@ -80,16 +118,27 @@ public IWampCancellableInvocation Invoke<TMessage>(IWampRawRpcOperationRouterCal
80118 TMessage number = arguments [ 0 ] ;
81119 int n = formatter . Deserialize < int > ( number ) ;
82120
121+ bool endWithError = arguments . Length > 0 && formatter . Deserialize < bool > ( arguments [ 1 ] ) ;
122+
83123 for ( int i = 0 ; i < n ; i ++ )
84124 {
85125 caller . Result ( WampObjectFormatter . Value ,
86126 new YieldOptions { Progress = true } ,
87127 new object [ ] { i } ) ;
88128 }
89129
90- caller . Result ( WampObjectFormatter . Value ,
91- new YieldOptions ( ) ,
92- new object [ ] { n } ) ;
130+ if ( endWithError )
131+ {
132+ caller . Error ( WampObjectFormatter . Value ,
133+ new Dictionary < string , string > ( ) ,
134+ "Something bad happened" ) ;
135+ }
136+ else
137+ {
138+ caller . Result ( WampObjectFormatter . Value ,
139+ new YieldOptions ( ) ,
140+ new object [ ] { n } ) ;
141+ }
93142
94143 return null ;
95144 }
@@ -122,6 +171,31 @@ public async Task<int> LongOp(int n, IProgress<int> progress)
122171 }
123172 }
124173
174+ public interface ILongOpObsService
175+ {
176+ [ WampProcedure ( "com.myapp.longop" ) ]
177+ [ WampProgressiveResultProcedure ]
178+ IObservable < int > LongOp ( int n , bool endWithError ) ;
179+ }
180+
181+ public class LongOpObsService : ILongOpObsService
182+ {
183+ public IObservable < int > LongOp ( int n , bool endWithError ) => Observable . Create < int > ( async obs =>
184+ {
185+ for ( int i = 0 ; i < n ; i ++ )
186+ {
187+ obs . OnNext ( i ) ;
188+ await Task . Delay ( 100 ) ;
189+ }
190+ if ( endWithError )
191+ obs . OnError ( new WampException ( "wamp.error" , "Something bad happened" ) ) ;
192+ else
193+ obs . OnCompleted ( ) ;
194+
195+ return Disposable . Empty ;
196+ } ) ;
197+ }
198+
125199 public class MyCallback : IWampRawRpcOperationClientCallback
126200 {
127201 private readonly TaskCompletionSource < int > mTask = new TaskCompletionSource < int > ( ) ;
@@ -187,4 +261,4 @@ public void Report(T value)
187261 mAction ( value ) ;
188262 }
189263 }
190- }
264+ }
0 commit comments