1616import atexit
1717import socket
1818import threading
19+ from collections import deque
1920from collections .abc import Iterator
2021from contextlib import contextmanager
2122from typing import Optional
2223
24+ # Size of the recently released ports queue
25+ # This prevents immediate reuse of ports that were just released
26+ # Increased to 1024 to reduce the chance of cycling back to TIME_WAIT ports
27+ _RECENTLY_RELEASED_PORTS_MAXLEN = 1024
28+
2329
2430class PortManager :
2531 """Thread-safe port manager to prevent EADDRINUSE errors.
@@ -33,10 +39,12 @@ class PortManager:
3339 def __init__ (self ) -> None :
3440 self ._lock = threading .Lock ()
3541 self ._allocated_ports : set [int ] = set ()
42+ # Recently released ports are kept in a queue to avoid immediate reuse
43+ self ._recently_released : deque [int ] = deque (maxlen = _RECENTLY_RELEASED_PORTS_MAXLEN )
3644 # Register cleanup to release all ports on exit
3745 atexit .register (self .release_all )
3846
39- def allocate_port (self , preferred_port : Optional [int ] = None , max_attempts : int = 100 ) -> int :
47+ def allocate_port (self , preferred_port : Optional [int ] = None , max_attempts : int = 1000 ) -> int :
4048 """Allocate a free port, ensuring it's not already reserved.
4149
4250 Args:
@@ -55,23 +63,27 @@ def allocate_port(self, preferred_port: Optional[int] = None, max_attempts: int
5563 if (
5664 preferred_port is not None
5765 and preferred_port not in self ._allocated_ports
66+ and preferred_port not in self ._recently_released
5867 and self ._is_port_free (preferred_port )
5968 ):
6069 self ._allocated_ports .add (preferred_port )
6170 return preferred_port
6271
63- # Try to find a free port
72+ # Let the OS choose a free port, but verify it's not in our tracking structures
73+ # The OS naturally avoids ports in TIME_WAIT (without SO_REUSEADDR)
6474 for attempt in range (max_attempts ):
6575 port = self ._find_free_port ()
6676
67- # Double-check it's not in our reserved set (shouldn't happen, but be safe)
68- if port not in self ._allocated_ports :
77+ # Skip if already allocated by us or recently released
78+ # This prevents race conditions within our process
79+ if port not in self ._allocated_ports and port not in self ._recently_released :
6980 self ._allocated_ports .add (port )
7081 return port
7182
7283 raise RuntimeError (
7384 f"Failed to allocate a free port after { max_attempts } attempts. "
74- f"Currently allocated ports: { len (self ._allocated_ports )} "
85+ f"Currently allocated: { len (self ._allocated_ports )} , "
86+ f"recently released: { len (self ._recently_released )} "
7587 )
7688
7789 def release_port (self , port : int ) -> None :
@@ -82,12 +94,43 @@ def release_port(self, port: int) -> None:
8294
8395 """
8496 with self ._lock :
85- self ._allocated_ports .discard (port )
97+ if port in self ._allocated_ports :
98+ self ._allocated_ports .remove (port )
99+ # Add to the back of the queue; oldest will be evicted when queue is full
100+ self ._recently_released .append (port )
86101
87102 def release_all (self ) -> None :
88103 """Release all allocated ports."""
89104 with self ._lock :
90105 self ._allocated_ports .clear ()
106+ self ._recently_released .clear ()
107+
108+ def reserve_existing_port (self , port : int ) -> bool :
109+ """Reserve a port that was allocated externally.
110+
111+ Args:
112+ port: The externally assigned port to reserve.
113+
114+ Returns:
115+ True if the port was reserved (or already reserved), False if the port value is invalid.
116+
117+ """
118+ if port <= 0 or port > 65535 :
119+ return False
120+
121+ with self ._lock :
122+ if port in self ._allocated_ports :
123+ return True
124+
125+ # Remove from recently released queue if present (we're explicitly reserving it)
126+ if port in self ._recently_released :
127+ # Create a new deque without this port
128+ self ._recently_released = deque (
129+ (p for p in self ._recently_released if p != port ), maxlen = _RECENTLY_RELEASED_PORTS_MAXLEN
130+ )
131+
132+ self ._allocated_ports .add (port )
133+ return True
91134
92135 @contextmanager
93136 def allocated_port (self , preferred_port : Optional [int ] = None ) -> Iterator [int ]:
@@ -121,7 +164,8 @@ def _find_free_port() -> int:
121164
122165 """
123166 s = socket .socket (socket .AF_INET , socket .SOCK_STREAM )
124- s .setsockopt (socket .SOL_SOCKET , socket .SO_REUSEADDR , 1 )
167+ # Don't use SO_REUSEADDR - we need to match the behavior of TCPStore
168+ # which binds without it, so ports in TIME_WAIT will be rejected
125169 s .bind (("" , 0 ))
126170 port = s .getsockname ()[1 ]
127171 s .close ()
@@ -140,7 +184,8 @@ def _is_port_free(port: int) -> bool:
140184 """
141185 try :
142186 s = socket .socket (socket .AF_INET , socket .SOCK_STREAM )
143- s .setsockopt (socket .SOL_SOCKET , socket .SO_REUSEADDR , 1 )
187+ # Don't use SO_REUSEADDR - we need to match the behavior of TCPStore
188+ # which binds without it, so ports in TIME_WAIT will be rejected
144189 s .bind (("" , port ))
145190 s .close ()
146191 return True
0 commit comments