diff --git a/src/core/fifo_pool.ml b/src/core/fifo_pool.ml index 38a07d3d..3e70e06b 100644 --- a/src/core/fifo_pool.ml +++ b/src/core/fifo_pool.ml @@ -28,7 +28,6 @@ type worker_state = { let[@inline] size_ (self : state) = Array.length self.threads let[@inline] num_tasks_ (self : state) : int = Bb_queue.size self.q -let k_worker_state : worker_state TLS.t = TLS.create () (* get_thread_state = TLS.get_opt k_worker_state @@ -71,12 +70,6 @@ let schedule_w (self : worker_state) (task : task_full) : unit = let get_next_task (self : worker_state) = try Bb_queue.pop self.st.q with Bb_queue.Closed -> raise WL.No_more_tasks -let get_thread_state () = - match TLS.get_exn k_worker_state with - | st -> st - | exception TLS.Not_set -> - failwith "Moonpool: get_thread_state called from outside a runner." - let before_start (self : worker_state) = let t_id = Thread.id @@ Thread.self () in self.st.on_init_thread ~dom_id:self.dom_idx ~t_id (); @@ -103,7 +96,6 @@ let worker_ops : worker_state WL.ops = WL.schedule = schedule_w; runner; get_next_task; - get_thread_state; around_task; on_exn; before_start; diff --git a/src/core/worker_loop_.ml b/src/core/worker_loop_.ml index bd2cd5ca..6ec11535 100644 --- a/src/core/worker_loop_.ml +++ b/src/core/worker_loop_.ml @@ -21,8 +21,6 @@ exception No_more_tasks type 'st ops = { schedule: 'st -> task_full -> unit; get_next_task: 'st -> task_full; (** @raise No_more_tasks *) - get_thread_state: unit -> 'st; - (** Access current thread's worker state from any worker *) around_task: 'st -> around_task; on_exn: 'st -> Exn_bt.t -> unit; runner: 'st -> Runner.t; @@ -41,8 +39,8 @@ let[@inline] raise_with_bt exn = let bt = Printexc.get_raw_backtrace () in Printexc.raise_with_backtrace exn bt -let with_handler (type st arg) ~(ops : st ops) (self : st) : - (unit -> unit) -> unit = +let with_handler (type st) ~(ops : st ops) (self : st) : (unit -> unit) -> unit + = let current = Some (fun k -> @@ -85,8 +83,8 @@ let with_handler (type st arg) ~(ops : st ops) (self : st) : let fiber = get_current_fiber_exn () in (* when triggers is signaled, reschedule task *) if not (Picos.Fiber.try_suspend fiber trigger fiber k reschedule) then - (* trigger was already signaled, run task now *) - Picos.Fiber.resume fiber k) + (* trigger was already signaled, reschedule task now *) + reschedule trigger fiber k) | Picos.Computation.Cancel_after _r -> Some (fun k -> @@ -98,31 +96,28 @@ let with_handler (type st arg) ~(ops : st ops) (self : st) : let handler = Effect.Deep.{ retc = Fun.id; exnc = raise_with_bt; effc } in fun f -> Effect.Deep.match_with f () handler -let worker_loop (type st) ~block_signals ~(ops : st ops) (self : st) : unit = - if block_signals then ( - try - ignore - (Unix.sigprocmask SIG_BLOCK - [ - Sys.sigterm; - Sys.sigpipe; - Sys.sigint; - Sys.sigchld; - Sys.sigalrm; - Sys.sigusr1; - Sys.sigusr2; - ] - : _ list) - with _ -> () - ); +module type FINE_GRAINED_ARGS = sig + type st - let cur_fiber : fiber ref = ref _dummy_fiber in - let runner = ops.runner self in - TLS.set Runner.For_runner_implementors.k_cur_runner runner; + val ops : st ops + val st : st +end - let (AT_pair (before_task, after_task)) = ops.around_task self in +module Fine_grained (Args : FINE_GRAINED_ARGS) () = struct + open Args + + let cur_fiber : fiber ref = ref _dummy_fiber + let runner = ops.runner st + + type state = + | New + | Ready + | Torn_down + + let state = ref New let run_task (task : task_full) : unit = + let (AT_pair (before_task, after_task)) = ops.around_task st in let fiber = match task with | T_start { fiber; _ } | T_resume { fiber; _ } -> fiber @@ -136,32 +131,82 @@ let worker_loop (type st) ~block_signals ~(ops : st ops) (self : st) : unit = assert (task != _dummy_task); (try match task with - | T_start { fiber = _; f } -> with_handler ~ops self f + | T_start { fiber = _; f } -> with_handler ~ops st f | T_resume { fiber = _; k } -> (* this is already in an effect handler *) k () with e -> let bt = Printexc.get_raw_backtrace () in let ebt = Exn_bt.make e bt in - ops.on_exn self ebt); + ops.on_exn st ebt); after_task runner _ctx; cur_fiber := _dummy_fiber; TLS.set k_cur_fiber _dummy_fiber - in - ops.before_start self; + let setup ~block_signals () : unit = + if !state <> New then invalid_arg "worker_loop.setup: not a new instance"; + state := Ready; - let continue = ref true in - try - while !continue do - match ops.get_next_task self with - | task -> run_task task + if block_signals then ( + try + ignore + (Unix.sigprocmask SIG_BLOCK + [ + Sys.sigterm; + Sys.sigpipe; + Sys.sigint; + Sys.sigchld; + Sys.sigalrm; + Sys.sigusr1; + Sys.sigusr2; + ] + : _ list) + with _ -> () + ); + + TLS.set Runner.For_runner_implementors.k_cur_runner runner; + + ops.before_start st + + let run ?(max_tasks = max_int) () : unit = + if !state <> Ready then invalid_arg "worker_loop.run: not setup"; + + let continue = ref true in + let n_tasks = ref 0 in + while !continue && !n_tasks < max_tasks do + match ops.get_next_task st with + | task -> + incr n_tasks; + run_task task | exception No_more_tasks -> continue := false - done; - ops.cleanup self + done + + let teardown () = + if !state <> Torn_down then ( + state := Torn_down; + cur_fiber := _dummy_fiber; + ops.cleanup st + ) +end + +let worker_loop (type st) ~block_signals ~(ops : st ops) (self : st) : unit = + let module FG = + Fine_grained + (struct + type nonrec st = st + + let ops = ops + let st = self + end) + () + in + FG.setup ~block_signals (); + try + FG.run (); + FG.teardown () with exn -> let bt = Printexc.get_raw_backtrace () in - ops.cleanup self; + FG.teardown (); Printexc.raise_with_backtrace exn bt diff --git a/src/core/worker_loop_.mli b/src/core/worker_loop_.mli index 7098deb8..3041b0dd 100644 --- a/src/core/worker_loop_.mli +++ b/src/core/worker_loop_.mli @@ -26,7 +26,6 @@ exception No_more_tasks type 'st ops = { schedule: 'st -> task_full -> unit; get_next_task: 'st -> task_full; - get_thread_state: unit -> 'st; around_task: 'st -> around_task; on_exn: 'st -> Exn_bt.t -> unit; runner: 'st -> Runner.t; @@ -34,4 +33,23 @@ type 'st ops = { cleanup: 'st -> unit; } +module type FINE_GRAINED_ARGS = sig + type st + + val ops : st ops + val st : st +end + +module Fine_grained (_ : FINE_GRAINED_ARGS) () : sig + val setup : block_signals:bool -> unit -> unit + (** Just initialize the loop *) + + val run : ?max_tasks:int -> unit -> unit + (** Run the loop until no task remains or until [max_tasks] tasks have been + run *) + + val teardown : unit -> unit + (** Tear down the loop *) +end + val worker_loop : block_signals:bool -> ops:'st ops -> 'st -> unit diff --git a/src/core/ws_pool.ml b/src/core/ws_pool.ml index 1b95cd16..153f4f06 100644 --- a/src/core/ws_pool.ml +++ b/src/core/ws_pool.ml @@ -62,12 +62,6 @@ let k_worker_state : worker_state TLS.t = TLS.create () let[@inline] get_current_worker_ () : worker_state option = TLS.get_opt k_worker_state -let[@inline] get_current_worker_exn () : worker_state = - match TLS.get_exn k_worker_state with - | w -> w - | exception TLS.Not_set -> - failwith "Moonpool: get_current_runner was called from outside a pool." - (** Try to wake up a waiter, if there's any. *) let[@inline] try_wake_someone_ (self : state) : unit = if self.n_waiting_nonzero then ( @@ -212,7 +206,6 @@ let worker_ops : worker_state WL.ops = WL.schedule = schedule_from_w; runner; get_next_task; - get_thread_state = get_current_worker_exn; around_task; on_exn; before_start; diff --git a/src/lwt/IO.ml b/src/lwt/IO.ml deleted file mode 100644 index 6ae09506..00000000 --- a/src/lwt/IO.ml +++ /dev/null @@ -1,66 +0,0 @@ -open Base - -let await_readable fd : unit = - let trigger = Trigger.create () in - Perform_action_in_lwt.schedule - @@ Action.Wait_readable - ( fd, - fun cancel -> - Trigger.signal trigger; - Lwt_engine.stop_event cancel ); - Trigger.await_exn trigger - -let rec read fd buf i len : int = - if len = 0 then - 0 - else ( - match Unix.read fd buf i len with - | exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> - await_readable fd; - read fd buf i len - | n -> n - ) - -let await_writable fd = - let trigger = Trigger.create () in - Perform_action_in_lwt.schedule - @@ Action.Wait_writable - ( fd, - fun cancel -> - Trigger.signal trigger; - Lwt_engine.stop_event cancel ); - Trigger.await_exn trigger - -let rec write_once fd buf i len : int = - if len = 0 then - 0 - else ( - match Unix.write fd buf i len with - | exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> - await_writable fd; - write_once fd buf i len - | n -> n - ) - -let write fd buf i len : unit = - let i = ref i in - let len = ref len in - while !len > 0 do - let n = write_once fd buf !i !len in - i := !i + n; - len := !len - n - done - -(** Sleep for the given amount of seconds *) -let sleep_s (f : float) : unit = - if f > 0. then ( - let trigger = Trigger.create () in - Perform_action_in_lwt.schedule - @@ Action.Sleep - ( f, - false, - fun cancel -> - Trigger.signal trigger; - Lwt_engine.stop_event cancel ); - Trigger.await_exn trigger - ) diff --git a/src/lwt/IO_in.ml b/src/lwt/IO_in.ml deleted file mode 100644 index b5ad110b..00000000 --- a/src/lwt/IO_in.ml +++ /dev/null @@ -1,152 +0,0 @@ -open Common_ - -class type t = object - method input : bytes -> int -> int -> int - (** Read into the slice. Returns [0] only if the stream is closed. *) - - method close : unit -> unit - (** Close the input. Must be idempotent. *) -end - -let create ?(close = ignore) ~input () : t = - object - method close = close - method input = input - end - -let empty : t = - object - method close () = () - method input _ _ _ = 0 - end - -let of_bytes ?(off = 0) ?len (b : bytes) : t = - (* i: current position in [b] *) - let i = ref off in - - let len = - match len with - | Some n -> - if n > Bytes.length b - off then invalid_arg "Iostream.In.of_bytes"; - n - | None -> Bytes.length b - off - in - let end_ = off + len in - - object - method input b_out i_out len_out = - let n = min (end_ - !i) len_out in - Bytes.blit b !i b_out i_out n; - i := !i + n; - n - - method close () = i := end_ - end - -let of_string ?off ?len s : t = of_bytes ?off ?len (Bytes.unsafe_of_string s) - -(** Read into the given slice. - @return the number of bytes read, [0] means end of input. *) -let[@inline] input (self : #t) buf i len = self#input buf i len - -(** Close the channel. *) -let[@inline] close self : unit = self#close () - -let rec really_input (self : #t) buf i len = - if len > 0 then ( - let n = input self buf i len in - if n = 0 then raise End_of_file; - (really_input [@tailrec]) self buf (i + n) (len - n) - ) - -let really_input_string self n : string = - let buf = Bytes.create n in - really_input self buf 0 n; - Bytes.unsafe_to_string buf - -let copy_into ?(buf = Bytes.create _default_buf_size) (ic : #t) (oc : IO_out.t) - : unit = - let continue = ref true in - while !continue do - let len = input ic buf 0 (Bytes.length buf) in - if len = 0 then - continue := false - else - IO_out.output oc buf 0 len - done - -let concat (l0 : t list) : t = - let l = ref l0 in - let rec input b i len : int = - match !l with - | [] -> 0 - | ic :: tl -> - let n = ic#input b i len in - if n > 0 then - n - else ( - l := tl; - input b i len - ) - in - let close () = List.iter close l0 in - create ~close ~input () - -let input_all ?(buf = Bytes.create 128) (self : #t) : string = - let buf = ref buf in - let i = ref 0 in - - let[@inline] full_ () = !i = Bytes.length !buf in - - let grow_ () = - let old_size = Bytes.length !buf in - let new_size = min Sys.max_string_length (old_size + (old_size / 4) + 10) in - if old_size = new_size then - failwith "input_all: maximum input size exceeded"; - let new_buf = Bytes.extend !buf 0 (new_size - old_size) in - buf := new_buf - in - - let rec loop () = - if full_ () then grow_ (); - let available = Bytes.length !buf - !i in - let n = input self !buf !i available in - if n > 0 then ( - i := !i + n; - (loop [@tailrec]) () - ) - in - loop (); - - if full_ () then - Bytes.unsafe_to_string !buf - else - Bytes.sub_string !buf 0 !i - -let of_unix_fd ?(close_noerr = false) ?(buf = Bytes.create _default_buf_size) - (fd : Unix.file_descr) : t = - let buf_len = ref 0 in - let buf_off = ref 0 in - - let refill () = - buf_off := 0; - buf_len := IO.read fd buf 0 (Bytes.length buf) - in - - object - method input b i len : int = - if !buf_len = 0 then refill (); - let n = min len !buf_len in - if n > 0 then ( - Bytes.blit buf !buf_off b i n; - buf_off := !buf_off + n; - buf_len := !buf_len - n - ); - n - - method close () = - if close_noerr then ( - try Unix.close fd with _ -> () - ) else - Unix.close fd - end diff --git a/src/lwt/IO_out.ml b/src/lwt/IO_out.ml deleted file mode 100644 index 522d3e0a..00000000 --- a/src/lwt/IO_out.ml +++ /dev/null @@ -1,118 +0,0 @@ -open Common_ - -class type t = object - method output_char : char -> unit - method output : bytes -> int -> int -> unit - method flush : unit -> unit - method close : unit -> unit -end - -let create ?(flush = ignore) ?(close = ignore) ~output_char ~output () : t = - object - method flush () = flush () - method close () = close () - method output_char c = output_char c - method output bs i len = output bs i len - end - -let dummy : t = - object - method flush () = () - method close () = () - method output_char _ = () - method output _ _ _ = () - end - -let of_unix_fd ?(close_noerr = false) ?(buf = Bytes.create _default_buf_size) fd - : t = - let buf_off = ref 0 in - - let[@inline] is_full () = !buf_off = Bytes.length buf in - - let flush () = - if !buf_off > 0 then ( - IO.write fd buf 0 !buf_off; - buf_off := 0 - ) - in - - object - method output_char c = - if is_full () then flush (); - Bytes.set buf !buf_off c; - incr buf_off - - method output bs i len : unit = - let i = ref i in - let len = ref len in - - while !len > 0 do - (* make space *) - if is_full () then flush (); - - let n = min !len (Bytes.length buf - !buf_off) in - Bytes.blit bs !i buf !buf_off n; - buf_off := !buf_off + n; - i := !i + n; - len := !len - n - done; - (* if full, write eagerly *) - if is_full () then flush () - - method close () = - if close_noerr then ( - try - flush (); - Unix.close fd - with _ -> () - ) else ( - flush (); - Unix.close fd - ) - - method flush = flush - end - -let of_buffer (buf : Buffer.t) : t = - object - method close () = () - method flush () = () - method output_char c = Buffer.add_char buf c - method output bs i len = Buffer.add_subbytes buf bs i len - end - -(** Output the buffer slice into this channel *) -let[@inline] output_char (self : #t) c : unit = self#output_char c - -(** Output the buffer slice into this channel *) -let[@inline] output (self : #t) buf i len : unit = self#output buf i len - -let[@inline] output_string (self : #t) (str : string) : unit = - self#output (Bytes.unsafe_of_string str) 0 (String.length str) - -let output_line (self : #t) (str : string) : unit = - output_string self str; - output_char self '\n' - -(** Close the channel. *) -let[@inline] close self : unit = self#close () - -(** Flush (ie. force write) any buffered bytes. *) -let[@inline] flush self : unit = self#flush () - -let output_int self i = - let s = string_of_int i in - output_string self s - -let output_lines self seq = Seq.iter (output_line self) seq - -let tee (l : t list) : t = - match l with - | [] -> dummy - | [ oc ] -> oc - | _ -> - let output bs i len = List.iter (fun oc -> output oc bs i len) l in - let output_char c = List.iter (fun oc -> output_char oc c) l in - let close () = List.iter close l in - let flush () = List.iter flush l in - create ~flush ~close ~output ~output_char () diff --git a/src/lwt/base.ml b/src/lwt/base.ml deleted file mode 100644 index 16124b8f..00000000 --- a/src/lwt/base.ml +++ /dev/null @@ -1,167 +0,0 @@ -open Common_ -module Trigger = M.Trigger -module Fiber = Moonpool_fib.Fiber -module FLS = Moonpool_fib.Fls - -(** Action scheduled from outside the loop *) -module Action = struct - type event = Lwt_engine.event - type cb = event -> unit - - (** Action that we ask the lwt loop to perform, from the outside *) - type t = - | Wait_readable of Unix.file_descr * cb - | Wait_writable of Unix.file_descr * cb - | Sleep of float * bool * cb - (* TODO: provide actions with cancellation, alongside a "select" operation *) - (* | Cancel of event *) - | On_termination : 'a Lwt.t * 'a Exn_bt.result ref * Trigger.t -> t - | Wakeup : 'a Lwt.u * 'a -> t - | Wakeup_exn : _ Lwt.u * exn -> t - | Other of (unit -> unit) - - (** Perform the action from within the Lwt thread *) - let perform (self : t) : unit = - match self with - | Wait_readable (fd, cb) -> ignore (Lwt_engine.on_readable fd cb : event) - | Wait_writable (fd, cb) -> ignore (Lwt_engine.on_writable fd cb : event) - | Sleep (f, repeat, cb) -> ignore (Lwt_engine.on_timer f repeat cb : event) - (* | Cancel ev -> Lwt_engine.stop_event ev *) - | On_termination (fut, res, trigger) -> - Lwt.on_any fut - (fun x -> - res := Ok x; - Trigger.signal trigger) - (fun exn -> - res := Error (Exn_bt.get_callstack 10 exn); - Trigger.signal trigger) - | Wakeup (prom, x) -> Lwt.wakeup prom x - | Wakeup_exn (prom, e) -> Lwt.wakeup_exn prom e - | Other f -> f () -end - -module Action_queue = struct - type t = { q: Action.t list Atomic.t } [@@unboxed] - - let create () : t = { q = Atomic.make [] } - let pop_all (self : t) : _ list = Atomic.exchange self.q [] - - (** Push the action and return whether the queue was previously empty *) - let push (self : t) (a : Action.t) : bool = - let is_first = ref true in - while - let old = Atomic.get self.q in - if Atomic.compare_and_set self.q old (a :: old) then ( - is_first := old == []; - false - ) else - true - do - () - done; - !is_first -end - -module Perform_action_in_lwt = struct - open struct - let actions_ : Action_queue.t = Action_queue.create () - - (** Gets the current set of notifications and perform them from inside the - Lwt thread *) - let perform_pending_actions () : unit = - let@ _sp = - Moonpool.Private.Tracing_.with_span - "moonpool-lwt.perform-pending-actions" - in - - let l = Action_queue.pop_all actions_ in - List.iter Action.perform l - - let notification : int = - Lwt_unix.make_notification ~once:false perform_pending_actions - end - - let schedule (a : Action.t) : unit = - let is_first = Action_queue.push actions_ a in - if is_first then Lwt_unix.send_notification notification -end - -let get_runner () : M.Runner.t = - match M.Runner.get_current_runner () with - | Some r -> r - | None -> failwith "Moonpool_lwt.get_runner: not inside a runner" - -let lwt_of_fut (fut : 'a M.Fut.t) : 'a Lwt.t = - let lwt_fut, lwt_prom = Lwt.wait () in - M.Fut.on_result fut (function - | Ok x -> Perform_action_in_lwt.schedule @@ Action.Wakeup (lwt_prom, x) - | Error ebt -> - let exn = Exn_bt.exn ebt in - Perform_action_in_lwt.schedule @@ Action.Wakeup_exn (lwt_prom, exn)); - lwt_fut - -let fut_of_lwt (lwt_fut : _ Lwt.t) : _ M.Fut.t = - match Lwt.poll lwt_fut with - | Some x -> M.Fut.return x - | None -> - let fut, prom = M.Fut.make () in - Lwt.on_any lwt_fut - (fun x -> M.Fut.fulfill prom (Ok x)) - (fun exn -> - let bt = Printexc.get_callstack 10 in - M.Fut.fulfill prom (Error (Exn_bt.make exn bt))); - fut - -let _dummy_exn_bt : Exn_bt.t = - Exn_bt.get_callstack 0 (Failure "dummy Exn_bt from moonpool-lwt") - -let await_lwt (fut : _ Lwt.t) = - match Lwt.poll fut with - | Some x -> x - | None -> - (* suspend fiber, wake it up when [fut] resolves *) - let trigger = M.Trigger.create () in - let res = ref (Error _dummy_exn_bt) in - Perform_action_in_lwt.(schedule Action.(On_termination (fut, res, trigger))); - Trigger.await trigger |> Option.iter Exn_bt.raise; - Exn_bt.unwrap !res - -let run_in_lwt f : _ M.Fut.t = - let fut, prom = M.Fut.make () in - Perform_action_in_lwt.schedule - (Action.Other - (fun () -> - let lwt_fut = f () in - Lwt.on_any lwt_fut - (fun x -> M.Fut.fulfill prom @@ Ok x) - (fun exn -> M.Fut.fulfill prom @@ Error (Exn_bt.get exn)))); - fut - -let run_in_lwt_and_await f = M.Fut.await @@ run_in_lwt f - -let detach_in_runner ~runner f : _ Lwt.t = - let fut, promise = Lwt.wait () in - M.Runner.run_async runner (fun () -> - match f () with - | x -> Perform_action_in_lwt.schedule @@ Action.Wakeup (promise, x) - | exception exn -> - Perform_action_in_lwt.schedule @@ Action.Wakeup_exn (promise, exn)); - fut - -let main_with_runner ~runner (f : unit -> 'a) : 'a = - let lwt_fut, lwt_prom = Lwt.wait () in - - let _fiber = - Fiber.spawn_top ~on:runner (fun () -> - try - let x = f () in - Perform_action_in_lwt.schedule (Action.Wakeup (lwt_prom, x)) - with exn -> - Perform_action_in_lwt.schedule (Action.Wakeup_exn (lwt_prom, exn))) - in - - Lwt_main.run lwt_fut - -let main f = - let@ runner = M.Ws_pool.with_ () in - main_with_runner ~runner f diff --git a/src/lwt/common_.ml b/src/lwt/common_.ml deleted file mode 100644 index d1fdf6d2..00000000 --- a/src/lwt/common_.ml +++ /dev/null @@ -1,5 +0,0 @@ -module M = Moonpool -module Exn_bt = M.Exn_bt - -let ( let@ ) = ( @@ ) -let _default_buf_size = 4 * 1024 diff --git a/src/lwt/dune b/src/lwt/dune index b03d03d6..93c86e61 100644 --- a/src/lwt/dune +++ b/src/lwt/dune @@ -1,12 +1,11 @@ (library (name moonpool_lwt) (public_name moonpool-lwt) - (private_modules common_) (enabled_if (>= %{ocaml_version} 5.0)) (libraries (re_export moonpool) - (re_export moonpool.fib) + moonpool.fib picos (re_export lwt) lwt.unix)) diff --git a/src/lwt/moonpool_lwt.ml b/src/lwt/moonpool_lwt.ml index 1d92ddab..89040d2e 100644 --- a/src/lwt/moonpool_lwt.ml +++ b/src/lwt/moonpool_lwt.ml @@ -1,6 +1,322 @@ -include Base -module IO = IO -module IO_out = IO_out -module IO_in = IO_in -module TCP_server = Tcp_server -module TCP_client = Tcp_client +module Exn_bt = Moonpool.Exn_bt + +open struct + module WL = Moonpool.Private.Worker_loop_ + module M = Moonpool +end + +module Fut = Moonpool.Fut + +let default_around_task_ : WL.around_task = AT_pair (ignore, fun _ _ -> ()) + +let on_uncaught_exn : (Moonpool.Exn_bt.t -> unit) ref = + ref (fun ebt -> + Printf.eprintf "uncaught exception in moonpool-lwt:\n%s" (Exn_bt.show ebt)) + +module Scheduler_state = struct + type st = { + tasks: WL.task_full Queue.t; + actions_from_other_threads: (unit -> unit) Queue.t; + (** Other threads ask us to run closures in the lwt thread *) + mutex: Mutex.t; + mutable thread: int; + closed: bool Atomic.t; + cleanup_done: bool Atomic.t; + mutable as_runner: Moonpool.Runner.t; + mutable enter_hook: Lwt_main.Enter_iter_hooks.hook option; + mutable leave_hook: Lwt_main.Leave_iter_hooks.hook option; + mutable notification: int; + (** A lwt_unix notification to wake up the event loop *) + has_notified: bool Atomic.t; + } + + (** Main state *) + let cur_st : st option Atomic.t = Atomic.make None + + let create_new () : st = + { + tasks = Queue.create (); + actions_from_other_threads = Queue.create (); + mutex = Mutex.create (); + thread = Thread.id (Thread.self ()); + closed = Atomic.make false; + cleanup_done = Atomic.make false; + as_runner = Moonpool.Runner.dummy; + enter_hook = None; + leave_hook = None; + notification = 0; + has_notified = Atomic.make false; + } + + let[@inline] notify_ (self : st) : unit = + if not (Atomic.exchange self.has_notified true) then + Lwt_unix.send_notification self.notification + + let[@inline never] add_action_from_another_thread_ (self : st) f : unit = + Mutex.lock self.mutex; + Queue.push f self.actions_from_other_threads; + Mutex.unlock self.mutex; + notify_ self + + let[@inline] on_lwt_thread_ (self : st) : bool = + Thread.id (Thread.self ()) = self.thread + + let[@inline] run_on_lwt_thread_ (self : st) (f : unit -> unit) : unit = + if on_lwt_thread_ self then + f () + else + add_action_from_another_thread_ self f + + let cleanup (st : st) = + match Atomic.get cur_st with + | Some st' -> + if st != st' then + failwith + "moonpool-lwt: cleanup failed (state is not the currently active \ + one!)"; + if not (on_lwt_thread_ st) then + failwith "moonpool-lwt: cleanup from the wrong thread"; + Atomic.set st.closed true; + if not (Atomic.exchange st.cleanup_done true) then ( + Option.iter Lwt_main.Enter_iter_hooks.remove st.enter_hook; + Option.iter Lwt_main.Leave_iter_hooks.remove st.leave_hook; + Lwt_unix.stop_notification st.notification + ); + + Atomic.set cur_st None + | None -> failwith "moonpool-lwt: cleanup failed (no current active state)" +end + +module Ops = struct + type st = Scheduler_state.st + + let around_task _ = default_around_task_ + + let schedule (self : st) t = + if Atomic.get self.closed then + failwith "moonpool-lwt.schedule: scheduler is closed"; + Scheduler_state.run_on_lwt_thread_ self (fun () -> Queue.push t self.tasks) + + let get_next_task (self : st) = + if Atomic.get self.closed then raise WL.No_more_tasks; + try Queue.pop self.tasks with Queue.Empty -> raise WL.No_more_tasks + + let on_exn _ ebt = !on_uncaught_exn ebt + let runner (self : st) = self.as_runner + let cleanup = Scheduler_state.cleanup + + let as_runner (self : st) : Moonpool.Runner.t = + Moonpool.Runner.For_runner_implementors.create + ~size:(fun () -> 1) + ~num_tasks:(fun () -> + Mutex.lock self.mutex; + let n = Queue.length self.tasks in + Mutex.unlock self.mutex; + n) + ~run_async:(fun ~fiber f -> schedule self @@ WL.T_start { fiber; f }) + ~shutdown:(fun ~wait:_ () -> Atomic.set self.closed true) + () + + let before_start (self : st) : unit = + self.as_runner <- as_runner self; + () + + let ops : st WL.ops = + { + schedule; + around_task; + get_next_task; + on_exn; + runner; + before_start; + cleanup; + } + + let setup st = + if Atomic.compare_and_set Scheduler_state.cur_st None (Some st) then + before_start st + else + failwith "moonpool-lwt: setup failed (state already in place)" +end + +(** Resolve [prom] with the result of [lwt_fut] *) +let transfer_lwt_to_fut (lwt_fut : 'a Lwt.t) (prom : 'a Fut.promise) : unit = + Lwt.on_any lwt_fut + (fun x -> M.Fut.fulfill prom (Ok x)) + (fun exn -> + let bt = Printexc.get_callstack 10 in + M.Fut.fulfill prom (Error (Exn_bt.make exn bt))) + +let[@inline] register_trigger_on_lwt_termination (lwt_fut : _ Lwt.t) + (tr : M.Trigger.t) : unit = + Lwt.on_termination lwt_fut (fun _ -> M.Trigger.signal tr) + +let[@inline] await_lwt_terminated (fut : _ Lwt.t) = + match Lwt.state fut with + | Return x -> x + | Fail exn -> raise exn + | Sleep -> assert false + +module Main_state = struct + let[@inline] get_st () : Scheduler_state.st = + match Atomic.get Scheduler_state.cur_st with + | Some st -> + if Atomic.get st.closed then failwith "moonpool-lwt: scheduler is closed"; + st + | None -> failwith "moonpool-lwt: scheduler is not setup" + + let[@inline] run_on_lwt_thread f = + Scheduler_state.run_on_lwt_thread_ (get_st ()) f + + let[@inline] on_lwt_thread () : bool = + Scheduler_state.on_lwt_thread_ (get_st ()) + + let[@inline] add_action_from_another_thread f : unit = + Scheduler_state.add_action_from_another_thread_ (get_st ()) f +end + +let await_lwt_from_another_thread fut = + let tr = M.Trigger.create () in + Main_state.add_action_from_another_thread (fun () -> + register_trigger_on_lwt_termination fut tr); + M.Trigger.await_exn tr; + await_lwt_terminated fut + +let await_lwt (fut : _ Lwt.t) = + if Scheduler_state.on_lwt_thread_ (Main_state.get_st ()) then ( + (* can directly access the future *) + match Lwt.state fut with + | Return x -> x + | Fail exn -> raise exn + | Sleep -> + let tr = M.Trigger.create () in + register_trigger_on_lwt_termination fut tr; + M.Trigger.await_exn tr; + await_lwt_terminated fut + ) else + await_lwt_from_another_thread fut + +let lwt_of_fut (fut : 'a M.Fut.t) : 'a Lwt.t = + if not (Main_state.on_lwt_thread ()) then + failwith "lwt_of_fut: not on the lwt thread"; + let lwt_fut, lwt_prom = Lwt.wait () in + + (* in lwt thread, resolve [lwt_fut] *) + let wakeup_using_res = function + | Ok x -> Lwt.wakeup lwt_prom x + | Error ebt -> + let exn = Exn_bt.exn ebt in + Lwt.wakeup_exn lwt_prom exn + in + + M.Fut.on_result fut (fun res -> + Main_state.run_on_lwt_thread (fun () -> + (* can safely wakeup from the lwt thread *) + wakeup_using_res res)); + + lwt_fut + +let fut_of_lwt (lwt_fut : _ Lwt.t) : _ M.Fut.t = + if Main_state.on_lwt_thread () then ( + match Lwt.state lwt_fut with + | Return x -> M.Fut.return x + | _ -> + let fut, prom = M.Fut.make () in + transfer_lwt_to_fut lwt_fut prom; + fut + ) else ( + let fut, prom = M.Fut.make () in + Main_state.add_action_from_another_thread (fun () -> + transfer_lwt_to_fut lwt_fut prom); + fut + ) + +module Setup_lwt_hooks (ARG : sig + val st : Scheduler_state.st +end) = +struct + open ARG + + module FG = + WL.Fine_grained + (struct + include Scheduler_state + + let st = st + let ops = Ops.ops + end) + () + + let run_in_hook () = + (* execute actions sent from other threads; first transfer them + all atomically to a local queue to reduce contention *) + let local_acts = Queue.create () in + Mutex.lock st.mutex; + Queue.transfer st.actions_from_other_threads local_acts; + Atomic.set st.has_notified false; + Mutex.unlock st.mutex; + + Queue.iter (fun f -> f ()) local_acts; + + (* run tasks *) + FG.run ~max_tasks:1000 (); + + if not (Queue.is_empty st.tasks) then ignore (Lwt.pause () : unit Lwt.t); + () + + let setup () = + (* only one thread does this *) + FG.setup ~block_signals:false (); + + st.thread <- Thread.self () |> Thread.id; + st.enter_hook <- Some (Lwt_main.Enter_iter_hooks.add_last run_in_hook); + st.leave_hook <- Some (Lwt_main.Leave_iter_hooks.add_last run_in_hook); + (* notification used to wake lwt up *) + st.notification <- Lwt_unix.make_notification ~once:false run_in_hook +end + +let setup () : Scheduler_state.st = + let st = Scheduler_state.create_new () in + Ops.setup st; + let module Setup_lwt_hooks' = Setup_lwt_hooks (struct + let st = st + end) in + Setup_lwt_hooks'.setup (); + st + +let[@inline] is_setup () = Option.is_some @@ Atomic.get Scheduler_state.cur_st + +let spawn_lwt f : _ Lwt.t = + let st = Main_state.get_st () in + let lwt_fut, lwt_prom = Lwt.wait () in + Moonpool_fib.spawn_top_ignore ~on:st.as_runner (fun () -> + try + let x = f () in + Lwt.wakeup lwt_prom x + with exn -> Lwt.wakeup_exn lwt_prom exn); + lwt_fut + +let spawn_lwt_ignore f = ignore (spawn_lwt f : unit Lwt.t) +let on_lwt_thread = Main_state.on_lwt_thread + +let run_in_lwt_and_await (f : unit -> 'a) : 'a = + let st = Main_state.get_st () in + if Scheduler_state.on_lwt_thread_ st then + (* run immediately *) + f () + else + await_lwt_from_another_thread @@ spawn_lwt f + +let lwt_main (f : _ -> 'a) : 'a = + let st = setup () in + (* make sure to cleanup *) + let finally () = Scheduler_state.cleanup st in + Fun.protect ~finally @@ fun () -> + let fut = spawn_lwt (fun () -> f st.as_runner) in + (* make sure the scheduler isn't already sleeping *) + Scheduler_state.notify_ st; + Lwt_main.run fut + +let[@inline] lwt_main_runner () = + let st = Main_state.get_st () in + st.as_runner diff --git a/src/lwt/moonpool_lwt.mli b/src/lwt/moonpool_lwt.mli index d933a73f..844ecdee 100644 --- a/src/lwt/moonpool_lwt.mli +++ b/src/lwt/moonpool_lwt.mli @@ -7,8 +7,7 @@ @since 0.6 *) -module Fiber = Moonpool_fib.Fiber -module FLS = Moonpool_fib.Fls +module Fut = Moonpool.Fut (** {2 Basic conversions} *) @@ -19,126 +18,46 @@ val fut_of_lwt : 'a Lwt.t -> 'a Moonpool.Fut.t val lwt_of_fut : 'a Moonpool.Fut.t -> 'a Lwt.t (** [lwt_of_fut fut] makes a lwt future that completes when [fut] does. This must be called from the Lwt thread, and the result must always be used only - from inside the Lwt thread. *) + from inside the Lwt thread. + @raise Failure if not run from the lwt thread. *) (** {2 Helpers on the moonpool side} *) +val spawn_lwt : (unit -> 'a) -> 'a Lwt.t +(** This spawns a task that runs in the Lwt scheduler. + @raise Failure if {!lwt_main} was not called. *) + +val spawn_lwt_ignore : (unit -> unit) -> unit +(** Like {!spawn_lwt} but ignores the result, like [Lwt.async]. *) + val await_lwt : 'a Lwt.t -> 'a (** [await_lwt fut] awaits a Lwt future from inside a task running on a moonpool runner. This must be run from within a Moonpool runner so that the await-ing effect is handled. *) -val run_in_lwt : (unit -> 'a Lwt.t) -> 'a Moonpool.Fut.t -(** [run_in_lwt f] runs [f()] from within the Lwt thread and returns a - thread-safe future. This can be run from anywhere. *) - -val run_in_lwt_and_await : (unit -> 'a Lwt.t) -> 'a -(** [run_in_lwt_and_await f] runs [f] in the Lwt thread, and awaits its result. - Must be run from inside a moonpool runner so that the await-in effect is - handled. - - This is similar to [Moonpool.await @@ run_in_lwt f]. *) - -val get_runner : unit -> Moonpool.Runner.t -(** Returns the runner from within which this is called. Must be run from within - a fiber. - @raise Failure if not run within a fiber *) - -(** {2 IO} *) - -(** IO using the Lwt event loop. - - These IO operations work on non-blocking file descriptors and rely on a - [Lwt_engine] event loop being active (meaning, [Lwt_main.run] is currently - running in some thread). - - Calling these functions must be done from a moonpool runner. A function like - [read] will first try to perform the IO action directly (here, call - {!Unix.read}); if the action fails because the FD is not ready, then - [await_readable] is called: it suspends the fiber and subscribes it to Lwt - to be awakened when the FD becomes ready. *) -module IO : sig - val read : Unix.file_descr -> bytes -> int -> int -> int - (** Read from the file descriptor *) - - val await_readable : Unix.file_descr -> unit - (** Suspend the fiber until the FD is readable *) - - val write_once : Unix.file_descr -> bytes -> int -> int -> int - (** Perform one write into the file descriptor *) - - val await_writable : Unix.file_descr -> unit - (** Suspend the fiber until the FD is writable *) - - val write : Unix.file_descr -> bytes -> int -> int -> unit - (** Loop around {!write_once} to write the entire slice. *) - - val sleep_s : float -> unit - (** Suspend the fiber for [n] seconds. *) -end - -module IO_in = IO_in -(** Input channel *) - -module IO_out = IO_out -(** Output channel *) - -module TCP_server : sig - type t = Lwt_io.server - - val establish_lwt : - ?backlog: - (* ?server_fd:Unix.file_descr -> *) - int -> - ?no_close:bool -> - runner:Moonpool.Runner.t -> - Unix.sockaddr -> - (Unix.sockaddr -> Lwt_io.input_channel -> Lwt_io.output_channel -> unit) -> - t - (** [establish ~runner addr handler] runs a TCP server in the Lwt thread. When - a client connects, a moonpool fiber is started on [runner] to handle it. - *) - - val establish : - ?backlog: - (* ?server_fd:Unix.file_descr -> *) - int -> - ?no_close:bool -> - runner:Moonpool.Runner.t -> - Unix.sockaddr -> - (Unix.sockaddr -> IO_in.t -> IO_out.t -> unit) -> - t - (** Like {!establish_lwt} but uses {!IO} to directly handle reads and writes - on client sockets. *) - - val shutdown : t -> unit - (** Shutdown the server *) -end - -module TCP_client : sig - val connect : Unix.sockaddr -> Unix.file_descr - - val with_connect : Unix.sockaddr -> (IO_in.t -> IO_out.t -> 'a) -> 'a - (** Open a connection, and use {!IO} to read and write from the socket in a - non blocking way. *) - - val with_connect_lwt : - Unix.sockaddr -> (Lwt_io.input_channel -> Lwt_io.output_channel -> 'a) -> 'a - (** Open a connection. *) -end - -(** {2 Helpers on the lwt side} *) - -val detach_in_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a Lwt.t -(** [detach_in_runner ~runner f] runs [f] in the given moonpool runner, and - returns a lwt future. This must be run from within the thread running - [Lwt_main]. *) +val run_in_lwt_and_await : (unit -> 'a) -> 'a +(** [run_in_lwt_and_await f] runs [f()] in the lwt thread, just like + [spawn_lwt f], and then calls {!await_lwt} on the result. This means [f()] + can use Lwt functions and libraries, use {!await_lwt} on them freely, etc, +*) (** {2 Wrappers around Lwt_main} *) -val main_with_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a -(** [main_with_runner ~runner f] starts a Lwt-based event loop and runs [f()] - inside a fiber in [runner]. *) +val on_uncaught_exn : (Moonpool.Exn_bt.t -> unit) ref -val main : (unit -> 'a) -> 'a -(** Like {!main_with_runner} but with a default choice of runner. *) +val lwt_main : (Moonpool.Runner.t -> 'a) -> 'a +(** [lwt_main f] sets the moonpool-lwt bridge up, runs lwt main, calls [f], + destroys the bridge, and return the result of [f()]. *) + +val on_lwt_thread : unit -> bool +(** [on_lwt_thread ()] is true if the current thread is the one currently + running {!lwt_main}. + @raise Failure if {!lwt_main} was not called. *) + +val lwt_main_runner : unit -> Moonpool.Runner.t +(** The runner from {!lwt_main}. The runner is only going to work if {!lwt_main} + is currently running in some thread. + @raise Failure if {!lwt_main} was not called. *) + +val is_setup : unit -> bool +(** Is the moonpool-lwt bridge setup? *) diff --git a/src/lwt/tcp_client.ml b/src/lwt/tcp_client.ml deleted file mode 100644 index 8aec16f2..00000000 --- a/src/lwt/tcp_client.ml +++ /dev/null @@ -1,53 +0,0 @@ -open Common_ -open Base - -let connect addr : Unix.file_descr = - let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in - Unix.set_nonblock sock; - Unix.setsockopt sock Unix.TCP_NODELAY true; - - (* connect asynchronously *) - while - try - Unix.connect sock addr; - false - with - | Unix.Unix_error ((Unix.EWOULDBLOCK | Unix.EINPROGRESS | Unix.EAGAIN), _, _) - -> - IO.await_writable sock; - true - do - () - done; - sock - -let with_connect addr (f : IO_in.t -> IO_out.t -> 'a) : 'a = - let sock = connect addr in - - let ic = IO_in.of_unix_fd sock in - let oc = IO_out.of_unix_fd sock in - - let finally () = try Unix.close sock with _ -> () in - let@ () = Fun.protect ~finally in - f ic oc - -let with_connect_lwt addr - (f : Lwt_io.input_channel -> Lwt_io.output_channel -> 'a) : 'a = - let sock = connect addr in - - let ic = - run_in_lwt_and_await (fun () -> - Lwt.return @@ Lwt_io.of_unix_fd ~mode:Lwt_io.input sock) - in - let oc = - run_in_lwt_and_await (fun () -> - Lwt.return @@ Lwt_io.of_unix_fd ~mode:Lwt_io.output sock) - in - - let finally () = - (try run_in_lwt_and_await (fun () -> Lwt_io.close ic) with _ -> ()); - (try run_in_lwt_and_await (fun () -> Lwt_io.close oc) with _ -> ()); - try Unix.close sock with _ -> () - in - let@ () = Fun.protect ~finally in - f ic oc diff --git a/src/lwt/tcp_server.ml b/src/lwt/tcp_server.ml deleted file mode 100644 index 22fa9253..00000000 --- a/src/lwt/tcp_server.ml +++ /dev/null @@ -1,38 +0,0 @@ -open Common_ -open Base - -type t = Lwt_io.server - -let establish_lwt ?backlog ?no_close ~runner addr handler : t = - let server = - Lwt_io.establish_server_with_client_socket ?backlog ?no_close addr - (fun client_addr client_sock -> - let ic = Lwt_io.of_fd ~mode:Lwt_io.input client_sock in - let oc = Lwt_io.of_fd ~mode:Lwt_io.output client_sock in - - let fut = - M.Fut.spawn ~on:runner (fun () -> handler client_addr ic oc) - in - - let lwt_fut = lwt_of_fut fut in - lwt_fut) - in - await_lwt server - -let establish ?backlog ?no_close ~runner addr handler : t = - let server = - Lwt_io.establish_server_with_client_socket ?backlog ?no_close addr - (fun client_addr client_sock -> - let ic = IO_in.of_unix_fd @@ Lwt_unix.unix_file_descr client_sock in - let oc = IO_out.of_unix_fd @@ Lwt_unix.unix_file_descr client_sock in - - let fut = - M.Fut.spawn ~on:runner (fun () -> handler client_addr ic oc) - in - - let lwt_fut = lwt_of_fut fut in - lwt_fut) - in - await_lwt server - -let shutdown self = await_lwt @@ Lwt_io.shutdown_server self diff --git a/src/sync/moonpool_sync.ml b/src/sync/moonpool_sync.ml index 99065305..88a6972a 100644 --- a/src/sync/moonpool_sync.ml +++ b/src/sync/moonpool_sync.ml @@ -1,3 +1,5 @@ +[@@@ocaml.deprecated "use Picos_std_sync directly or single threaded solutions"] + module Mutex = Picos_std_sync.Mutex module Condition = Picos_std_sync.Condition module Lock = Lock diff --git a/test/fiber/dune b/test/fiber/dune index 42845ff5..d090b895 100644 --- a/test/fiber/dune +++ b/test/fiber/dune @@ -4,6 +4,7 @@ (>= %{ocaml_version} 5.0)) (package moonpool) (libraries + t_fibers moonpool moonpool.fib trace diff --git a/test/fiber/lib/dune b/test/fiber/lib/dune new file mode 100644 index 00000000..a87f439c --- /dev/null +++ b/test/fiber/lib/dune @@ -0,0 +1,6 @@ +(library + (name t_fibers) + (enabled_if + (>= %{ocaml_version} 5.0)) + (package moonpool) + (libraries moonpool moonpool.fib trace qcheck-core)) diff --git a/test/fiber/lib/fib.ml b/test/fiber/lib/fib.ml new file mode 100644 index 00000000..790746a4 --- /dev/null +++ b/test/fiber/lib/fib.ml @@ -0,0 +1,177 @@ +open! Moonpool +module A = Atomic +module F = Moonpool_fib.Fiber + +let ( let@ ) = ( @@ ) + +module TS = struct + type t = int list + + let show (s : t) = String.concat "." @@ List.map string_of_int s + let init = [ 0 ] + + let next_ = function + | [] -> [ 0 ] + | n :: tl -> (n + 1) :: tl + + let tick (t : t ref) = t := next_ !t + + let tick_get t = + tick t; + !t +end + +(* more deterministic logging of events *) +module Log_ = struct + let events : (TS.t * string) list A.t = A.make [] + + let add_event t msg : unit = + while + let old = A.get events in + not (A.compare_and_set events old ((t, msg) :: old)) + do + () + done + + let logf t fmt = Printf.ksprintf (add_event t) fmt + + let print_and_clear () = + let l = + A.exchange events [] + |> List.map (fun (ts, msg) -> List.rev ts, msg) + |> List.sort Stdlib.compare + in + List.iter (fun (ts, msg) -> Printf.printf "%s: %s\n" (TS.show ts) msg) l +end + +let logf = Log_.logf + +let run1 ~runner () = + Printf.printf "============\nstart\n%!"; + let clock = ref TS.init in + let fib = + F.spawn_top ~on:runner @@ fun () -> + let chan_progress = Chan.create ~max_size:4 () in + let chans = Array.init 5 (fun _ -> Chan.create ~max_size:4 ()) in + + let subs = + List.init 5 (fun i -> + F.spawn ~protect:false @@ fun _n -> + Thread.delay (float i *. 0.01); + Chan.pop chans.(i); + Chan.push chan_progress i; + F.check_if_cancelled (); + i) + in + + logf (TS.tick_get clock) "wait for subs"; + + F.spawn_ignore (fun () -> + for i = 0 to 4 do + Chan.push chans.(i) (); + let i' = Chan.pop chan_progress in + assert (i = i') + done); + + (let clock0 = !clock in + List.iteri + (fun i f -> + let clock = ref (0 :: i :: clock0) in + logf !clock "await fiber %d" i; + logf (TS.tick_get clock) "cur fiber[%d] is some: %b" i + (Option.is_some @@ F.Private_.get_cur_opt ()); + let res = F.await f in + logf (TS.tick_get clock) "cur fiber[%d] is some: %b" i + (Option.is_some @@ F.Private_.get_cur_opt ()); + F.yield (); + logf (TS.tick_get clock) "res %d = %d" i res) + subs); + + logf (TS.tick_get clock) "main fiber done" + in + + Fut.await @@ F.res fib; + logf (TS.tick_get clock) "main fiber exited"; + Log_.print_and_clear (); + () + +let run2 ~runner () = + (* same but now, cancel one of the sub-fibers *) + Printf.printf "============\nstart\n"; + + let clock = ref TS.init in + let fib = + F.spawn_top ~on:runner @@ fun () -> + let@ () = + F.with_on_self_cancel (fun ebt -> + logf (TS.tick_get clock) "main fiber cancelled with %s" + @@ Exn_bt.show ebt) + in + + let chans_unblock = Array.init 10 (fun _i -> Chan.create ~max_size:4 ()) in + let chan_progress = Chan.create ~max_size:4 () in + + logf (TS.tick_get clock) "start fibers"; + let subs = + let clock0 = !clock in + List.init 10 (fun i -> + let clock = ref (0 :: i :: clock0) in + F.spawn ~protect:false @@ fun _n -> + let@ () = + F.with_on_self_cancel (fun _ -> + logf (TS.tick_get clock) "sub-fiber %d was cancelled" i) + in + Thread.delay 0.002; + + (* sync for determinism *) + Chan.pop chans_unblock.(i); + Chan.push chan_progress i; + + if i = 7 then ( + logf (TS.tick_get clock) "I'm fiber %d and I'm about to fail…" i; + failwith "oh no!" + ); + + F.check_if_cancelled (); + i) + in + + let post = TS.tick_get clock in + List.iteri + (fun i fib -> + F.on_result fib (function + | Ok _ -> logf (i :: post) "fiber %d resolved as ok" i + | Error _ -> logf (i :: post) "fiber %d resolved as error" i)) + subs; + + (* sequentialize the fibers, for determinism *) + F.spawn_ignore (fun () -> + for j = 0 to 9 do + Chan.push chans_unblock.(j) (); + let j' = Chan.pop chan_progress in + assert (j = j') + done); + + logf (TS.tick_get clock) "wait for subs"; + List.iteri + (fun i f -> + logf (TS.tick_get clock) "await fiber %d" i; + let res = F.await f in + logf (TS.tick_get clock) "res %d = %d" i res) + subs; + logf (TS.tick_get clock) "yield"; + F.yield (); + logf (TS.tick_get clock) "yielded"; + logf (TS.tick_get clock) "main fiber done" + in + + F.on_result fib (function + | Ok () -> logf (TS.tick_get clock) "main fiber result: ok" + | Error ebt -> + logf (TS.tick_get clock) "main fiber result: error %s" (Exn_bt.show ebt)); + + (try Fut.await @@ F.res fib + with Failure msg -> logf (TS.tick_get clock) "main fib failed with %S" msg); + logf (TS.tick_get clock) "main fiber exited"; + Log_.print_and_clear (); + () diff --git a/test/fiber/lib/fls.ml b/test/fiber/lib/fls.ml new file mode 100644 index 00000000..dffca2aa --- /dev/null +++ b/test/fiber/lib/fls.ml @@ -0,0 +1,169 @@ +open! Moonpool +module A = Atomic +module F = Moonpool_fib.Fiber +module FLS = Moonpool_fib.Fls + +(* ### dummy little tracing system with local storage *) + +type span_id = int + +let k_parent : span_id Hmap.key = Hmap.Key.create () +let ( let@ ) = ( @@ ) +let spf = Printf.sprintf + +module Span = struct + let new_id_ : unit -> span_id = + let n = A.make 0 in + fun () -> A.fetch_and_add n 1 + + type t = { + id: span_id; + parent: span_id option; + msg: string; + } +end + +module Tracer = struct + type t = { spans: Span.t list A.t } + + let create () : t = { spans = A.make [] } + let get self = A.get self.spans + + let add (self : t) span = + while + let old = A.get self.spans in + not (A.compare_and_set self.spans old (span :: old)) + do + () + done + + let with_span self name f = + let id = Span.new_id_ () in + let parent = FLS.get_in_local_hmap_opt k_parent in + let span = { Span.id; parent; msg = name } in + add self span; + FLS.with_in_local_hmap k_parent id f +end + +module Render = struct + type span_tree = { + msg: string; (** message of the span at the root *) + children: span_tree list; + } + + type t = { roots: span_tree list } + + let build (tracer : Tracer.t) : t = + let tops : (span_id, Span.t) Hashtbl.t = Hashtbl.create 16 in + let children : (span_id, Span.t list) Hashtbl.t = Hashtbl.create 16 in + + (* everyone is a root at first *) + let all_spans = Tracer.get tracer in + List.iter (fun (sp : Span.t) -> Hashtbl.add tops sp.id sp) all_spans; + + (* now consider the parenting relationships *) + let add_span_to_parent (span : Span.t) = + match span.parent with + | None -> () + | Some p -> + Hashtbl.remove tops span.id; + let l = try Hashtbl.find children p with Not_found -> [] in + Hashtbl.replace children p (span :: l) + in + List.iter add_span_to_parent all_spans; + + (* build the tree *) + let rec build_tree (sp : Span.t) : span_tree = + let children = try Hashtbl.find children sp.id with Not_found -> [] in + let children = List.map build_tree children |> List.sort Stdlib.compare in + { msg = sp.msg; children } + in + + let roots = + Hashtbl.fold (fun _ sp l -> build_tree sp :: l) tops [] + |> List.sort Stdlib.compare + in + + { roots } + + let pp (oc : out_channel) (self : t) : unit = + let rec pp_tree indent out (t : span_tree) = + let prefix = String.make indent ' ' in + Printf.fprintf out "%s%S\n" prefix t.msg; + List.iter (pp_tree (indent + 2) out) t.children + in + List.iter (pp_tree 2 oc) self.roots +end + +let run ~pool ~pool_name () = + let tracer = Tracer.create () in + + let sub_sub_child ~idx ~idx_child ~idx_sub ~idx_sub_sub () = + let@ () = + Tracer.with_span tracer + (spf "child_%d.%d.%d.%d" idx idx_child idx_sub idx_sub_sub) + in + + for j = 1 to 5 do + let@ () = Tracer.with_span tracer (spf "iter.loop %d" j) in + F.yield () + done + in + + let sub_child ~idx ~idx_child ~idx_sub () = + let@ () = + Tracer.with_span tracer (spf "child_%d.%d.%d" idx idx_child idx_sub) + in + + for i = 1 to 10 do + let@ () = Tracer.with_span tracer (spf "iter.loop %02d" i) in + F.yield () + done; + + let subs = + List.init 2 (fun idx_sub_sub -> + F.spawn ~protect:true (fun () -> + sub_sub_child ~idx ~idx_child ~idx_sub ~idx_sub_sub ())) + in + List.iter F.await subs + in + + let top_child ~idx ~idx_child () = + let@ () = Tracer.with_span tracer (spf "child.%d.%d" idx idx_child) in + + let subs = + List.init 2 (fun k -> + F.spawn ~protect:true @@ fun () -> + sub_child ~idx ~idx_child ~idx_sub:k ()) + in + + let@ () = + Tracer.with_span tracer + (spf "child.%d.%d.99.await_children" idx idx_child) + in + List.iter F.await subs + in + + let top idx = + let@ () = Tracer.with_span tracer (spf "top_%d" idx) in + + let subs = + List.init 5 (fun j -> + F.spawn ~protect:true @@ fun () -> top_child ~idx ~idx_child:j ()) + in + + List.iter F.await subs + in + + Printf.printf "run test on pool = %s\n" pool_name; + let fibs = + List.init 8 (fun idx -> F.spawn_top ~on:pool (fun () -> top idx)) + in + List.iter F.await fibs; + + Printf.printf "tracing complete\n"; + Printf.printf "spans:\n"; + let tree = Render.build tracer in + Render.pp stdout tree; + Printf.printf "done\n%!"; + () diff --git a/test/fiber/t_fib1.ml b/test/fiber/t_fib1.ml index 77360b2b..a71d309e 100644 --- a/test/fiber/t_fib1.ml +++ b/test/fiber/t_fib1.ml @@ -1,179 +1,6 @@ -open! Moonpool -module A = Atomic -module F = Moonpool_fib.Fiber - let ( let@ ) = ( @@ ) -let runner = Fifo_pool.create ~num_threads:1 () - -module TS = struct - type t = int list - - let show (s : t) = String.concat "." @@ List.map string_of_int s - let init = [ 0 ] - - let next_ = function - | [] -> [ 0 ] - | n :: tl -> (n + 1) :: tl - - let tick (t : t ref) = t := next_ !t - - let tick_get t = - tick t; - !t -end - -(* more deterministic logging of events *) -module Log_ = struct - let events : (TS.t * string) list A.t = A.make [] - - let add_event t msg : unit = - while - let old = A.get events in - not (A.compare_and_set events old ((t, msg) :: old)) - do - () - done - - let logf t fmt = Printf.ksprintf (add_event t) fmt - - let print_and_clear () = - let l = - A.exchange events [] - |> List.map (fun (ts, msg) -> List.rev ts, msg) - |> List.sort Stdlib.compare - in - List.iter (fun (ts, msg) -> Printf.printf "%s: %s\n" (TS.show ts) msg) l -end - -let logf = Log_.logf let () = - Printf.printf "============\nstart\n"; - let clock = ref TS.init in - let fib = - F.spawn_top ~on:runner @@ fun () -> - let chan_progress = Chan.create ~max_size:4 () in - let chans = Array.init 5 (fun _ -> Chan.create ~max_size:4 ()) in - - let subs = - List.init 5 (fun i -> - F.spawn ~protect:false @@ fun _n -> - Thread.delay (float i *. 0.01); - Chan.pop chans.(i); - Chan.push chan_progress i; - F.check_if_cancelled (); - i) - in - - logf (TS.tick_get clock) "wait for subs"; - - F.spawn_ignore (fun () -> - for i = 0 to 4 do - Chan.push chans.(i) (); - let i' = Chan.pop chan_progress in - assert (i = i') - done); - - (let clock0 = !clock in - List.iteri - (fun i f -> - let clock = ref (0 :: i :: clock0) in - logf !clock "await fiber %d" i; - logf (TS.tick_get clock) "cur fiber[%d] is some: %b" i - (Option.is_some @@ F.Private_.get_cur_opt ()); - let res = F.await f in - logf (TS.tick_get clock) "cur fiber[%d] is some: %b" i - (Option.is_some @@ F.Private_.get_cur_opt ()); - F.yield (); - logf (TS.tick_get clock) "res %d = %d" i res) - subs); - - logf (TS.tick_get clock) "main fiber done" - in - - Fut.wait_block_exn @@ F.res fib; - logf (TS.tick_get clock) "main fiber exited"; - Log_.print_and_clear (); - () - -let () = - let@ _r = Moonpool_fib.main in - (* same but now, cancel one of the sub-fibers *) - Printf.printf "============\nstart\n"; - - let clock = ref TS.init in - let fib = - F.spawn_top ~on:runner @@ fun () -> - let@ () = - F.with_on_self_cancel (fun ebt -> - logf (TS.tick_get clock) "main fiber cancelled with %s" - @@ Exn_bt.show ebt) - in - - let chans_unblock = Array.init 10 (fun _i -> Chan.create ~max_size:4 ()) in - let chan_progress = Chan.create ~max_size:4 () in - - logf (TS.tick_get clock) "start fibers"; - let subs = - let clock0 = !clock in - List.init 10 (fun i -> - let clock = ref (0 :: i :: clock0) in - F.spawn ~protect:false @@ fun _n -> - let@ () = - F.with_on_self_cancel (fun _ -> - logf (TS.tick_get clock) "sub-fiber %d was cancelled" i) - in - Thread.delay 0.002; - - (* sync for determinism *) - Chan.pop chans_unblock.(i); - Chan.push chan_progress i; - - if i = 7 then ( - logf (TS.tick_get clock) "I'm fiber %d and I'm about to fail…" i; - failwith "oh no!" - ); - - F.check_if_cancelled (); - i) - in - - let post = TS.tick_get clock in - List.iteri - (fun i fib -> - F.on_result fib (function - | Ok _ -> logf (i :: post) "fiber %d resolved as ok" i - | Error _ -> logf (i :: post) "fiber %d resolved as error" i)) - subs; - - (* sequentialize the fibers, for determinism *) - F.spawn_ignore (fun () -> - for j = 0 to 9 do - Chan.push chans_unblock.(j) (); - let j' = Chan.pop chan_progress in - assert (j = j') - done); - - logf (TS.tick_get clock) "wait for subs"; - List.iteri - (fun i f -> - logf (TS.tick_get clock) "await fiber %d" i; - let res = F.await f in - logf (TS.tick_get clock) "res %d = %d" i res) - subs; - logf (TS.tick_get clock) "yield"; - F.yield (); - logf (TS.tick_get clock) "yielded"; - logf (TS.tick_get clock) "main fiber done" - in - - F.on_result fib (function - | Ok () -> logf (TS.tick_get clock) "main fiber result: ok" - | Error ebt -> - logf (TS.tick_get clock) "main fiber result: error %s" (Exn_bt.show ebt)); - - (try Fut.wait_block_exn @@ F.res fib - with Failure msg -> logf (TS.tick_get clock) "main fib failed with %S" msg); - logf (TS.tick_get clock) "main fiber exited"; - Log_.print_and_clear (); - () + let@ runner = Moonpool_fib.main in + T_fibers.Fib.run1 ~runner (); + T_fibers.Fib.run2 ~runner () diff --git a/test/fiber/t_fls.expected b/test/fiber/t_fls.expected index b4de9eee..73b446e3 100644 --- a/test/fiber/t_fls.expected +++ b/test/fiber/t_fls.expected @@ -1930,7 +1930,7 @@ spans: "iter.loop 09" "iter.loop 10" done -run test on pool = ws_pool +run test on pool = fifo_pool tracing complete spans: "top_0" diff --git a/test/fiber/t_fls.ml b/test/fiber/t_fls.ml index ca397ed0..4a3125f5 100644 --- a/test/fiber/t_fls.ml +++ b/test/fiber/t_fls.ml @@ -1,177 +1,12 @@ open! Moonpool -module A = Atomic -module F = Moonpool_fib.Fiber -module FLS = Moonpool_fib.Fls -(* ### dummy little tracing system with local storage *) - -type span_id = int - -let k_parent : span_id Hmap.key = Hmap.Key.create () let ( let@ ) = ( @@ ) -let spf = Printf.sprintf - -module Span = struct - let new_id_ : unit -> span_id = - let n = A.make 0 in - fun () -> A.fetch_and_add n 1 - - type t = { - id: span_id; - parent: span_id option; - msg: string; - } -end - -module Tracer = struct - type t = { spans: Span.t list A.t } - - let create () : t = { spans = A.make [] } - let get self = A.get self.spans - - let add (self : t) span = - while - let old = A.get self.spans in - not (A.compare_and_set self.spans old (span :: old)) - do - () - done - - let with_span self name f = - let id = Span.new_id_ () in - let parent = FLS.get_in_local_hmap_opt k_parent in - let span = { Span.id; parent; msg = name } in - add self span; - FLS.with_in_local_hmap k_parent id f -end - -module Render = struct - type span_tree = { - msg: string; (** message of the span at the root *) - children: span_tree list; - } - - type t = { roots: span_tree list } - - let build (tracer : Tracer.t) : t = - let tops : (span_id, Span.t) Hashtbl.t = Hashtbl.create 16 in - let children : (span_id, Span.t list) Hashtbl.t = Hashtbl.create 16 in - - (* everyone is a root at first *) - let all_spans = Tracer.get tracer in - List.iter (fun (sp : Span.t) -> Hashtbl.add tops sp.id sp) all_spans; - - (* now consider the parenting relationships *) - let add_span_to_parent (span : Span.t) = - match span.parent with - | None -> () - | Some p -> - Hashtbl.remove tops span.id; - let l = try Hashtbl.find children p with Not_found -> [] in - Hashtbl.replace children p (span :: l) - in - List.iter add_span_to_parent all_spans; - - (* build the tree *) - let rec build_tree (sp : Span.t) : span_tree = - let children = try Hashtbl.find children sp.id with Not_found -> [] in - let children = List.map build_tree children |> List.sort Stdlib.compare in - { msg = sp.msg; children } - in - - let roots = - Hashtbl.fold (fun _ sp l -> build_tree sp :: l) tops [] - |> List.sort Stdlib.compare - in - - { roots } - - let pp (oc : out_channel) (self : t) : unit = - let rec pp_tree indent out (t : span_tree) = - let prefix = String.make indent ' ' in - Printf.fprintf out "%s%S\n" prefix t.msg; - List.iter (pp_tree (indent + 2) out) t.children - in - List.iter (pp_tree 2 oc) self.roots -end - -let run ~pool ~pool_name () = - let tracer = Tracer.create () in - - let sub_sub_child ~idx ~idx_child ~idx_sub ~idx_sub_sub () = - let@ () = - Tracer.with_span tracer - (spf "child_%d.%d.%d.%d" idx idx_child idx_sub idx_sub_sub) - in - - for j = 1 to 5 do - let@ () = Tracer.with_span tracer (spf "iter.loop %d" j) in - F.yield () - done - in - - let sub_child ~idx ~idx_child ~idx_sub () = - let@ () = - Tracer.with_span tracer (spf "child_%d.%d.%d" idx idx_child idx_sub) - in - - for i = 1 to 10 do - let@ () = Tracer.with_span tracer (spf "iter.loop %02d" i) in - F.yield () - done; - - let subs = - List.init 2 (fun idx_sub_sub -> - F.spawn ~protect:true (fun () -> - sub_sub_child ~idx ~idx_child ~idx_sub ~idx_sub_sub ())) - in - List.iter F.await subs - in - - let top_child ~idx ~idx_child () = - let@ () = Tracer.with_span tracer (spf "child.%d.%d" idx idx_child) in - - let subs = - List.init 2 (fun k -> - F.spawn ~protect:true @@ fun () -> - sub_child ~idx ~idx_child ~idx_sub:k ()) - in - - let@ () = - Tracer.with_span tracer - (spf "child.%d.%d.99.await_children" idx idx_child) - in - List.iter F.await subs - in - - let top idx = - let@ () = Tracer.with_span tracer (spf "top_%d" idx) in - - let subs = - List.init 5 (fun j -> - F.spawn ~protect:true @@ fun () -> top_child ~idx ~idx_child:j ()) - in - - List.iter F.await subs - in - - Printf.printf "run test on pool = %s\n" pool_name; - let fibs = - List.init 8 (fun idx -> F.spawn_top ~on:pool (fun () -> top idx)) - in - List.iter F.wait_block_exn fibs; - - Printf.printf "tracing complete\n"; - Printf.printf "spans:\n"; - let tree = Render.build tracer in - Render.pp stdout tree; - Printf.printf "done\n%!"; - () let () = + let@ _ = Moonpool_fib.main in (let@ pool = Ws_pool.with_ () in - run ~pool ~pool_name:"ws_pool" ()); + T_fibers.Fls.run ~pool ~pool_name:"ws_pool" ()); (let@ pool = Fifo_pool.with_ () in - run ~pool ~pool_name:"ws_pool" ()); + T_fibers.Fls.run ~pool ~pool_name:"fifo_pool" ()); () diff --git a/test/lwt/dune b/test/lwt/dune index 8bdee879..df9aa94a 100644 --- a/test/lwt/dune +++ b/test/lwt/dune @@ -16,7 +16,7 @@ (action (with-stdout-to %{targets} - (run ./run_hash.sh -d ../data/ --n-conn=2 -j=4)))) + (run ./run_hash.sh -d ../data/ --n-conn=2)))) (rule (alias runtest) @@ -36,9 +36,12 @@ (= %{system} "linux") (>= %{ocaml_version} 5.0))) (action - (with-stdout-to - %{targets} - (run ./run_echo.sh -n 10 --n-conn=2 -j=4)))) + (setenv + CI_MODE + 1 + (with-stdout-to + %{targets} + (run ./run_echo.sh -n 10 --n-conn=2 -v))))) (rule (alias runtest) diff --git a/test/lwt/echo_client.ml b/test/lwt/echo_client.ml index 7143d8be..25a05b37 100644 --- a/test/lwt/echo_client.ml +++ b/test/lwt/echo_client.ml @@ -1,93 +1,117 @@ -module M = Moonpool module M_lwt = Moonpool_lwt module Trace = Trace_core +let ci_mode = Option.is_some @@ Sys.getenv_opt "CI_MODE" let spf = Printf.sprintf +let await_lwt = Moonpool_lwt.await_lwt let ( let@ ) = ( @@ ) -let lock_stdout = M.Lock.create () -let main ~port ~runner ~n ~n_conn () : unit Lwt.t = +let main ~port ~n ~n_conn ~verbose ~msg_per_conn () : unit = let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "main" in - let remaining = Atomic.make n in - let all_done = Atomic.make 0 in - - let fut_exit, prom_exit = M.Fut.make () in - - Printf.printf "connecting to port %d\n%!" port; + let t0 = Unix.gettimeofday () in + Printf.printf + "connecting to port %d (%d msg per conn, %d conns total, %d max at a time)\n\ + %!" + port msg_per_conn n n_conn; let addr = Unix.ADDR_INET (Unix.inet_addr_loopback, port) in - let rec run_task () = + let token_pool = Lwt_pool.create n_conn (fun () -> Lwt.return_unit) in + let n_msg_total = ref 0 in + + let run_task () = (* Printf.printf "running task\n%!"; *) - let n = Atomic.fetch_and_add remaining (-1) in - if n > 0 then ( - (let _sp = - Trace.enter_manual_toplevel_span ~__FILE__ ~__LINE__ "connect.client" - in - Trace.message "connecting new client…"; - M_lwt.TCP_client.with_connect addr @@ fun ic oc -> - let buf = Bytes.create 32 in + let@ () = Lwt_pool.use token_pool in - for _j = 1 to 10 do - let _sp = - Trace.enter_manual_sub_span ~parent:_sp ~__FILE__ ~__LINE__ - "write.loop" - in + let@ () = M_lwt.spawn_lwt in + let _sp = + Trace.enter_manual_span ~parent:None ~__FILE__ ~__LINE__ "connect.client" + in + Trace.message "connecting new client…"; - let s = spf "hello %d" _j in - M_lwt.IO_out.output_string oc s; - M_lwt.IO_out.flush oc; + let ic, oc = Lwt_io.open_connection addr |> await_lwt in - (* read back something *) - M_lwt.IO_in.really_input ic buf 0 (String.length s); - (let@ () = M.Lock.with_ lock_stdout in - Printf.printf "read: %s\n%!" - (Bytes.sub_string buf 0 (String.length s))); - Trace.exit_manual_span _sp; - () - done; - Trace.exit_manual_span _sp); + let cleanup () = + Trace.message "closing connection"; + Lwt_io.close ic |> await_lwt; + Lwt_io.close oc |> await_lwt + in - (* run another task *) M.Runner.run_async runner run_task - ) else ( - (* if we're the last to exit, resolve the promise *) - let n_already_done = Atomic.fetch_and_add all_done 1 in - if n_already_done = n_conn - 1 then ( - (let@ () = M.Lock.with_ lock_stdout in - Printf.printf "all done\n%!"); - M.Fut.fulfill prom_exit @@ Ok () - ) - ) + let@ () = Fun.protect ~finally:cleanup in + + let buf = Bytes.create 32 in + + for _j = 1 to msg_per_conn do + let _sp = + Trace.enter_manual_span + ~parent:(Some (Trace.ctx_of_span _sp)) + ~__FILE__ ~__LINE__ "write.loop" + in + + let s = spf "hello %d" _j in + Lwt_io.write oc s |> await_lwt; + Lwt_io.flush oc |> await_lwt; + incr n_msg_total; + + (* read back something *) + Lwt_io.read_into_exactly ic buf 0 (String.length s) |> await_lwt; + if verbose then + Printf.printf "read: %s\n%!" (Bytes.sub_string buf 0 (String.length s)); + Trace.exit_manual_span _sp; + () + done; + Trace.exit_manual_span _sp in (* start the first [n_conn] tasks *) - for _i = 1 to n_conn do - M.Runner.run_async runner run_task - done; + let futs = List.init n (fun _ -> run_task ()) in + Lwt.join futs |> await_lwt; - (* exit when [fut_exit] is resolved *) - M_lwt.lwt_of_fut fut_exit + Printf.printf "all done\n%!"; + let elapsed = Unix.gettimeofday () -. t0 in + if not ci_mode then + Printf.printf " sent %d messages in %.4fs (%.2f msg/s)\n%!" !n_msg_total + elapsed + (float !n_msg_total /. elapsed); + () let () = let@ () = Trace_tef.with_setup () in Trace.set_thread_name "main"; let port = ref 0 in - let j = ref 4 in let n_conn = ref 100 in let n = ref 50_000 in + let msg_per_conn = ref 10 in + let verbose = ref false in let opts = [ "-p", Arg.Set_int port, " port"; - "-j", Arg.Set_int j, " number of threads"; "-n", Arg.Set_int n, " total number of connections"; - "--n-conn", Arg.Set_int n_conn, " number of parallel connections"; + ( "--msg-per-conn", + Arg.Set_int msg_per_conn, + " messages sent per connection" ); + "-v", Arg.Set verbose, " verbose"; + ( "--n-conn", + Arg.Set_int n_conn, + " maximum number of connections opened simultaneously" ); ] |> Arg.align in Arg.parse opts ignore "echo client"; - let@ runner = M.Ws_pool.with_ ~name:"tpool" ~num_threads:!j () in - (* Lwt_engine.set @@ new Lwt_engine.libev (); *) - Lwt_main.run @@ main ~runner ~port:!port ~n:!n ~n_conn:!n_conn () + let main () = + (* Lwt_engine.set @@ new Lwt_engine.libev (); *) + M_lwt.lwt_main @@ fun _runner -> + main ~port:!port ~n:!n ~n_conn:!n_conn ~verbose:!verbose + ~msg_per_conn:!msg_per_conn () + in + + print_endline "first run"; + main (); + assert (not (M_lwt.is_setup ())); + print_endline "second run"; + main (); + assert (not (M_lwt.is_setup ())); + print_endline "done" diff --git a/test/lwt/echo_server.ml b/test/lwt/echo_server.ml index 722bcf69..b603257f 100644 --- a/test/lwt/echo_server.ml +++ b/test/lwt/echo_server.ml @@ -3,6 +3,7 @@ module M_lwt = Moonpool_lwt module Trace = Trace_core let ( let@ ) = ( @@ ) +let await_lwt = M_lwt.await_lwt let spf = Printf.sprintf let str_of_sockaddr = function @@ -10,52 +11,63 @@ let str_of_sockaddr = function | Unix.ADDR_INET (addr, port) -> spf "%s:%d" (Unix.string_of_inet_addr addr) port -let main ~port ~runner () : unit Lwt.t = +let main ~port ~verbose ~runner:_ () : unit = let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "main" in let lwt_fut, _lwt_prom = Lwt.wait () in - (* TODO: handle exit?? *) + (* TODO: handle exit?? ctrl-c? *) Printf.printf "listening on port %d\n%!" port; - let handle_client client_addr ic oc = + let handle_client client_addr (ic, oc) : _ Lwt.t = + let@ () = M_lwt.spawn_lwt in let _sp = - Trace.enter_manual_toplevel_span ~__FILE__ ~__LINE__ "handle.client" + Trace.enter_manual_span ~parent:None ~__FILE__ ~__LINE__ "handle.client" ~data:(fun () -> [ "addr", `String (str_of_sockaddr client_addr) ]) in + if verbose then + Printf.printf "got new client on %s\n%!" (str_of_sockaddr client_addr); + let buf = Bytes.create 32 in let continue = ref true in while !continue do Trace.message "read"; - let n = M_lwt.IO_in.input ic buf 0 (Bytes.length buf) in + let n = Lwt_io.read_into ic buf 0 (Bytes.length buf) |> await_lwt in if n = 0 then continue := false else ( Trace.messagef (fun k -> k "got %dB" n); - M_lwt.IO_out.output oc buf 0 n; - M_lwt.IO_out.flush oc; + Lwt_io.write_from_exactly oc buf 0 n |> await_lwt; + Lwt_io.flush oc |> await_lwt; Trace.message "write" ) done; + if verbose then + Printf.printf "done with client on %s\n%!" (str_of_sockaddr client_addr); Trace.exit_manual_span _sp; Trace.message "exit handle client" in let addr = Unix.ADDR_INET (Unix.inet_addr_any, port) in - let _server = M_lwt.TCP_server.establish ~runner addr handle_client in + let _server = + Lwt_io.establish_server_with_client_address addr handle_client |> await_lwt + in - lwt_fut + M_lwt.await_lwt lwt_fut let () = let@ () = Trace_tef.with_setup () in Trace.set_thread_name "main"; let port = ref 0 in let j = ref 4 in + let verbose = ref false in let opts = [ - "-p", Arg.Set_int port, " port"; "-j", Arg.Set_int j, " number of threads"; + "-v", Arg.Set verbose, " verbose"; + "-p", Arg.Set_int port, " port"; + "-j", Arg.Set_int j, " number of threads"; ] |> Arg.align in @@ -63,4 +75,4 @@ let () = let@ runner = M.Ws_pool.with_ ~name:"tpool" ~num_threads:!j () in (* Lwt_engine.set @@ new Lwt_engine.libev (); *) - Lwt_main.run @@ main ~runner ~port:!port () + M_lwt.lwt_main @@ fun _ -> main ~runner ~port:!port ~verbose:!verbose () diff --git a/test/lwt/fibers/dune b/test/lwt/fibers/dune new file mode 100644 index 00000000..ec2f267f --- /dev/null +++ b/test/lwt/fibers/dune @@ -0,0 +1,16 @@ +(tests + (names t_fls t_main t_fib1) + (enabled_if + (>= %{ocaml_version} 5.0)) + (package moonpool-lwt) + (libraries + t_fibers + moonpool + moonpool.fib + moonpool-lwt + trace + trace-tef + qcheck-core + qcheck-core.runner + ;tracy-client.trace + )) diff --git a/test/lwt/fibers/t_fib1.expected b/test/lwt/fibers/t_fib1.expected new file mode 100644 index 00000000..d4e2b79f --- /dev/null +++ b/test/lwt/fibers/t_fib1.expected @@ -0,0 +1,61 @@ +============ +start +1: wait for subs +1.0.0: await fiber 0 +1.0.1: cur fiber[0] is some: true +1.0.2: cur fiber[0] is some: true +1.0.3: res 0 = 0 +1.1.0: await fiber 1 +1.1.1: cur fiber[1] is some: true +1.1.2: cur fiber[1] is some: true +1.1.3: res 1 = 1 +1.2.0: await fiber 2 +1.2.1: cur fiber[2] is some: true +1.2.2: cur fiber[2] is some: true +1.2.3: res 2 = 2 +1.3.0: await fiber 3 +1.3.1: cur fiber[3] is some: true +1.3.2: cur fiber[3] is some: true +1.3.3: res 3 = 3 +1.4.0: await fiber 4 +1.4.1: cur fiber[4] is some: true +1.4.2: cur fiber[4] is some: true +1.4.3: res 4 = 4 +2: main fiber done +3: main fiber exited +============ +start +1: start fibers +1.7.1: I'm fiber 7 and I'm about to fail… +1.8.1: sub-fiber 8 was cancelled +1.9.1: sub-fiber 9 was cancelled +2.0: fiber 0 resolved as ok +2.1: fiber 1 resolved as ok +2.2: fiber 2 resolved as ok +2.3: fiber 3 resolved as ok +2.4: fiber 4 resolved as ok +2.5: fiber 5 resolved as ok +2.6: fiber 6 resolved as ok +2.7: fiber 7 resolved as error +2.8: fiber 8 resolved as error +2.9: fiber 9 resolved as error +3: wait for subs +4: await fiber 0 +5: res 0 = 0 +6: await fiber 1 +7: res 1 = 1 +8: await fiber 2 +9: res 2 = 2 +10: await fiber 3 +11: res 3 = 3 +12: await fiber 4 +13: res 4 = 4 +14: await fiber 5 +15: res 5 = 5 +16: await fiber 6 +17: res 6 = 6 +18: await fiber 7 +19: main fiber cancelled with Failure("oh no!") +20: main fiber result: error Failure("oh no!") +21: main fib failed with "oh no!" +22: main fiber exited diff --git a/test/lwt/fibers/t_fib1.ml b/test/lwt/fibers/t_fib1.ml new file mode 100644 index 00000000..175e5983 --- /dev/null +++ b/test/lwt/fibers/t_fib1.ml @@ -0,0 +1,8 @@ +module M_lwt = Moonpool_lwt + +let ( let@ ) = ( @@ ) + +let () = + let@ runner = M_lwt.lwt_main in + T_fibers.Fib.run1 ~runner (); + T_fibers.Fib.run2 ~runner () diff --git a/test/lwt/fibers/t_fls.expected b/test/lwt/fibers/t_fls.expected new file mode 100644 index 00000000..c787f171 --- /dev/null +++ b/test/lwt/fibers/t_fls.expected @@ -0,0 +1,1932 @@ +run test on pool = lwt +tracing complete +spans: + "top_0" + "child.0.0" + "child.0.0.99.await_children" + "child_0.0.0" + "child_0.0.0.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_0.0.0.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child_0.0.1" + "child_0.0.1.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_0.0.1.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child.0.1" + "child.0.1.99.await_children" + "child_0.1.0" + "child_0.1.0.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_0.1.0.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child_0.1.1" + "child_0.1.1.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_0.1.1.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child.0.2" + "child.0.2.99.await_children" + "child_0.2.0" + "child_0.2.0.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_0.2.0.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child_0.2.1" + "child_0.2.1.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_0.2.1.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child.0.3" + "child.0.3.99.await_children" + "child_0.3.0" + "child_0.3.0.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_0.3.0.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child_0.3.1" + "child_0.3.1.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_0.3.1.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child.0.4" + "child.0.4.99.await_children" + "child_0.4.0" + "child_0.4.0.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_0.4.0.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child_0.4.1" + "child_0.4.1.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_0.4.1.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "top_1" + "child.1.0" + "child.1.0.99.await_children" + "child_1.0.0" + "child_1.0.0.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_1.0.0.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child_1.0.1" + "child_1.0.1.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_1.0.1.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child.1.1" + "child.1.1.99.await_children" + "child_1.1.0" + "child_1.1.0.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_1.1.0.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child_1.1.1" + "child_1.1.1.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_1.1.1.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child.1.2" + "child.1.2.99.await_children" + "child_1.2.0" + "child_1.2.0.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_1.2.0.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child_1.2.1" + "child_1.2.1.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_1.2.1.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child.1.3" + "child.1.3.99.await_children" + "child_1.3.0" + "child_1.3.0.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_1.3.0.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child_1.3.1" + "child_1.3.1.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_1.3.1.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child.1.4" + "child.1.4.99.await_children" + "child_1.4.0" + "child_1.4.0.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_1.4.0.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child_1.4.1" + "child_1.4.1.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_1.4.1.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "top_2" + "child.2.0" + "child.2.0.99.await_children" + "child_2.0.0" + "child_2.0.0.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_2.0.0.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child_2.0.1" + "child_2.0.1.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_2.0.1.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child.2.1" + "child.2.1.99.await_children" + "child_2.1.0" + "child_2.1.0.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_2.1.0.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child_2.1.1" + "child_2.1.1.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_2.1.1.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child.2.2" + "child.2.2.99.await_children" + "child_2.2.0" + "child_2.2.0.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_2.2.0.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child_2.2.1" + "child_2.2.1.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_2.2.1.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child.2.3" + "child.2.3.99.await_children" + "child_2.3.0" + "child_2.3.0.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_2.3.0.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child_2.3.1" + "child_2.3.1.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_2.3.1.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child.2.4" + "child.2.4.99.await_children" + "child_2.4.0" + "child_2.4.0.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_2.4.0.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child_2.4.1" + "child_2.4.1.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_2.4.1.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "top_3" + "child.3.0" + "child.3.0.99.await_children" + "child_3.0.0" + "child_3.0.0.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_3.0.0.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child_3.0.1" + "child_3.0.1.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_3.0.1.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child.3.1" + "child.3.1.99.await_children" + "child_3.1.0" + "child_3.1.0.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_3.1.0.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child_3.1.1" + "child_3.1.1.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_3.1.1.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child.3.2" + "child.3.2.99.await_children" + "child_3.2.0" + "child_3.2.0.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_3.2.0.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child_3.2.1" + "child_3.2.1.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_3.2.1.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child.3.3" + "child.3.3.99.await_children" + "child_3.3.0" + "child_3.3.0.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_3.3.0.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child_3.3.1" + "child_3.3.1.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_3.3.1.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child.3.4" + "child.3.4.99.await_children" + "child_3.4.0" + "child_3.4.0.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_3.4.0.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child_3.4.1" + "child_3.4.1.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_3.4.1.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "top_4" + "child.4.0" + "child.4.0.99.await_children" + "child_4.0.0" + "child_4.0.0.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_4.0.0.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child_4.0.1" + "child_4.0.1.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_4.0.1.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child.4.1" + "child.4.1.99.await_children" + "child_4.1.0" + "child_4.1.0.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_4.1.0.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child_4.1.1" + "child_4.1.1.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_4.1.1.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child.4.2" + "child.4.2.99.await_children" + "child_4.2.0" + "child_4.2.0.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_4.2.0.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child_4.2.1" + "child_4.2.1.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_4.2.1.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child.4.3" + "child.4.3.99.await_children" + "child_4.3.0" + "child_4.3.0.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_4.3.0.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child_4.3.1" + "child_4.3.1.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_4.3.1.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child.4.4" + "child.4.4.99.await_children" + "child_4.4.0" + "child_4.4.0.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_4.4.0.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child_4.4.1" + "child_4.4.1.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_4.4.1.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "top_5" + "child.5.0" + "child.5.0.99.await_children" + "child_5.0.0" + "child_5.0.0.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_5.0.0.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child_5.0.1" + "child_5.0.1.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_5.0.1.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child.5.1" + "child.5.1.99.await_children" + "child_5.1.0" + "child_5.1.0.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_5.1.0.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child_5.1.1" + "child_5.1.1.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_5.1.1.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child.5.2" + "child.5.2.99.await_children" + "child_5.2.0" + "child_5.2.0.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_5.2.0.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child_5.2.1" + "child_5.2.1.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_5.2.1.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child.5.3" + "child.5.3.99.await_children" + "child_5.3.0" + "child_5.3.0.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_5.3.0.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child_5.3.1" + "child_5.3.1.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_5.3.1.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child.5.4" + "child.5.4.99.await_children" + "child_5.4.0" + "child_5.4.0.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_5.4.0.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child_5.4.1" + "child_5.4.1.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_5.4.1.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "top_6" + "child.6.0" + "child.6.0.99.await_children" + "child_6.0.0" + "child_6.0.0.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_6.0.0.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child_6.0.1" + "child_6.0.1.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_6.0.1.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child.6.1" + "child.6.1.99.await_children" + "child_6.1.0" + "child_6.1.0.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_6.1.0.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child_6.1.1" + "child_6.1.1.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_6.1.1.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child.6.2" + "child.6.2.99.await_children" + "child_6.2.0" + "child_6.2.0.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_6.2.0.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child_6.2.1" + "child_6.2.1.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_6.2.1.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child.6.3" + "child.6.3.99.await_children" + "child_6.3.0" + "child_6.3.0.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_6.3.0.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child_6.3.1" + "child_6.3.1.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_6.3.1.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child.6.4" + "child.6.4.99.await_children" + "child_6.4.0" + "child_6.4.0.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_6.4.0.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child_6.4.1" + "child_6.4.1.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_6.4.1.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "top_7" + "child.7.0" + "child.7.0.99.await_children" + "child_7.0.0" + "child_7.0.0.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_7.0.0.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child_7.0.1" + "child_7.0.1.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_7.0.1.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child.7.1" + "child.7.1.99.await_children" + "child_7.1.0" + "child_7.1.0.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_7.1.0.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child_7.1.1" + "child_7.1.1.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_7.1.1.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child.7.2" + "child.7.2.99.await_children" + "child_7.2.0" + "child_7.2.0.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_7.2.0.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child_7.2.1" + "child_7.2.1.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_7.2.1.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child.7.3" + "child.7.3.99.await_children" + "child_7.3.0" + "child_7.3.0.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_7.3.0.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child_7.3.1" + "child_7.3.1.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_7.3.1.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child.7.4" + "child.7.4.99.await_children" + "child_7.4.0" + "child_7.4.0.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_7.4.0.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" + "child_7.4.1" + "child_7.4.1.0" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "child_7.4.1.1" + "iter.loop 1" + "iter.loop 2" + "iter.loop 3" + "iter.loop 4" + "iter.loop 5" + "iter.loop 01" + "iter.loop 02" + "iter.loop 03" + "iter.loop 04" + "iter.loop 05" + "iter.loop 06" + "iter.loop 07" + "iter.loop 08" + "iter.loop 09" + "iter.loop 10" +done diff --git a/test/lwt/fibers/t_fls.ml b/test/lwt/fibers/t_fls.ml new file mode 100644 index 00000000..31b92234 --- /dev/null +++ b/test/lwt/fibers/t_fls.ml @@ -0,0 +1,8 @@ +module M_lwt = Moonpool_lwt + +let ( let@ ) = ( @@ ) + +let () = + (let@ runner = M_lwt.lwt_main in + T_fibers.Fls.run ~pool:runner ~pool_name:"lwt" ()); + () diff --git a/test/lwt/fibers/t_main.ml b/test/lwt/fibers/t_main.ml new file mode 100644 index 00000000..fd1e2127 --- /dev/null +++ b/test/lwt/fibers/t_main.ml @@ -0,0 +1,35 @@ +open Moonpool +module M_lwt = Moonpool_lwt +module F = Moonpool_fib + +let ( let@ ) = ( @@ ) + +let () = + (* run fibers in the background, await them in the main thread *) + let@ bg = Fifo_pool.with_ ~num_threads:4 () in + let r = + M_lwt.lwt_main @@ fun runner -> + let f1 = F.spawn_top ~on:bg (fun () -> 1) in + let f2 = F.spawn_top ~on:runner (fun () -> 2) in + let f3 = F.spawn_top ~on:runner (fun () -> F.await f1 + 10) in + let r = F.await f2 + F.await f3 in + assert (r = 13); + r + in + assert (r = 13) + +let () = + (* run multiple times to make sure cleanup is correct *) + for _i = 1 to 10 do + try + let _r = + M_lwt.lwt_main @@ fun runner -> + let fib = F.spawn_top ~on:runner (fun () -> failwith "oops") in + F.await fib + in + + assert false + with Failure msg -> + (* Printf.eprintf "got %S\n%!" msg; *) + assert (msg = "oops") + done diff --git a/test/lwt/hash_client.ml b/test/lwt/hash_client.ml index 085666fb..1e807184 100644 --- a/test/lwt/hash_client.ml +++ b/test/lwt/hash_client.ml @@ -1,4 +1,3 @@ -module M = Moonpool module M_lwt = Moonpool_lwt module Trace = Trace_core @@ -8,10 +7,10 @@ module Str_tbl = Hashtbl.Make (struct let hash = Hashtbl.hash end) +let await_lwt = Moonpool_lwt.await_lwt let ( let@ ) = ( @@ ) -let lock_stdout = M.Lock.create () -let main ~port ~runner ~ext ~dir ~n_conn () : unit Lwt.t = +let main ~port ~ext ~dir ~n_conn () : unit = let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "main" in Printf.printf "hash dir=%S\n%!" dir; @@ -20,12 +19,15 @@ let main ~port ~runner ~ext ~dir ~n_conn () : unit Lwt.t = let addr = Unix.ADDR_INET (Unix.inet_addr_loopback, port) in (* TODO: *) - let run_task () : unit = - let _sp = Trace.enter_manual_toplevel_span ~__FILE__ ~__LINE__ "run-task" in + let run_task () : unit Lwt.t = + let@ () = M_lwt.spawn_lwt in + let _sp = + Trace.enter_manual_span ~parent:None ~__FILE__ ~__LINE__ "run-task" + in let seen = Str_tbl.create 16 in - M_lwt.TCP_client.with_connect_lwt addr @@ fun ic oc -> + let ic, oc = Lwt_io.open_connection addr |> await_lwt in let rec walk file : unit = if not (Sys.file_exists file) then () @@ -33,7 +35,9 @@ let main ~port ~runner ~ext ~dir ~n_conn () : unit Lwt.t = () else if Sys.is_directory file then ( let _sp = - Trace.enter_manual_sub_span ~parent:_sp ~__FILE__ ~__LINE__ "walk-dir" + Trace.enter_manual_span + ~parent:(Some (Trace.ctx_of_span _sp)) + ~__FILE__ ~__LINE__ "walk-dir" ~data:(fun () -> [ "d", `String file ]) in @@ -45,9 +49,8 @@ let main ~port ~runner ~ext ~dir ~n_conn () : unit Lwt.t = () else ( Str_tbl.add seen file (); - M_lwt.run_in_lwt_and_await (fun () -> Lwt_io.write_line oc file); - let res = M_lwt.run_in_lwt_and_await (fun () -> Lwt_io.read_line ic) in - let@ () = M.Lock.with_ lock_stdout in + Lwt_io.write_line oc file |> await_lwt; + let res = Lwt_io.read_line ic |> await_lwt in Printf.printf "%s\n%!" res ) in @@ -56,16 +59,14 @@ let main ~port ~runner ~ext ~dir ~n_conn () : unit Lwt.t = in (* start the first [n_conn] tasks *) - let futs = List.init n_conn (fun _ -> M.Fut.spawn ~on:runner run_task) in - - Lwt.join (List.map M_lwt.lwt_of_fut futs) + let futs = List.init n_conn (fun _ -> run_task ()) in + Lwt.join futs |> await_lwt let () = let@ () = Trace_tef.with_setup () in Trace.set_thread_name "main"; let port = ref 1234 in - let j = ref 4 in let n_conn = ref 100 in let ext = ref "" in let dir = ref "." in @@ -73,7 +74,6 @@ let () = let opts = [ "-p", Arg.Set_int port, " port"; - "-j", Arg.Set_int j, " number of threads"; "-d", Arg.Set_string dir, " directory to hash"; "--n-conn", Arg.Set_int n_conn, " number of parallel connections"; "--ext", Arg.Set_string ext, " extension to filter files"; @@ -82,7 +82,6 @@ let () = in Arg.parse opts ignore "echo client"; - let@ runner = M.Ws_pool.with_ ~name:"tpool" ~num_threads:!j () in (* Lwt_engine.set @@ new Lwt_engine.libev (); *) - Lwt_main.run - @@ main ~runner ~port:!port ~ext:!ext ~dir:!dir ~n_conn:!n_conn () + M_lwt.lwt_main @@ fun _runner -> + main ~port:!port ~ext:!ext ~dir:!dir ~n_conn:!n_conn () diff --git a/test/lwt/hash_server.ml b/test/lwt/hash_server.ml index a84f6ccb..e7028176 100644 --- a/test/lwt/hash_server.ml +++ b/test/lwt/hash_server.ml @@ -1,4 +1,7 @@ -(* vendored from https://github.com/dbuenzli/uuidm *) +(* vendored from https://github.com/dbuenzli/uuidm + + This function is Copyright (c) 2008 The uuidm programmers. + SPDX-License-Identifier: ISC *) let sha_1 s = (* Based on pseudo-code of RFC 3174. Slow and ugly but does the job. *) @@ -116,28 +119,16 @@ let sha_1 s = i2s h 16 !h4; Bytes.unsafe_to_string h -(*--------------------------------------------------------------------------- - Copyright (c) 2008 The uuidm programmers +(* ================== *) - Permission to use, copy, modify, and/or distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. +(* test server that reads a list of files from each client connection, and sends back + to the client the hashes of these files *) - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - ---------------------------------------------------------------------------*) - -(* server that reads from sockets lists of files, and returns hashes of these files *) - -module M = Moonpool module M_lwt = Moonpool_lwt module Trace = Trace_core +module Fut = Moonpool.Fut +let await_lwt = Moonpool_lwt.await_lwt let ( let@ ) = ( @@ ) let spf = Printf.sprintf @@ -165,7 +156,7 @@ let read_file filename : string = in In_channel.with_open_bin filename In_channel.input_all -let main ~port ~runner () : unit Lwt.t = +let main ~port ~runner () : unit = let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "main" in let lwt_fut, _lwt_prom = Lwt.wait () in @@ -173,38 +164,39 @@ let main ~port ~runner () : unit Lwt.t = (* TODO: handle exit?? *) Printf.printf "listening on port %d\n%!" port; - let handle_client client_addr ic oc = + let handle_client client_addr (ic, oc) = + let@ () = Moonpool_lwt.spawn_lwt in let _sp = - Trace.enter_manual_toplevel_span ~__FILE__ ~__LINE__ "handle.client" + Trace.enter_manual_span ~parent:None ~__FILE__ ~__LINE__ "handle.client" ~data:(fun () -> [ "addr", `String (str_of_sockaddr client_addr) ]) in try while true do Trace.message "read"; - let filename = - M_lwt.run_in_lwt_and_await (fun () -> Lwt_io.read_line ic) - |> String.trim - in + let filename = Lwt_io.read_line ic |> await_lwt |> String.trim in Trace.messagef (fun k -> k "hash %S" filename); match read_file filename with | exception e -> Printf.eprintf "error while reading %S:\n%s\n" filename (Printexc.to_string e); - M_lwt.run_in_lwt_and_await (fun () -> - Lwt_io.write_line oc (spf "%s: error" filename)); - M_lwt.run_in_lwt_and_await (fun () -> Lwt_io.flush oc) + Lwt_io.write_line oc (spf "%s: error" filename) |> await_lwt; + Lwt_io.flush oc |> await_lwt | content -> - (* got the content, now hash it *) - let hash = - let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "hash" in + (* got the content, now hash it in a background task *) + let hash : _ Fut.t = + let@ () = Moonpool.spawn ~on:runner in + let@ _sp = + Trace.with_span ~__FILE__ ~__LINE__ "hash" ~data:(fun () -> + [ "file", `String filename ]) + in sha_1 content |> to_hex in - M_lwt.run_in_lwt_and_await (fun () -> - Lwt_io.write_line oc (spf "%s: %s" filename hash)); - M_lwt.run_in_lwt_and_await (fun () -> Lwt_io.flush oc) + let hash = Fut.await hash in + Lwt_io.write_line oc (spf "%s: %s" filename hash) |> await_lwt; + Lwt_io.flush oc |> await_lwt done with End_of_file | Unix.Unix_error (Unix.ECONNRESET, _, _) -> Trace.exit_manual_span _sp; @@ -212,16 +204,17 @@ let main ~port ~runner () : unit Lwt.t = in let addr = Unix.ADDR_INET (Unix.inet_addr_loopback, port) in - let _server = M_lwt.TCP_server.establish_lwt ~runner addr handle_client in - Printf.printf "listening on port=%d\n%!" port; + let _server = + Lwt_io.establish_server_with_client_address addr handle_client |> await_lwt + in - lwt_fut + lwt_fut |> await_lwt let () = let@ () = Trace_tef.with_setup () in Trace.set_thread_name "main"; let port = ref 1234 in - let j = ref 4 in + let j = ref 0 in let opts = [ @@ -231,6 +224,14 @@ let () = in Arg.parse opts ignore "echo server"; - let@ runner = M.Ws_pool.with_ ~name:"tpool" ~num_threads:!j () in (* Lwt_engine.set @@ new Lwt_engine.libev (); *) - Lwt_main.run @@ main ~runner ~port:!port () + let@ runner = + let num_threads = + if !j = 0 then + None + else + Some !j + in + Moonpool.Ws_pool.with_ ?num_threads () + in + M_lwt.lwt_main @@ fun _main_runner -> main ~runner ~port:!port () diff --git a/test/lwt/output_echo.expected b/test/lwt/output_echo.expected index 4311fe1a..a1b20abf 100644 --- a/test/lwt/output_echo.expected +++ b/test/lwt/output_echo.expected @@ -1,8 +1,22 @@ run echo server on port=12346 listening on port 12346 -run echo client -p 12346 -n 10 --n-conn=2 -j=4 +run echo client -p 12346 -n 10 --n-conn=2 -v all done -connecting to port 12346 +all done +connecting to port 12346 (10 msg per conn, 10 conns total, 2 max at a time) +connecting to port 12346 (10 msg per conn, 10 conns total, 2 max at a time) +done +first run +read: hello 1 +read: hello 1 +read: hello 1 +read: hello 1 +read: hello 1 +read: hello 1 +read: hello 1 +read: hello 1 +read: hello 1 +read: hello 1 read: hello 1 read: hello 1 read: hello 1 @@ -23,6 +37,26 @@ read: hello 10 read: hello 10 read: hello 10 read: hello 10 +read: hello 10 +read: hello 10 +read: hello 10 +read: hello 10 +read: hello 10 +read: hello 10 +read: hello 10 +read: hello 10 +read: hello 10 +read: hello 10 +read: hello 2 +read: hello 2 +read: hello 2 +read: hello 2 +read: hello 2 +read: hello 2 +read: hello 2 +read: hello 2 +read: hello 2 +read: hello 2 read: hello 2 read: hello 2 read: hello 2 @@ -43,6 +77,26 @@ read: hello 3 read: hello 3 read: hello 3 read: hello 3 +read: hello 3 +read: hello 3 +read: hello 3 +read: hello 3 +read: hello 3 +read: hello 3 +read: hello 3 +read: hello 3 +read: hello 3 +read: hello 3 +read: hello 4 +read: hello 4 +read: hello 4 +read: hello 4 +read: hello 4 +read: hello 4 +read: hello 4 +read: hello 4 +read: hello 4 +read: hello 4 read: hello 4 read: hello 4 read: hello 4 @@ -63,6 +117,26 @@ read: hello 5 read: hello 5 read: hello 5 read: hello 5 +read: hello 5 +read: hello 5 +read: hello 5 +read: hello 5 +read: hello 5 +read: hello 5 +read: hello 5 +read: hello 5 +read: hello 5 +read: hello 5 +read: hello 6 +read: hello 6 +read: hello 6 +read: hello 6 +read: hello 6 +read: hello 6 +read: hello 6 +read: hello 6 +read: hello 6 +read: hello 6 read: hello 6 read: hello 6 read: hello 6 @@ -83,6 +157,26 @@ read: hello 7 read: hello 7 read: hello 7 read: hello 7 +read: hello 7 +read: hello 7 +read: hello 7 +read: hello 7 +read: hello 7 +read: hello 7 +read: hello 7 +read: hello 7 +read: hello 7 +read: hello 7 +read: hello 8 +read: hello 8 +read: hello 8 +read: hello 8 +read: hello 8 +read: hello 8 +read: hello 8 +read: hello 8 +read: hello 8 +read: hello 8 read: hello 8 read: hello 8 read: hello 8 @@ -103,3 +197,14 @@ read: hello 9 read: hello 9 read: hello 9 read: hello 9 +read: hello 9 +read: hello 9 +read: hello 9 +read: hello 9 +read: hello 9 +read: hello 9 +read: hello 9 +read: hello 9 +read: hello 9 +read: hello 9 +second run diff --git a/test/lwt/output_hash.expected b/test/lwt/output_hash.expected index 300672df..726e59e3 100644 --- a/test/lwt/output_hash.expected +++ b/test/lwt/output_hash.expected @@ -1,7 +1,6 @@ running hash server on port=12345 listening on port 12345 -listening on port=12345 -run hash client -p 12345 -d ../data/ --n-conn=2 -j=4 +run hash client -p 12345 -d ../data/ --n-conn=2 ../data/d1/large: fdb479c5661572f9606266eeb280b4db5c26cc38 ../data/d1/large: fdb479c5661572f9606266eeb280b4db5c26cc38 ../data/d1/large_10: c31560efa1a5ad6dbf89990d51878f3bd64b13ce