|
| 1 | +type ('env, 'state) task = |
| 2 | + { fd : Unix.file_descr |
| 3 | + ; mutable select_on : bool |
| 4 | + ; mutable wake_time : float option |
| 5 | + ; mutable process_read : ('env, 'state) task -> 'env -> 'state -> bool * ('env, 'state) task list * 'state |
| 6 | + ; mutable process_wake : ('env, 'state) task -> 'env -> 'state -> bool * ('env, 'state) task list * 'state |
| 7 | + ; finalize : ('env, 'state) task -> 'env -> 'state -> 'state |
| 8 | + } |
| 9 | + |
| 10 | +let process process_f t hts env state = |
| 11 | + let state = ref state in |
| 12 | + let (finished, ts, state') = process_f t env !state in |
| 13 | + state := state'; |
| 14 | + if finished then begin |
| 15 | + Hashtbl.remove hts t.fd; |
| 16 | + state := t.finalize t env !state |
| 17 | + end; |
| 18 | + List.iter (fun t' -> Hashtbl.add hts t'.fd t') ts; |
| 19 | + !state |
| 20 | + |
| 21 | +let rec eloop default_timeout old_timestamp hts env state = |
| 22 | + let state = ref state in |
| 23 | + let (select_fds, min_timeout) = |
| 24 | + Hashtbl.fold |
| 25 | + (fun fd t (fds, timeout) -> |
| 26 | + let fds' = if t.select_on then fd :: fds else fds in |
| 27 | + let timeout' = |
| 28 | + match t.wake_time with |
| 29 | + | None -> timeout |
| 30 | + | Some wake_time -> min timeout wake_time |
| 31 | + in (fds', timeout')) |
| 32 | + hts ([], default_timeout) in |
| 33 | + let (ready_fds, _, _) = Util.select_unintr select_fds [] [] min_timeout in |
| 34 | + List.iter |
| 35 | + (fun fd -> |
| 36 | + let t = Hashtbl.find hts fd in |
| 37 | + state := process t.process_read t hts env !state) ready_fds; |
| 38 | + let new_timestamp = Unix.gettimeofday () in |
| 39 | + let elapsed_time = new_timestamp -. old_timestamp in |
| 40 | + let wake_tasks = |
| 41 | + Hashtbl.fold |
| 42 | + (fun fd t ts -> |
| 43 | + match t.wake_time with |
| 44 | + | None -> ts |
| 45 | + | Some wake_time -> |
| 46 | + if elapsed_time >= wake_time then |
| 47 | + t :: ts |
| 48 | + else |
| 49 | + (t.wake_time <- Some (wake_time -. elapsed_time); ts)) |
| 50 | + hts [] in |
| 51 | + List.iter (fun t -> state := process t.process_wake t hts env !state) wake_tasks; |
| 52 | + eloop default_timeout new_timestamp hts env !state |
0 commit comments