@@ -278,6 +278,7 @@ private class InternalPoolImpl : IDisposable
278278 private MemcachedNode ownerNode ;
279279 private readonly EndPoint _endPoint ;
280280 private readonly TimeSpan queueTimeout ;
281+ private readonly TimeSpan _receiveTimeout ;
281282 private SemaphoreSlim _semaphore ;
282283
283284 private readonly object initLock = new Object ( ) ;
@@ -293,11 +294,14 @@ internal InternalPoolImpl(
293294 throw new InvalidOperationException ( "maxItems must be larger than minItems" , null ) ;
294295 if ( config . QueueTimeout < TimeSpan . Zero )
295296 throw new InvalidOperationException ( "queueTimeout must be >= TimeSpan.Zero" , null ) ;
297+ if ( config . ReceiveTimeout < TimeSpan . Zero )
298+ throw new InvalidOperationException ( "ReceiveTimeout must be >= TimeSpan.Zero" , null ) ;
296299
297300 this . ownerNode = ownerNode ;
298301 this . isAlive = true ;
299302 _endPoint = ownerNode . EndPoint ;
300303 this . queueTimeout = config . QueueTimeout ;
304+ _receiveTimeout = config . ReceiveTimeout ;
301305
302306 this . minItems = config . MinPoolSize ;
303307 this . maxItems = config . MaxPoolSize ;
@@ -356,7 +360,7 @@ internal async Task InitPoolAsync()
356360 {
357361 _freeItems . Push ( await CreateSocketAsync ( ) ) ;
358362 }
359- catch ( Exception ex )
363+ catch ( Exception ex )
360364 {
361365 _logger . LogError ( ex , $ "Failed to put { nameof ( PooledSocket ) } { i } in Pool") ;
362366 }
@@ -561,9 +565,22 @@ public async Task<IPooledSocketResult> AcquireAsync()
561565
562566 try
563567 {
564- retval . Reset ( ) ;
568+ var resetTask = retval . ResetAsync ( ) ;
565569
566- message = "Socket was reset. " + retval . InstanceId ;
570+ if ( await Task . WhenAny ( resetTask , Task . Delay ( _receiveTimeout ) ) == resetTask )
571+ {
572+ await resetTask ;
573+ }
574+ else
575+ {
576+ message = "Timeout to reset an acquired socket. InstanceId " + retval . InstanceId ;
577+ _logger . LogError ( message ) ;
578+ MarkAsDead ( ) ;
579+ result . Fail ( message ) ;
580+ return result ;
581+ }
582+
583+ message = "Socket was reset. InstanceId " + retval . InstanceId ;
567584 if ( _isDebugEnabled ) _logger . LogDebug ( message ) ;
568585
569586 result . Pass ( message ) ;
@@ -796,7 +813,7 @@ protected internal virtual async Task<PooledSocket> CreateSocketAsync()
796813 }
797814 catch ( Exception ex )
798815 {
799- var endPointStr = _endPoint . ToString ( ) . Replace ( "Unspecified/" , string . Empty ) ;
816+ var endPointStr = _endPoint . ToString ( ) . Replace ( "Unspecified/" , string . Empty ) ;
800817 _logger . LogError ( ex , $ "Failed to { nameof ( CreateSocketAsync ) } to { endPointStr } ") ;
801818 throw ;
802819 }
@@ -874,7 +891,7 @@ protected virtual async Task<IPooledSocketResult> ExecuteOperationAsync(IOperati
874891 _logger . LogDebug ( "pooledSocket.WriteAsync..." ) ;
875892
876893 var writeSocketTask = pooledSocket . WriteAsync ( b ) ;
877- if ( await Task . WhenAny ( writeSocketTask , Task . Delay ( _config . ConnectionTimeout ) ) != writeSocketTask )
894+ if ( await Task . WhenAny ( writeSocketTask , Task . Delay ( _config . ConnectionTimeout ) ) != writeSocketTask )
878895 {
879896 result . Fail ( "Timeout to pooledSocket.WriteAsync" ) ;
880897 return result ;
0 commit comments