22import string
33import sys
44from code import InteractiveConsole
5+ from collections import deque
56from contextlib import contextmanager
67from functools import partial
78from importlib import import_module
@@ -138,8 +139,7 @@ def eval_lua(lua_code, immediate=False):
138139 'code' : lua_code ,
139140 'immediate' : immediate ,
140141 })
141- # do not uncomment this, or use sys.__stdout__.write
142- # print('{} → {}'.format(lua_code, repr(result)))
142+ # debug('{} → {}'.format(lua_code, repr(result)))
143143 if not immediate :
144144 result = rproc .coro (result )
145145 return result
@@ -237,6 +237,56 @@ def switch(self, *args, **kwargs):
237237 self ._on_death ()
238238
239239
240+ class CCEventRouter :
241+ def __init__ (self , on_first_sub , on_last_unsub , resume_task ):
242+ self ._stacks = {}
243+ self ._active = {}
244+ self ._on_first_sub = on_first_sub
245+ self ._on_last_unsub = on_last_unsub
246+ self ._resume_task = resume_task
247+
248+ def sub (self , task_id , event ):
249+ if event not in self ._stacks :
250+ self ._stacks [event ] = {}
251+ self ._on_first_sub (event )
252+ se = self ._stacks [event ]
253+ if task_id in se :
254+ raise Exception ('Same task subscribes to the same event twice' )
255+ se [task_id ] = deque ()
256+
257+ def unsub (self , task_id , event ):
258+ if event not in self ._stacks :
259+ return
260+ self ._stacks [event ].pop (task_id , None )
261+ if len (self ._stacks [event ]) == 0 :
262+ self ._on_last_unsub (event )
263+ del self ._stacks [event ]
264+
265+ def on_event (self , event , params ):
266+ if event not in self ._stacks :
267+ self ._on_last_unsub (event )
268+ return
269+ for task_id , queue in self ._stacks [event ].items ():
270+ queue .append (params )
271+ if self ._active .get (task_id ) == event :
272+ self ._set_task_status (task_id , event , False )
273+ self ._resume_task (task_id )
274+
275+ def get_from_stack (self , task_id , event ):
276+ queue = self ._stacks [event ][task_id ]
277+ try :
278+ return queue .popleft ()
279+ except IndexError :
280+ self ._set_task_status (task_id , event , True )
281+ return None
282+
283+ def _set_task_status (self , task_id , event , waits : bool ):
284+ if waits :
285+ self ._active [task_id ] = event
286+ else :
287+ self ._active .pop (task_id , None )
288+
289+
240290class CCSession :
241291 def __init__ (self , computer_id , sender ):
242292 # computer_id is unique identifier of a CCSession
@@ -246,6 +296,11 @@ def __init__(self, computer_id, sender):
246296 self ._greenlets = {}
247297 self ._server_greenlet = get_current_greenlet ()
248298 self ._program_greenlet = None
299+ self ._evr = CCEventRouter (
300+ lambda event : self ._sender ({'action' : 'sub' , 'event' : event }),
301+ lambda event : self ._sender ({'action' : 'unsub' , 'event' : event }),
302+ lambda task_id : self ._greenlets [task_id ].defer_switch ('event' ),
303+ )
249304
250305 def on_task_result (self , task_id , result ):
251306 assert get_current_greenlet () is self ._server_greenlet
@@ -254,6 +309,9 @@ def on_task_result(self, task_id, result):
254309 return
255310 self ._greenlets [task_id ].switch (result )
256311
312+ def on_event (self , event , params ):
313+ self ._evr .on_event (event , params )
314+
257315 def create_task_id (self ):
258316 return next (self ._tid_allocator )
259317
0 commit comments