11import asyncio
22import logging
33import re
4+ import traceback
45import urllib .parse
56from collections .abc import Coroutine , Iterable
67from concurrent .futures import Future
7- from importlib import import_module
88from pathlib import Path
9- from threading import Thread
109from typing import Any , Callable
1110
1211import orjson
2221from reactpy .core .types import ComponentType
2322
2423_logger = logging .getLogger (__name__ )
25- _backhaul_loop = asyncio .new_event_loop ()
26-
27-
28- def start_backhaul_loop ():
29- """Starts the asyncio event loop that will perform component rendering
30- tasks."""
31- asyncio .set_event_loop (_backhaul_loop )
32- _backhaul_loop .run_forever ()
33-
34-
35- _backhaul_thread = Thread (target = start_backhaul_loop , daemon = True )
3624
3725
3826class ReactPyMiddleware :
27+ _asgi_single_callable = True
28+ servestatic_static : ServeStaticASGI | None = None
29+ servestatic_web_modules : ServeStaticASGI | None = None
30+ single_root_component : bool = False
31+ root_component : ComponentType | None = None
32+
3933 def __init__ (
4034 self ,
4135 app : Callable [..., Coroutine ],
@@ -58,23 +52,15 @@ def __init__(
5852 self .static_pattern = re .compile (f"^{ self .static_path } .*" )
5953 self .web_modules_dir = web_modules_dir or REACTPY_WEB_MODULES_DIR .current
6054 self .static_dir = Path (__file__ ).parent .parent / "static"
61- self .backhaul_thread = False # TODO: Add backhaul_thread settings
6255 self .user_app = guarantee_single_callable (app )
63- self .servestatic_static : ServeStaticASGI | None = None
64- self .servestatic_web_modules : ServeStaticASGI | None = None
6556 self .component_dotted_paths = set (root_components )
6657 self .components : dict [str , ComponentType ] = import_components (
6758 self .component_dotted_paths
6859 )
69- self .dispatcher : Future | asyncio .Task | None = None
70- self .recv_queue : asyncio .Queue = asyncio .Queue ()
60+
7161 if self .web_modules_dir != REACTPY_WEB_MODULES_DIR .current :
7262 REACTPY_WEB_MODULES_DIR .set_current (self .web_modules_dir )
7363
74- # Start the backhaul thread if it's not already running
75- if self .backhaul_thread and not _backhaul_thread .is_alive ():
76- _backhaul_thread .start ()
77-
7864 # Validate the arguments
7965 reason = check_path (self .path_prefix )
8066 if reason :
@@ -110,34 +96,26 @@ async def component_dispatch_app(
11096 send : Callable [..., Coroutine ],
11197 ) -> None :
11298 """ASGI app for rendering ReactPy Python components."""
113- ws_connected : bool = False
99+ dispatcher : Future | asyncio .Task | None = None
100+ recv_queue : asyncio .Queue = asyncio .Queue ()
114101
102+ # Start a loop that handles ASGI websocket events
115103 while True :
116- # Future WS events on this connection will always be received here
117104 event = await receive ()
118-
119- if event ["type" ] == "websocket.connect" and not ws_connected :
120- ws_connected = True
105+ if event ["type" ] == "websocket.connect" :
121106 await send ({"type" : "websocket.accept" })
122- run_dispatcher_coro = self .run_dispatcher (scope , receive , send )
123- if self .backhaul_thread :
124- self .dispatcher = asyncio .run_coroutine_threadsafe (
125- run_dispatcher_coro , _backhaul_loop
126- )
127- else :
128- self .dispatcher = asyncio .create_task (run_dispatcher_coro )
107+ dispatcher = asyncio .create_task (
108+ self .run_dispatcher (scope , receive , send , recv_queue )
109+ )
129110
130111 if event ["type" ] == "websocket.disconnect" :
131- if self . dispatcher :
132- self . dispatcher .cancel ()
112+ if dispatcher :
113+ dispatcher .cancel ()
133114 break
134115
135116 if event ["type" ] == "websocket.receive" :
136- queue_put_coro = self .recv_queue .put (orjson .loads (event ["text" ]))
137- if self .backhaul_thread :
138- asyncio .run_coroutine_threadsafe (queue_put_coro , _backhaul_loop )
139- else :
140- await queue_put_coro
117+ queue_put_func = recv_queue .put (orjson .loads (event ["text" ]))
118+ await queue_put_func
141119
142120 async def web_module_app (
143121 self ,
@@ -190,45 +168,50 @@ async def run_dispatcher(
190168 scope : dict [str , Any ],
191169 receive : Callable [..., Coroutine ],
192170 send : Callable [..., Coroutine ],
171+ recv_queue : asyncio .Queue ,
193172 ) -> None :
194173 # Get the component from the URL.
195- url_path = re .match (self .dispatcher_pattern , scope ["path" ])
196- if not url_path :
197- raise RuntimeError ("Could not find component in URL path." )
198- dotted_path = url_path [1 ]
199- module_str , component_str = dotted_path .rsplit ("." , 1 )
200- module = import_module (module_str )
201- component = getattr (module , component_str )
202- parsed_url = urllib .parse .urlparse (scope ["path" ])
203-
204- await serve_layout (
205- Layout ( # type: ignore
206- ConnectionContext (
207- component (),
208- value = Connection (
209- scope = scope ,
210- location = Location (
211- parsed_url .path ,
212- f"?{ parsed_url .query } " if parsed_url .query else "" ,
174+ try :
175+ if not self .single_root_component :
176+ url_match = re .match (self .dispatcher_pattern , scope ["path" ])
177+ if not url_match :
178+ raise RuntimeError ("Could not find component in URL path." )
179+ dotted_path = url_match [1 ]
180+ component = self .components [dotted_path ]
181+ else :
182+ component = self .root_component
183+ parsed_url = urllib .parse .urlparse (scope ["path" ])
184+
185+ await serve_layout (
186+ Layout ( # type: ignore
187+ ConnectionContext (
188+ component ,
189+ value = Connection (
190+ scope = scope ,
191+ location = Location (
192+ parsed_url .path ,
193+ f"?{ parsed_url .query } " if parsed_url .query else "" ,
194+ ),
195+ carrier = {
196+ "scope" : scope ,
197+ "send" : send ,
198+ "receive" : receive ,
199+ },
213200 ),
214- carrier = {
215- "scope" : scope ,
216- "send" : send ,
217- "receive" : receive ,
218- },
219- ),
220- )
221- ),
222- self .send_json_ws (send ),
223- self .recv_queue .get ,
224- )
201+ )
202+ ),
203+ self .send_json_ws (send ),
204+ recv_queue .get ,
205+ )
206+ except Exception as error :
207+ await asyncio .to_thread (_logger .error , f"{ error } \n { traceback .format_exc ()} " )
225208
226209 @staticmethod
227210 def send_json_ws (send : Callable ) -> Callable [..., Coroutine ]:
228211 """Use orjson to send JSON over an ASGI websocket."""
229212
230213 async def _send_json (value : Any ) -> None :
231- await send ({"type" : "websocket.send" , "text" : orjson .dumps (value )})
214+ await send ({"type" : "websocket.send" , "text" : orjson .dumps (value ). decode () })
232215
233216 return _send_json
234217
0 commit comments