Skip to content

Commit 579b7eb

Browse files
Luis Garzatimm4205
authored andcommitted
feat(connection): add TCP keepalive configuration options
1 parent 8fa0a25 commit 579b7eb

File tree

7 files changed

+392
-5
lines changed

7 files changed

+392
-5
lines changed

redshift_connector/__init__.py

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,18 @@
103103

104104
__author__ = "Mathieu Fenniak"
105105

106+
def validate_keepalive_values(idle, interval, count):
107+
if idle is not None:
108+
if idle <= 0:
109+
raise ValueError("tcp_keepalive_idle must be positive")
110+
111+
if interval is not None:
112+
if interval <= 0:
113+
raise ValueError("tcp_keepalive_interval must be positive")
114+
115+
if count is not None:
116+
if count <= 0:
117+
raise ValueError("tcp_keepalive_count must be positive")
106118

107119
def connect(
108120
user: typing.Optional[str] = None,
@@ -117,6 +129,9 @@ def connect(
117129
timeout: typing.Optional[int] = None,
118130
max_prepared_statements: typing.Optional[int] = None,
119131
tcp_keepalive: typing.Optional[bool] = None,
132+
tcp_keepalive_idle: typing.Optional[int] = None,
133+
tcp_keepalive_interval: typing.Optional[int] = None,
134+
tcp_keepalive_count: typing.Optional[int] = None,
120135
application_name: typing.Optional[str] = None,
121136
replication: typing.Optional[str] = None,
122137
idp_host: typing.Optional[str] = None,
@@ -193,7 +208,13 @@ def connect(
193208
The number of seconds before the connection to the server will timeout. By default there is no timeout.
194209
max_prepared_statements : Optional[int]
195210
tcp_keepalive : Optional[bool]
196-
Is `TCP keepalive <https://en.wikipedia.org/wiki/Keepalive#TCP_keepalive>`_ used. The default value is ``True``.
211+
A boolean specifying whether the driver uses TCP keepalives to prevent connections from timing out. Defaults to ``True``.
212+
tcp_keepalive_idle : Optional[int]
213+
Time (in seconds) before sending keepalive probes. Defaults to None (system default).
214+
tcp_keepalive_interval : Optional[int]
215+
Time (in seconds) between keepalive probes. Defaults to None (system default).
216+
tcp_keepalive_count : Optional[int]
217+
Number of failed probes before connection is considered dead. Defaults to None (system default).
197218
application_name : Optional[str]
198219
Sets the application name. The default value is None.
199220
replication : Optional[str]
@@ -343,6 +364,9 @@ def connect(
343364
info.put("ssl_insecure", ssl_insecure)
344365
info.put("sslmode", sslmode)
345366
info.put("tcp_keepalive", tcp_keepalive)
367+
info.put("tcp_keepalive_idle", tcp_keepalive_idle)
368+
info.put("tcp_keepalive_interval", tcp_keepalive_interval)
369+
info.put("tcp_keepalive_count", tcp_keepalive_count)
346370
info.put("timeout", timeout)
347371
info.put("token", token)
348372
info.put("token_type", token_type)
@@ -385,6 +409,16 @@ def connect(
385409
if not redshift_native_auth:
386410
IamHelper.set_iam_properties(info)
387411

412+
if info.tcp_keepalive:
413+
try:
414+
validate_keepalive_values(
415+
info.tcp_keepalive_idle,
416+
info.tcp_keepalive_interval,
417+
info.tcp_keepalive_count
418+
)
419+
except ValueError as e:
420+
raise InterfaceError(str(e))
421+
388422
_logger.debug(make_divider_block())
389423
_logger.debug("Connection arguments following validation and IAM auth (if applicable)")
390424
_logger.debug(make_divider_block())
@@ -404,6 +438,9 @@ def connect(
404438
timeout=info.timeout,
405439
max_prepared_statements=info.max_prepared_statements,
406440
tcp_keepalive=info.tcp_keepalive,
441+
tcp_keepalive_idle=info.tcp_keepalive_idle,
442+
tcp_keepalive_interval=info.tcp_keepalive_interval,
443+
tcp_keepalive_count=info.tcp_keepalive_count,
407444
application_name=info.application_name,
408445
replication=info.replication,
409446
client_protocol_version=info.client_protocol_version,

redshift_connector/core.py

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,6 @@ def create_message(code: bytes, data: bytes = b"") -> bytes:
363363

364364
arr_trans: typing.Mapping[int, typing.Optional[str]] = dict(zip(map(ord, "[] 'u"), ["{", "}", None, None, None]))
365365

366-
367366
class Connection:
368367
# DBAPI Extension: supply exceptions as attributes on the connection
369368
Warning = property(lambda self: self._getError(Warning))
@@ -424,6 +423,9 @@ def __init__(
424423
timeout: typing.Optional[int] = None,
425424
max_prepared_statements: int = DEFAULT_MAX_PREPARED_STATEMENTS,
426425
tcp_keepalive: typing.Optional[bool] = True,
426+
tcp_keepalive_idle: typing.Optional[int] = None,
427+
tcp_keepalive_interval: typing.Optional[int] = None,
428+
tcp_keepalive_count: typing.Optional[int] = None,
427429
application_name: typing.Optional[str] = None,
428430
replication: typing.Optional[str] = None,
429431
client_protocol_version: int = DEFAULT_PROTOCOL_VERSION,
@@ -462,7 +464,13 @@ def __init__(
462464
The number of seconds before the connection to the server will timeout. By default there is no timeout.
463465
max_prepared_statements : int
464466
tcp_keepalive : Optional[bool]
465-
Is `TCP keepalive <https://en.wikipedia.org/wiki/Keepalive#TCP_keepalive>`_ used. The default value is ``True``.
467+
A boolean specifying whether the driver uses TCP keepalives to prevent connections from timing out. Defaults to ``True``.
468+
tcp_keepalive_idle : Optional[int]
469+
Time (in seconds) before sending TCP keepalive probes on an idle connection. Defaults to None (system default).
470+
tcp_keepalive_interval : Optional[int]
471+
Time (in seconds) between individual TCP keepalive probes. Defaults to None (system default).
472+
tcp_keepalive_count : Optional[int]
473+
Maximum number of TCP keepalive probes to send before dropping the connection if no response is received. Defaults to None (system default).
466474
application_name : Optional[str]
467475
Sets the application name. The default value is None.
468476
replication : Optional[str]
@@ -686,6 +694,32 @@ def get_calling_module() -> str:
686694
_logger.debug("enabling tcp keepalive on socket")
687695
self._usock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
688696

697+
# Set TCP keepalive parameters if supported by platform and values are defined
698+
if tcp_keepalive_idle is not None:
699+
# Mac OS X uses TCP_KEEPALIVE instead of TCP_KEEPIDLE
700+
if hasattr(socket, 'TCP_KEEPIDLE'):
701+
self._usock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, tcp_keepalive_idle)
702+
_logger.debug(f"Set TCP_KEEPIDLE to {tcp_keepalive_idle}")
703+
elif hasattr(socket, 'TCP_KEEPALIVE'): # macOS/BSD
704+
self._usock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPALIVE, tcp_keepalive_idle)
705+
_logger.debug(f"Set TCP_KEEPALIVE to {tcp_keepalive_idle}")
706+
else:
707+
_logger.warning("Neither TCP_KEEPIDLE nor TCP_KEEPALIVE supported on this platform")
708+
709+
if tcp_keepalive_interval is not None:
710+
if hasattr(socket, 'TCP_KEEPINTVL'):
711+
self._usock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, tcp_keepalive_interval)
712+
_logger.debug(f"Set TCP_KEEPINTVL to {tcp_keepalive_interval}")
713+
else:
714+
_logger.warning("TCP_KEEPINTVL not supported on this platform")
715+
716+
if tcp_keepalive_count is not None:
717+
if hasattr(socket, 'TCP_KEEPCNT'):
718+
self._usock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, tcp_keepalive_count)
719+
_logger.debug(f"Set TCP_KEEPCNT to {tcp_keepalive_count}")
720+
else:
721+
_logger.warning("TCP_KEEPCNT not supported on this platform")
722+
689723
except socket.timeout as timeout_error:
690724
self._usock.close()
691725
raise OperationalError("connection time out", timeout_error)

redshift_connector/redshift_property.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,12 @@ def __init__(self: "RedshiftProperty", **kwargs):
112112
self.sslmode: str = "verify-ca"
113113
# Use this property to enable or disable TCP keepalives.
114114
self.tcp_keepalive: bool = True
115+
# Time (in seconds) before sending keepalive probes
116+
self.tcp_keepalive_idle: typing.Optional[int] = None
117+
# Time (in seconds) between keepalive probes
118+
self.tcp_keepalive_interval: typing.Optional[int] = None
119+
# Number of failed probes before connection is considered dead
120+
self.tcp_keepalive_count: typing.Optional[int] = None
115121
# This is the time in seconds before the connection to the server will time out.
116122
self.timeout: typing.Optional[int] = None
117123
self.token: typing.Optional[str] = None

redshift_connector/utils/logging_utils.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ def mask_secure_info_in_props(info: "RedshiftProperty") -> "RedshiftProperty":
7171
"ssl_insecure",
7272
"sslmode",
7373
"tcp_keepalive",
74+
"tcp_keepalive_idle",
75+
"tcp_keepalive_interval",
76+
"tcp_keepalive_count",
7477
"token_type",
7578
"timeout",
7679
"unix_sock",

test/integration/test_connection.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import logging
33
import os
44
import random
5+
import socket
56
import string
67
import sys
78
import typing
@@ -334,3 +335,72 @@ def test_socket_timeout(db_kwargs) -> None:
334335

335336
with pytest.raises(redshift_connector.InterfaceError):
336337
redshift_connector.connect(**db_kwargs)
338+
339+
def test_tcp_keepalive(db_kwargs) -> None:
340+
"""Test TCP keepalive configuration is properly applied"""
341+
db_kwargs["tcp_keepalive"] = True
342+
db_kwargs["tcp_keepalive_idle"] = 30
343+
db_kwargs["tcp_keepalive_interval"] = 10
344+
db_kwargs["tcp_keepalive_count"] = 3
345+
346+
with redshift_connector.connect(**db_kwargs) as conn:
347+
# Verify the socket options after connection is established
348+
sock = conn._usock
349+
350+
# Get the socket options
351+
keepalive = sock.getsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE)
352+
assert keepalive > 0, "TCP keepalive should be enabled"
353+
354+
if hasattr(socket, 'TCP_KEEPIDLE'):
355+
idle = sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE)
356+
assert idle == 30, "TCP keepalive idle time should be 30"
357+
elif hasattr(socket, 'TCP_KEEPALIVE'):
358+
idle = sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPALIVE)
359+
assert idle == 30, "TCP keepalive idle time should be 30"
360+
361+
if hasattr(socket, 'TCP_KEEPINTVL'):
362+
interval = sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL)
363+
assert interval == 10, "TCP keepalive interval should be 10"
364+
365+
if hasattr(socket, 'TCP_KEEPCNT'):
366+
count = sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT)
367+
assert count == 3, "TCP keepalive count should be 3"
368+
369+
def test_tcp_keepalive_defaults(db_kwargs) -> None:
370+
"""Test TCP keepalive with default values"""
371+
with redshift_connector.connect(**db_kwargs) as conn:
372+
# Verify the socket options after connection is established
373+
sock = conn._usock
374+
375+
# Verify SO_KEEPALIVE is enabled
376+
keepalive = sock.getsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE)
377+
assert keepalive > 0, "TCP keepalive should be enabled"
378+
379+
# Verify other parameters were not explicitly set by checking they match system defaults
380+
# We can do this by creating another connection without keepalive and comparing values
381+
with redshift_connector.connect(**db_kwargs) as conn2:
382+
db_kwargs["tcp_keepalive"] = False
383+
sock2 = conn2._usock
384+
385+
if hasattr(socket, 'TCP_KEEPIDLE'):
386+
val1 = sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE)
387+
val2 = sock2.getsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE)
388+
assert val1 == val2, "TCP_KEEPIDLE should match system default. Got {} but expected {}".format(val1,
389+
val2)
390+
elif hasattr(socket, 'TCP_KEEPALIVE'):
391+
val1 = sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPALIVE)
392+
val2 = sock2.getsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPALIVE)
393+
assert val1 == val2, "TCP_KEEPALIVE should match system default. Got {} but expected {}".format(val1,
394+
val2)
395+
396+
if hasattr(socket, 'TCP_KEEPINTVL'):
397+
val1 = sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL)
398+
val2 = sock2.getsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL)
399+
assert val1 == val2, "TCP_KEEPINTVL should match system default. Got {} but expected {}".format(val1,
400+
val2)
401+
402+
if hasattr(socket, 'TCP_KEEPCNT'):
403+
val1 = sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT)
404+
val2 = sock2.getsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT)
405+
assert val1 == val2, "TCP_KEEPCNT should match system default. Got {} but expected {}".format(val1,
406+
val2)

