diff --git a/src/lwt/IO.ml b/src/lwt/IO.ml deleted file mode 100644 index 6ae09506..00000000 --- a/src/lwt/IO.ml +++ /dev/null @@ -1,66 +0,0 @@ -open Base - -let await_readable fd : unit = - let trigger = Trigger.create () in - Perform_action_in_lwt.schedule - @@ Action.Wait_readable - ( fd, - fun cancel -> - Trigger.signal trigger; - Lwt_engine.stop_event cancel ); - Trigger.await_exn trigger - -let rec read fd buf i len : int = - if len = 0 then - 0 - else ( - match Unix.read fd buf i len with - | exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> - await_readable fd; - read fd buf i len - | n -> n - ) - -let await_writable fd = - let trigger = Trigger.create () in - Perform_action_in_lwt.schedule - @@ Action.Wait_writable - ( fd, - fun cancel -> - Trigger.signal trigger; - Lwt_engine.stop_event cancel ); - Trigger.await_exn trigger - -let rec write_once fd buf i len : int = - if len = 0 then - 0 - else ( - match Unix.write fd buf i len with - | exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) -> - await_writable fd; - write_once fd buf i len - | n -> n - ) - -let write fd buf i len : unit = - let i = ref i in - let len = ref len in - while !len > 0 do - let n = write_once fd buf !i !len in - i := !i + n; - len := !len - n - done - -(** Sleep for the given amount of seconds *) -let sleep_s (f : float) : unit = - if f > 0. then ( - let trigger = Trigger.create () in - Perform_action_in_lwt.schedule - @@ Action.Sleep - ( f, - false, - fun cancel -> - Trigger.signal trigger; - Lwt_engine.stop_event cancel ); - Trigger.await_exn trigger - ) diff --git a/src/lwt/IO_in.ml b/src/lwt/IO_in.ml deleted file mode 100644 index b5ad110b..00000000 --- a/src/lwt/IO_in.ml +++ /dev/null @@ -1,152 +0,0 @@ -open Common_ - -class type t = object - method input : bytes -> int -> int -> int - (** Read into the slice. Returns [0] only if the stream is closed. *) - - method close : unit -> unit - (** Close the input. Must be idempotent. *) -end - -let create ?(close = ignore) ~input () : t = - object - method close = close - method input = input - end - -let empty : t = - object - method close () = () - method input _ _ _ = 0 - end - -let of_bytes ?(off = 0) ?len (b : bytes) : t = - (* i: current position in [b] *) - let i = ref off in - - let len = - match len with - | Some n -> - if n > Bytes.length b - off then invalid_arg "Iostream.In.of_bytes"; - n - | None -> Bytes.length b - off - in - let end_ = off + len in - - object - method input b_out i_out len_out = - let n = min (end_ - !i) len_out in - Bytes.blit b !i b_out i_out n; - i := !i + n; - n - - method close () = i := end_ - end - -let of_string ?off ?len s : t = of_bytes ?off ?len (Bytes.unsafe_of_string s) - -(** Read into the given slice. - @return the number of bytes read, [0] means end of input. *) -let[@inline] input (self : #t) buf i len = self#input buf i len - -(** Close the channel. *) -let[@inline] close self : unit = self#close () - -let rec really_input (self : #t) buf i len = - if len > 0 then ( - let n = input self buf i len in - if n = 0 then raise End_of_file; - (really_input [@tailrec]) self buf (i + n) (len - n) - ) - -let really_input_string self n : string = - let buf = Bytes.create n in - really_input self buf 0 n; - Bytes.unsafe_to_string buf - -let copy_into ?(buf = Bytes.create _default_buf_size) (ic : #t) (oc : IO_out.t) - : unit = - let continue = ref true in - while !continue do - let len = input ic buf 0 (Bytes.length buf) in - if len = 0 then - continue := false - else - IO_out.output oc buf 0 len - done - -let concat (l0 : t list) : t = - let l = ref l0 in - let rec input b i len : int = - match !l with - | [] -> 0 - | ic :: tl -> - let n = ic#input b i len in - if n > 0 then - n - else ( - l := tl; - input b i len - ) - in - let close () = List.iter close l0 in - create ~close ~input () - -let input_all ?(buf = Bytes.create 128) (self : #t) : string = - let buf = ref buf in - let i = ref 0 in - - let[@inline] full_ () = !i = Bytes.length !buf in - - let grow_ () = - let old_size = Bytes.length !buf in - let new_size = min Sys.max_string_length (old_size + (old_size / 4) + 10) in - if old_size = new_size then - failwith "input_all: maximum input size exceeded"; - let new_buf = Bytes.extend !buf 0 (new_size - old_size) in - buf := new_buf - in - - let rec loop () = - if full_ () then grow_ (); - let available = Bytes.length !buf - !i in - let n = input self !buf !i available in - if n > 0 then ( - i := !i + n; - (loop [@tailrec]) () - ) - in - loop (); - - if full_ () then - Bytes.unsafe_to_string !buf - else - Bytes.sub_string !buf 0 !i - -let of_unix_fd ?(close_noerr = false) ?(buf = Bytes.create _default_buf_size) - (fd : Unix.file_descr) : t = - let buf_len = ref 0 in - let buf_off = ref 0 in - - let refill () = - buf_off := 0; - buf_len := IO.read fd buf 0 (Bytes.length buf) - in - - object - method input b i len : int = - if !buf_len = 0 then refill (); - let n = min len !buf_len in - if n > 0 then ( - Bytes.blit buf !buf_off b i n; - buf_off := !buf_off + n; - buf_len := !buf_len - n - ); - n - - method close () = - if close_noerr then ( - try Unix.close fd with _ -> () - ) else - Unix.close fd - end diff --git a/src/lwt/IO_out.ml b/src/lwt/IO_out.ml deleted file mode 100644 index 522d3e0a..00000000 --- a/src/lwt/IO_out.ml +++ /dev/null @@ -1,118 +0,0 @@ -open Common_ - -class type t = object - method output_char : char -> unit - method output : bytes -> int -> int -> unit - method flush : unit -> unit - method close : unit -> unit -end - -let create ?(flush = ignore) ?(close = ignore) ~output_char ~output () : t = - object - method flush () = flush () - method close () = close () - method output_char c = output_char c - method output bs i len = output bs i len - end - -let dummy : t = - object - method flush () = () - method close () = () - method output_char _ = () - method output _ _ _ = () - end - -let of_unix_fd ?(close_noerr = false) ?(buf = Bytes.create _default_buf_size) fd - : t = - let buf_off = ref 0 in - - let[@inline] is_full () = !buf_off = Bytes.length buf in - - let flush () = - if !buf_off > 0 then ( - IO.write fd buf 0 !buf_off; - buf_off := 0 - ) - in - - object - method output_char c = - if is_full () then flush (); - Bytes.set buf !buf_off c; - incr buf_off - - method output bs i len : unit = - let i = ref i in - let len = ref len in - - while !len > 0 do - (* make space *) - if is_full () then flush (); - - let n = min !len (Bytes.length buf - !buf_off) in - Bytes.blit bs !i buf !buf_off n; - buf_off := !buf_off + n; - i := !i + n; - len := !len - n - done; - (* if full, write eagerly *) - if is_full () then flush () - - method close () = - if close_noerr then ( - try - flush (); - Unix.close fd - with _ -> () - ) else ( - flush (); - Unix.close fd - ) - - method flush = flush - end - -let of_buffer (buf : Buffer.t) : t = - object - method close () = () - method flush () = () - method output_char c = Buffer.add_char buf c - method output bs i len = Buffer.add_subbytes buf bs i len - end - -(** Output the buffer slice into this channel *) -let[@inline] output_char (self : #t) c : unit = self#output_char c - -(** Output the buffer slice into this channel *) -let[@inline] output (self : #t) buf i len : unit = self#output buf i len - -let[@inline] output_string (self : #t) (str : string) : unit = - self#output (Bytes.unsafe_of_string str) 0 (String.length str) - -let output_line (self : #t) (str : string) : unit = - output_string self str; - output_char self '\n' - -(** Close the channel. *) -let[@inline] close self : unit = self#close () - -(** Flush (ie. force write) any buffered bytes. *) -let[@inline] flush self : unit = self#flush () - -let output_int self i = - let s = string_of_int i in - output_string self s - -let output_lines self seq = Seq.iter (output_line self) seq - -let tee (l : t list) : t = - match l with - | [] -> dummy - | [ oc ] -> oc - | _ -> - let output bs i len = List.iter (fun oc -> output oc bs i len) l in - let output_char c = List.iter (fun oc -> output_char oc c) l in - let close () = List.iter close l in - let flush () = List.iter flush l in - create ~flush ~close ~output ~output_char () diff --git a/src/lwt/base.ml b/src/lwt/base.ml deleted file mode 100644 index 16124b8f..00000000 --- a/src/lwt/base.ml +++ /dev/null @@ -1,167 +0,0 @@ -open Common_ -module Trigger = M.Trigger -module Fiber = Moonpool_fib.Fiber -module FLS = Moonpool_fib.Fls - -(** Action scheduled from outside the loop *) -module Action = struct - type event = Lwt_engine.event - type cb = event -> unit - - (** Action that we ask the lwt loop to perform, from the outside *) - type t = - | Wait_readable of Unix.file_descr * cb - | Wait_writable of Unix.file_descr * cb - | Sleep of float * bool * cb - (* TODO: provide actions with cancellation, alongside a "select" operation *) - (* | Cancel of event *) - | On_termination : 'a Lwt.t * 'a Exn_bt.result ref * Trigger.t -> t - | Wakeup : 'a Lwt.u * 'a -> t - | Wakeup_exn : _ Lwt.u * exn -> t - | Other of (unit -> unit) - - (** Perform the action from within the Lwt thread *) - let perform (self : t) : unit = - match self with - | Wait_readable (fd, cb) -> ignore (Lwt_engine.on_readable fd cb : event) - | Wait_writable (fd, cb) -> ignore (Lwt_engine.on_writable fd cb : event) - | Sleep (f, repeat, cb) -> ignore (Lwt_engine.on_timer f repeat cb : event) - (* | Cancel ev -> Lwt_engine.stop_event ev *) - | On_termination (fut, res, trigger) -> - Lwt.on_any fut - (fun x -> - res := Ok x; - Trigger.signal trigger) - (fun exn -> - res := Error (Exn_bt.get_callstack 10 exn); - Trigger.signal trigger) - | Wakeup (prom, x) -> Lwt.wakeup prom x - | Wakeup_exn (prom, e) -> Lwt.wakeup_exn prom e - | Other f -> f () -end - -module Action_queue = struct - type t = { q: Action.t list Atomic.t } [@@unboxed] - - let create () : t = { q = Atomic.make [] } - let pop_all (self : t) : _ list = Atomic.exchange self.q [] - - (** Push the action and return whether the queue was previously empty *) - let push (self : t) (a : Action.t) : bool = - let is_first = ref true in - while - let old = Atomic.get self.q in - if Atomic.compare_and_set self.q old (a :: old) then ( - is_first := old == []; - false - ) else - true - do - () - done; - !is_first -end - -module Perform_action_in_lwt = struct - open struct - let actions_ : Action_queue.t = Action_queue.create () - - (** Gets the current set of notifications and perform them from inside the - Lwt thread *) - let perform_pending_actions () : unit = - let@ _sp = - Moonpool.Private.Tracing_.with_span - "moonpool-lwt.perform-pending-actions" - in - - let l = Action_queue.pop_all actions_ in - List.iter Action.perform l - - let notification : int = - Lwt_unix.make_notification ~once:false perform_pending_actions - end - - let schedule (a : Action.t) : unit = - let is_first = Action_queue.push actions_ a in - if is_first then Lwt_unix.send_notification notification -end - -let get_runner () : M.Runner.t = - match M.Runner.get_current_runner () with - | Some r -> r - | None -> failwith "Moonpool_lwt.get_runner: not inside a runner" - -let lwt_of_fut (fut : 'a M.Fut.t) : 'a Lwt.t = - let lwt_fut, lwt_prom = Lwt.wait () in - M.Fut.on_result fut (function - | Ok x -> Perform_action_in_lwt.schedule @@ Action.Wakeup (lwt_prom, x) - | Error ebt -> - let exn = Exn_bt.exn ebt in - Perform_action_in_lwt.schedule @@ Action.Wakeup_exn (lwt_prom, exn)); - lwt_fut - -let fut_of_lwt (lwt_fut : _ Lwt.t) : _ M.Fut.t = - match Lwt.poll lwt_fut with - | Some x -> M.Fut.return x - | None -> - let fut, prom = M.Fut.make () in - Lwt.on_any lwt_fut - (fun x -> M.Fut.fulfill prom (Ok x)) - (fun exn -> - let bt = Printexc.get_callstack 10 in - M.Fut.fulfill prom (Error (Exn_bt.make exn bt))); - fut - -let _dummy_exn_bt : Exn_bt.t = - Exn_bt.get_callstack 0 (Failure "dummy Exn_bt from moonpool-lwt") - -let await_lwt (fut : _ Lwt.t) = - match Lwt.poll fut with - | Some x -> x - | None -> - (* suspend fiber, wake it up when [fut] resolves *) - let trigger = M.Trigger.create () in - let res = ref (Error _dummy_exn_bt) in - Perform_action_in_lwt.(schedule Action.(On_termination (fut, res, trigger))); - Trigger.await trigger |> Option.iter Exn_bt.raise; - Exn_bt.unwrap !res - -let run_in_lwt f : _ M.Fut.t = - let fut, prom = M.Fut.make () in - Perform_action_in_lwt.schedule - (Action.Other - (fun () -> - let lwt_fut = f () in - Lwt.on_any lwt_fut - (fun x -> M.Fut.fulfill prom @@ Ok x) - (fun exn -> M.Fut.fulfill prom @@ Error (Exn_bt.get exn)))); - fut - -let run_in_lwt_and_await f = M.Fut.await @@ run_in_lwt f - -let detach_in_runner ~runner f : _ Lwt.t = - let fut, promise = Lwt.wait () in - M.Runner.run_async runner (fun () -> - match f () with - | x -> Perform_action_in_lwt.schedule @@ Action.Wakeup (promise, x) - | exception exn -> - Perform_action_in_lwt.schedule @@ Action.Wakeup_exn (promise, exn)); - fut - -let main_with_runner ~runner (f : unit -> 'a) : 'a = - let lwt_fut, lwt_prom = Lwt.wait () in - - let _fiber = - Fiber.spawn_top ~on:runner (fun () -> - try - let x = f () in - Perform_action_in_lwt.schedule (Action.Wakeup (lwt_prom, x)) - with exn -> - Perform_action_in_lwt.schedule (Action.Wakeup_exn (lwt_prom, exn))) - in - - Lwt_main.run lwt_fut - -let main f = - let@ runner = M.Ws_pool.with_ () in - main_with_runner ~runner f diff --git a/src/lwt/common_.ml b/src/lwt/common_.ml index d1fdf6d2..aad8ac4c 100644 --- a/src/lwt/common_.ml +++ b/src/lwt/common_.ml @@ -1,5 +1,3 @@ -module M = Moonpool -module Exn_bt = M.Exn_bt +module Exn_bt = Moonpool.Exn_bt let ( let@ ) = ( @@ ) -let _default_buf_size = 4 * 1024 diff --git a/src/lwt/moonpool_lwt.ml b/src/lwt/moonpool_lwt.ml index 1d92ddab..d669ad9c 100644 --- a/src/lwt/moonpool_lwt.ml +++ b/src/lwt/moonpool_lwt.ml @@ -1,6 +1,135 @@ -include Base -module IO = IO -module IO_out = IO_out -module IO_in = IO_in -module TCP_server = Tcp_server -module TCP_client = Tcp_client +open Common_ + +open struct + module WL = Moonpool.Private.Worker_loop_ + module M = Moonpool +end + +module Fut = Moonpool.Fut + +let default_around_task_ : WL.around_task = AT_pair (ignore, fun _ _ -> ()) + +module Scheduler_state = struct + module BQ = Moonpool.Blocking_queue + + type st = { + tasks: WL.task_full BQ.t; + on_exn: Exn_bt.t -> unit; + mutable as_runner: Moonpool.Runner.t; + mutable enter_hook: Lwt_main.Enter_iter_hooks.hook option; + mutable leave_hook: Lwt_main.Leave_iter_hooks.hook option; + } + + let create ~on_exn () : st = + { + tasks = BQ.create (); + on_exn; + as_runner = Moonpool.Runner.dummy; + enter_hook = None; + leave_hook = None; + } + + let around_task _ = default_around_task_ + let schedule (self : st) t = BQ.push self.tasks t + + let get_next_task (self : st) = + try BQ.pop self.tasks with BQ.Closed -> raise WL.No_more_tasks + + let on_exn (self : st) ebt = self.on_exn ebt + let runner self = self.as_runner + + let as_runner (self : st) : Moonpool.Runner.t = + Moonpool.Runner.For_runner_implementors.create + ~size:(fun () -> 1) + ~num_tasks:(fun () -> BQ.size self.tasks) + ~run_async:(fun ~fiber f -> schedule self @@ WL.T_start { fiber; f }) + ~shutdown:(fun ~wait:_ () -> BQ.close self.tasks) + () + + let before_start self : unit = + self.as_runner <- as_runner self; + () + + let cleanup self = + Option.iter Lwt_main.Enter_iter_hooks.remove self.enter_hook; + Option.iter Lwt_main.Leave_iter_hooks.remove self.leave_hook; + () + + let ops : st WL.ops = + { + schedule; + around_task; + get_next_task; + on_exn; + runner; + before_start; + cleanup; + } +end + +let _dummy_exn_bt : Exn_bt.t = + Exn_bt.get_callstack 0 (Failure "dummy Exn_bt from moonpool-lwt") + +let await_lwt (fut : _ Lwt.t) = + match Lwt.poll fut with + | Some x -> x + | None -> + (* suspend fiber, wake it up when [fut] resolves *) + let trigger = M.Trigger.create () in + let res = ref (Error _dummy_exn_bt) in + Lwt.on_termination fut (fun _ -> M.Trigger.signal trigger); + M.Trigger.await trigger |> Option.iter Exn_bt.raise; + Exn_bt.unwrap !res + +let lwt_of_fut (fut : 'a M.Fut.t) : 'a Lwt.t = + let lwt_fut, lwt_prom = Lwt.wait () in + M.Fut.on_result fut (function + | Ok x -> Lwt.wakeup lwt_prom x + | Error ebt -> + let exn = Exn_bt.exn ebt in + Lwt.wakeup_exn lwt_prom exn); + lwt_fut + +let fut_of_lwt (lwt_fut : _ Lwt.t) : _ M.Fut.t = + match Lwt.poll lwt_fut with + | Some x -> M.Fut.return x + | None -> + let fut, prom = M.Fut.make () in + Lwt.on_any lwt_fut + (fun x -> M.Fut.fulfill prom (Ok x)) + (fun exn -> + let bt = Printexc.get_callstack 10 in + M.Fut.fulfill prom (Error (Exn_bt.make exn bt))); + fut + +let on_uncaught_exn : (Moonpool.Exn_bt.t -> unit) ref = + ref (fun ebt -> + Printf.eprintf "uncaught exception in moonpool-lwt:\n%s" (Exn_bt.show ebt)) + +let runner_ : Moonpool.Runner.t option ref = ref None + +let setup () = + match !runner_ with + | Some r -> r + | None -> + let on_exn ebt = !on_uncaught_exn ebt in + let module Arg = struct + type nonrec st = Scheduler_state.st + + let ops = Scheduler_state.ops + let st = Scheduler_state.create ~on_exn () + end in + let module FG = WL.Fine_grained (Arg) () in + runner_ := Some Arg.st.as_runner; + + FG.setup ~block_signals:false (); + let run_in_hook () = FG.run ~max_tasks:1000 () in + Arg.st.enter_hook <- Some (Lwt_main.Enter_iter_hooks.add_last run_in_hook); + Arg.st.leave_hook <- Some (Lwt_main.Leave_iter_hooks.add_last run_in_hook); + + Arg.st.as_runner + +let lwt_main (f : _ -> 'a) : 'a = + let runner = setup () in + let fut = M.spawn ~on:runner (fun () -> f runner) in + Lwt_main.run (lwt_of_fut fut) diff --git a/src/lwt/moonpool_lwt.mli b/src/lwt/moonpool_lwt.mli index d933a73f..29fe2add 100644 --- a/src/lwt/moonpool_lwt.mli +++ b/src/lwt/moonpool_lwt.mli @@ -7,8 +7,7 @@ @since 0.6 *) -module Fiber = Moonpool_fib.Fiber -module FLS = Moonpool_fib.Fls +module Fut = Moonpool.Fut (** {2 Basic conversions} *) @@ -28,117 +27,12 @@ val await_lwt : 'a Lwt.t -> 'a runner. This must be run from within a Moonpool runner so that the await-ing effect is handled. *) -val run_in_lwt : (unit -> 'a Lwt.t) -> 'a Moonpool.Fut.t -(** [run_in_lwt f] runs [f()] from within the Lwt thread and returns a - thread-safe future. This can be run from anywhere. *) - -val run_in_lwt_and_await : (unit -> 'a Lwt.t) -> 'a -(** [run_in_lwt_and_await f] runs [f] in the Lwt thread, and awaits its result. - Must be run from inside a moonpool runner so that the await-in effect is - handled. - - This is similar to [Moonpool.await @@ run_in_lwt f]. *) - -val get_runner : unit -> Moonpool.Runner.t -(** Returns the runner from within which this is called. Must be run from within - a fiber. - @raise Failure if not run within a fiber *) - -(** {2 IO} *) - -(** IO using the Lwt event loop. - - These IO operations work on non-blocking file descriptors and rely on a - [Lwt_engine] event loop being active (meaning, [Lwt_main.run] is currently - running in some thread). - - Calling these functions must be done from a moonpool runner. A function like - [read] will first try to perform the IO action directly (here, call - {!Unix.read}); if the action fails because the FD is not ready, then - [await_readable] is called: it suspends the fiber and subscribes it to Lwt - to be awakened when the FD becomes ready. *) -module IO : sig - val read : Unix.file_descr -> bytes -> int -> int -> int - (** Read from the file descriptor *) - - val await_readable : Unix.file_descr -> unit - (** Suspend the fiber until the FD is readable *) - - val write_once : Unix.file_descr -> bytes -> int -> int -> int - (** Perform one write into the file descriptor *) - - val await_writable : Unix.file_descr -> unit - (** Suspend the fiber until the FD is writable *) - - val write : Unix.file_descr -> bytes -> int -> int -> unit - (** Loop around {!write_once} to write the entire slice. *) - - val sleep_s : float -> unit - (** Suspend the fiber for [n] seconds. *) -end - -module IO_in = IO_in -(** Input channel *) - -module IO_out = IO_out -(** Output channel *) - -module TCP_server : sig - type t = Lwt_io.server - - val establish_lwt : - ?backlog: - (* ?server_fd:Unix.file_descr -> *) - int -> - ?no_close:bool -> - runner:Moonpool.Runner.t -> - Unix.sockaddr -> - (Unix.sockaddr -> Lwt_io.input_channel -> Lwt_io.output_channel -> unit) -> - t - (** [establish ~runner addr handler] runs a TCP server in the Lwt thread. When - a client connects, a moonpool fiber is started on [runner] to handle it. - *) - - val establish : - ?backlog: - (* ?server_fd:Unix.file_descr -> *) - int -> - ?no_close:bool -> - runner:Moonpool.Runner.t -> - Unix.sockaddr -> - (Unix.sockaddr -> IO_in.t -> IO_out.t -> unit) -> - t - (** Like {!establish_lwt} but uses {!IO} to directly handle reads and writes - on client sockets. *) - - val shutdown : t -> unit - (** Shutdown the server *) -end - -module TCP_client : sig - val connect : Unix.sockaddr -> Unix.file_descr - - val with_connect : Unix.sockaddr -> (IO_in.t -> IO_out.t -> 'a) -> 'a - (** Open a connection, and use {!IO} to read and write from the socket in a - non blocking way. *) - - val with_connect_lwt : - Unix.sockaddr -> (Lwt_io.input_channel -> Lwt_io.output_channel -> 'a) -> 'a - (** Open a connection. *) -end - -(** {2 Helpers on the lwt side} *) - -val detach_in_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a Lwt.t -(** [detach_in_runner ~runner f] runs [f] in the given moonpool runner, and - returns a lwt future. This must be run from within the thread running - [Lwt_main]. *) - (** {2 Wrappers around Lwt_main} *) -val main_with_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a -(** [main_with_runner ~runner f] starts a Lwt-based event loop and runs [f()] - inside a fiber in [runner]. *) +val on_uncaught_exn : (Moonpool.Exn_bt.t -> unit) ref -val main : (unit -> 'a) -> 'a -(** Like {!main_with_runner} but with a default choice of runner. *) +val setup : unit -> Moonpool.Runner.t +(** Install hooks in Lwt_main to run the scheduler *) + +val lwt_main : (Moonpool.Runner.t -> 'a) -> 'a +(** Setup, run lwt main, return the result *) diff --git a/src/lwt/tcp_client.ml b/src/lwt/tcp_client.ml deleted file mode 100644 index 8aec16f2..00000000 --- a/src/lwt/tcp_client.ml +++ /dev/null @@ -1,53 +0,0 @@ -open Common_ -open Base - -let connect addr : Unix.file_descr = - let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in - Unix.set_nonblock sock; - Unix.setsockopt sock Unix.TCP_NODELAY true; - - (* connect asynchronously *) - while - try - Unix.connect sock addr; - false - with - | Unix.Unix_error ((Unix.EWOULDBLOCK | Unix.EINPROGRESS | Unix.EAGAIN), _, _) - -> - IO.await_writable sock; - true - do - () - done; - sock - -let with_connect addr (f : IO_in.t -> IO_out.t -> 'a) : 'a = - let sock = connect addr in - - let ic = IO_in.of_unix_fd sock in - let oc = IO_out.of_unix_fd sock in - - let finally () = try Unix.close sock with _ -> () in - let@ () = Fun.protect ~finally in - f ic oc - -let with_connect_lwt addr - (f : Lwt_io.input_channel -> Lwt_io.output_channel -> 'a) : 'a = - let sock = connect addr in - - let ic = - run_in_lwt_and_await (fun () -> - Lwt.return @@ Lwt_io.of_unix_fd ~mode:Lwt_io.input sock) - in - let oc = - run_in_lwt_and_await (fun () -> - Lwt.return @@ Lwt_io.of_unix_fd ~mode:Lwt_io.output sock) - in - - let finally () = - (try run_in_lwt_and_await (fun () -> Lwt_io.close ic) with _ -> ()); - (try run_in_lwt_and_await (fun () -> Lwt_io.close oc) with _ -> ()); - try Unix.close sock with _ -> () - in - let@ () = Fun.protect ~finally in - f ic oc diff --git a/src/lwt/tcp_server.ml b/src/lwt/tcp_server.ml deleted file mode 100644 index 22fa9253..00000000 --- a/src/lwt/tcp_server.ml +++ /dev/null @@ -1,38 +0,0 @@ -open Common_ -open Base - -type t = Lwt_io.server - -let establish_lwt ?backlog ?no_close ~runner addr handler : t = - let server = - Lwt_io.establish_server_with_client_socket ?backlog ?no_close addr - (fun client_addr client_sock -> - let ic = Lwt_io.of_fd ~mode:Lwt_io.input client_sock in - let oc = Lwt_io.of_fd ~mode:Lwt_io.output client_sock in - - let fut = - M.Fut.spawn ~on:runner (fun () -> handler client_addr ic oc) - in - - let lwt_fut = lwt_of_fut fut in - lwt_fut) - in - await_lwt server - -let establish ?backlog ?no_close ~runner addr handler : t = - let server = - Lwt_io.establish_server_with_client_socket ?backlog ?no_close addr - (fun client_addr client_sock -> - let ic = IO_in.of_unix_fd @@ Lwt_unix.unix_file_descr client_sock in - let oc = IO_out.of_unix_fd @@ Lwt_unix.unix_file_descr client_sock in - - let fut = - M.Fut.spawn ~on:runner (fun () -> handler client_addr ic oc) - in - - let lwt_fut = lwt_of_fut fut in - lwt_fut) - in - await_lwt server - -let shutdown self = await_lwt @@ Lwt_io.shutdown_server self