11using System ;
2- using System . Collections . Generic ;
3- using System . Collections . Specialized ;
42using System . IO ;
53using System . Net ;
64using System . Reflection ;
75using System . Text ;
8- using System . Threading ;
96using System . Threading . Tasks ;
107using Nest . Domain . Connection ;
118
@@ -16,7 +13,6 @@ public class Connection : IConnection
1613 const int BUFFER_SIZE = 1024 ;
1714
1815 private IConnectionSettings _ConnectionSettings { get ; set ; }
19- private Semaphore _ResourceLock ;
2016 private readonly bool _enableTrace ;
2117
2218 public Connection ( IConnectionSettings settings )
@@ -25,7 +21,6 @@ public Connection(IConnectionSettings settings)
2521 throw new ArgumentNullException ( "settings" ) ;
2622
2723 this . _ConnectionSettings = settings ;
28- this . _ResourceLock = new Semaphore ( settings . MaximumAsyncConnections , settings . MaximumAsyncConnections ) ;
2924 this . _enableTrace = settings . TraceEnabled ;
3025 }
3126
@@ -205,126 +200,17 @@ protected virtual ConnectionStatus DoSynchronousRequest(HttpWebRequest request,
205200
206201 return cs ;
207202 }
208- }
209-
203+ }
210204 }
211205
212206 protected virtual Task < ConnectionStatus > DoAsyncRequest ( HttpWebRequest request , string data = null )
213207 {
214- var tcs = new TaskCompletionSource < ConnectionStatus > ( ) ;
215- var timeout = this . _ConnectionSettings . Timeout ;
216- if ( ! this . _ResourceLock . WaitOne ( timeout ) )
217- {
218- using ( var tracer = new ConnectionStatusTracer ( this . _ConnectionSettings . TraceEnabled ) )
219- {
220- var m = "Could not start the operation before the timeout of " + timeout +
221- "ms completed while waiting for the semaphore" ;
222- var cs = new ConnectionStatus ( new TimeoutException ( m ) ) ;
223- tcs . SetResult ( cs ) ;
224- tracer . SetResult ( cs ) ;
225- return tcs . Task ;
226- }
227- }
228- try
229- {
230- return Task . Factory . StartNew ( ( ) =>
231- {
232- using ( var tracer = new ConnectionStatusTracer ( this . _ConnectionSettings . TraceEnabled ) )
233- {
234- this . Iterate ( this . _AsyncSteps ( request , tcs , data ) , tcs ) ;
235- var cs = tcs . Task . Result ;
236- tracer . SetResult ( cs ) ;
237- _ConnectionSettings . ConnectionStatusHandler ( cs ) ;
238- return cs ;
239- }
240- } , TaskCreationOptions . LongRunning ) ;
241- }
242- finally
243- {
244- this . _ResourceLock . Release ( ) ;
245- }
246- }
247-
248- private IEnumerable < Task > _AsyncSteps ( HttpWebRequest request , TaskCompletionSource < ConnectionStatus > tcs , string data = null )
249- {
250- var timeout = this . _ConnectionSettings . Timeout ;
251-
252- var state = new ConnectionState { Connection = request } ;
253-
254- if ( data != null )
255- {
256- var getRequestStream = Task . Factory . FromAsync < Stream > ( request . BeginGetRequestStream , request . EndGetRequestStream , null ) ;
257- ThreadPool . RegisterWaitForSingleObject ( ( getRequestStream as IAsyncResult ) . AsyncWaitHandle , ThreadTimeoutCallback , request , timeout , true ) ;
258- yield return getRequestStream ;
259-
260- var requestStream = getRequestStream . Result ;
261- try
262- {
263- byte [ ] buffer = Encoding . UTF8 . GetBytes ( data ) ;
264- var writeToRequestStream = Task . Factory . FromAsync ( requestStream . BeginWrite , requestStream . EndWrite , buffer , 0 , buffer . Length , state ) ;
265- yield return writeToRequestStream ;
266- }
267- finally
268- {
269- requestStream . Close ( ) ;
270- }
271- }
272-
273- // Get the response
274- var getResponse = Task . Factory . FromAsync < WebResponse > ( request . BeginGetResponse , request . EndGetResponse , null ) ;
275- ThreadPool . RegisterWaitForSingleObject ( ( getResponse as IAsyncResult ) . AsyncWaitHandle , ThreadTimeoutCallback , request , timeout , true ) ;
276- yield return getResponse ;
277-
278- // Get the response stream
279- using ( var response = ( HttpWebResponse ) getResponse . Result )
280- using ( var responseStream = response . GetResponseStream ( ) )
281- {
282- // Copy all data from the response stream
283- var output = new MemoryStream ( ) ;
284- var buffer = new byte [ BUFFER_SIZE ] ;
285- while ( responseStream != null )
286- {
287- var read = Task < int > . Factory . FromAsync ( responseStream . BeginRead , responseStream . EndRead , buffer , 0 , BUFFER_SIZE , null ) ;
288- yield return read ;
289- if ( read . Result == 0 ) break ;
290- output . Write ( buffer , 0 , read . Result ) ;
291- }
292-
293- // Decode the data and store the result
294- var result = Encoding . UTF8 . GetString ( output . ToArray ( ) ) ;
295- var cs = new ConnectionStatus ( result ) { Request = data , RequestUrl = request . RequestUri . ToString ( ) , RequestMethod = request . Method } ;
296- tcs . TrySetResult ( cs ) ;
297- }
298- yield break ;
299-
300- }
301-
302- public void Iterate ( IEnumerable < Task > asyncIterator , TaskCompletionSource < ConnectionStatus > tcs )
303- {
304- var enumerator = asyncIterator . GetEnumerator ( ) ;
305- Action < Task > recursiveBody = null ;
306- recursiveBody = completedTask =>
307- {
308- if ( completedTask != null && completedTask . IsFaulted )
309- {
310- //none of the individual steps in _AsyncSteps run in parallel for 1 request
311- //as this would be impossible we can assume Aggregate Exception.InnerException
312- var exception = completedTask . Exception . InnerException ;
313-
314- //cleanly exit from exceptions in stages if the exception is a webexception
315- if ( exception is WebException )
316- tcs . SetResult ( new ConnectionStatus ( exception ) ) ;
317- else
318- tcs . TrySetException ( exception ) ;
319- enumerator . Dispose ( ) ;
320- }
321- else if ( enumerator . MoveNext ( ) )
322- {
323- enumerator . Current . ContinueWith ( recursiveBody , TaskContinuationOptions . ExecuteSynchronously ) ;
324- }
325- else enumerator . Dispose ( ) ;
326- } ;
327- recursiveBody ( null ) ;
208+ var operation = new AsyncRequestOperation (
209+ request ,
210+ data ,
211+ _ConnectionSettings ,
212+ new ConnectionStatusTracer ( this . _ConnectionSettings . TraceEnabled ) ) ;
213+ return operation . Task ;
328214 }
329215
330216 private Uri _CreateUriString ( string path )
@@ -386,4 +272,4 @@ public static void LeaveDotsAndSlashesEscaped(Uri uri)
386272 fieldInfo . SetValue ( uriParser , uriSyntaxFlags ) ;
387273 }
388274 }
389- }
275+ }
0 commit comments