test/unit/test_connection.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
InterfaceError,
1717
OperationalError,
1818
ProgrammingError,
19+
validate_keepalive_values,
1920
)
2021
from redshift_connector.config import (
2122
ClientProtocolVersion,
@@ -560,3 +561,24 @@ def test_make_params_maps_pandas_timestamp_to_timestamp(db_kwargs):
560561
assert res[0][0] == RedshiftOID.TIMESTAMPTZ
561562
assert res[0][1] == 1
562563
assert res[0][2] == timestamptz_send_integer
564+
565+
566+
@pytest.mark.parametrize(
567+
"idle,interval,count,expected_error",
568+
[
569+
(-1, None, None, "tcp_keepalive_idle must be positive"),
570+
(None, -1, None, "tcp_keepalive_interval must be positive"),
571+
(None, None, -1, "tcp_keepalive_count must be positive"),
572+
(0, None, None, "tcp_keepalive_idle must be positive"),
573+
(None, 0, None, "tcp_keepalive_interval must be positive"),
574+
(None, None, 0, "tcp_keepalive_count must be positive"),
575+
(1, 1, 1, None), # valid values should not raise error
576+
],
577+
)
578+
def test_tcp_keepalive_validation(idle, interval, count, expected_error):
579+
if expected_error:
580+
with pytest.raises(ValueError, match=expected_error):
581+
validate_keepalive_values(idle, interval, count)
582+
else:
583+
# Should not raise any exception
584+
validate_keepalive_values(idle, interval, count)

0 commit comments

Comments
 (0)