Skip to content

Commit aeb6bcf

Browse files
authored
Revert "PYTHON-5536 Avoid clearing the connection pool when the server connec…"
This reverts commit d267eb4.
1 parent 27785ae commit aeb6bcf

File tree

19 files changed

+63
-345
lines changed

19 files changed

+63
-345
lines changed

.evergreen/run-tests.sh

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,12 @@ else
2626
fi
2727

2828
# List the packages.
29-
uv sync ${UV_ARGS} --reinstall --quiet
29+
uv sync ${UV_ARGS} --reinstall
3030
uv pip list
3131

32+
# Ensure we go back to base environment after the test.
33+
trap "uv sync" EXIT HUP
34+
3235
# Start the test runner.
3336
uv run ${UV_ARGS} .evergreen/scripts/run_tests.py "$@"
3437

justfile

Lines changed: 16 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
# See https://just.systems/man/en/ for instructions
22
set shell := ["bash", "-c"]
3-
# Do not modify the lock file when running justfile commands.
4-
export UV_FROZEN := "1"
53

64
# Commonly used command segments.
7-
typing_run := "uv run --group typing --extra aws --extra encryption --extra ocsp --extra snappy --extra test --extra zstd"
8-
docs_run := "uv run --extra docs"
5+
uv_run := "uv run --isolated --frozen "
6+
typing_run := uv_run + "--group typing --extra aws --extra encryption --extra ocsp --extra snappy --extra test --extra zstd"
7+
docs_run := uv_run + "--extra docs"
98
doc_build := "./doc/_build"
109
mypy_args := "--install-types --non-interactive"
1110

@@ -14,55 +13,51 @@ mypy_args := "--install-types --non-interactive"
1413
default:
1514
@just --list
1615

17-
[private]
18-
resync:
19-
@uv sync --quiet --frozen
20-
2116
install:
2217
bash .evergreen/scripts/setup-dev-env.sh
2318

2419
[group('docs')]
25-
docs: && resync
20+
docs:
2621
{{docs_run}} sphinx-build -W -b html doc {{doc_build}}/html
2722

2823
[group('docs')]
29-
docs-serve: && resync
24+
docs-serve:
3025
{{docs_run}} sphinx-autobuild -W -b html doc --watch ./pymongo --watch ./bson --watch ./gridfs {{doc_build}}/serve
3126

3227
[group('docs')]
33-
docs-linkcheck: && resync
28+
docs-linkcheck:
3429
{{docs_run}} sphinx-build -E -b linkcheck doc {{doc_build}}/linkcheck
3530

3631
[group('typing')]
37-
typing: && resync
32+
typing:
3833
just typing-mypy
3934
just typing-pyright
4035

4136
[group('typing')]
42-
typing-mypy: && resync
37+
typing-mypy:
4338
{{typing_run}} mypy {{mypy_args}} bson gridfs tools pymongo
4439
{{typing_run}} mypy {{mypy_args}} --config-file mypy_test.ini test
4540
{{typing_run}} mypy {{mypy_args}} test/test_typing.py test/test_typing_strict.py
4641

4742
[group('typing')]
48-
typing-pyright: && resync
43+
typing-pyright:
4944
{{typing_run}} pyright test/test_typing.py test/test_typing_strict.py
5045
{{typing_run}} pyright -p strict_pyrightconfig.json test/test_typing_strict.py
5146

5247
[group('lint')]
53-
lint: && resync
54-
uv run pre-commit run --all-files
48+
lint:
49+
{{uv_run}} pre-commit run --all-files
5550

5651
[group('lint')]
57-
lint-manual: && resync
58-
uv run pre-commit run --all-files --hook-stage manual
52+
lint-manual:
53+
{{uv_run}} pre-commit run --all-files --hook-stage manual
5954

6055
[group('test')]
61-
test *args="-v --durations=5 --maxfail=10": && resync
62-
uv run --extra test pytest {{args}}
56+
test *args="-v --durations=5 --maxfail=10":
57+
{{uv_run}} --extra test pytest {{args}}
6358

6459
[group('test')]
65-
run-tests *args: && resync
60+
run-tests *args:
6661
bash ./.evergreen/run-tests.sh {{args}}
6762

6863
[group('test')]

pymongo/asynchronous/pool.py

