diff --git a/src/MongoDB.Driver/Core/Compression/SnappyCompressor.cs b/src/MongoDB.Driver/Core/Compression/SnappyCompressor.cs index 4e84bbfd213..ae4517d3bdf 100644 --- a/src/MongoDB.Driver/Core/Compression/SnappyCompressor.cs +++ b/src/MongoDB.Driver/Core/Compression/SnappyCompressor.cs @@ -34,7 +34,7 @@ public void Compress(Stream input, Stream output) { var uncompressedSize = (int)(input.Length - input.Position); var uncompressedBytes = new byte[uncompressedSize]; // does not include uncompressed message headers - input.ReadBytes(OperationContext.NoTimeout, uncompressedBytes, offset: 0, count: uncompressedSize, socketTimeout: Timeout.InfiniteTimeSpan); + input.ReadBytes(uncompressedBytes, offset: 0, count: uncompressedSize); var maxCompressedSize = Snappy.GetMaxCompressedLength(uncompressedSize); var compressedBytes = new byte[maxCompressedSize]; var compressedSize = Snappy.Compress(uncompressedBytes, compressedBytes); @@ -50,7 +50,7 @@ public void Decompress(Stream input, Stream output) { var compressedSize = (int)(input.Length - input.Position); var compressedBytes = new byte[compressedSize]; - input.ReadBytes(OperationContext.NoTimeout, compressedBytes, offset: 0, count: compressedSize, socketTimeout: Timeout.InfiniteTimeSpan); + input.ReadBytes(compressedBytes, offset: 0, count: compressedSize); var uncompressedSize = Snappy.GetUncompressedLength(compressedBytes); var decompressedBytes = new byte[uncompressedSize]; var decompressedSize = Snappy.Decompress(compressedBytes, decompressedBytes); diff --git a/src/MongoDB.Driver/Core/Connections/BinaryConnection.cs b/src/MongoDB.Driver/Core/Connections/BinaryConnection.cs index 3359a33821e..4ea159e8182 100644 --- a/src/MongoDB.Driver/Core/Connections/BinaryConnection.cs +++ b/src/MongoDB.Driver/Core/Connections/BinaryConnection.cs @@ -345,14 +345,14 @@ private IByteBuffer ReceiveBuffer(OperationContext operationContext) try { var messageSizeBytes = new byte[4]; - _stream.ReadBytes(operationContext, messageSizeBytes, 0, 4, _socketReadTimeout); + _stream.ReadBytes(messageSizeBytes, 0, 4, (int)operationContext.RemainingTimeoutOrDefault(_socketReadTimeout).TotalMilliseconds, operationContext.CancellationToken); var messageSize = BinaryPrimitives.ReadInt32LittleEndian(messageSizeBytes); EnsureMessageSizeIsValid(messageSize); var inputBufferChunkSource = new InputBufferChunkSource(BsonChunkPool.Default); var buffer = ByteBufferFactory.Create(inputBufferChunkSource, messageSize); buffer.Length = messageSize; buffer.SetBytes(0, messageSizeBytes, 0, 4); - _stream.ReadBytes(operationContext, buffer, 4, messageSize - 4, _socketReadTimeout); + _stream.ReadBytes(buffer, 4, messageSize - 4, (int)operationContext.RemainingTimeoutOrDefault(_socketReadTimeout).TotalMilliseconds, operationContext.CancellationToken); _lastUsedAtUtc = DateTime.UtcNow; buffer.MakeReadOnly(); return buffer; @@ -370,14 +370,14 @@ private async Task ReceiveBufferAsync(OperationContext operationCon try { var messageSizeBytes = new byte[4]; - await _stream.ReadBytesAsync(operationContext, messageSizeBytes, 0, 4, _socketReadTimeout).ConfigureAwait(false); + await _stream.ReadBytesAsync(messageSizeBytes, 0, 4, (int)operationContext.RemainingTimeoutOrDefault(_socketReadTimeout).TotalMilliseconds, operationContext.CancellationToken).ConfigureAwait(false); var messageSize = BinaryPrimitives.ReadInt32LittleEndian(messageSizeBytes); EnsureMessageSizeIsValid(messageSize); var inputBufferChunkSource = new InputBufferChunkSource(BsonChunkPool.Default); var buffer = ByteBufferFactory.Create(inputBufferChunkSource, messageSize); buffer.Length = messageSize; buffer.SetBytes(0, messageSizeBytes, 0, 4); - await _stream.ReadBytesAsync(operationContext, buffer, 4, messageSize - 4, _socketReadTimeout).ConfigureAwait(false); + await _stream.ReadBytesAsync(buffer, 4, messageSize - 4, (int)operationContext.RemainingTimeoutOrDefault(_socketReadTimeout).TotalMilliseconds, operationContext.CancellationToken).ConfigureAwait(false); _lastUsedAtUtc = DateTime.UtcNow; buffer.MakeReadOnly(); return buffer; @@ -475,7 +475,8 @@ private void SendBuffer(OperationContext operationContext, IByteBuffer buffer) try { - _stream.WriteBytes(operationContext, buffer, 0, buffer.Length, _socketWriteTimeout); + var timeout = operationContext.RemainingTimeoutOrDefault(_socketWriteTimeout); + _stream.WriteBytes(buffer, 0, buffer.Length, (int)timeout.TotalMilliseconds, operationContext.CancellationToken); _lastUsedAtUtc = DateTime.UtcNow; } catch (Exception ex) @@ -495,7 +496,8 @@ private async Task SendBufferAsync(OperationContext operationContext, IByteBuffe try { - await _stream.WriteBytesAsync(operationContext, buffer, 0, buffer.Length, _socketWriteTimeout).ConfigureAwait(false); + var timeout = operationContext.RemainingTimeoutOrDefault(_socketWriteTimeout); + await _stream.WriteBytesAsync(buffer, 0, buffer.Length, (int)timeout.TotalMilliseconds, operationContext.CancellationToken).ConfigureAwait(false); _lastUsedAtUtc = DateTime.UtcNow; } catch (Exception ex) diff --git a/src/MongoDB.Driver/Core/Connections/Socks5Helper.cs b/src/MongoDB.Driver/Core/Connections/Socks5Helper.cs index edea1055c96..cb52d8e6028 100644 --- a/src/MongoDB.Driver/Core/Connections/Socks5Helper.cs +++ b/src/MongoDB.Driver/Core/Connections/Socks5Helper.cs @@ -104,7 +104,7 @@ public static void PerformSocks5Handshake(Stream stream, EndPoint endPoint, Sock var greetingRequestLength = CreateGreetingRequest(buffer, useAuth); stream.Write(buffer, 0, greetingRequestLength); - stream.ReadBytes(buffer, 0, 2, cancellationToken); + stream.ReadBytes(buffer, 0, 2, cancellationToken: cancellationToken); var requiresAuthenticationStep = ProcessGreetingResponse(buffer, useAuth); // If we have username and password, but the proxy doesn't need them, we skip the authentication step. @@ -113,16 +113,16 @@ public static void PerformSocks5Handshake(Stream stream, EndPoint endPoint, Sock var authenticationRequestLength = CreateAuthenticationRequest(buffer, authenticationSettings); stream.Write(buffer, 0, authenticationRequestLength); - stream.ReadBytes(buffer, 0, 2, cancellationToken); + stream.ReadBytes(buffer, 0, 2, cancellationToken: cancellationToken); ProcessAuthenticationResponse(buffer); } var connectRequestLength = CreateConnectRequest(buffer, targetHost, targetPort); stream.Write(buffer, 0, connectRequestLength); - stream.ReadBytes(buffer, 0, 5, cancellationToken); + stream.ReadBytes(buffer, 0, 5, cancellationToken: cancellationToken); var skip = ProcessConnectResponse(buffer); - stream.ReadBytes(buffer, 0, skip, cancellationToken); + stream.ReadBytes(buffer, 0, skip, cancellationToken: cancellationToken); } finally { @@ -141,7 +141,7 @@ public static async Task PerformSocks5HandshakeAsync(Stream stream, EndPoint end var greetingRequestLength = CreateGreetingRequest(buffer, useAuth); await stream.WriteAsync(buffer, 0, greetingRequestLength, cancellationToken).ConfigureAwait(false); - await stream.ReadBytesAsync(buffer, 0, 2, cancellationToken).ConfigureAwait(false); + await stream.ReadBytesAsync(buffer, 0, 2, cancellationToken: cancellationToken).ConfigureAwait(false); var requiresAuthenticationStep = ProcessGreetingResponse(buffer, useAuth); // If we have username and password, but the proxy doesn't need them, we skip the authentication step. @@ -150,16 +150,16 @@ public static async Task PerformSocks5HandshakeAsync(Stream stream, EndPoint end var authenticationRequestLength = CreateAuthenticationRequest(buffer, authenticationSettings); await stream.WriteAsync(buffer, 0, authenticationRequestLength, cancellationToken).ConfigureAwait(false); - await stream.ReadBytesAsync(buffer, 0, 2, cancellationToken).ConfigureAwait(false); + await stream.ReadBytesAsync(buffer, 0, 2, cancellationToken: cancellationToken).ConfigureAwait(false); ProcessAuthenticationResponse(buffer); } var connectRequestLength = CreateConnectRequest(buffer, targetHost, targetPort); await stream.WriteAsync(buffer, 0, connectRequestLength, cancellationToken).ConfigureAwait(false); - await stream.ReadBytesAsync(buffer, 0, 5, cancellationToken).ConfigureAwait(false); + await stream.ReadBytesAsync(buffer, 0, 5, cancellationToken: cancellationToken).ConfigureAwait(false); var skip = ProcessConnectResponse(buffer); - await stream.ReadBytesAsync(buffer, 0, skip, cancellationToken).ConfigureAwait(true); + await stream.ReadBytesAsync(buffer, 0, skip, cancellationToken: cancellationToken).ConfigureAwait(true); } finally { @@ -340,4 +340,4 @@ private static void EnsureSocksSuccess(byte code, string operation) throw new IOException($"SOCKS5 {operation} failed. {message}"); } -} \ No newline at end of file +} diff --git a/src/MongoDB.Driver/Core/Misc/StreamExtensionMethods.cs b/src/MongoDB.Driver/Core/Misc/StreamExtensionMethods.cs index b552f8bf372..7a08db12dc9 100644 --- a/src/MongoDB.Driver/Core/Misc/StreamExtensionMethods.cs +++ b/src/MongoDB.Driver/Core/Misc/StreamExtensionMethods.cs @@ -1,4 +1,4 @@ -/* Copyright 2013-present MongoDB Inc. +/* Copyright 2010-present MongoDB Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -36,249 +36,280 @@ public static void EfficientCopyTo(this Stream input, Stream output) } } - public static int Read(this Stream stream, byte[] buffer, int offset, int count, TimeSpan timeout, CancellationToken cancellationToken) => + public static void ReadBytes(this Stream stream, byte[] buffer, int offset, int count, int timeoutMs = Timeout.Infinite, CancellationToken cancellationToken = default) + { + Ensure.IsNotNull(stream, nameof(stream)); + Ensure.IsNotNull(buffer, nameof(buffer)); + Ensure.IsBetween(offset, 0, buffer.Length, nameof(offset)); + Ensure.IsBetween(count, 0, buffer.Length - offset, nameof(count)); + ExecuteOperationWithTimeout( stream, - (str, state) => str.Read(state.Buffer, state.Offset, state.Count), - buffer, - offset, - count, - timeout, + (buffer, offset, count), + (str, state) => + { + var bytesRead = 0; + while (bytesRead < state.count) + { + var readResult = str.Read(state.buffer, state.offset + bytesRead, state.count - bytesRead); + if (readResult == 0) + { + throw new EndOfStreamException(); + } + + bytesRead += readResult; + } + }, + timeoutMs, cancellationToken); + } - public static async Task ReadAsync(this Stream stream, byte[] buffer, int offset, int count, TimeSpan timeout, CancellationToken cancellationToken) + public static void ReadBytes(this Stream stream, IByteBuffer buffer, int offset, int count, int timeoutMs = Timeout.Infinite, CancellationToken cancellationToken = default) { - Task readTask = null; - try - { - readTask = stream.ReadAsync(buffer, offset, count); - return await readTask.WaitAsync(timeout, cancellationToken).ConfigureAwait(false); - } - catch (ObjectDisposedException) - { - // It's possible to get ObjectDisposedException when the connection pool was closed with interruptInUseConnections set to true. - throw new IOException(); - } - catch (Exception ex) when (ex is OperationCanceledException or TimeoutException) - { - // await Task.WaitAsync() throws OperationCanceledException in case of cancellation and TimeoutException in case of timeout - try + Ensure.IsNotNull(stream, nameof(stream)); + Ensure.IsNotNull(buffer, nameof(buffer)); + Ensure.IsBetween(offset, 0, buffer.Length, nameof(offset)); + Ensure.IsBetween(count, 0, buffer.Length - offset, nameof(count)); + + ExecuteOperationWithTimeout( + stream, + (buffer, offset, count), + (str, state) => { - stream.Dispose(); - if (readTask != null) + var bytesRead = 0; + while (bytesRead < state.count) { - // Should await on the task to avoid UnobservedTaskException - await readTask.ConfigureAwait(false); + var backingBytes = state.buffer.AccessBackingBytes(bytesRead + state.offset); + var bytesToRead = Math.Min(state.count - bytesRead, backingBytes.Count); + var readResult = str.Read(backingBytes.Array, backingBytes.Offset, bytesToRead); + if (readResult == 0) + { + throw new EndOfStreamException(); + } + + bytesRead += readResult; } - } - catch - { - // Ignore any exceptions - } - - throw; - } + }, + timeoutMs, + cancellationToken); } - public static void ReadBytes(this Stream stream, OperationContext operationContext, byte[] buffer, int offset, int count, TimeSpan socketTimeout) + public static Task ReadBytesAsync(this Stream stream, byte[] buffer, int offset, int count, int timeoutMs = Timeout.Infinite, CancellationToken cancellationToken = default) { Ensure.IsNotNull(stream, nameof(stream)); Ensure.IsNotNull(buffer, nameof(buffer)); Ensure.IsBetween(offset, 0, buffer.Length, nameof(offset)); Ensure.IsBetween(count, 0, buffer.Length - offset, nameof(count)); - while (count > 0) - { - var bytesRead = stream.Read(buffer, offset, count, operationContext.RemainingTimeoutOrDefault(socketTimeout), operationContext.CancellationToken); - if (bytesRead == 0) + return ExecuteOperationWithTimeoutAsync( + stream, + (buffer, offset, count), + async (str, state) => { - throw new EndOfStreamException(); - } - offset += bytesRead; - count -= bytesRead; - } + var bytesRead = 0; + while (bytesRead < state.count) + { + var readResult = await str.ReadAsync(state.buffer, state.offset + bytesRead, state.count - bytesRead).ConfigureAwait(false); + if (readResult == 0) + { + throw new EndOfStreamException(); + } + + bytesRead += readResult; + } + }, + timeoutMs, + cancellationToken + ); } - public static void ReadBytes(this Stream stream, OperationContext operationContext, IByteBuffer buffer, int offset, int count, TimeSpan socketTimeout) + public static Task ReadBytesAsync(this Stream stream, IByteBuffer buffer, int offset, int count, int timeoutMs = Timeout.Infinite, CancellationToken cancellationToken = default) { Ensure.IsNotNull(stream, nameof(stream)); Ensure.IsNotNull(buffer, nameof(buffer)); Ensure.IsBetween(offset, 0, buffer.Length, nameof(offset)); Ensure.IsBetween(count, 0, buffer.Length - offset, nameof(count)); - while (count > 0) - { - var backingBytes = buffer.AccessBackingBytes(offset); - var bytesToRead = Math.Min(count, backingBytes.Count); - var bytesRead = stream.Read(backingBytes.Array, backingBytes.Offset, bytesToRead, operationContext.RemainingTimeoutOrDefault(socketTimeout), operationContext.CancellationToken); - if (bytesRead == 0) + return ExecuteOperationWithTimeoutAsync( + stream, + (buffer, offset, count), + async (str, state) => { - throw new EndOfStreamException(); - } - offset += bytesRead; - count -= bytesRead; - } + var bytesRead = 0; + while (bytesRead < state.count) + { + var backingBytes = state.buffer.AccessBackingBytes(bytesRead + state.offset); + var bytesToRead = Math.Min(state.count - bytesRead, backingBytes.Count); + var readResult = await str.ReadAsync(backingBytes.Array, backingBytes.Offset, bytesToRead).ConfigureAwait(false); + if (readResult == 0) + { + throw new EndOfStreamException(); + } + + bytesRead += readResult; + } + }, + timeoutMs, + cancellationToken); } - public static void ReadBytes(this Stream stream, byte[] destination, int offset, int count, CancellationToken cancellationToken) + public static void WriteBytes(this Stream stream, byte[] buffer, int offset, int count, int timeoutMs = Timeout.Infinite, CancellationToken cancellationToken = default) { - while (count > 0) - { - var bytesRead = stream.Read(destination, offset, count); // TODO: honor cancellationToken? - if (bytesRead == 0) - { - throw new EndOfStreamException(); - } - offset += bytesRead; - count -= bytesRead; - } + Ensure.IsNotNull(stream, nameof(stream)); + Ensure.IsNotNull(buffer, nameof(buffer)); + Ensure.IsBetween(offset, 0, buffer.Length, nameof(offset)); + Ensure.IsBetween(count, 0, buffer.Length - offset, nameof(count)); + + ExecuteOperationWithTimeout( + stream, + (buffer, offset, count), + (str, state) => str.Write(state.buffer, state.offset, state.count), + timeoutMs, + cancellationToken); } - public static async Task ReadBytesAsync(this Stream stream, OperationContext operationContext, byte[] buffer, int offset, int count, TimeSpan socketTimeout) + public static void WriteBytes(this Stream stream, IByteBuffer buffer, int offset, int count, int timeoutMs = Timeout.Infinite, CancellationToken cancellationToken = default) { Ensure.IsNotNull(stream, nameof(stream)); Ensure.IsNotNull(buffer, nameof(buffer)); Ensure.IsBetween(offset, 0, buffer.Length, nameof(offset)); Ensure.IsBetween(count, 0, buffer.Length - offset, nameof(count)); - while (count > 0) - { - var bytesRead = await stream.ReadAsync(buffer, offset, count, operationContext.RemainingTimeoutOrDefault(socketTimeout), operationContext.CancellationToken).ConfigureAwait(false); - if (bytesRead == 0) + ExecuteOperationWithTimeout( + stream, + (buffer, offset, count), + (str, state) => { - throw new EndOfStreamException(); - } - offset += bytesRead; - count -= bytesRead; - } + var bytesWritten = 0; + while (bytesWritten < state.count) + { + var backingBytes = state.buffer.AccessBackingBytes(state.offset + bytesWritten); + var bytesToWrite = Math.Min(state.count - bytesWritten, backingBytes.Count); + str.Write(backingBytes.Array, backingBytes.Offset, bytesToWrite); + bytesWritten += bytesToWrite; + } + }, + timeoutMs, + cancellationToken); } - public static async Task ReadBytesAsync(this Stream stream, OperationContext operationContext, IByteBuffer buffer, int offset, int count, TimeSpan socketTimeout) + public static Task WriteBytesAsync(this Stream stream, byte[] buffer, int offset, int count, int timeoutMs = Timeout.Infinite, CancellationToken cancellationToken = default) { Ensure.IsNotNull(stream, nameof(stream)); Ensure.IsNotNull(buffer, nameof(buffer)); Ensure.IsBetween(offset, 0, buffer.Length, nameof(offset)); Ensure.IsBetween(count, 0, buffer.Length - offset, nameof(count)); - while (count > 0) - { - var backingBytes = buffer.AccessBackingBytes(offset); - var bytesToRead = Math.Min(count, backingBytes.Count); - var bytesRead = await stream.ReadAsync(backingBytes.Array, backingBytes.Offset, bytesToRead, operationContext.RemainingTimeoutOrDefault(socketTimeout), operationContext.CancellationToken).ConfigureAwait(false); - if (bytesRead == 0) - { - throw new EndOfStreamException(); - } - offset += bytesRead; - count -= bytesRead; - } + return ExecuteOperationWithTimeoutAsync( + stream, + (buffer, offset, count), + (str, state) => str.WriteAsync(state.buffer, state.offset, state.count), + timeoutMs, + cancellationToken); } - public static async Task ReadBytesAsync(this Stream stream, byte[] destination, int offset, int count, CancellationToken cancellationToken) + public static Task WriteBytesAsync(this Stream stream, IByteBuffer buffer, int offset, int count, int timeoutMs = Timeout.Infinite, CancellationToken cancellationToken = default) { - while (count > 0) - { - var bytesRead = await stream.ReadAsync(destination, offset, count, cancellationToken).ConfigureAwait(false); - if (bytesRead == 0) - { - throw new EndOfStreamException(); - } - offset += bytesRead; - count -= bytesRead; - } - } + Ensure.IsNotNull(stream, nameof(stream)); + Ensure.IsNotNull(buffer, nameof(buffer)); + Ensure.IsBetween(offset, 0, buffer.Length, nameof(offset)); + Ensure.IsBetween(count, 0, buffer.Length - offset, nameof(count)); - public static void Write(this Stream stream, byte[] buffer, int offset, int count, TimeSpan timeout, CancellationToken cancellationToken) => - ExecuteOperationWithTimeout( + return ExecuteOperationWithTimeoutAsync( stream, - (str, state) => + (buffer, offset, count), + async (str, state) => { - str.Write(state.Buffer, state.Offset, state.Count); - return true; + var bytesWritten = 0; + while (bytesWritten < state.count) + { + var backingBytes = state.buffer.AccessBackingBytes(state.offset + bytesWritten); + var bytesToWrite = Math.Min(state.count - bytesWritten, backingBytes.Count); + await str.WriteAsync(backingBytes.Array, backingBytes.Offset, bytesToWrite).ConfigureAwait(false); + bytesWritten += bytesToWrite; + } }, - buffer, - offset, - count, - timeout, + timeoutMs, cancellationToken); + } - public static async Task WriteAsync(this Stream stream, byte[] buffer, int offset, int count, TimeSpan timeout, CancellationToken cancellationToken) + private static async Task ExecuteOperationWithTimeoutAsync(Stream stream, TState state, Func operation, int timeoutMs, CancellationToken cancellationToken) { - Task writeTask = null; - try + if (timeoutMs == 0) { - writeTask = stream.WriteAsync(buffer, offset, count); - await writeTask.WaitAsync(timeout, cancellationToken).ConfigureAwait(false); + throw new TimeoutException(); } - catch (ObjectDisposedException) + + Task cancellationTask = null; + if (timeoutMs > 0 || cancellationToken.CanBeCanceled) { - // It's possible to get ObjectDisposedException when the connection pool was closed with interruptInUseConnections set to true. - throw new IOException(); + cancellationTask = Task.Delay(timeoutMs, cancellationToken); } - catch (Exception ex) when (ex is OperationCanceledException or TimeoutException) + + Task operationTask; + try { - // await Task.WaitAsync() throws OperationCanceledException in case of cancellation and TimeoutException in case of timeout - try + operationTask = operation(stream, state); + if (cancellationTask == null) { - stream.Dispose(); - // Should await on the task to avoid UnobservedTaskException - if (writeTask != null) - { - await writeTask.ConfigureAwait(false); - } + await operationTask.ConfigureAwait(false); + return; } - catch + + var completedTask = await Task.WhenAny(operationTask, cancellationTask).ConfigureAwait(false); + if (completedTask == operationTask) { - // Ignore any exceptions + await operationTask.ConfigureAwait(false); // Will re-throw exception if any + return; + } + } + catch (Exception ex) + { + if (cancellationToken.IsCancellationRequested) + { + throw new TaskCanceledException(); + } + + if (ex is ObjectDisposedException) + { + throw new IOException(); } throw; } - } - public static void WriteBytes(this Stream stream, OperationContext operationContext, IByteBuffer buffer, int offset, int count, TimeSpan socketTimeout) - { - Ensure.IsNotNull(stream, nameof(stream)); - Ensure.IsNotNull(buffer, nameof(buffer)); - Ensure.IsBetween(offset, 0, buffer.Length, nameof(offset)); - Ensure.IsBetween(count, 0, buffer.Length - offset, nameof(count)); + // if we reach here - then operation was either cancelled or timed out + operationTask.IgnoreExceptions(); + try + { + stream.Dispose(); + } + catch (Exception) + { + // suppress any exception + } - while (count > 0) + if (cancellationToken.IsCancellationRequested) { - var backingBytes = buffer.AccessBackingBytes(offset); - var bytesToWrite = Math.Min(count, backingBytes.Count); - stream.Write(backingBytes.Array, backingBytes.Offset, bytesToWrite, operationContext.RemainingTimeoutOrDefault(socketTimeout), operationContext.CancellationToken); - offset += bytesToWrite; - count -= bytesToWrite; + throw new TaskCanceledException(); } + throw new TimeoutException(); } - public static async Task WriteBytesAsync(this Stream stream, OperationContext operationContext, IByteBuffer buffer, int offset, int count, TimeSpan socketTimeout) + private static void ExecuteOperationWithTimeout(Stream stream, TState state, Action operation, int timeoutMs, CancellationToken cancellationToken) { - Ensure.IsNotNull(stream, nameof(stream)); - Ensure.IsNotNull(buffer, nameof(buffer)); - Ensure.IsBetween(offset, 0, buffer.Length, nameof(offset)); - Ensure.IsBetween(count, 0, buffer.Length - offset, nameof(count)); - - while (count > 0) + if (timeoutMs == 0) { - var backingBytes = buffer.AccessBackingBytes(offset); - var bytesToWrite = Math.Min(count, backingBytes.Count); - await stream.WriteAsync(backingBytes.Array, backingBytes.Offset, bytesToWrite, operationContext.RemainingTimeoutOrDefault(socketTimeout), operationContext.CancellationToken).ConfigureAwait(false); - offset += bytesToWrite; - count -= bytesToWrite; + throw new TimeoutException(); } - } - private static TResult ExecuteOperationWithTimeout(Stream stream, Func operation, byte[] buffer, int offset, int count, TimeSpan timeout, CancellationToken cancellationToken) - { StreamDisposeCallbackState callbackState = null; Timer timer = null; CancellationTokenRegistration cancellationSubscription = default; - if (timeout != Timeout.InfiniteTimeSpan) + if (timeoutMs > 0) { callbackState = new StreamDisposeCallbackState(stream); - timer = new Timer(DisposeStreamCallback, callbackState, timeout, Timeout.InfiniteTimeSpan); + timer = new Timer(DisposeStreamCallback, callbackState, timeoutMs, Timeout.Infinite); } if (cancellationToken.CanBeCanceled) @@ -289,16 +320,14 @@ private static TResult ExecuteOperationWithTimeout(Stream stream, Func< try { - var result = operation(stream, (buffer, offset, count)); + operation(stream, state); if (callbackState?.TryChangeStateFromInProgress(OperationState.Done) == false) { - // if cannot change the state - then the stream was/will be disposed, throw here + // If the state can't be changed - then the stream was/will be disposed, throw here throw new IOException(); } - - return result; } - catch (IOException) + catch (Exception ex) { if (callbackState?.OperationState == OperationState.Interrupted) { @@ -306,6 +335,11 @@ private static TResult ExecuteOperationWithTimeout(Stream stream, Func< throw new TimeoutException(); } + if (ex is ObjectDisposedException) + { + throw new IOException(); + } + throw; } finally @@ -336,7 +370,7 @@ static void DisposeStreamCallback(object state) private record StreamDisposeCallbackState(Stream Stream) { - private int _operationState = 0; + private int _operationState = (int)OperationState.InProgress; public OperationState OperationState => (OperationState)_operationState; diff --git a/src/MongoDB.Driver/GridFS/GridFSBucket.cs b/src/MongoDB.Driver/GridFS/GridFSBucket.cs index 7e65af290a3..204dce64067 100644 --- a/src/MongoDB.Driver/GridFS/GridFSBucket.cs +++ b/src/MongoDB.Driver/GridFS/GridFSBucket.cs @@ -830,8 +830,7 @@ private GridFSUploadStream CreateUploadStream(IReadWriteBindingHandle b while (count > 0) { var partialCount = (int)Math.Min(buffer.Length, count); - source.ReadBytes(buffer, 0, partialCount, cancellationToken); - //((Stream)source).ReadBytes(buffer, 0, partialCount, cancellationToken); + source.ReadBytes(buffer, 0, partialCount, cancellationToken: cancellationToken); destination.Write(buffer, 0, partialCount); count -= partialCount; } @@ -849,7 +848,7 @@ private GridFSUploadStream CreateUploadStream(IReadWriteBindingHandle b while (count > 0) { var partialCount = (int)Math.Min(buffer.Length, count); - await source.ReadBytesAsync(buffer, 0, partialCount, cancellationToken).ConfigureAwait(false); + await source.ReadBytesAsync(buffer, 0, partialCount, cancellationToken: cancellationToken).ConfigureAwait(false); await destination.WriteAsync(buffer, 0, partialCount, cancellationToken).ConfigureAwait(false); count -= partialCount; } diff --git a/tests/MongoDB.Driver.Tests/Core/Misc/StreamExtensionMethodsTests.cs b/tests/MongoDB.Driver.Tests/Core/Misc/StreamExtensionMethodsTests.cs index cd51bdd785d..c907a019b51 100644 --- a/tests/MongoDB.Driver.Tests/Core/Misc/StreamExtensionMethodsTests.cs +++ b/tests/MongoDB.Driver.Tests/Core/Misc/StreamExtensionMethodsTests.cs @@ -42,11 +42,11 @@ public async Task ReadBytes_with_byte_array_should_have_expected_effect_for_coun if (async) { - await stream.ReadBytesAsync(OperationContext.NoTimeout, destination, 0, count, Timeout.InfiniteTimeSpan); + await stream.ReadBytesAsync(destination, 0, count); } else { - stream.ReadBytes(OperationContext.NoTimeout, destination, 0, count, Timeout.InfiniteTimeSpan); + stream.ReadBytes(destination, 0, count); } destination.Should().Equal(expectedBytes); @@ -65,11 +65,11 @@ public async Task ReadBytes_with_byte_array_should_have_expected_effect_for_offs if (async) { - await stream.ReadBytesAsync(OperationContext.NoTimeout, destination, offset, 1, Timeout.InfiniteTimeSpan); + await stream.ReadBytesAsync(destination, offset, 1); } else { - stream.ReadBytes(OperationContext.NoTimeout, destination, offset, 1, Timeout.InfiniteTimeSpan); + stream.ReadBytes(destination, offset, 1); } destination.Should().Equal(expectedBytes); @@ -106,11 +106,11 @@ int ReadPartial (byte[] buffer, int offset, int count) if (async) { - await mockStream.Object.ReadBytesAsync(OperationContext.NoTimeout, destination, 0, 3, Timeout.InfiniteTimeSpan); + await mockStream.Object.ReadBytesAsync(destination, 0, 3); } else { - mockStream.Object.ReadBytes(OperationContext.NoTimeout, destination, 0, 3, Timeout.InfiniteTimeSpan); + mockStream.Object.ReadBytes(destination, 0, 3); } destination.Should().Equal(bytes); @@ -128,8 +128,8 @@ public async Task ReadBytes_with_byte_array_should_throw_when_end_of_stream_is_r .Returns(Task.FromResult(0)); var exception = async ? - await Record.ExceptionAsync(() => mockStream.Object.ReadBytesAsync(OperationContext.NoTimeout, destination, 0, 1, Timeout.InfiniteTimeSpan)) : - Record.Exception(() => mockStream.Object.ReadBytes(OperationContext.NoTimeout, destination, 0, 1, Timeout.InfiniteTimeSpan)); + await Record.ExceptionAsync(() => mockStream.Object.ReadBytesAsync(destination, 0, 1)) : + Record.Exception(() => mockStream.Object.ReadBytes(destination, 0, 1)); exception.Should().BeOfType(); } @@ -142,8 +142,8 @@ public async Task ReadBytes_with_byte_array_should_throw_when_buffer_is_null([Va byte[] destination = null; var exception = async ? - await Record.ExceptionAsync(() => stream.ReadBytesAsync(OperationContext.NoTimeout, destination, 0, 0, Timeout.InfiniteTimeSpan)) : - Record.Exception(() => stream.ReadBytes(OperationContext.NoTimeout, destination, 0, 0, Timeout.InfiniteTimeSpan)); + await Record.ExceptionAsync(() => stream.ReadBytesAsync(destination, 0, 0)) : + Record.Exception(() => stream.ReadBytes(destination, 0, 0)); exception.Should().BeOfType().Subject .ParamName.Should().Be("buffer"); @@ -162,8 +162,8 @@ public async Task ReadBytes_with_byte_array_should_throw_when_count_is_invalid(b var destination = new byte[2]; var exception = async ? - await Record.ExceptionAsync(() => stream.ReadBytesAsync(OperationContext.NoTimeout, destination, offset, count, Timeout.InfiniteTimeSpan)) : - Record.Exception(() => stream.ReadBytes(OperationContext.NoTimeout, destination, offset, count, Timeout.InfiniteTimeSpan)); + await Record.ExceptionAsync(() => stream.ReadBytesAsync(destination, offset, count)) : + Record.Exception(() => stream.ReadBytes(destination, offset, count)); exception.Should().BeOfType().Subject .ParamName.Should().Be("count"); @@ -179,8 +179,8 @@ public async Task ReadBytes_with_byte_array_should_throw_when_offset_is_invalid( var destination = new byte[2]; var exception = async ? - await Record.ExceptionAsync(() => stream.ReadBytesAsync(OperationContext.NoTimeout, destination, offset, 0, Timeout.InfiniteTimeSpan)) : - Record.Exception(() => stream.ReadBytes(OperationContext.NoTimeout, destination, offset, 0, Timeout.InfiniteTimeSpan)); + await Record.ExceptionAsync(() => stream.ReadBytesAsync(destination, offset, 0)) : + Record.Exception(() => stream.ReadBytes(destination, offset, 0)); exception.Should().BeOfType().Subject .ParamName.Should().Be("offset"); @@ -194,8 +194,8 @@ public async Task ReadBytes_with_byte_array_should_throw_when_stream_is_null([Va var destination = new byte[0]; var exception = async ? - await Record.ExceptionAsync(() => stream.ReadBytesAsync(OperationContext.NoTimeout, destination, 0, 0, Timeout.InfiniteTimeSpan)) : - Record.Exception(() => stream.ReadBytes(OperationContext.NoTimeout, destination, 0, 0, Timeout.InfiniteTimeSpan)); + await Record.ExceptionAsync(() => stream.ReadBytesAsync(destination, 0, 0)) : + Record.Exception(() => stream.ReadBytes(destination, 0, 0)); exception.Should().BeOfType().Subject .ParamName.Should().Be("stream"); @@ -203,18 +203,17 @@ await Record.ExceptionAsync(() => stream.ReadBytesAsync(OperationContext.NoTimeo [Theory] [ParameterAttributeData] - public async Task ReadBytes_with_byte_array_throws_on_timeout([Values(true, false)]bool async) + public async Task ReadBytes_with_byte_array_throws_on_timeout([Values(true, false)]bool async, [Values(0, 10)]int timeout) { var streamMock = new Mock(); SetupStreamRead(streamMock); var stream = streamMock.Object; var destination = new byte[2]; - var timeout = TimeSpan.FromMilliseconds(10); var exception = async ? - await Record.ExceptionAsync(() => stream.ReadAsync(destination, 0, 2, timeout, CancellationToken.None)) : - Record.Exception(() => stream.Read(destination, 0, 2, timeout, CancellationToken.None)); + await Record.ExceptionAsync(() => stream.ReadBytesAsync(destination, 0, 2, timeout, CancellationToken.None)) : + Record.Exception(() => stream.ReadBytes(destination, 0, 2, timeout, CancellationToken.None)); exception.Should().BeOfType(); } @@ -231,8 +230,8 @@ public async Task ReadBytes_with_byte_array_throws_on_cancellation([Values(true, using var cancellationTokenSource = new CancellationTokenSource(10); var exception = async ? - await Record.ExceptionAsync(() => stream.ReadAsync(destination, 0, 2, Timeout.InfiniteTimeSpan, cancellationTokenSource.Token)) : - Record.Exception(() => stream.Read(destination, 0, 2, Timeout.InfiniteTimeSpan, cancellationTokenSource.Token)); + await Record.ExceptionAsync(() => stream.ReadBytesAsync(destination, 0, 2, cancellationToken: cancellationTokenSource.Token)) : + Record.Exception(() => stream.ReadBytes(destination, 0, 2, cancellationToken: cancellationTokenSource.Token)); if (async) { @@ -259,11 +258,11 @@ public async Task ReadBytes_with_byte_buffer_should_have_expected_effect_for_cou if (async) { - await stream.ReadBytesAsync(OperationContext.NoTimeout, destination, 0, count, Timeout.InfiniteTimeSpan); + await stream.ReadBytesAsync(destination, 0, count); } else { - stream.ReadBytes(OperationContext.NoTimeout, destination, 0, count, Timeout.InfiniteTimeSpan); + stream.ReadBytes(destination, 0, count); } destination.AccessBackingBytes(0).Array.Should().Equal(expectedBytes); @@ -282,11 +281,11 @@ public async Task ReadBytes_with_byte_buffer_should_have_expected_effect_for_off if (async) { - await stream.ReadBytesAsync(OperationContext.NoTimeout, destination, offset, 1, Timeout.InfiniteTimeSpan); + await stream.ReadBytesAsync(destination, offset, 1); } else { - stream.ReadBytes(OperationContext.NoTimeout, destination, offset, 1, Timeout.InfiniteTimeSpan); + stream.ReadBytes(destination, offset, 1); } destination.AccessBackingBytes(0).Array.Should().Equal(expectedBytes); @@ -323,11 +322,11 @@ int ReadPartial (byte[] buffer, int offset, int count) if (async) { - await mockStream.Object.ReadBytesAsync(OperationContext.NoTimeout, destination, 0, 3, Timeout.InfiniteTimeSpan); + await mockStream.Object.ReadBytesAsync(destination, 0, 3); } else { - mockStream.Object.ReadBytes(OperationContext.NoTimeout, destination, 0, 3, Timeout.InfiniteTimeSpan); + mockStream.Object.ReadBytes(destination, 0, 3); } destination.AccessBackingBytes(0).Array.Should().Equal(bytes); @@ -345,8 +344,8 @@ public async Task ReadBytes_with_byte_buffer_should_throw_when_end_of_stream_is_ .Returns(Task.FromResult(0)); var exception = async ? - await Record.ExceptionAsync(() => mockStream.Object.ReadBytesAsync(OperationContext.NoTimeout, destination, 0, 1, Timeout.InfiniteTimeSpan)) : - Record.Exception(() => mockStream.Object.ReadBytes(OperationContext.NoTimeout, destination, 0, 1, Timeout.InfiniteTimeSpan)); + await Record.ExceptionAsync(() => mockStream.Object.ReadBytesAsync(destination, 0, 1)) : + Record.Exception(() => mockStream.Object.ReadBytes(destination, 0, 1)); exception.Should().BeOfType(); } @@ -359,8 +358,8 @@ public async Task ReadBytes_with_byte_buffer_should_throw_when_buffer_is_null([V IByteBuffer destination = null; var exception = async ? - await Record.ExceptionAsync(() => stream.ReadBytesAsync(OperationContext.NoTimeout, destination, 0, 0, Timeout.InfiniteTimeSpan)) : - Record.Exception(() => stream.ReadBytes(OperationContext.NoTimeout, destination, 0, 0, Timeout.InfiniteTimeSpan)); + await Record.ExceptionAsync(() => stream.ReadBytesAsync(destination, 0, 0)) : + Record.Exception(() => stream.ReadBytes(destination, 0, 0)); exception.Should().BeOfType().Subject .ParamName.Should().Be("buffer"); @@ -379,8 +378,8 @@ public async Task ReadBytes_with_byte_buffer_should_throw_when_count_is_invalid( var destination = CreateMockByteBuffer(2).Object; var exception = async ? - await Record.ExceptionAsync(() => stream.ReadBytesAsync(OperationContext.NoTimeout, destination, offset, count, Timeout.InfiniteTimeSpan)) : - Record.Exception(() => stream.ReadBytes(OperationContext.NoTimeout, destination, offset, count, Timeout.InfiniteTimeSpan)); + await Record.ExceptionAsync(() => stream.ReadBytesAsync(destination, offset, count)) : + Record.Exception(() => stream.ReadBytes(destination, offset, count)); exception.Should().BeOfType().Subject .ParamName.Should().Be("count"); @@ -396,8 +395,8 @@ public async Task ReadBytes_with_byte_buffer_should_throw_when_offset_is_invalid var destination = CreateMockByteBuffer(2).Object; var exception = async ? - await Record.ExceptionAsync(() => stream.ReadBytesAsync(OperationContext.NoTimeout, destination, offset, 0, Timeout.InfiniteTimeSpan)) : - Record.Exception(() => stream.ReadBytes(OperationContext.NoTimeout, destination, offset, 0, Timeout.InfiniteTimeSpan)); + await Record.ExceptionAsync(() => stream.ReadBytesAsync(destination, offset, 0)) : + Record.Exception(() => stream.ReadBytes(destination, offset, 0)); exception.Should().BeOfType().Subject .ParamName.Should().Be("offset"); @@ -411,8 +410,8 @@ public async Task ReadBytes_with_byte_buffer_should_throw_when_stream_is_null([V var destination = new Mock().Object; var exception = async ? - await Record.ExceptionAsync(() => stream.ReadBytesAsync(OperationContext.NoTimeout, destination, 0, 0, Timeout.InfiniteTimeSpan)) : - Record.Exception(() => stream.ReadBytes(OperationContext.NoTimeout, destination, 0, 0, Timeout.InfiniteTimeSpan)); + await Record.ExceptionAsync(() => stream.ReadBytesAsync(destination, 0, 0)) : + Record.Exception(() => stream.ReadBytes(destination, 0, 0)); exception.Should().BeOfType().Subject .ParamName.Should().Be("stream"); @@ -425,18 +424,131 @@ await Record.ExceptionAsync(() => stream.ReadBytesAsync(OperationContext.NoTimeo [InlineData(false, 0, new byte[] { })] [InlineData(false, 1, new byte[] { 1 })] [InlineData(false, 2, new byte[] { 1, 2 })] - public async Task WriteBytes_should_have_expected_effect_for_count(bool async, int count, byte[] expectedBytes) + public async Task WriteBytes_with_byte_array_should_have_expected_effect_for_count(bool async, int count, byte[] expectedBytes) + { + var stream = new MemoryStream(); + var source = new byte[] { 1, 2 }; + + if (async) + { + await stream.WriteBytesAsync(source, 0, count); + } + else + { + stream.WriteBytes(source, 0, count); + } + + stream.ToArray().Should().Equal(expectedBytes); + } + + [Theory] + [InlineData(true, 1, new byte[] { 2 })] + [InlineData(true, 2, new byte[] { 3 })] + [InlineData(false, 1, new byte[] { 2 })] + [InlineData(false, 2, new byte[] { 3 })] + public async Task WriteBytes_with_byte_array_should_have_expected_effect_for_offset(bool async, int offset, byte[] expectedBytes) + { + var stream = new MemoryStream(); + var source = new byte[] { 1, 2, 3 }; + + if (async) + { + await stream.WriteBytesAsync(source, offset, 1); + } + else + { + stream.WriteBytes(source, offset, 1); + } + + stream.ToArray().Should().Equal(expectedBytes); + } + + [Theory] + [ParameterAttributeData] + public async Task WriteBytes_with_byte_array_should_throw_when_buffer_is_null([Values(true, false)]bool async) + { + var stream = new Mock().Object; + byte[] buffer = null; + + var exception = async ? + await Record.ExceptionAsync(() => stream.WriteBytesAsync(buffer, 0, 0)) : + Record.Exception(() => stream.WriteBytes(buffer, 0, 0)); + + exception.Should().BeOfType().Subject + .ParamName.Should().Be("buffer"); + } + + [Theory] + [InlineData(true, 0, -1)] + [InlineData(true, 1, 2)] + [InlineData(true, 2, 1)] + [InlineData(false, 0, -1)] + [InlineData(false, 1, 2)] + [InlineData(false, 2, 1)] + public async Task WriteBytes_with_byte_array_should_throw_when_count_is_invalid(bool async, int offset, int count) + { + var stream = new Mock().Object; + var source = new byte[] { 1, 2 }; + + var exception = async ? + await Record.ExceptionAsync(() => stream.WriteBytesAsync(source, offset, count)) : + Record.Exception(() => stream.WriteBytes(source, offset, count)); + + exception.Should().BeOfType().Subject + .ParamName.Should().Be("count"); + } + + [Theory] + [ParameterAttributeData] + public async Task WriteBytes_with_byte_array_should_throw_when_offset_is_invalid( + [Values(true, false)]bool async, + [Values(-1, 3)]int offset) + { + var stream = new Mock().Object; + var source = new byte[] { 1, 2 }; + + var exception = async ? + await Record.ExceptionAsync(() => stream.WriteBytesAsync(source, offset, 0)) : + Record.Exception(() => stream.WriteBytes(source, offset, 0)); + + exception.Should().BeOfType().Subject + .ParamName.Should().Be("offset"); + } + + [Theory] + [ParameterAttributeData] + public async Task WriteBytes_with_byte_array_should_throw_when_stream_is_null([Values(true, false)]bool async) + { + Stream stream = null; + var source = new byte[] { 1, 2 }; + + var exception = async ? + await Record.ExceptionAsync(() => stream.WriteBytesAsync(source, 0, 0)) : + Record.Exception(() => stream.WriteBytes(source, 0, 0)); + + exception.Should().BeOfType().Subject + .ParamName.Should().Be("stream"); + } + + [Theory] + [InlineData(true, 0, new byte[] { })] + [InlineData(true, 1, new byte[] { 1 })] + [InlineData(true, 2, new byte[] { 1, 2 })] + [InlineData(false, 0, new byte[] { })] + [InlineData(false, 1, new byte[] { 1 })] + [InlineData(false, 2, new byte[] { 1, 2 })] + public async Task WriteBytes_with_byte_buffer_should_have_expected_effect_for_count(bool async, int count, byte[] expectedBytes) { var stream = new MemoryStream(); var source = new ByteArrayBuffer(new byte[] { 1, 2 }); if (async) { - await stream.WriteBytesAsync(OperationContext.NoTimeout, source, 0, count, Timeout.InfiniteTimeSpan); + await stream.WriteBytesAsync(source, 0, count); } else { - stream.WriteBytes(OperationContext.NoTimeout, source, 0, count, Timeout.InfiniteTimeSpan); + stream.WriteBytes(source, 0, count); } stream.ToArray().Should().Equal(expectedBytes); @@ -447,18 +559,18 @@ public async Task WriteBytes_should_have_expected_effect_for_count(bool async, i [InlineData(true, 2, new byte[] { 3 })] [InlineData(false, 1, new byte[] { 2 })] [InlineData(false, 2, new byte[] { 3 })] - public async Task WriteBytes_should_have_expected_effect_for_offset(bool async, int offset, byte[] expectedBytes) + public async Task WriteBytes_with_byte_buffer_should_have_expected_effect_for_offset(bool async, int offset, byte[] expectedBytes) { var stream = new MemoryStream(); var source = new ByteArrayBuffer(new byte[] { 1, 2, 3 }); if (async) { - await stream.WriteBytesAsync(OperationContext.NoTimeout, source, offset, 1, Timeout.InfiniteTimeSpan); + await stream.WriteBytesAsync(source, offset, 1); } else { - stream.WriteBytes(OperationContext.NoTimeout, source, offset, 1, Timeout.InfiniteTimeSpan); + stream.WriteBytes(source, offset, 1); } stream.ToArray().Should().Equal(expectedBytes); @@ -473,7 +585,7 @@ public async Task WriteBytes_should_have_expected_effect_for_offset(bool async, [InlineData(false, 2, new[] { 1, 2 })] [InlineData(false, 3, new[] { 2, 1 })] [InlineData(false, 4, new[] { 1, 1, 1 })] - public async Task WriteBytes_should_have_expected_effect_for_partial_writes(bool async, int testCase, int[] partition) + public async Task WriteBytes_with_byte_buffer_should_have_expected_effect_for_partial_writes(bool async, int testCase, int[] partition) { var stream = new MemoryStream(); var mockSource = new Mock(); @@ -489,11 +601,11 @@ public async Task WriteBytes_should_have_expected_effect_for_partial_writes(bool if (async) { - await stream.WriteBytesAsync(OperationContext.NoTimeout, mockSource.Object, 0, 3, Timeout.InfiniteTimeSpan); + await stream.WriteBytesAsync(mockSource.Object, 0, 3); } else { - stream.WriteBytes(OperationContext.NoTimeout, mockSource.Object, 0, 3, Timeout.InfiniteTimeSpan); + stream.WriteBytes(mockSource.Object, 0, 3); } stream.ToArray().Should().Equal(bytes); @@ -501,13 +613,14 @@ public async Task WriteBytes_should_have_expected_effect_for_partial_writes(bool [Theory] [ParameterAttributeData] - public async Task WriteBytes_should_throw_when_buffer_is_null([Values(true, false)]bool async) + public async Task WriteBytes_with_byte_buffer_should_throw_when_buffer_is_null([Values(true, false)]bool async) { var stream = new Mock().Object; + IByteBuffer buffer = null; var exception = async ? - await Record.ExceptionAsync(() => stream.WriteBytesAsync(OperationContext.NoTimeout, null, 0, 0, Timeout.InfiniteTimeSpan)) : - Record.Exception(() => stream.WriteBytes(OperationContext.NoTimeout, null, 0, 0, Timeout.InfiniteTimeSpan)); + await Record.ExceptionAsync(() => stream.WriteBytesAsync(buffer, 0, 0)) : + Record.Exception(() => stream.WriteBytes(buffer, 0, 0)); exception.Should().BeOfType().Subject .ParamName.Should().Be("buffer"); @@ -520,14 +633,14 @@ await Record.ExceptionAsync(() => stream.WriteBytesAsync(OperationContext.NoTime [InlineData(false, 0, -1)] [InlineData(false, 1, 2)] [InlineData(false, 2, 1)] - public async Task WriteBytes_should_throw_when_count_is_invalid(bool async, int offset, int count) + public async Task WriteBytes_with_byte_buffer_should_throw_when_count_is_invalid(bool async, int offset, int count) { var stream = new Mock().Object; var source = CreateMockByteBuffer(2).Object; var exception = async ? - await Record.ExceptionAsync(() => stream.WriteBytesAsync(OperationContext.NoTimeout, source, offset, count, Timeout.InfiniteTimeSpan)) : - Record.Exception(() => stream.WriteBytes(OperationContext.NoTimeout, source, offset, count, Timeout.InfiniteTimeSpan)); + await Record.ExceptionAsync(() => stream.WriteBytesAsync(source, offset, count)) : + Record.Exception(() => stream.WriteBytes(source, offset, count)); exception.Should().BeOfType().Subject .ParamName.Should().Be("count"); @@ -535,7 +648,7 @@ await Record.ExceptionAsync(() => stream.WriteBytesAsync(OperationContext.NoTime [Theory] [ParameterAttributeData] - public async Task WriteBytes_should_throw_when_offset_is_invalid( + public async Task WriteBytes_with_byte_buffer_should_throw_when_offset_is_invalid( [Values(true, false)]bool async, [Values(-1, 3)]int offset) { @@ -543,8 +656,8 @@ public async Task WriteBytes_should_throw_when_offset_is_invalid( var source = CreateMockByteBuffer(2).Object; var exception = async ? - await Record.ExceptionAsync(() => stream.WriteBytesAsync(OperationContext.NoTimeout, source, offset, 0, Timeout.InfiniteTimeSpan)) : - Record.Exception(() => stream.WriteBytes(OperationContext.NoTimeout, source, offset, 0, Timeout.InfiniteTimeSpan)); + await Record.ExceptionAsync(() => stream.WriteBytesAsync(source, offset, 0)) : + Record.Exception(() => stream.WriteBytes(source, offset, 0)); exception.Should().BeOfType().Subject .ParamName.Should().Be("offset"); @@ -552,14 +665,14 @@ await Record.ExceptionAsync(() => stream.WriteBytesAsync(OperationContext.NoTime [Theory] [ParameterAttributeData] - public async Task WriteBytes_should_throw_when_stream_is_null([Values(true, false)]bool async) + public async Task WriteBytes_with_byte_buffer_should_throw_when_stream_is_null([Values(true, false)]bool async) { Stream stream = null; var source = new Mock().Object; var exception = async ? - await Record.ExceptionAsync(() => stream.WriteBytesAsync(OperationContext.NoTimeout, source, 0, 0, Timeout.InfiniteTimeSpan)) : - Record.Exception(() => stream.WriteBytes(OperationContext.NoTimeout, source, 0, 0, Timeout.InfiniteTimeSpan)); + await Record.ExceptionAsync(() => stream.WriteBytesAsync(source, 0, 0)) : + Record.Exception(() => stream.WriteBytes(source, 0, 0)); exception.Should().BeOfType().Subject .ParamName.Should().Be("stream");