@@ -127,11 +127,13 @@ async def _send(self, message: str) -> None:
127127 """Send the provided message to the adapter connection and log the message"""
128128
129129 if not self ._connected :
130- raise TransportClosed (
131- "Transport is not connected"
132- ) from self .close_exception
130+ if isinstance (self .close_exception , TransportConnectionFailed ):
131+ raise self .close_exception
132+ else :
133+ raise TransportConnectionFailed () from self .close_exception
133134
134135 try :
136+ # Can raise TransportConnectionFailed
135137 await self .adapter .send (message )
136138 log .info (">>> %s" , message )
137139 except TransportConnectionFailed as e :
@@ -143,7 +145,7 @@ async def _receive(self) -> str:
143145
144146 # It is possible that the connection has been already closed in another task
145147 if not self ._connected :
146- raise TransportClosed ( "Transport is already closed" )
148+ raise TransportConnectionFailed () from self . close_exception
147149
148150 # Wait for the next frame.
149151 # Can raise TransportConnectionFailed or TransportProtocolError
@@ -214,8 +216,6 @@ async def _receive_data_loop(self) -> None:
214216 except (TransportConnectionFailed , TransportProtocolError ) as e :
215217 await self ._fail (e , clean_close = False )
216218 break
217- except TransportClosed :
218- break
219219
220220 # Parse the answer
221221 try :
@@ -482,6 +482,10 @@ async def _close_coro(self, e: Exception, clean_close: bool = True) -> None:
482482 # We should always have an active websocket connection here
483483 assert self ._connected
484484
485+ # Saving exception to raise it later if trying to use the transport
486+ # after it has already closed.
487+ self .close_exception = e
488+
485489 # Properly shut down liveness checker if enabled
486490 if self .check_keep_alive_task is not None :
487491 # More info: https://stackoverflow.com/a/43810272/1113207
@@ -492,18 +496,17 @@ async def _close_coro(self, e: Exception, clean_close: bool = True) -> None:
492496 # Calling the subclass close hook
493497 await self ._close_hook ()
494498
495- # Saving exception to raise it later if trying to use the transport
496- # after it has already closed.
497- self .close_exception = e
498-
499499 if clean_close :
500500 log .debug ("_close_coro: starting clean_close" )
501501 try :
502502 await self ._clean_close (e )
503503 except Exception as exc : # pragma: no cover
504504 log .warning ("Ignoring exception in _clean_close: " + repr (exc ))
505505
506- log .debug ("_close_coro: sending exception to listeners" )
506+ if log .isEnabledFor (logging .DEBUG ):
507+ log .debug (
508+ f"_close_coro: sending exception to { len (self .listeners )} listeners"
509+ )
507510
508511 # Send an exception to all remaining listeners
509512 for query_id , listener in self .listeners .items ():
@@ -530,7 +533,15 @@ async def _close_coro(self, e: Exception, clean_close: bool = True) -> None:
530533 log .debug ("_close_coro: exiting" )
531534
532535 async def _fail (self , e : Exception , clean_close : bool = True ) -> None :
533- log .debug ("_fail: starting with exception: " + repr (e ))
536+ if log .isEnabledFor (logging .DEBUG ):
537+ import inspect
538+
539+ current_frame = inspect .currentframe ()
540+ assert current_frame is not None
541+ caller_frame = current_frame .f_back
542+ assert caller_frame is not None
543+ caller_name = inspect .getframeinfo (caller_frame ).function
544+ log .debug (f"_fail from { caller_name } : " + repr (e ))
534545
535546 if self .close_task is None :
536547
0 commit comments