Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/MongoDB.Driver/Core/Compression/SnappyCompressor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void Compress(Stream input, Stream output)
{
var uncompressedSize = (int)(input.Length - input.Position);
var uncompressedBytes = new byte[uncompressedSize]; // does not include uncompressed message headers
input.ReadBytes(OperationContext.NoTimeout, uncompressedBytes, offset: 0, count: uncompressedSize, socketTimeout: Timeout.InfiniteTimeSpan);
input.ReadBytes(uncompressedBytes, offset: 0, count: uncompressedSize);
var maxCompressedSize = Snappy.GetMaxCompressedLength(uncompressedSize);
var compressedBytes = new byte[maxCompressedSize];
var compressedSize = Snappy.Compress(uncompressedBytes, compressedBytes);
Expand All @@ -50,7 +50,7 @@ public void Decompress(Stream input, Stream output)
{
var compressedSize = (int)(input.Length - input.Position);
var compressedBytes = new byte[compressedSize];
input.ReadBytes(OperationContext.NoTimeout, compressedBytes, offset: 0, count: compressedSize, socketTimeout: Timeout.InfiniteTimeSpan);
input.ReadBytes(compressedBytes, offset: 0, count: compressedSize);
var uncompressedSize = Snappy.GetUncompressedLength(compressedBytes);
var decompressedBytes = new byte[uncompressedSize];
var decompressedSize = Snappy.Decompress(compressedBytes, decompressedBytes);
Expand Down
14 changes: 8 additions & 6 deletions src/MongoDB.Driver/Core/Connections/BinaryConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -345,14 +345,14 @@ private IByteBuffer ReceiveBuffer(OperationContext operationContext)
try
{
var messageSizeBytes = new byte[4];
_stream.ReadBytes(operationContext, messageSizeBytes, 0, 4, _socketReadTimeout);
_stream.ReadBytes(messageSizeBytes, 0, 4, (int)operationContext.RemainingTimeoutOrDefault(_socketReadTimeout).TotalMilliseconds, operationContext.CancellationToken);
var messageSize = BinaryPrimitives.ReadInt32LittleEndian(messageSizeBytes);
EnsureMessageSizeIsValid(messageSize);
var inputBufferChunkSource = new InputBufferChunkSource(BsonChunkPool.Default);
var buffer = ByteBufferFactory.Create(inputBufferChunkSource, messageSize);
buffer.Length = messageSize;
buffer.SetBytes(0, messageSizeBytes, 0, 4);
_stream.ReadBytes(operationContext, buffer, 4, messageSize - 4, _socketReadTimeout);
_stream.ReadBytes(buffer, 4, messageSize - 4, (int)operationContext.RemainingTimeoutOrDefault(_socketReadTimeout).TotalMilliseconds, operationContext.CancellationToken);
_lastUsedAtUtc = DateTime.UtcNow;
buffer.MakeReadOnly();
return buffer;
Expand All @@ -370,14 +370,14 @@ private async Task<IByteBuffer> ReceiveBufferAsync(OperationContext operationCon
try
{
var messageSizeBytes = new byte[4];
await _stream.ReadBytesAsync(operationContext, messageSizeBytes, 0, 4, _socketReadTimeout).ConfigureAwait(false);
await _stream.ReadBytesAsync(messageSizeBytes, 0, 4, (int)operationContext.RemainingTimeoutOrDefault(_socketReadTimeout).TotalMilliseconds, operationContext.CancellationToken).ConfigureAwait(false);
var messageSize = BinaryPrimitives.ReadInt32LittleEndian(messageSizeBytes);
EnsureMessageSizeIsValid(messageSize);
var inputBufferChunkSource = new InputBufferChunkSource(BsonChunkPool.Default);
var buffer = ByteBufferFactory.Create(inputBufferChunkSource, messageSize);
buffer.Length = messageSize;
buffer.SetBytes(0, messageSizeBytes, 0, 4);
await _stream.ReadBytesAsync(operationContext, buffer, 4, messageSize - 4, _socketReadTimeout).ConfigureAwait(false);
await _stream.ReadBytesAsync(buffer, 4, messageSize - 4, (int)operationContext.RemainingTimeoutOrDefault(_socketReadTimeout).TotalMilliseconds, operationContext.CancellationToken).ConfigureAwait(false);
_lastUsedAtUtc = DateTime.UtcNow;
buffer.MakeReadOnly();
return buffer;
Expand Down Expand Up @@ -475,7 +475,8 @@ private void SendBuffer(OperationContext operationContext, IByteBuffer buffer)

try
{
_stream.WriteBytes(operationContext, buffer, 0, buffer.Length, _socketWriteTimeout);
var timeout = operationContext.RemainingTimeoutOrDefault(_socketWriteTimeout);
_stream.WriteBytes(buffer, 0, buffer.Length, (int)timeout.TotalMilliseconds, operationContext.CancellationToken);
_lastUsedAtUtc = DateTime.UtcNow;
}
catch (Exception ex)
Expand All @@ -495,7 +496,8 @@ private async Task SendBufferAsync(OperationContext operationContext, IByteBuffe

try
{
await _stream.WriteBytesAsync(operationContext, buffer, 0, buffer.Length, _socketWriteTimeout).ConfigureAwait(false);
var timeout = operationContext.RemainingTimeoutOrDefault(_socketWriteTimeout);
await _stream.WriteBytesAsync(buffer, 0, buffer.Length, (int)timeout.TotalMilliseconds, operationContext.CancellationToken).ConfigureAwait(false);
_lastUsedAtUtc = DateTime.UtcNow;
}
catch (Exception ex)
Expand Down
18 changes: 9 additions & 9 deletions src/MongoDB.Driver/Core/Connections/Socks5Helper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public static void PerformSocks5Handshake(Stream stream, EndPoint endPoint, Sock
var greetingRequestLength = CreateGreetingRequest(buffer, useAuth);
stream.Write(buffer, 0, greetingRequestLength);

stream.ReadBytes(buffer, 0, 2, cancellationToken);
stream.ReadBytes(buffer, 0, 2, cancellationToken: cancellationToken);
var requiresAuthenticationStep = ProcessGreetingResponse(buffer, useAuth);

// If we have username and password, but the proxy doesn't need them, we skip the authentication step.
Expand All @@ -113,16 +113,16 @@ public static void PerformSocks5Handshake(Stream stream, EndPoint endPoint, Sock
var authenticationRequestLength = CreateAuthenticationRequest(buffer, authenticationSettings);
stream.Write(buffer, 0, authenticationRequestLength);

stream.ReadBytes(buffer, 0, 2, cancellationToken);
stream.ReadBytes(buffer, 0, 2, cancellationToken: cancellationToken);
ProcessAuthenticationResponse(buffer);
}

var connectRequestLength = CreateConnectRequest(buffer, targetHost, targetPort);
stream.Write(buffer, 0, connectRequestLength);

stream.ReadBytes(buffer, 0, 5, cancellationToken);
stream.ReadBytes(buffer, 0, 5, cancellationToken: cancellationToken);
var skip = ProcessConnectResponse(buffer);
stream.ReadBytes(buffer, 0, skip, cancellationToken);
stream.ReadBytes(buffer, 0, skip, cancellationToken: cancellationToken);
}
finally
{
Expand All @@ -141,7 +141,7 @@ public static async Task PerformSocks5HandshakeAsync(Stream stream, EndPoint end
var greetingRequestLength = CreateGreetingRequest(buffer, useAuth);
await stream.WriteAsync(buffer, 0, greetingRequestLength, cancellationToken).ConfigureAwait(false);

await stream.ReadBytesAsync(buffer, 0, 2, cancellationToken).ConfigureAwait(false);
await stream.ReadBytesAsync(buffer, 0, 2, cancellationToken: cancellationToken).ConfigureAwait(false);
var requiresAuthenticationStep = ProcessGreetingResponse(buffer, useAuth);

// If we have username and password, but the proxy doesn't need them, we skip the authentication step.
Expand All @@ -150,16 +150,16 @@ public static async Task PerformSocks5HandshakeAsync(Stream stream, EndPoint end
var authenticationRequestLength = CreateAuthenticationRequest(buffer, authenticationSettings);
await stream.WriteAsync(buffer, 0, authenticationRequestLength, cancellationToken).ConfigureAwait(false);

await stream.ReadBytesAsync(buffer, 0, 2, cancellationToken).ConfigureAwait(false);
await stream.ReadBytesAsync(buffer, 0, 2, cancellationToken: cancellationToken).ConfigureAwait(false);
ProcessAuthenticationResponse(buffer);
}

var connectRequestLength = CreateConnectRequest(buffer, targetHost, targetPort);
await stream.WriteAsync(buffer, 0, connectRequestLength, cancellationToken).ConfigureAwait(false);

await stream.ReadBytesAsync(buffer, 0, 5, cancellationToken).ConfigureAwait(false);
await stream.ReadBytesAsync(buffer, 0, 5, cancellationToken: cancellationToken).ConfigureAwait(false);
var skip = ProcessConnectResponse(buffer);
await stream.ReadBytesAsync(buffer, 0, skip, cancellationToken).ConfigureAwait(true);
await stream.ReadBytesAsync(buffer, 0, skip, cancellationToken: cancellationToken).ConfigureAwait(true);
}
finally
{
Expand Down Expand Up @@ -340,4 +340,4 @@ private static void EnsureSocksSuccess(byte code, string operation)

throw new IOException($"SOCKS5 {operation} failed. {message}");
}
}
}
Loading
Loading