Lines changed: 12 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
from bson import DEFAULT_CODEC_OPTIONS
3838
from pymongo import _csot, helpers_shared
3939
from pymongo.asynchronous.client_session import _validate_session_write_concern
40-
from pymongo.asynchronous.helpers import _backoff, _handle_reauth
40+
from pymongo.asynchronous.helpers import _handle_reauth
4141
from pymongo.asynchronous.network import command
4242
from pymongo.common import (
4343
MAX_BSON_SIZE,
@@ -788,9 +788,9 @@ def __init__(
788788
# Enforces: maxConnecting
789789
# Also used for: clearing the wait queue
790790
self._max_connecting_cond = _async_create_condition(self.lock)
791+
self._max_connecting = self.opts.max_connecting
791792
self._pending = 0
792793
self._client_id = client_id
793-
self._backoff = 0
794794
if self.enabled_for_cmap:
795795
assert self.opts._event_listeners is not None
796796
self.opts._event_listeners.publish_pool_created(
@@ -846,8 +846,6 @@ async def _reset(
846846
async with self.size_cond:
847847
if self.closed:
848848
return
849-
# Clear the backoff state.
850-
self._backoff = 0
851849
if self.opts.pause_enabled and pause and not self.opts.load_balanced:
852850
old_state, self.state = self.state, PoolState.PAUSED
853851
self.gen.inc(service_id)
@@ -930,11 +928,6 @@ async def _reset(
930928
for conn in sockets:
931929
await conn.close_conn(ConnectionClosedReason.STALE)
932930

933-
@property
934-
def max_connecting(self) -> int:
935-
"""The current max connecting limit for the pool."""
936-
return 1 if self._backoff else self.opts.max_connecting
937-
938931
async def update_is_writable(self, is_writable: Optional[bool]) -> None:
939932
"""Updates the is_writable attribute on all sockets currently in the
940933
Pool.
@@ -1001,7 +994,7 @@ async def remove_stale_sockets(self, reference_generation: int) -> None:
1001994
async with self._max_connecting_cond:
1002995
# If maxConnecting connections are already being created
1003996
# by this pool then try again later instead of waiting.
1004-
if self._pending >= self.max_connecting:
997+
if self._pending >= self._max_connecting:
1005998
return
1006999
self._pending += 1
10071000
incremented = True
@@ -1029,30 +1022,6 @@ async def remove_stale_sockets(self, reference_generation: int) -> None:
10291022
self.requests -= 1
10301023
self.size_cond.notify()
10311024

1032-
def _handle_connection_error(self, error: BaseException, phase: str, conn_id: int) -> None:
1033-
# Handle system overload condition for non-sdam pools.
1034-
# Look for an AutoReconnect error raised from a ConnectionResetError with
1035-
# errno == errno.ECONNRESET or raised from an OSError that we've created due to
1036-
# a closed connection.
1037-
# If found, set backoff and add error labels.
1038-
if self.is_sdam or type(error) != AutoReconnect:
1039-
return
1040-
self._backoff += 1
1041-
error._add_error_label("SystemOverloadedError")
1042-
error._add_error_label("RetryableError")
1043-
# Log the pool backoff message.
1044-
if self.enabled_for_logging and _CONNECTION_LOGGER.isEnabledFor(logging.DEBUG):
1045-
_debug_log(
1046-
_CONNECTION_LOGGER,
1047-
message=_ConnectionStatusMessage.POOL_BACKOFF,
1048-
clientId=self._client_id,
1049-
serverHost=self.address[0],
1050-
serverPort=self.address[1],
1051-
driverConnectionId=conn_id,
1052-
reason=_verbose_connection_error_reason(ConnectionClosedReason.POOL_BACKOFF),
1053-
error=ConnectionClosedReason.POOL_BACKOFF,
1054-
)
1055-
10561025
async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> AsyncConnection:
10571026
"""Connect to Mongo and return a new AsyncConnection.
10581027
@@ -1082,17 +1051,8 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A
10821051
driverConnectionId=conn_id,
10831052
)
10841053

1085-
# Apply backoff if applicable.
1086-
if self._backoff:
1087-
await asyncio.sleep(_backoff(self._backoff))
1088-
1089-
# Pass a context to determine if we successfully create a configured socket.
1090-
context = dict(has_created_socket=False)
1091-
10921054
try:
1093-
networking_interface = await _configured_protocol_interface(
1094-
self.address, self.opts, context=context
1095-
)
1055+
networking_interface = await _configured_protocol_interface(self.address, self.opts)
10961056
# Catch KeyboardInterrupt, CancelledError, etc. and cleanup.
10971057
except BaseException as error:
10981058
async with self.lock:
@@ -1113,11 +1073,10 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A
11131073
reason=_verbose_connection_error_reason(ConnectionClosedReason.ERROR),
11141074
error=ConnectionClosedReason.ERROR,
11151075
)
1116-
if context["has_created_socket"]:
1117-
self._handle_connection_error(error, "handshake", conn_id)
11181076
if isinstance(error, (IOError, OSError, *SSLErrors)):
11191077
details = _get_timeout_details(self.opts)
11201078
_raise_connection_failure(self.address, error, timeout_details=details)
1079+
11211080
raise
11221081

11231082
conn = AsyncConnection(networking_interface, self, self.address, conn_id, self.is_sdam) # type: ignore[arg-type]
@@ -1135,18 +1094,15 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A
11351094

11361095
await conn.authenticate()
11371096
# Catch KeyboardInterrupt, CancelledError, etc. and cleanup.
1138-
except BaseException as e:
1097+
except BaseException:
11391098
async with self.lock:
11401099
self.active_contexts.discard(conn.cancel_context)
1141-
self._handle_connection_error(e, "hello", conn_id)
11421100
await conn.close_conn(ConnectionClosedReason.ERROR)
11431101
raise
11441102

11451103
if handler:
11461104
await handler.client._topology.receive_cluster_time(conn._cluster_time)
11471105

1148-
# Clear the backoff state.
1149-
self._backoff = 0
11501106
return conn
11511107

11521108
@contextlib.asynccontextmanager
@@ -1323,12 +1279,12 @@ async def _get_conn(
13231279
# to be checked back into the pool.
13241280
async with self._max_connecting_cond:
13251281
self._raise_if_not_ready(checkout_started_time, emit_event=False)
1326-
while not (self.conns or self._pending < self.max_connecting):
1282+
while not (self.conns or self._pending < self._max_connecting):
13271283
timeout = deadline - time.monotonic() if deadline else None
13281284
if not await _async_cond_wait(self._max_connecting_cond, timeout):
13291285
# Timed out, notify the next thread to ensure a
13301286
# timeout doesn't consume the condition.
1331-
if self.conns or self._pending < self.max_connecting:
1287+
if self.conns or self._pending < self._max_connecting:
13321288
self._max_connecting_cond.notify()
13331289
emitted_event = True
13341290
self._raise_wait_queue_timeout(checkout_started_time)
@@ -1469,8 +1425,8 @@ async def _perished(self, conn: AsyncConnection) -> bool:
14691425
:class:`~pymongo.errors.AutoReconnect` exceptions on server
14701426
hiccups, etc. We only check if the socket was closed by an external
14711427
error if it has been > 1 second since the socket was checked into the
1472-
pool, or we are in backoff mode, to keep performance reasonable -
1473-
we can't avoid AutoReconnects completely anyway.
1428+
pool, to keep performance reasonable - we can't avoid AutoReconnects
1429+
completely anyway.
14741430
"""
14751431
idle_time_seconds = conn.idle_time_seconds()
14761432
# If socket is idle, open a new one.
@@ -1481,11 +1437,8 @@ async def _perished(self, conn: AsyncConnection) -> bool:
14811437
await conn.close_conn(ConnectionClosedReason.IDLE)
14821438
return True
14831439

1484-
check_interval_seconds = self._check_interval_seconds
1485-
if self._backoff:
1486-
check_interval_seconds = 0
1487-
if check_interval_seconds is not None and (
1488-
check_interval_seconds == 0 or idle_time_seconds > check_interval_seconds
1440+
if self._check_interval_seconds is not None and (
1441+
self._check_interval_seconds == 0 or idle_time_seconds > self._check_interval_seconds
14891442
):
14901443
if conn.conn_closed():
14911444
await conn.close_conn(ConnectionClosedReason.ERROR)

pymongo/asynchronous/topology.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -890,9 +890,7 @@ async def _handle_error(self, address: _Address, err_ctx: _ErrorContext) -> None
890890
# Clear the pool.
891891
await server.reset(service_id)
892892
elif isinstance(error, ConnectionFailure):
893-
if isinstance(error, WaitQueueTimeoutError) or error.has_error_label(
894-
"SystemOverloadedError"
895-
):
893+
if isinstance(error, WaitQueueTimeoutError):
896894
return
897895
# "Client MUST replace the server's description with type Unknown
898896
# ... MUST NOT request an immediate check of the server."

pymongo/logger.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ class _ConnectionStatusMessage(str, enum.Enum):
4242
POOL_READY = "Connection pool ready"
4343
POOL_CLOSED = "Connection pool closed"
4444
POOL_CLEARED = "Connection pool cleared"
45-
POOL_BACKOFF = "Connection pool backoff"
4645

4746
CONN_CREATED = "Connection created"
4847
CONN_READY = "Connection ready"
@@ -89,7 +88,6 @@ class _SDAMStatusMessage(str, enum.Enum):
8988
_VERBOSE_CONNECTION_ERROR_REASONS = {
9089
ConnectionClosedReason.POOL_CLOSED: "Connection pool was closed",
9190
ConnectionCheckOutFailedReason.POOL_CLOSED: "Connection pool was closed",
92-
ConnectionClosedReason.POOL_BACKOFF: "Connection pool is in backoff",
9391
ConnectionClosedReason.STALE: "Connection pool was stale",
9492
ConnectionClosedReason.ERROR: "An error occurred while using the connection",
9593
ConnectionCheckOutFailedReason.CONN_ERROR: "An error occurred while trying to establish a new connection",

pymongo/monitoring.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -934,9 +934,6 @@ class ConnectionClosedReason:
934934
POOL_CLOSED = "poolClosed"
935935
"""The pool was closed, making the connection no longer valid."""
936936

937-
POOL_BACKOFF = "poolBackoff"
938-
"""The pool is in backoff mode."""
939-
940937

941938
class ConnectionCheckOutFailedReason:
942939
"""An enum that defines values for `reason` on a

pymongo/network_layer.py

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,6 @@ def __init__(self, timeout: Optional[float] = None):
256256
self._timeout = timeout
257257
self._closed = asyncio.get_running_loop().create_future()
258258
self._connection_lost = False
259-
self._closing_exception = None
260259

261260
def settimeout(self, timeout: float | None) -> None:
262261
self._timeout = timeout
@@ -270,11 +269,9 @@ def close(self, exc: Optional[Exception] = None) -> None:
270269
self.transport.abort()
271270
self._resolve_pending(exc)
272271
self._connection_lost = True
273-
self._closing_exception = exc # type:ignore[assignment]
274272

275273
def connection_lost(self, exc: Optional[Exception] = None) -> None:
276274
self._resolve_pending(exc)
277-
self._closing_exception = exc # type:ignore[assignment]
278275
if not self._closed.done():
279276
self._closed.set_result(None)
280277

@@ -338,11 +335,8 @@ async def read(self, request_id: Optional[int], max_message_size: int) -> tuple[
338335
if self._done_messages:
339336
message = await self._done_messages.popleft()
340337
else:
341-
if self._closed.done():
342-
if self._closing_exception:
343-
raise self._closing_exception
344-
else:
345-
raise OSError("connection closed")
338+
if self.transport and self.transport.is_closing():
339+
raise OSError("connection is already closed")
346340
read_waiter = asyncio.get_running_loop().create_future()
347341
self._pending_messages.append(read_waiter)
348342
try:
@@ -480,7 +474,6 @@ def _resolve_pending(self, exc: Optional[Exception] = None) -> None:
480474
else:
481475
msg.set_exception(exc)
482476
self._done_messages.append(msg)
483-
self._pending_messages.clear()
484477

485478

486479
class PyMongoKMSProtocol(PyMongoBaseProtocol):

pymongo/pool_shared.py

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,6 @@ async def _configured_protocol_interface(
250250
address: _Address,
251251
options: PoolOptions,
252252
protocol_kls: type[PyMongoBaseProtocol] = PyMongoProtocol,
253-
context: dict[str, bool] | None = None,
254253
) -> AsyncNetworkingInterface:
255254
"""Given (host, port) and PoolOptions, return a configured AsyncNetworkingInterface.
256255
@@ -262,10 +261,6 @@ async def _configured_protocol_interface(
262261
ssl_context = options._ssl_context
263262
timeout = options.socket_timeout
264263

265-
# Signal that we have created the socket successfully.
266-
if context:
267-
context["has_created_socket"] = True
268-
269264
if ssl_context is None:
270265
return AsyncNetworkingInterface(
271266
await asyncio.get_running_loop().create_connection(
@@ -379,7 +374,7 @@ def _create_connection(address: _Address, options: PoolOptions) -> socket.socket
379374

380375

381376
def _configured_socket_interface(
382-
address: _Address, options: PoolOptions, *args: Any, context: dict[str, bool] | None = None
377+
address: _Address, options: PoolOptions, *args: Any
383378
) -> NetworkingInterface:
384379
"""Given (host, port) and PoolOptions, return a NetworkingInterface wrapping a configured socket.
385380
@@ -390,10 +385,6 @@ def _configured_socket_interface(
390385
sock = _create_connection(address, options)
391386
ssl_context = options._ssl_context
392387

393-
# Signal that we have created the socket successfully.
394-
if context:
395-
context["has_created_socket"] = True
396-
397388
if ssl_context is None:
398389
sock.settimeout(options.socket_timeout)
399390
return NetworkingInterface(sock)

0 commit comments

Comments
 (0)