@@ -34,7 +34,7 @@ public BulkAllApiTests(IntrusiveOperationCluster cluster, EndpointUsage usage)
3434 this . _client = cluster . Client ;
3535 }
3636
37- [ I ]
37+ [ I ]
3838 public void ReturnsExpectedResponse ( )
3939 {
4040 var index = CreateIndexName ( ) ;
@@ -44,24 +44,24 @@ public void ReturnsExpectedResponse()
4444 var pages = 100 ;
4545 var seenPages = 0 ;
4646 var numberOfDocuments = size * pages ;
47- var documents = this . CreateLazyStreamOfDocuments ( numberOfDocuments ) ;
48-
49- //first we setup our cold observable
47+ var documents = this . CreateLazyStreamOfDocuments ( numberOfDocuments ) ;
48+
49+ //first we setup our cold observable
5050 var observableBulk = this . _client . BulkAll ( documents , f => f
5151 . MaxDegreeOfParallelism ( 8 )
5252 . BackOffTime ( TimeSpan . FromSeconds ( 10 ) )
5353 . BackOffRetries ( 2 )
5454 . Size ( size )
5555 . RefreshOnCompleted ( )
5656 . Index ( index )
57- ) ;
58- //we set up an observer
57+ ) ;
58+ //we set up an observer
5959 var bulkObserver = new BulkAllObserver (
6060 onError : ( e ) => { throw e ; } ,
6161 onCompleted : ( ) => handle . Set ( ) ,
6262 onNext : ( b ) => Interlocked . Increment ( ref seenPages )
63- ) ;
64- //when we subscribe the observable becomes hot
63+ ) ;
64+ //when we subscribe the observable becomes hot
6565 observableBulk . Subscribe ( bulkObserver ) ;
6666
6767 handle . WaitOne ( TimeSpan . FromMinutes ( 5 ) ) ;
@@ -73,7 +73,7 @@ public void ReturnsExpectedResponse()
7373 bulkObserver . TotalNumberOfRetries . Should ( ) . Be ( 0 ) ;
7474 }
7575
76- [ I ]
76+ [ I ]
7777 public void DisposingObservableCancelsBulkAll ( )
7878 {
7979 var index = CreateIndexName ( ) ;
@@ -83,30 +83,30 @@ public void DisposingObservableCancelsBulkAll()
8383 var pages = 100 ;
8484 var seenPages = 0 ;
8585 var numberOfDocuments = size * pages ;
86- var documents = this . CreateLazyStreamOfDocuments ( numberOfDocuments ) ;
87-
88- //first we setup our cold observable
86+ var documents = this . CreateLazyStreamOfDocuments ( numberOfDocuments ) ;
87+
88+ //first we setup our cold observable
8989 var observableBulk = this . _client . BulkAll ( documents , f => f
9090 . MaxDegreeOfParallelism ( 8 )
9191 . BackOffTime ( TimeSpan . FromSeconds ( 10 ) )
9292 . BackOffRetries ( 2 )
9393 . Size ( size )
9494 . RefreshOnCompleted ( )
9595 . Index ( index )
96- ) ;
97- //we set up an observer
96+ ) ;
97+ //we set up an observer
9898 var bulkObserver = new BulkAllObserver (
9999 onError : ( e ) => { throw e ; } ,
100100 onCompleted : ( ) => handle . Set ( ) ,
101101 onNext : ( b ) => Interlocked . Increment ( ref seenPages )
102- ) ;
103- //when we subscribe the observable becomes hot
104- observableBulk . Subscribe ( bulkObserver ) ;
105-
106- //we wait N seconds to see some bulks
107- handle . WaitOne ( TimeSpan . FromSeconds ( 3 ) ) ;
108- observableBulk . Dispose ( ) ;
109- //we wait N seconds to give in flight request a chance to cancel
102+ ) ;
103+ //when we subscribe the observable becomes hot
104+ observableBulk . Subscribe ( bulkObserver ) ;
105+
106+ //we wait N seconds to see some bulks
107+ handle . WaitOne ( TimeSpan . FromSeconds ( 1 ) ) ;
108+ observableBulk . Dispose ( ) ;
109+ //we wait N seconds to give in flight request a chance to cancel
110110 handle . WaitOne ( TimeSpan . FromSeconds ( 3 ) ) ;
111111
112112 seenPages . Should ( ) . BeLessThan ( pages ) . And . BeGreaterThan ( 0 ) ;
@@ -116,7 +116,7 @@ public void DisposingObservableCancelsBulkAll()
116116 bulkObserver . TotalNumberOfRetries . Should ( ) . Be ( 0 ) ;
117117 }
118118
119- [ I ]
119+ [ I ]
120120 public void CancelBulkAll ( )
121121 {
122122 var index = CreateIndexName ( ) ;
@@ -126,9 +126,9 @@ public void CancelBulkAll()
126126 var pages = 100 ;
127127 var seenPages = 0 ;
128128 var numberOfDocuments = size * pages ;
129- var documents = this . CreateLazyStreamOfDocuments ( numberOfDocuments ) ;
130-
131- //first we setup our cold observable
129+ var documents = this . CreateLazyStreamOfDocuments ( numberOfDocuments ) ;
130+
131+ //first we setup our cold observable
132132 var tokenSource = new CancellationTokenSource ( ) ;
133133 var observableBulk = this . _client . BulkAll ( documents , f => f
134134 . MaxDegreeOfParallelism ( 8 )
@@ -137,21 +137,21 @@ public void CancelBulkAll()
137137 . Size ( size )
138138 . RefreshOnCompleted ( )
139139 . Index ( index )
140- , tokenSource . Token ) ;
141-
142- //we set up an observer
140+ , tokenSource . Token ) ;
141+
142+ //we set up an observer
143143 var bulkObserver = new BulkAllObserver (
144144 onError : ( e ) => { throw e ; } ,
145145 onCompleted : ( ) => handle . Set ( ) ,
146146 onNext : ( b ) => Interlocked . Increment ( ref seenPages )
147- ) ;
148- //when we subscribe the observable becomes hot
149- observableBulk . Subscribe ( bulkObserver ) ;
150-
151- //we wait Nseconds to see some bulks
152- handle . WaitOne ( TimeSpan . FromSeconds ( 3 ) ) ;
153- tokenSource . Cancel ( ) ;
154- //we wait Nseconds to give in flight request a chance to cancel
147+ ) ;
148+ //when we subscribe the observable becomes hot
149+ observableBulk . Subscribe ( bulkObserver ) ;
150+
151+ //we wait Nseconds to see some bulks
152+ handle . WaitOne ( TimeSpan . FromSeconds ( 1 ) ) ;
153+ tokenSource . Cancel ( ) ;
154+ //we wait Nseconds to give in flight request a chance to cancel
155155 handle . WaitOne ( TimeSpan . FromSeconds ( 3 ) ) ;
156156
157157 seenPages . Should ( ) . BeLessThan ( pages ) . And . BeGreaterThan ( 0 ) ;
@@ -161,7 +161,7 @@ public void CancelBulkAll()
161161 bulkObserver . TotalNumberOfRetries . Should ( ) . Be ( 0 ) ;
162162 }
163163
164- [ I ]
164+ [ I ]
165165 public async Task AwaitBulkAll ( )
166166 {
167167 var index = CreateIndexName ( ) ;
0 commit comments