Skip to content

Commit ee1efed

Browse files
authored
Support Lazy Connect (#104)
Signed-off-by: currantw <taylor.curran@improving.com>
1 parent 7fcc002 commit ee1efed

File tree

13 files changed

+337
-103
lines changed

13 files changed

+337
-103
lines changed

DEVELOPER.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ cargo fmt --all -- --check
212212

213213
6. Test framework and style
214214

215-
The CSharp Valkey-Glide client uses xUnit v3 for testing code. The test code styles are defined in `.editorcofing` (see `dotnet_diagnostic.xUnit..` rules). The xUnit rules are enforced by the [xUnit analyzers](https://github.com/xunit/xunit.analyzers) referenced in the main xunit.v3 NuGet package. If you choose to use xunit.v3.core instead, you can reference xunit.analyzers explicitly. For additional info, please, refer to https://xunit.net and https://github.com/xunit/xunit
215+
The CSharp Valkey-Glide client uses xUnit v3 for testing code. The test code styles are defined in `.editorconfig` (see `dotnet_diagnostic.xUnit..` rules). The xUnit rules are enforced by the [xUnit analyzers](https://github.com/xunit/xunit.analyzers) referenced in the main xunit.v3 NuGet package. If you choose to use xunit.v3.core instead, you can reference xunit.analyzers explicitly. For additional info, please, refer to <https://xunit.net> and <https://github.com/xunit/xunit>
216216

217217
## Community and Feedback
218218

rust/src/ffi.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ pub struct ConnectionConfig {
7171
pub protocol: redis::ProtocolVersion,
7272
/// zero pointer is valid, means no client name is given (`None`)
7373
pub client_name: *const c_char,
74+
pub lazy_connect: bool,
7475
/*
7576
TODO below
7677
pub periodic_checks: Option<PeriodicCheck>,
@@ -147,11 +148,11 @@ pub(crate) unsafe fn create_connection_request(
147148
} else {
148149
None
149150
},
151+
lazy_connect: config.lazy_connect,
150152
// TODO below
151153
periodic_checks: None,
152154
pubsub_subscriptions: None,
153155
inflight_requests_limit: None,
154-
lazy_connect: false,
155156
}
156157
}
157158

sources/Valkey.Glide/BaseClient.GenericCommands.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ public async Task<bool> KeyCopyAsync(ValkeyKey sourceKey, ValkeyKey destinationK
178178
public async Task<ValkeyValue[]> SortAsync(ValkeyKey key, long skip = 0, long take = -1, Order order = Order.Ascending, SortType sortType = SortType.Numeric, ValkeyValue by = default, ValkeyValue[]? get = null, CommandFlags flags = CommandFlags.None)
179179
{
180180
Utils.Requires<NotImplementedException>(flags == CommandFlags.None, "Command flags are not supported by GLIDE");
181-
return await Command(Request.SortAsync(key, skip, take, order, sortType, by, get, _serverVersion));
181+
return await Command(Request.SortAsync(key, skip, take, order, sortType, by, get, await GetServerVersionAsync()));
182182
}
183183

184184
public async Task<long> SortAndStoreAsync(ValkeyKey destination, ValkeyKey key, long skip = 0, long take = -1, Order order = Order.Ascending, SortType sortType = SortType.Numeric, ValkeyValue by = default, ValkeyValue[]? get = null, CommandFlags flags = CommandFlags.None)

sources/Valkey.Glide/BaseClient.cs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,14 +53,12 @@ protected static async Task<T> CreateClient<T>(BaseClientConfiguration config, F
5353
CreateClientFfi(request.ToPtr(), successCallbackPointer, failureCallbackPointer);
5454
client._clientPointer = await message; // This will throw an error thru failure callback if any
5555

56-
if (client._clientPointer != IntPtr.Zero)
56+
if (client._clientPointer == IntPtr.Zero)
5757
{
58-
// Initialize server version after successful connection
59-
await client.InitializeServerVersionAsync();
60-
return client;
58+
throw new ConnectionException("Failed creating a client");
6159
}
6260

63-
throw new ConnectionException("Failed creating a client");
61+
return client;
6462
}
6563

6664
protected BaseClient()
@@ -127,6 +125,11 @@ internal virtual async Task<T> Command<R, T>(Cmd<R, T> command, Route? route = n
127125

128126
// All memory allocated is auto-freed by `using` operator
129127
}
128+
protected Version? ParseServerVersion(string response)
129+
{
130+
var versionMatch = System.Text.RegularExpressions.Regex.Match(response, @"(?:valkey_version|redis_version):([\d\.]+)");
131+
return versionMatch.Success ? new(versionMatch.Groups[1].Value) : null;
132+
}
130133
#endregion protected methods
131134

132135
#region private methods
@@ -145,7 +148,7 @@ private void FailureCallback(ulong index, IntPtr strPtr, RequestErrorType errTyp
145148

146149
internal void SetInfo(string info) => _clientInfo = info;
147150

148-
protected abstract Task InitializeServerVersionAsync();
151+
protected abstract Task<Version> GetServerVersionAsync();
149152

150153
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
151154
private delegate void SuccessAction(ulong index, IntPtr ptr);
@@ -169,6 +172,7 @@ private void FailureCallback(ulong index, IntPtr strPtr, RequestErrorType errTyp
169172
private readonly object _lock = new();
170173
private string _clientInfo = ""; // used to distinguish and identify clients during tests
171174
protected Version? _serverVersion; // cached server version
175+
protected static readonly Version DefaultServerVersion = new(8, 0, 0);
172176

173177
#endregion private fields
174178
}

sources/Valkey.Glide/ConnectionConfiguration.cs

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,10 @@ internal record ConnectionConfig
2424
public uint DatabaseId;
2525
public Protocol? Protocol;
2626
public string? ClientName;
27+
public bool LazyConnect;
2728

2829
internal FFI.ConnectionConfig ToFfi() =>
29-
new(Addresses, TlsMode, ClusterMode, (uint?)RequestTimeout?.TotalMilliseconds, (uint?)ConnectionTimeout?.TotalMilliseconds, ReadFrom, RetryStrategy, AuthenticationInfo, DatabaseId, Protocol, ClientName);
30+
new(Addresses, TlsMode, ClusterMode, (uint?)RequestTimeout?.TotalMilliseconds, (uint?)ConnectionTimeout?.TotalMilliseconds, ReadFrom, RetryStrategy, AuthenticationInfo, DatabaseId, Protocol, ClientName, LazyConnect);
3031
}
3132

3233
/// <summary>
@@ -171,7 +172,7 @@ public abstract class BaseClientConfiguration
171172
/// <summary>
172173
/// Configuration for a standalone client. <br />
173174
/// Use <see cref="StandaloneClientConfigurationBuilder" /> or
174-
/// <see cref="StandaloneClientConfiguration(List{ValueTuple{string?, ushort?}}, bool?, TimeSpan?, TimeSpan?, ReadFrom?, RetryStrategy?, string?, string?, uint?, Protocol?, string?)" /> to create an instance.
175+
/// <see cref="StandaloneClientConfiguration(List{ValueTuple{string?, ushort?}}, bool?, TimeSpan?, TimeSpan?, ReadFrom?, RetryStrategy?, string?, string?, uint?, Protocol?, string?, bool)" /> to create an instance.
175176
/// </summary>
176177
public sealed class StandaloneClientConfiguration : BaseClientConfiguration
177178
{
@@ -190,6 +191,7 @@ internal StandaloneClientConfiguration() { }
190191
/// <param name="databaseId"><inheritdoc cref="ClientConfigurationBuilder{T}.DataBaseId" path="/summary" /></param>
191192
/// <param name="protocol"><inheritdoc cref="ClientConfigurationBuilder{T}.ProtocolVersion" path="/summary" /></param>
192193
/// <param name="clientName"><inheritdoc cref="ClientConfigurationBuilder{T}.ClientName" path="/summary" /></param>
194+
/// <param name="lazyConnect"><inheritdoc cref="ClientConfigurationBuilder{T}.LazyConnect" path="/summary" /></param>
193195
public StandaloneClientConfiguration(
194196
List<(string? host, ushort? port)> addresses,
195197
bool? useTls = null,
@@ -201,7 +203,8 @@ public StandaloneClientConfiguration(
201203
string? password = null,
202204
uint? databaseId = null,
203205
Protocol? protocol = null,
204-
string? clientName = null
206+
string? clientName = null,
207+
bool lazyConnect = false
205208
)
206209
{
207210
StandaloneClientConfigurationBuilder builder = new();
@@ -215,6 +218,7 @@ public StandaloneClientConfiguration(
215218
_ = databaseId.HasValue ? builder.DataBaseId = databaseId.Value : new();
216219
_ = protocol.HasValue ? builder.ProtocolVersion = protocol.Value : new();
217220
_ = clientName is not null ? builder.ClientName = clientName : "";
221+
builder.LazyConnect = lazyConnect;
218222
Request = builder.Build().Request;
219223
}
220224
}
@@ -239,6 +243,7 @@ internal ClusterClientConfiguration() { }
239243
/// <param name="databaseId"><inheritdoc cref="ClientConfigurationBuilder{T}.DataBaseId" path="/summary" /></param>
240244
/// <param name="protocol"><inheritdoc cref="ClientConfigurationBuilder{T}.ProtocolVersion" path="/summary" /></param>
241245
/// <param name="clientName"><inheritdoc cref="ClientConfigurationBuilder{T}.ClientName" path="/summary" /></param>
246+
/// <param name="lazyConnect"><inheritdoc cref="ClientConfigurationBuilder{T}.LazyConnect" path="/summary" /></param>
242247
public ClusterClientConfiguration(
243248
List<(string? host, ushort? port)> addresses,
244249
bool? useTls = null,
@@ -250,7 +255,8 @@ public ClusterClientConfiguration(
250255
string? password = null,
251256
uint? databaseId = null,
252257
Protocol? protocol = null,
253-
string? clientName = null
258+
string? clientName = null,
259+
bool lazyConnect = false
254260
)
255261
{
256262
ClusterClientConfigurationBuilder builder = new();
@@ -264,6 +270,7 @@ public ClusterClientConfiguration(
264270
_ = databaseId.HasValue ? builder.DataBaseId = databaseId.Value : new();
265271
_ = protocol.HasValue ? builder.ProtocolVersion = protocol.Value : new();
266272
_ = clientName is not null ? builder.ClientName = clientName : "";
273+
builder.LazyConnect = lazyConnect;
267274
Request = builder.Build().Request;
268275
}
269276
}
@@ -534,6 +541,24 @@ public T WithDataBaseId(uint dataBaseId)
534541
return (T)this;
535542
}
536543
#endregion
544+
#region Lazy Connect
545+
/// <summary>
546+
/// Configure whether to defer connections until the first command is executed.<br />
547+
/// If not explicitly set, a default value of <c>false</c> will be used.
548+
/// </summary>
549+
public bool LazyConnect
550+
{
551+
get => Config.LazyConnect;
552+
set => Config.LazyConnect = value;
553+
}
554+
555+
/// <inheritdoc cref="LazyConnect" />
556+
public T WithLazyConnect(bool lazyConnect)
557+
{
558+
LazyConnect = lazyConnect;
559+
return (T)this;
560+
}
561+
#endregion
537562

538563
internal ConnectionConfig Build() => Config;
539564
}

sources/Valkey.Glide/GlideClient.cs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -196,21 +196,21 @@ public async IAsyncEnumerable<ValkeyKey> KeysAsync(int database = -1, ValkeyValu
196196
} while (currentCursor != 0);
197197
}
198198

199-
protected override async Task InitializeServerVersionAsync()
199+
protected override async Task<Version> GetServerVersionAsync()
200200
{
201-
try
201+
if (_serverVersion == null)
202202
{
203-
var infoResponse = await Command(Request.Info([InfoOptions.Section.SERVER]));
204-
var versionMatch = System.Text.RegularExpressions.Regex.Match(infoResponse, @"(?:valkey_version|redis_version):([\d\.]+)");
205-
if (versionMatch.Success)
203+
try
206204
{
207-
_serverVersion = new Version(versionMatch.Groups[1].Value);
205+
var infoResponse = await Command(Request.Info([InfoOptions.Section.SERVER]));
206+
_serverVersion = ParseServerVersion(infoResponse) ?? DefaultServerVersion;
207+
}
208+
catch
209+
{
210+
_serverVersion = DefaultServerVersion;
208211
}
209212
}
210-
catch
211-
{
212-
// If we can't get version, assume newer version (use SORT_RO)
213-
_serverVersion = new Version(8, 0, 0);
214-
}
213+
214+
return _serverVersion;
215215
}
216216
}

sources/Valkey.Glide/GlideClusterClient.cs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -301,21 +301,21 @@ public async Task<string> SelectAsync(long index, CommandFlags flags = CommandFl
301301
return await Command(Request.Select(index), Route.Random);
302302
}
303303

304-
protected override async Task InitializeServerVersionAsync()
304+
protected override async Task<Version> GetServerVersionAsync()
305305
{
306-
try
306+
if (_serverVersion == null)
307307
{
308-
var infoResponse = await Command(Request.Info([InfoOptions.Section.SERVER]).ToClusterValue(true), Route.Random);
309-
var versionMatch = System.Text.RegularExpressions.Regex.Match(infoResponse.SingleValue, @"(?:valkey_version|redis_version):([\d\.]+)");
310-
if (versionMatch.Success)
308+
try
311309
{
312-
_serverVersion = new Version(versionMatch.Groups[1].Value);
310+
var infoResponse = await Command(Request.Info([InfoOptions.Section.SERVER]).ToClusterValue(true), Route.Random);
311+
_serverVersion = ParseServerVersion(infoResponse.SingleValue) ?? DefaultServerVersion;
312+
}
313+
catch
314+
{
315+
_serverVersion = DefaultServerVersion;
313316
}
314317
}
315-
catch
316-
{
317-
// If we can't get version, assume newer version (use SORT_RO)
318-
_serverVersion = new Version(8, 0, 0);
319-
}
318+
319+
return _serverVersion;
320320
}
321321
}

sources/Valkey.Glide/Internals/FFI.structs.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,8 @@ public ConnectionConfig(
212212
AuthenticationInfo? authenticationInfo,
213213
uint databaseId,
214214
ConnectionConfiguration.Protocol? protocol,
215-
string? clientName)
215+
string? clientName,
216+
bool lazyConnect = false)
216217
{
217218
_addresses = addresses;
218219
_request = new()
@@ -235,6 +236,7 @@ public ConnectionConfig(
235236
HasProtocol = protocol.HasValue,
236237
Protocol = protocol ?? default,
237238
ClientName = clientName,
239+
LazyConnect = lazyConnect,
238240
};
239241
}
240242

@@ -770,6 +772,8 @@ private struct ConnectionRequest
770772
public ConnectionConfiguration.Protocol Protocol;
771773
[MarshalAs(UnmanagedType.LPStr)]
772774
public string? ClientName;
775+
[MarshalAs(UnmanagedType.U1)]
776+
public bool LazyConnect;
773777
// TODO more config params, see ffi.rs
774778
}
775779

tests/Valkey.Glide.IntegrationTests/ClusterClientTests.cs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using Valkey.Glide.Pipeline;
66

77
using static Valkey.Glide.Commands.Options.InfoOptions;
8+
using static Valkey.Glide.ConnectionConfiguration;
89
using static Valkey.Glide.Errors;
910
using static Valkey.Glide.Route;
1011

@@ -585,4 +586,70 @@ public async Task TestKeyCopyAsync(GlideClusterClient client)
585586
// Verify the source key still exists in the current database
586587
Assert.True(await client.KeyExistsAsync(sourceKey));
587588
}
589+
590+
[Fact]
591+
public async Task LazyConnect()
592+
{
593+
string serverName = $"test_{Guid.NewGuid():N}";
594+
595+
try
596+
{
597+
// Create dedicated server.
598+
var addresses = ServerManager.StartClusterServer(serverName);
599+
var configBuilder = new ClusterClientConfigurationBuilder()
600+
.WithAddress(addresses[0].host, addresses[0].port);
601+
var eagerConfig = configBuilder.WithLazyConnect(false).Build();
602+
var lazyConfig = configBuilder.WithLazyConnect(true).Build();
603+
604+
// Create reference client.
605+
using var referenceClient = await GlideClusterClient.CreateClient(eagerConfig);
606+
var initialCount = await ConnectionInfo.GetConnectionCount(referenceClient);
607+
608+
// Create lazy client (does not connect immediately).
609+
using var lazyClient = await GlideClusterClient.CreateClient(lazyConfig);
610+
var connectCount = await ConnectionInfo.GetConnectionCount(referenceClient);
611+
612+
Assert.Equal(initialCount, connectCount);
613+
614+
// First command establishes connection.
615+
await lazyClient.PingAsync();
616+
var commandCount = await ConnectionInfo.GetConnectionCount(referenceClient);
617+
618+
Assert.True(connectCount < commandCount);
619+
}
620+
finally
621+
{
622+
ServerManager.StopServer(serverName);
623+
}
624+
}
625+
626+
[Fact]
627+
public async Task EagerConnect()
628+
{
629+
string serverName = $"test_{Guid.NewGuid():N}";
630+
631+
try
632+
{
633+
// Create dedicated server.
634+
var addresses = ServerManager.StartClusterServer(serverName);
635+
var eagerConfig = new ClusterClientConfigurationBuilder()
636+
.WithAddress(addresses[0].host, addresses[0].port)
637+
.WithLazyConnect(false)
638+
.Build();
639+
640+
// Create reference client.
641+
using var referenceClient = await GlideClusterClient.CreateClient(eagerConfig);
642+
var initialCount = await ConnectionInfo.GetConnectionCount(referenceClient);
643+
644+
// Create eager client (connects immediately).
645+
using var eagerClient = await GlideClusterClient.CreateClient(eagerConfig);
646+
var connectCount = await ConnectionInfo.GetConnectionCount(referenceClient);
647+
648+
Assert.True(initialCount < connectCount);
649+
}
650+
finally
651+
{
652+
ServerManager.StopServer(serverName);
653+
}
654+
}
588655
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
// Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0
2+
3+
namespace Valkey.Glide.IntegrationTests;
4+
5+
/// <summary>
6+
/// Utility class for retrieving connection information from Valkey servers.
7+
/// </summary>
8+
internal static class ConnectionInfo
9+
{
10+
private static readonly GlideString[] ClientListCommandArgs = ["CLIENT", "LIST"];
11+
12+
/// <summary>
13+
/// Returns the total number of client connections to a standalone Valkey server.
14+
/// </summary>
15+
/// <param name="client">The standalone client instance.</param>
16+
/// <returns>A task that resolves to the total number of client connections.</returns>
17+
public static async Task<int> GetConnectionCount(GlideClient client)
18+
{
19+
var result = await client.CustomCommand(ClientListCommandArgs);
20+
return result!.ToString()!.Split('\n', StringSplitOptions.RemoveEmptyEntries).Length;
21+
}
22+
23+
/// <summary>
24+
/// Returns the total number of client connections to a cluster Valkey server.
25+
/// </summary>
26+
/// <param name="client">The cluster client instance.</param>
27+
/// <returns>A task that resolves to the total number of client connections.</returns>
28+
public static async Task<int> GetConnectionCount(GlideClusterClient client)
29+
{
30+
var result = await client.CustomCommand(ClientListCommandArgs, new Route.AllPrimariesRoute());
31+
return result!.MultiValue.Values.Sum(nodeResult =>
32+
nodeResult!.ToString()!.Split('\n', StringSplitOptions.RemoveEmptyEntries).Length);
33+
}
34+
}

0 commit comments

Comments
 (0)