Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/MongoDB.Driver/Core/Compression/SnappyCompressor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void Compress(Stream input, Stream output)
{
var uncompressedSize = (int)(input.Length - input.Position);
var uncompressedBytes = new byte[uncompressedSize]; // does not include uncompressed message headers
input.ReadBytes(OperationContext.NoTimeout, uncompressedBytes, offset: 0, count: uncompressedSize, socketTimeout: Timeout.InfiniteTimeSpan);
input.ReadBytes(uncompressedBytes, offset: 0, count: uncompressedSize);
var maxCompressedSize = Snappy.GetMaxCompressedLength(uncompressedSize);
var compressedBytes = new byte[maxCompressedSize];
var compressedSize = Snappy.Compress(uncompressedBytes, compressedBytes);
Expand All @@ -50,7 +50,7 @@ public void Decompress(Stream input, Stream output)
{
var compressedSize = (int)(input.Length - input.Position);
var compressedBytes = new byte[compressedSize];
input.ReadBytes(OperationContext.NoTimeout, compressedBytes, offset: 0, count: compressedSize, socketTimeout: Timeout.InfiniteTimeSpan);
input.ReadBytes(compressedBytes, offset: 0, count: compressedSize);
var uncompressedSize = Snappy.GetUncompressedLength(compressedBytes);
var decompressedBytes = new byte[uncompressedSize];
var decompressedSize = Snappy.Decompress(compressedBytes, decompressedBytes);
Expand Down
14 changes: 8 additions & 6 deletions src/MongoDB.Driver/Core/Connections/BinaryConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -345,14 +345,14 @@ private IByteBuffer ReceiveBuffer(OperationContext operationContext)
try
{
var messageSizeBytes = new byte[4];
_stream.ReadBytes(operationContext, messageSizeBytes, 0, 4, _socketReadTimeout);
_stream.ReadBytes(messageSizeBytes, 0, 4, (int)operationContext.RemainingTimeoutOrDefault(_socketReadTimeout).TotalMilliseconds, operationContext.CancellationToken);
var messageSize = BinaryPrimitives.ReadInt32LittleEndian(messageSizeBytes);
EnsureMessageSizeIsValid(messageSize);
var inputBufferChunkSource = new InputBufferChunkSource(BsonChunkPool.Default);
var buffer = ByteBufferFactory.Create(inputBufferChunkSource, messageSize);
buffer.Length = messageSize;
buffer.SetBytes(0, messageSizeBytes, 0, 4);
_stream.ReadBytes(operationContext, buffer, 4, messageSize - 4, _socketReadTimeout);
_stream.ReadBytes(buffer, 4, messageSize - 4, (int)operationContext.RemainingTimeoutOrDefault(_socketReadTimeout).TotalMilliseconds, operationContext.CancellationToken);
_lastUsedAtUtc = DateTime.UtcNow;
buffer.MakeReadOnly();
return buffer;
Expand All @@ -370,14 +370,14 @@ private async Task<IByteBuffer> ReceiveBufferAsync(OperationContext operationCon
try
{
var messageSizeBytes = new byte[4];
await _stream.ReadBytesAsync(operationContext, messageSizeBytes, 0, 4, _socketReadTimeout).ConfigureAwait(false);
await _stream.ReadBytesAsync(messageSizeBytes, 0, 4, (int)operationContext.RemainingTimeoutOrDefault(_socketReadTimeout).TotalMilliseconds, operationContext.CancellationToken).ConfigureAwait(false);
var messageSize = BinaryPrimitives.ReadInt32LittleEndian(messageSizeBytes);
EnsureMessageSizeIsValid(messageSize);
var inputBufferChunkSource = new InputBufferChunkSource(BsonChunkPool.Default);
var buffer = ByteBufferFactory.Create(inputBufferChunkSource, messageSize);
buffer.Length = messageSize;
buffer.SetBytes(0, messageSizeBytes, 0, 4);
await _stream.ReadBytesAsync(operationContext, buffer, 4, messageSize - 4, _socketReadTimeout).ConfigureAwait(false);
await _stream.ReadBytesAsync(buffer, 4, messageSize - 4, (int)operationContext.RemainingTimeoutOrDefault(_socketReadTimeout).TotalMilliseconds, operationContext.CancellationToken).ConfigureAwait(false);
_lastUsedAtUtc = DateTime.UtcNow;
buffer.MakeReadOnly();
return buffer;
Expand Down Expand Up @@ -475,7 +475,8 @@ private void SendBuffer(OperationContext operationContext, IByteBuffer buffer)

try
{
_stream.WriteBytes(operationContext, buffer, 0, buffer.Length, _socketWriteTimeout);
var timeout = operationContext.RemainingTimeoutOrDefault(_socketWriteTimeout);
_stream.WriteBytes(buffer, 0, buffer.Length, (int)timeout.TotalMilliseconds, operationContext.CancellationToken);
_lastUsedAtUtc = DateTime.UtcNow;
}
catch (Exception ex)
Expand All @@ -495,7 +496,8 @@ private async Task SendBufferAsync(OperationContext operationContext, IByteBuffe

try
{
await _stream.WriteBytesAsync(operationContext, buffer, 0, buffer.Length, _socketWriteTimeout).ConfigureAwait(false);
var timeout = operationContext.RemainingTimeoutOrDefault(_socketWriteTimeout);
await _stream.WriteBytesAsync(buffer, 0, buffer.Length, (int)timeout.TotalMilliseconds, operationContext.CancellationToken).ConfigureAwait(false);
_lastUsedAtUtc = DateTime.UtcNow;
}
catch (Exception ex)
Expand Down
18 changes: 9 additions & 9 deletions src/MongoDB.Driver/Core/Connections/Socks5Helper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public static void PerformSocks5Handshake(Stream stream, EndPoint endPoint, Sock
var greetingRequestLength = CreateGreetingRequest(buffer, useAuth);
stream.Write(buffer, 0, greetingRequestLength);

stream.ReadBytes(buffer, 0, 2, cancellationToken);
stream.ReadBytes(buffer, 0, 2, cancellationToken: cancellationToken);
var requiresAuthenticationStep = ProcessGreetingResponse(buffer, useAuth);

// If we have username and password, but the proxy doesn't need them, we skip the authentication step.
Expand All @@ -113,16 +113,16 @@ public static void PerformSocks5Handshake(Stream stream, EndPoint endPoint, Sock
var authenticationRequestLength = CreateAuthenticationRequest(buffer, authenticationSettings);
stream.Write(buffer, 0, authenticationRequestLength);

stream.ReadBytes(buffer, 0, 2, cancellationToken);
stream.ReadBytes(buffer, 0, 2, cancellationToken: cancellationToken);
ProcessAuthenticationResponse(buffer);
}

var connectRequestLength = CreateConnectRequest(buffer, targetHost, targetPort);
stream.Write(buffer, 0, connectRequestLength);

stream.ReadBytes(buffer, 0, 5, cancellationToken);
stream.ReadBytes(buffer, 0, 5, cancellationToken: cancellationToken);
var skip = ProcessConnectResponse(buffer);
stream.ReadBytes(buffer, 0, skip, cancellationToken);
stream.ReadBytes(buffer, 0, skip, cancellationToken: cancellationToken);
}
finally
{
Expand All @@ -141,7 +141,7 @@ public static async Task PerformSocks5HandshakeAsync(Stream stream, EndPoint end
var greetingRequestLength = CreateGreetingRequest(buffer, useAuth);
await stream.WriteAsync(buffer, 0, greetingRequestLength, cancellationToken).ConfigureAwait(false);

await stream.ReadBytesAsync(buffer, 0, 2, cancellationToken).ConfigureAwait(false);
await stream.ReadBytesAsync(buffer, 0, 2, cancellationToken: cancellationToken).ConfigureAwait(false);
var requiresAuthenticationStep = ProcessGreetingResponse(buffer, useAuth);

// If we have username and password, but the proxy doesn't need them, we skip the authentication step.
Expand All @@ -150,16 +150,16 @@ public static async Task PerformSocks5HandshakeAsync(Stream stream, EndPoint end
var authenticationRequestLength = CreateAuthenticationRequest(buffer, authenticationSettings);
await stream.WriteAsync(buffer, 0, authenticationRequestLength, cancellationToken).ConfigureAwait(false);

await stream.ReadBytesAsync(buffer, 0, 2, cancellationToken).ConfigureAwait(false);
await stream.ReadBytesAsync(buffer, 0, 2, cancellationToken: cancellationToken).ConfigureAwait(false);
ProcessAuthenticationResponse(buffer);
}

var connectRequestLength = CreateConnectRequest(buffer, targetHost, targetPort);
await stream.WriteAsync(buffer, 0, connectRequestLength, cancellationToken).ConfigureAwait(false);

await stream.ReadBytesAsync(buffer, 0, 5, cancellationToken).ConfigureAwait(false);
await stream.ReadBytesAsync(buffer, 0, 5, cancellationToken: cancellationToken).ConfigureAwait(false);
var skip = ProcessConnectResponse(buffer);
await stream.ReadBytesAsync(buffer, 0, skip, cancellationToken).ConfigureAwait(true);
await stream.ReadBytesAsync(buffer, 0, skip, cancellationToken: cancellationToken).ConfigureAwait(true);
}
finally
{
Expand Down Expand Up @@ -340,4 +340,4 @@ private static void EnsureSocksSuccess(byte code, string operation)

throw new IOException($"SOCKS5 {operation} failed. {message}");
}
}
}
Loading