@@ -694,9 +694,17 @@ async def _cancel(self):
694694
695695 async def connect (self , force = False ):
696696 if not force :
697- await self ._lock .acquire ()
697+ async with self ._lock :
698+ return await self ._connect_internal (force )
698699 else :
699700 logger .debug ("Proceeding without acquiring lock." )
701+ return await self ._connect_internal (force )
702+
703+ async def _connect_internal (self , force ):
704+ # Check state again after acquiring lock to avoid duplicate connections
705+ if not force and self .state in (State .OPEN , State .CONNECTING ):
706+ return None
707+
700708 logger .debug (f"Websocket connecting to { self .ws_url } " )
701709 if self ._sending is None or self ._sending .empty ():
702710 self ._sending = asyncio .Queue ()
@@ -725,17 +733,13 @@ async def connect(self, force=False):
725733 except socket .gaierror :
726734 logger .debug (f"Hostname not known (this is just for testing" )
727735 await asyncio .sleep (10 )
728- if self ._lock .locked ():
729- self ._lock .release ()
730736 return await self .connect (force = force )
731737 logger .debug ("Connection established" )
732738 self .ws = connection
733739 if self ._send_recv_task is None or self ._send_recv_task .done ():
734740 self ._send_recv_task = asyncio .get_running_loop ().create_task (
735741 self ._handler (self .ws )
736742 )
737- if self ._lock .locked ():
738- self ._lock .release ()
739743 return None
740744
741745 async def _handler (self , ws : ClientConnection ) -> Union [None , Exception ]:
0 commit comments