Skip to content

Commit 57e2081

Browse files
authored
fix: Don't hang when MCP server returns 5xx (#1169)
Fixes #995 where if a MCP tool_call receives a 5XX error from the server, the call hangs and never ends. The root cause is that Anthropic's MCP client - on receiving a 5XX - bubbles up an exception that ends up cancelling all TaskGroup tasks which results in the session/client/asyncio loop being torn down and the tool_call never resolves, thus the hang. The fix is two fold: - Detect that the situation occurs and trigger a close `close_future` future - Update all background_invokes to eagerly bail on `close_future` being triggered --------- Co-authored-by: Mackenzie Zastrow <zastrowm@users.noreply.github.com>
1 parent ccc3a8b commit 57e2081

File tree

2 files changed

+125
-13
lines changed

2 files changed

+125
-13
lines changed

src/strands/tools/mcp/mcp_client.py

Lines changed: 58 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -119,10 +119,12 @@ def __init__(
119119
mcp_instrumentation()
120120
self._session_id = uuid.uuid4()
121121
self._log_debug_with_thread("initializing MCPClient connection")
122-
# Main thread blocks until future completesock
122+
# Main thread blocks until future completes
123123
self._init_future: futures.Future[None] = futures.Future()
124+
# Set within the inner loop as it needs the asyncio loop
125+
self._close_future: asyncio.futures.Future[None] | None = None
126+
self._close_exception: None | Exception = None
124127
# Do not want to block other threads while close event is false
125-
self._close_event = asyncio.Event()
126128
self._transport_callable = transport_callable
127129

128130
self._background_thread: threading.Thread | None = None
@@ -288,11 +290,12 @@ def stop(
288290
- _background_thread: Thread running the async event loop
289291
- _background_thread_session: MCP ClientSession (auto-closed by context manager)
290292
- _background_thread_event_loop: AsyncIO event loop in background thread
291-
- _close_event: AsyncIO event to signal thread shutdown
293+
- _close_future: AsyncIO future to signal thread shutdown
294+
- _close_exception: Exception that caused the background thread shutdown; None if a normal shutdown occurred.
292295
- _init_future: Future for initialization synchronization
293296
294297
Cleanup order:
295-
1. Signal close event to background thread (if session initialized)
298+
1. Signal close future to background thread (if session initialized)
296299
2. Wait for background thread to complete
297300
3. Reset all state for reuse
298301
@@ -303,25 +306,26 @@ def stop(
303306
"""
304307
self._log_debug_with_thread("exiting MCPClient context")
305308

306-
# Only try to signal close event if we have a background thread
309+
# Only try to signal close future if we have a background thread
307310
if self._background_thread is not None:
308-
# Signal close event if event loop exists
311+
# Signal close future if event loop exists
309312
if self._background_thread_event_loop is not None:
310313

311314
async def _set_close_event() -> None:
312-
self._close_event.set()
315+
if self._close_future and not self._close_future.done():
316+
self._close_future.set_result(None)
313317

314318
# Not calling _invoke_on_background_thread since the session does not need to exist
315319
# we only need the thread and event loop to exist.
316320
asyncio.run_coroutine_threadsafe(coro=_set_close_event(), loop=self._background_thread_event_loop)
317321

318322
self._log_debug_with_thread("waiting for background thread to join")
319323
self._background_thread.join()
324+
320325
self._log_debug_with_thread("background thread is closed, MCPClient context exited")
321326

322327
# Reset fields to allow instance reuse
323328
self._init_future = futures.Future()
324-
self._close_event = asyncio.Event()
325329
self._background_thread = None
326330
self._background_thread_session = None
327331
self._background_thread_event_loop = None
@@ -330,6 +334,11 @@ async def _set_close_event() -> None:
330334
self._tool_provider_started = False
331335
self._consumers = set()
332336

337+
if self._close_exception:
338+
exception = self._close_exception
339+
self._close_exception = None
340+
raise RuntimeError("Connection to the MCP server was closed") from exception
341+
333342
def list_tools_sync(
334343
self,
335344
pagination_token: str | None = None,
@@ -563,6 +572,10 @@ async def _async_background_thread(self) -> None:
563572
signals readiness to the main thread, and waits for a close signal.
564573
"""
565574
self._log_debug_with_thread("starting async background thread for MCP connection")
575+
576+
# Initialized here so that it has the asyncio loop
577+
self._close_future = asyncio.Future()
578+
566579
try:
567580
async with self._transport_callable() as (read_stream, write_stream, *_):
568581
self._log_debug_with_thread("transport connection established")
@@ -583,15 +596,22 @@ async def _async_background_thread(self) -> None:
583596

584597
self._log_debug_with_thread("waiting for close signal")
585598
# Keep background thread running until signaled to close.
586-
# Thread is not blocked as this is an asyncio.Event not a threading.Event
587-
await self._close_event.wait()
599+
# Thread is not blocked as this a future
600+
await self._close_future
601+
588602
self._log_debug_with_thread("close signal received")
589603
except Exception as e:
590604
# If we encounter an exception and the future is still running,
591605
# it means it was encountered during the initialization phase.
592606
if not self._init_future.done():
593607
self._init_future.set_exception(e)
594608
else:
609+
# _close_future is automatically cancelled by the framework which doesn't provide us with the useful
610+
# exception, so instead we store the exception in a different field where stop() can read it
611+
self._close_exception = e
612+
if self._close_future and not self._close_future.done():
613+
self._close_future.set_result(None)
614+
595615
self._log_debug_with_thread(
596616
"encountered exception on background thread after initialization %s", str(e)
597617
)
@@ -601,7 +621,7 @@ def _background_task(self) -> None:
601621
602622
This method creates a new event loop for the background thread,
603623
sets it as the current event loop, and runs the async_background_thread
604-
coroutine until completion. In this case "until completion" means until the _close_event is set.
624+
coroutine until completion. In this case "until completion" means until the _close_future is resolved.
605625
This allows for a long-running event loop.
606626
"""
607627
self._log_debug_with_thread("setting up background task event loop")
@@ -699,9 +719,34 @@ def _log_debug_with_thread(self, msg: str, *args: Any, **kwargs: Any) -> None:
699719
)
700720

701721
def _invoke_on_background_thread(self, coro: Coroutine[Any, Any, T]) -> futures.Future[T]:
702-
if self._background_thread_session is None or self._background_thread_event_loop is None:
722+
# save a reference to this so that even if it's reset we have the original
723+
close_future = self._close_future
724+
725+
if (
726+
self._background_thread_session is None
727+
or self._background_thread_event_loop is None
728+
or close_future is None
729+
):
703730
raise MCPClientInitializationError("the client session was not initialized")
704-
return asyncio.run_coroutine_threadsafe(coro=coro, loop=self._background_thread_event_loop)
731+
732+
async def run_async() -> T:
733+
# Fix for strands-agents/sdk-python/issues/995 - cancel all pending invocations if/when the session closes
734+
invoke_event = asyncio.create_task(coro)
735+
tasks: list[asyncio.Task | asyncio.Future] = [
736+
invoke_event,
737+
close_future,
738+
]
739+
740+
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
741+
742+
if done.pop() == close_future:
743+
self._log_debug_with_thread("event loop for the server closed before the invoke completed")
744+
raise RuntimeError("Connection to the MCP server was closed")
745+
else:
746+
return await invoke_event
747+
748+
invoke_future = asyncio.run_coroutine_threadsafe(coro=run_async(), loop=self._background_thread_event_loop)
749+
return invoke_future
705750

706751
def _should_include_tool(self, tool: MCPAgentTool) -> bool:
707752
"""Check if a tool should be included based on constructor filters."""

tests_integ/mcp/test_mcp_client.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -420,3 +420,70 @@ def transport_callback() -> MCPTransport:
420420
result = await streamable_http_client.call_tool_async(tool_use_id="123", name="timeout_tool")
421421
assert result["status"] == "error"
422422
assert result["content"][0]["text"] == "Tool execution failed: Connection closed"
423+
424+
425+
def start_5xx_proxy_for_tool_calls(target_url: str, proxy_port: int):
426+
"""Starts a proxy that throws a 5XX when a tool call is invoked"""
427+
import aiohttp
428+
from aiohttp import web
429+
430+
async def proxy_handler(request):
431+
url = f"{target_url}{request.path_qs}"
432+
433+
async with aiohttp.ClientSession() as session:
434+
data = await request.read()
435+
436+
if "tools/call" in f"{data}":
437+
return web.Response(status=500, text="Internal Server Error")
438+
439+
async with session.request(
440+
method=request.method, url=url, headers=request.headers, data=data, allow_redirects=False
441+
) as resp:
442+
print(f"Got request to {url} {data}")
443+
response = web.StreamResponse(status=resp.status, headers=resp.headers)
444+
await response.prepare(request)
445+
446+
async for chunk in resp.content.iter_chunked(8192):
447+
await response.write(chunk)
448+
449+
return response
450+
451+
app = web.Application()
452+
app.router.add_route("*", "/{path:.*}", proxy_handler)
453+
454+
web.run_app(app, host="127.0.0.1", port=proxy_port)
455+
456+
457+
@pytest.mark.asyncio
458+
async def test_streamable_http_mcp_client_with_500_error():
459+
import asyncio
460+
import multiprocessing
461+
462+
server_thread = threading.Thread(
463+
target=start_comprehensive_mcp_server, kwargs={"transport": "streamable-http", "port": 8001}, daemon=True
464+
)
465+
server_thread.start()
466+
467+
proxy_process = multiprocessing.Process(
468+
target=start_5xx_proxy_for_tool_calls, kwargs={"target_url": "http://127.0.0.1:8001", "proxy_port": 8002}
469+
)
470+
proxy_process.start()
471+
472+
try:
473+
await asyncio.sleep(2) # wait for server to startup completely
474+
475+
def transport_callback() -> MCPTransport:
476+
return streamablehttp_client(url="http://127.0.0.1:8002/mcp")
477+
478+
streamable_http_client = MCPClient(transport_callback)
479+
with pytest.raises(RuntimeError, match="Connection to the MCP server was closed"):
480+
with streamable_http_client:
481+
result = await streamable_http_client.call_tool_async(
482+
tool_use_id="123", name="calculator", arguments={"x": 3, "y": 4}
483+
)
484+
finally:
485+
proxy_process.terminate()
486+
proxy_process.join()
487+
488+
assert result["status"] == "error"
489+
assert result["content"][0]["text"] == "Tool execution failed: Connection to the MCP server was closed"

0 commit comments

Comments
 (0)