mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-06 11:15:38 -05:00
wip lwt: event loop for moonpool directly inside lwt
This commit is contained in:
parent
55e3e77a66
commit
bf90c32c86
9 changed files with 143 additions and 716 deletions
|
|
@ -1,66 +0,0 @@
|
|||
open Base
|
||||
|
||||
let await_readable fd : unit =
|
||||
let trigger = Trigger.create () in
|
||||
Perform_action_in_lwt.schedule
|
||||
@@ Action.Wait_readable
|
||||
( fd,
|
||||
fun cancel ->
|
||||
Trigger.signal trigger;
|
||||
Lwt_engine.stop_event cancel );
|
||||
Trigger.await_exn trigger
|
||||
|
||||
let rec read fd buf i len : int =
|
||||
if len = 0 then
|
||||
0
|
||||
else (
|
||||
match Unix.read fd buf i len with
|
||||
| exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) ->
|
||||
await_readable fd;
|
||||
read fd buf i len
|
||||
| n -> n
|
||||
)
|
||||
|
||||
let await_writable fd =
|
||||
let trigger = Trigger.create () in
|
||||
Perform_action_in_lwt.schedule
|
||||
@@ Action.Wait_writable
|
||||
( fd,
|
||||
fun cancel ->
|
||||
Trigger.signal trigger;
|
||||
Lwt_engine.stop_event cancel );
|
||||
Trigger.await_exn trigger
|
||||
|
||||
let rec write_once fd buf i len : int =
|
||||
if len = 0 then
|
||||
0
|
||||
else (
|
||||
match Unix.write fd buf i len with
|
||||
| exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) ->
|
||||
await_writable fd;
|
||||
write_once fd buf i len
|
||||
| n -> n
|
||||
)
|
||||
|
||||
let write fd buf i len : unit =
|
||||
let i = ref i in
|
||||
let len = ref len in
|
||||
while !len > 0 do
|
||||
let n = write_once fd buf !i !len in
|
||||
i := !i + n;
|
||||
len := !len - n
|
||||
done
|
||||
|
||||
(** Sleep for the given amount of seconds *)
|
||||
let sleep_s (f : float) : unit =
|
||||
if f > 0. then (
|
||||
let trigger = Trigger.create () in
|
||||
Perform_action_in_lwt.schedule
|
||||
@@ Action.Sleep
|
||||
( f,
|
||||
false,
|
||||
fun cancel ->
|
||||
Trigger.signal trigger;
|
||||
Lwt_engine.stop_event cancel );
|
||||
Trigger.await_exn trigger
|
||||
)
|
||||
152
src/lwt/IO_in.ml
152
src/lwt/IO_in.ml
|
|
@ -1,152 +0,0 @@
|
|||
open Common_
|
||||
|
||||
class type t = object
|
||||
method input : bytes -> int -> int -> int
|
||||
(** Read into the slice. Returns [0] only if the stream is closed. *)
|
||||
|
||||
method close : unit -> unit
|
||||
(** Close the input. Must be idempotent. *)
|
||||
end
|
||||
|
||||
let create ?(close = ignore) ~input () : t =
|
||||
object
|
||||
method close = close
|
||||
method input = input
|
||||
end
|
||||
|
||||
let empty : t =
|
||||
object
|
||||
method close () = ()
|
||||
method input _ _ _ = 0
|
||||
end
|
||||
|
||||
let of_bytes ?(off = 0) ?len (b : bytes) : t =
|
||||
(* i: current position in [b] *)
|
||||
let i = ref off in
|
||||
|
||||
let len =
|
||||
match len with
|
||||
| Some n ->
|
||||
if n > Bytes.length b - off then invalid_arg "Iostream.In.of_bytes";
|
||||
n
|
||||
| None -> Bytes.length b - off
|
||||
in
|
||||
let end_ = off + len in
|
||||
|
||||
object
|
||||
method input b_out i_out len_out =
|
||||
let n = min (end_ - !i) len_out in
|
||||
Bytes.blit b !i b_out i_out n;
|
||||
i := !i + n;
|
||||
n
|
||||
|
||||
method close () = i := end_
|
||||
end
|
||||
|
||||
let of_string ?off ?len s : t = of_bytes ?off ?len (Bytes.unsafe_of_string s)
|
||||
|
||||
(** Read into the given slice.
|
||||
@return the number of bytes read, [0] means end of input. *)
|
||||
let[@inline] input (self : #t) buf i len = self#input buf i len
|
||||
|
||||
(** Close the channel. *)
|
||||
let[@inline] close self : unit = self#close ()
|
||||
|
||||
let rec really_input (self : #t) buf i len =
|
||||
if len > 0 then (
|
||||
let n = input self buf i len in
|
||||
if n = 0 then raise End_of_file;
|
||||
(really_input [@tailrec]) self buf (i + n) (len - n)
|
||||
)
|
||||
|
||||
let really_input_string self n : string =
|
||||
let buf = Bytes.create n in
|
||||
really_input self buf 0 n;
|
||||
Bytes.unsafe_to_string buf
|
||||
|
||||
let copy_into ?(buf = Bytes.create _default_buf_size) (ic : #t) (oc : IO_out.t)
|
||||
: unit =
|
||||
let continue = ref true in
|
||||
while !continue do
|
||||
let len = input ic buf 0 (Bytes.length buf) in
|
||||
if len = 0 then
|
||||
continue := false
|
||||
else
|
||||
IO_out.output oc buf 0 len
|
||||
done
|
||||
|
||||
let concat (l0 : t list) : t =
|
||||
let l = ref l0 in
|
||||
let rec input b i len : int =
|
||||
match !l with
|
||||
| [] -> 0
|
||||
| ic :: tl ->
|
||||
let n = ic#input b i len in
|
||||
if n > 0 then
|
||||
n
|
||||
else (
|
||||
l := tl;
|
||||
input b i len
|
||||
)
|
||||
in
|
||||
let close () = List.iter close l0 in
|
||||
create ~close ~input ()
|
||||
|
||||
let input_all ?(buf = Bytes.create 128) (self : #t) : string =
|
||||
let buf = ref buf in
|
||||
let i = ref 0 in
|
||||
|
||||
let[@inline] full_ () = !i = Bytes.length !buf in
|
||||
|
||||
let grow_ () =
|
||||
let old_size = Bytes.length !buf in
|
||||
let new_size = min Sys.max_string_length (old_size + (old_size / 4) + 10) in
|
||||
if old_size = new_size then
|
||||
failwith "input_all: maximum input size exceeded";
|
||||
let new_buf = Bytes.extend !buf 0 (new_size - old_size) in
|
||||
buf := new_buf
|
||||
in
|
||||
|
||||
let rec loop () =
|
||||
if full_ () then grow_ ();
|
||||
let available = Bytes.length !buf - !i in
|
||||
let n = input self !buf !i available in
|
||||
if n > 0 then (
|
||||
i := !i + n;
|
||||
(loop [@tailrec]) ()
|
||||
)
|
||||
in
|
||||
loop ();
|
||||
|
||||
if full_ () then
|
||||
Bytes.unsafe_to_string !buf
|
||||
else
|
||||
Bytes.sub_string !buf 0 !i
|
||||
|
||||
let of_unix_fd ?(close_noerr = false) ?(buf = Bytes.create _default_buf_size)
|
||||
(fd : Unix.file_descr) : t =
|
||||
let buf_len = ref 0 in
|
||||
let buf_off = ref 0 in
|
||||
|
||||
let refill () =
|
||||
buf_off := 0;
|
||||
buf_len := IO.read fd buf 0 (Bytes.length buf)
|
||||
in
|
||||
|
||||
object
|
||||
method input b i len : int =
|
||||
if !buf_len = 0 then refill ();
|
||||
let n = min len !buf_len in
|
||||
if n > 0 then (
|
||||
Bytes.blit buf !buf_off b i n;
|
||||
buf_off := !buf_off + n;
|
||||
buf_len := !buf_len - n
|
||||
);
|
||||
n
|
||||
|
||||
method close () =
|
||||
if close_noerr then (
|
||||
try Unix.close fd with _ -> ()
|
||||
) else
|
||||
Unix.close fd
|
||||
end
|
||||
|
|
@ -1,118 +0,0 @@
|
|||
open Common_
|
||||
|
||||
class type t = object
|
||||
method output_char : char -> unit
|
||||
method output : bytes -> int -> int -> unit
|
||||
method flush : unit -> unit
|
||||
method close : unit -> unit
|
||||
end
|
||||
|
||||
let create ?(flush = ignore) ?(close = ignore) ~output_char ~output () : t =
|
||||
object
|
||||
method flush () = flush ()
|
||||
method close () = close ()
|
||||
method output_char c = output_char c
|
||||
method output bs i len = output bs i len
|
||||
end
|
||||
|
||||
let dummy : t =
|
||||
object
|
||||
method flush () = ()
|
||||
method close () = ()
|
||||
method output_char _ = ()
|
||||
method output _ _ _ = ()
|
||||
end
|
||||
|
||||
let of_unix_fd ?(close_noerr = false) ?(buf = Bytes.create _default_buf_size) fd
|
||||
: t =
|
||||
let buf_off = ref 0 in
|
||||
|
||||
let[@inline] is_full () = !buf_off = Bytes.length buf in
|
||||
|
||||
let flush () =
|
||||
if !buf_off > 0 then (
|
||||
IO.write fd buf 0 !buf_off;
|
||||
buf_off := 0
|
||||
)
|
||||
in
|
||||
|
||||
object
|
||||
method output_char c =
|
||||
if is_full () then flush ();
|
||||
Bytes.set buf !buf_off c;
|
||||
incr buf_off
|
||||
|
||||
method output bs i len : unit =
|
||||
let i = ref i in
|
||||
let len = ref len in
|
||||
|
||||
while !len > 0 do
|
||||
(* make space *)
|
||||
if is_full () then flush ();
|
||||
|
||||
let n = min !len (Bytes.length buf - !buf_off) in
|
||||
Bytes.blit bs !i buf !buf_off n;
|
||||
buf_off := !buf_off + n;
|
||||
i := !i + n;
|
||||
len := !len - n
|
||||
done;
|
||||
(* if full, write eagerly *)
|
||||
if is_full () then flush ()
|
||||
|
||||
method close () =
|
||||
if close_noerr then (
|
||||
try
|
||||
flush ();
|
||||
Unix.close fd
|
||||
with _ -> ()
|
||||
) else (
|
||||
flush ();
|
||||
Unix.close fd
|
||||
)
|
||||
|
||||
method flush = flush
|
||||
end
|
||||
|
||||
let of_buffer (buf : Buffer.t) : t =
|
||||
object
|
||||
method close () = ()
|
||||
method flush () = ()
|
||||
method output_char c = Buffer.add_char buf c
|
||||
method output bs i len = Buffer.add_subbytes buf bs i len
|
||||
end
|
||||
|
||||
(** Output the buffer slice into this channel *)
|
||||
let[@inline] output_char (self : #t) c : unit = self#output_char c
|
||||
|
||||
(** Output the buffer slice into this channel *)
|
||||
let[@inline] output (self : #t) buf i len : unit = self#output buf i len
|
||||
|
||||
let[@inline] output_string (self : #t) (str : string) : unit =
|
||||
self#output (Bytes.unsafe_of_string str) 0 (String.length str)
|
||||
|
||||
let output_line (self : #t) (str : string) : unit =
|
||||
output_string self str;
|
||||
output_char self '\n'
|
||||
|
||||
(** Close the channel. *)
|
||||
let[@inline] close self : unit = self#close ()
|
||||
|
||||
(** Flush (ie. force write) any buffered bytes. *)
|
||||
let[@inline] flush self : unit = self#flush ()
|
||||
|
||||
let output_int self i =
|
||||
let s = string_of_int i in
|
||||
output_string self s
|
||||
|
||||
let output_lines self seq = Seq.iter (output_line self) seq
|
||||
|
||||
let tee (l : t list) : t =
|
||||
match l with
|
||||
| [] -> dummy
|
||||
| [ oc ] -> oc
|
||||
| _ ->
|
||||
let output bs i len = List.iter (fun oc -> output oc bs i len) l in
|
||||
let output_char c = List.iter (fun oc -> output_char oc c) l in
|
||||
let close () = List.iter close l in
|
||||
let flush () = List.iter flush l in
|
||||
create ~flush ~close ~output ~output_char ()
|
||||
167
src/lwt/base.ml
167
src/lwt/base.ml
|
|
@ -1,167 +0,0 @@
|
|||
open Common_
|
||||
module Trigger = M.Trigger
|
||||
module Fiber = Moonpool_fib.Fiber
|
||||
module FLS = Moonpool_fib.Fls
|
||||
|
||||
(** Action scheduled from outside the loop *)
|
||||
module Action = struct
|
||||
type event = Lwt_engine.event
|
||||
type cb = event -> unit
|
||||
|
||||
(** Action that we ask the lwt loop to perform, from the outside *)
|
||||
type t =
|
||||
| Wait_readable of Unix.file_descr * cb
|
||||
| Wait_writable of Unix.file_descr * cb
|
||||
| Sleep of float * bool * cb
|
||||
(* TODO: provide actions with cancellation, alongside a "select" operation *)
|
||||
(* | Cancel of event *)
|
||||
| On_termination : 'a Lwt.t * 'a Exn_bt.result ref * Trigger.t -> t
|
||||
| Wakeup : 'a Lwt.u * 'a -> t
|
||||
| Wakeup_exn : _ Lwt.u * exn -> t
|
||||
| Other of (unit -> unit)
|
||||
|
||||
(** Perform the action from within the Lwt thread *)
|
||||
let perform (self : t) : unit =
|
||||
match self with
|
||||
| Wait_readable (fd, cb) -> ignore (Lwt_engine.on_readable fd cb : event)
|
||||
| Wait_writable (fd, cb) -> ignore (Lwt_engine.on_writable fd cb : event)
|
||||
| Sleep (f, repeat, cb) -> ignore (Lwt_engine.on_timer f repeat cb : event)
|
||||
(* | Cancel ev -> Lwt_engine.stop_event ev *)
|
||||
| On_termination (fut, res, trigger) ->
|
||||
Lwt.on_any fut
|
||||
(fun x ->
|
||||
res := Ok x;
|
||||
Trigger.signal trigger)
|
||||
(fun exn ->
|
||||
res := Error (Exn_bt.get_callstack 10 exn);
|
||||
Trigger.signal trigger)
|
||||
| Wakeup (prom, x) -> Lwt.wakeup prom x
|
||||
| Wakeup_exn (prom, e) -> Lwt.wakeup_exn prom e
|
||||
| Other f -> f ()
|
||||
end
|
||||
|
||||
module Action_queue = struct
|
||||
type t = { q: Action.t list Atomic.t } [@@unboxed]
|
||||
|
||||
let create () : t = { q = Atomic.make [] }
|
||||
let pop_all (self : t) : _ list = Atomic.exchange self.q []
|
||||
|
||||
(** Push the action and return whether the queue was previously empty *)
|
||||
let push (self : t) (a : Action.t) : bool =
|
||||
let is_first = ref true in
|
||||
while
|
||||
let old = Atomic.get self.q in
|
||||
if Atomic.compare_and_set self.q old (a :: old) then (
|
||||
is_first := old == [];
|
||||
false
|
||||
) else
|
||||
true
|
||||
do
|
||||
()
|
||||
done;
|
||||
!is_first
|
||||
end
|
||||
|
||||
module Perform_action_in_lwt = struct
|
||||
open struct
|
||||
let actions_ : Action_queue.t = Action_queue.create ()
|
||||
|
||||
(** Gets the current set of notifications and perform them from inside the
|
||||
Lwt thread *)
|
||||
let perform_pending_actions () : unit =
|
||||
let@ _sp =
|
||||
Moonpool.Private.Tracing_.with_span
|
||||
"moonpool-lwt.perform-pending-actions"
|
||||
in
|
||||
|
||||
let l = Action_queue.pop_all actions_ in
|
||||
List.iter Action.perform l
|
||||
|
||||
let notification : int =
|
||||
Lwt_unix.make_notification ~once:false perform_pending_actions
|
||||
end
|
||||
|
||||
let schedule (a : Action.t) : unit =
|
||||
let is_first = Action_queue.push actions_ a in
|
||||
if is_first then Lwt_unix.send_notification notification
|
||||
end
|
||||
|
||||
let get_runner () : M.Runner.t =
|
||||
match M.Runner.get_current_runner () with
|
||||
| Some r -> r
|
||||
| None -> failwith "Moonpool_lwt.get_runner: not inside a runner"
|
||||
|
||||
let lwt_of_fut (fut : 'a M.Fut.t) : 'a Lwt.t =
|
||||
let lwt_fut, lwt_prom = Lwt.wait () in
|
||||
M.Fut.on_result fut (function
|
||||
| Ok x -> Perform_action_in_lwt.schedule @@ Action.Wakeup (lwt_prom, x)
|
||||
| Error ebt ->
|
||||
let exn = Exn_bt.exn ebt in
|
||||
Perform_action_in_lwt.schedule @@ Action.Wakeup_exn (lwt_prom, exn));
|
||||
lwt_fut
|
||||
|
||||
let fut_of_lwt (lwt_fut : _ Lwt.t) : _ M.Fut.t =
|
||||
match Lwt.poll lwt_fut with
|
||||
| Some x -> M.Fut.return x
|
||||
| None ->
|
||||
let fut, prom = M.Fut.make () in
|
||||
Lwt.on_any lwt_fut
|
||||
(fun x -> M.Fut.fulfill prom (Ok x))
|
||||
(fun exn ->
|
||||
let bt = Printexc.get_callstack 10 in
|
||||
M.Fut.fulfill prom (Error (Exn_bt.make exn bt)));
|
||||
fut
|
||||
|
||||
let _dummy_exn_bt : Exn_bt.t =
|
||||
Exn_bt.get_callstack 0 (Failure "dummy Exn_bt from moonpool-lwt")
|
||||
|
||||
let await_lwt (fut : _ Lwt.t) =
|
||||
match Lwt.poll fut with
|
||||
| Some x -> x
|
||||
| None ->
|
||||
(* suspend fiber, wake it up when [fut] resolves *)
|
||||
let trigger = M.Trigger.create () in
|
||||
let res = ref (Error _dummy_exn_bt) in
|
||||
Perform_action_in_lwt.(schedule Action.(On_termination (fut, res, trigger)));
|
||||
Trigger.await trigger |> Option.iter Exn_bt.raise;
|
||||
Exn_bt.unwrap !res
|
||||
|
||||
let run_in_lwt f : _ M.Fut.t =
|
||||
let fut, prom = M.Fut.make () in
|
||||
Perform_action_in_lwt.schedule
|
||||
(Action.Other
|
||||
(fun () ->
|
||||
let lwt_fut = f () in
|
||||
Lwt.on_any lwt_fut
|
||||
(fun x -> M.Fut.fulfill prom @@ Ok x)
|
||||
(fun exn -> M.Fut.fulfill prom @@ Error (Exn_bt.get exn))));
|
||||
fut
|
||||
|
||||
let run_in_lwt_and_await f = M.Fut.await @@ run_in_lwt f
|
||||
|
||||
let detach_in_runner ~runner f : _ Lwt.t =
|
||||
let fut, promise = Lwt.wait () in
|
||||
M.Runner.run_async runner (fun () ->
|
||||
match f () with
|
||||
| x -> Perform_action_in_lwt.schedule @@ Action.Wakeup (promise, x)
|
||||
| exception exn ->
|
||||
Perform_action_in_lwt.schedule @@ Action.Wakeup_exn (promise, exn));
|
||||
fut
|
||||
|
||||
let main_with_runner ~runner (f : unit -> 'a) : 'a =
|
||||
let lwt_fut, lwt_prom = Lwt.wait () in
|
||||
|
||||
let _fiber =
|
||||
Fiber.spawn_top ~on:runner (fun () ->
|
||||
try
|
||||
let x = f () in
|
||||
Perform_action_in_lwt.schedule (Action.Wakeup (lwt_prom, x))
|
||||
with exn ->
|
||||
Perform_action_in_lwt.schedule (Action.Wakeup_exn (lwt_prom, exn)))
|
||||
in
|
||||
|
||||
Lwt_main.run lwt_fut
|
||||
|
||||
let main f =
|
||||
let@ runner = M.Ws_pool.with_ () in
|
||||
main_with_runner ~runner f
|
||||
|
|
@ -1,5 +1,3 @@
|
|||
module M = Moonpool
|
||||
module Exn_bt = M.Exn_bt
|
||||
module Exn_bt = Moonpool.Exn_bt
|
||||
|
||||
let ( let@ ) = ( @@ )
|
||||
let _default_buf_size = 4 * 1024
|
||||
|
|
|
|||
|
|
@ -1,6 +1,135 @@
|
|||
include Base
|
||||
module IO = IO
|
||||
module IO_out = IO_out
|
||||
module IO_in = IO_in
|
||||
module TCP_server = Tcp_server
|
||||
module TCP_client = Tcp_client
|
||||
open Common_
|
||||
|
||||
open struct
|
||||
module WL = Moonpool.Private.Worker_loop_
|
||||
module M = Moonpool
|
||||
end
|
||||
|
||||
module Fut = Moonpool.Fut
|
||||
|
||||
let default_around_task_ : WL.around_task = AT_pair (ignore, fun _ _ -> ())
|
||||
|
||||
module Scheduler_state = struct
|
||||
module BQ = Moonpool.Blocking_queue
|
||||
|
||||
type st = {
|
||||
tasks: WL.task_full BQ.t;
|
||||
on_exn: Exn_bt.t -> unit;
|
||||
mutable as_runner: Moonpool.Runner.t;
|
||||
mutable enter_hook: Lwt_main.Enter_iter_hooks.hook option;
|
||||
mutable leave_hook: Lwt_main.Leave_iter_hooks.hook option;
|
||||
}
|
||||
|
||||
let create ~on_exn () : st =
|
||||
{
|
||||
tasks = BQ.create ();
|
||||
on_exn;
|
||||
as_runner = Moonpool.Runner.dummy;
|
||||
enter_hook = None;
|
||||
leave_hook = None;
|
||||
}
|
||||
|
||||
let around_task _ = default_around_task_
|
||||
let schedule (self : st) t = BQ.push self.tasks t
|
||||
|
||||
let get_next_task (self : st) =
|
||||
try BQ.pop self.tasks with BQ.Closed -> raise WL.No_more_tasks
|
||||
|
||||
let on_exn (self : st) ebt = self.on_exn ebt
|
||||
let runner self = self.as_runner
|
||||
|
||||
let as_runner (self : st) : Moonpool.Runner.t =
|
||||
Moonpool.Runner.For_runner_implementors.create
|
||||
~size:(fun () -> 1)
|
||||
~num_tasks:(fun () -> BQ.size self.tasks)
|
||||
~run_async:(fun ~fiber f -> schedule self @@ WL.T_start { fiber; f })
|
||||
~shutdown:(fun ~wait:_ () -> BQ.close self.tasks)
|
||||
()
|
||||
|
||||
let before_start self : unit =
|
||||
self.as_runner <- as_runner self;
|
||||
()
|
||||
|
||||
let cleanup self =
|
||||
Option.iter Lwt_main.Enter_iter_hooks.remove self.enter_hook;
|
||||
Option.iter Lwt_main.Leave_iter_hooks.remove self.leave_hook;
|
||||
()
|
||||
|
||||
let ops : st WL.ops =
|
||||
{
|
||||
schedule;
|
||||
around_task;
|
||||
get_next_task;
|
||||
on_exn;
|
||||
runner;
|
||||
before_start;
|
||||
cleanup;
|
||||
}
|
||||
end
|
||||
|
||||
let _dummy_exn_bt : Exn_bt.t =
|
||||
Exn_bt.get_callstack 0 (Failure "dummy Exn_bt from moonpool-lwt")
|
||||
|
||||
let await_lwt (fut : _ Lwt.t) =
|
||||
match Lwt.poll fut with
|
||||
| Some x -> x
|
||||
| None ->
|
||||
(* suspend fiber, wake it up when [fut] resolves *)
|
||||
let trigger = M.Trigger.create () in
|
||||
let res = ref (Error _dummy_exn_bt) in
|
||||
Lwt.on_termination fut (fun _ -> M.Trigger.signal trigger);
|
||||
M.Trigger.await trigger |> Option.iter Exn_bt.raise;
|
||||
Exn_bt.unwrap !res
|
||||
|
||||
let lwt_of_fut (fut : 'a M.Fut.t) : 'a Lwt.t =
|
||||
let lwt_fut, lwt_prom = Lwt.wait () in
|
||||
M.Fut.on_result fut (function
|
||||
| Ok x -> Lwt.wakeup lwt_prom x
|
||||
| Error ebt ->
|
||||
let exn = Exn_bt.exn ebt in
|
||||
Lwt.wakeup_exn lwt_prom exn);
|
||||
lwt_fut
|
||||
|
||||
let fut_of_lwt (lwt_fut : _ Lwt.t) : _ M.Fut.t =
|
||||
match Lwt.poll lwt_fut with
|
||||
| Some x -> M.Fut.return x
|
||||
| None ->
|
||||
let fut, prom = M.Fut.make () in
|
||||
Lwt.on_any lwt_fut
|
||||
(fun x -> M.Fut.fulfill prom (Ok x))
|
||||
(fun exn ->
|
||||
let bt = Printexc.get_callstack 10 in
|
||||
M.Fut.fulfill prom (Error (Exn_bt.make exn bt)));
|
||||
fut
|
||||
|
||||
let on_uncaught_exn : (Moonpool.Exn_bt.t -> unit) ref =
|
||||
ref (fun ebt ->
|
||||
Printf.eprintf "uncaught exception in moonpool-lwt:\n%s" (Exn_bt.show ebt))
|
||||
|
||||
let runner_ : Moonpool.Runner.t option ref = ref None
|
||||
|
||||
let setup () =
|
||||
match !runner_ with
|
||||
| Some r -> r
|
||||
| None ->
|
||||
let on_exn ebt = !on_uncaught_exn ebt in
|
||||
let module Arg = struct
|
||||
type nonrec st = Scheduler_state.st
|
||||
|
||||
let ops = Scheduler_state.ops
|
||||
let st = Scheduler_state.create ~on_exn ()
|
||||
end in
|
||||
let module FG = WL.Fine_grained (Arg) () in
|
||||
runner_ := Some Arg.st.as_runner;
|
||||
|
||||
FG.setup ~block_signals:false ();
|
||||
let run_in_hook () = FG.run ~max_tasks:1000 () in
|
||||
Arg.st.enter_hook <- Some (Lwt_main.Enter_iter_hooks.add_last run_in_hook);
|
||||
Arg.st.leave_hook <- Some (Lwt_main.Leave_iter_hooks.add_last run_in_hook);
|
||||
|
||||
Arg.st.as_runner
|
||||
|
||||
let lwt_main (f : _ -> 'a) : 'a =
|
||||
let runner = setup () in
|
||||
let fut = M.spawn ~on:runner (fun () -> f runner) in
|
||||
Lwt_main.run (lwt_of_fut fut)
|
||||
|
|
|
|||
|
|
@ -7,8 +7,7 @@
|
|||
|
||||
@since 0.6 *)
|
||||
|
||||
module Fiber = Moonpool_fib.Fiber
|
||||
module FLS = Moonpool_fib.Fls
|
||||
module Fut = Moonpool.Fut
|
||||
|
||||
(** {2 Basic conversions} *)
|
||||
|
||||
|
|
@ -28,117 +27,12 @@ val await_lwt : 'a Lwt.t -> 'a
|
|||
runner. This must be run from within a Moonpool runner so that the await-ing
|
||||
effect is handled. *)
|
||||
|
||||
val run_in_lwt : (unit -> 'a Lwt.t) -> 'a Moonpool.Fut.t
|
||||
(** [run_in_lwt f] runs [f()] from within the Lwt thread and returns a
|
||||
thread-safe future. This can be run from anywhere. *)
|
||||
|
||||
val run_in_lwt_and_await : (unit -> 'a Lwt.t) -> 'a
|
||||
(** [run_in_lwt_and_await f] runs [f] in the Lwt thread, and awaits its result.
|
||||
Must be run from inside a moonpool runner so that the await-in effect is
|
||||
handled.
|
||||
|
||||
This is similar to [Moonpool.await @@ run_in_lwt f]. *)
|
||||
|
||||
val get_runner : unit -> Moonpool.Runner.t
|
||||
(** Returns the runner from within which this is called. Must be run from within
|
||||
a fiber.
|
||||
@raise Failure if not run within a fiber *)
|
||||
|
||||
(** {2 IO} *)
|
||||
|
||||
(** IO using the Lwt event loop.
|
||||
|
||||
These IO operations work on non-blocking file descriptors and rely on a
|
||||
[Lwt_engine] event loop being active (meaning, [Lwt_main.run] is currently
|
||||
running in some thread).
|
||||
|
||||
Calling these functions must be done from a moonpool runner. A function like
|
||||
[read] will first try to perform the IO action directly (here, call
|
||||
{!Unix.read}); if the action fails because the FD is not ready, then
|
||||
[await_readable] is called: it suspends the fiber and subscribes it to Lwt
|
||||
to be awakened when the FD becomes ready. *)
|
||||
module IO : sig
|
||||
val read : Unix.file_descr -> bytes -> int -> int -> int
|
||||
(** Read from the file descriptor *)
|
||||
|
||||
val await_readable : Unix.file_descr -> unit
|
||||
(** Suspend the fiber until the FD is readable *)
|
||||
|
||||
val write_once : Unix.file_descr -> bytes -> int -> int -> int
|
||||
(** Perform one write into the file descriptor *)
|
||||
|
||||
val await_writable : Unix.file_descr -> unit
|
||||
(** Suspend the fiber until the FD is writable *)
|
||||
|
||||
val write : Unix.file_descr -> bytes -> int -> int -> unit
|
||||
(** Loop around {!write_once} to write the entire slice. *)
|
||||
|
||||
val sleep_s : float -> unit
|
||||
(** Suspend the fiber for [n] seconds. *)
|
||||
end
|
||||
|
||||
module IO_in = IO_in
|
||||
(** Input channel *)
|
||||
|
||||
module IO_out = IO_out
|
||||
(** Output channel *)
|
||||
|
||||
module TCP_server : sig
|
||||
type t = Lwt_io.server
|
||||
|
||||
val establish_lwt :
|
||||
?backlog:
|
||||
(* ?server_fd:Unix.file_descr -> *)
|
||||
int ->
|
||||
?no_close:bool ->
|
||||
runner:Moonpool.Runner.t ->
|
||||
Unix.sockaddr ->
|
||||
(Unix.sockaddr -> Lwt_io.input_channel -> Lwt_io.output_channel -> unit) ->
|
||||
t
|
||||
(** [establish ~runner addr handler] runs a TCP server in the Lwt thread. When
|
||||
a client connects, a moonpool fiber is started on [runner] to handle it.
|
||||
*)
|
||||
|
||||
val establish :
|
||||
?backlog:
|
||||
(* ?server_fd:Unix.file_descr -> *)
|
||||
int ->
|
||||
?no_close:bool ->
|
||||
runner:Moonpool.Runner.t ->
|
||||
Unix.sockaddr ->
|
||||
(Unix.sockaddr -> IO_in.t -> IO_out.t -> unit) ->
|
||||
t
|
||||
(** Like {!establish_lwt} but uses {!IO} to directly handle reads and writes
|
||||
on client sockets. *)
|
||||
|
||||
val shutdown : t -> unit
|
||||
(** Shutdown the server *)
|
||||
end
|
||||
|
||||
module TCP_client : sig
|
||||
val connect : Unix.sockaddr -> Unix.file_descr
|
||||
|
||||
val with_connect : Unix.sockaddr -> (IO_in.t -> IO_out.t -> 'a) -> 'a
|
||||
(** Open a connection, and use {!IO} to read and write from the socket in a
|
||||
non blocking way. *)
|
||||
|
||||
val with_connect_lwt :
|
||||
Unix.sockaddr -> (Lwt_io.input_channel -> Lwt_io.output_channel -> 'a) -> 'a
|
||||
(** Open a connection. *)
|
||||
end
|
||||
|
||||
(** {2 Helpers on the lwt side} *)
|
||||
|
||||
val detach_in_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a Lwt.t
|
||||
(** [detach_in_runner ~runner f] runs [f] in the given moonpool runner, and
|
||||
returns a lwt future. This must be run from within the thread running
|
||||
[Lwt_main]. *)
|
||||
|
||||
(** {2 Wrappers around Lwt_main} *)
|
||||
|
||||
val main_with_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a
|
||||
(** [main_with_runner ~runner f] starts a Lwt-based event loop and runs [f()]
|
||||
inside a fiber in [runner]. *)
|
||||
val on_uncaught_exn : (Moonpool.Exn_bt.t -> unit) ref
|
||||
|
||||
val main : (unit -> 'a) -> 'a
|
||||
(** Like {!main_with_runner} but with a default choice of runner. *)
|
||||
val setup : unit -> Moonpool.Runner.t
|
||||
(** Install hooks in Lwt_main to run the scheduler *)
|
||||
|
||||
val lwt_main : (Moonpool.Runner.t -> 'a) -> 'a
|
||||
(** Setup, run lwt main, return the result *)
|
||||
|
|
|
|||
|
|
@ -1,53 +0,0 @@
|
|||
open Common_
|
||||
open Base
|
||||
|
||||
let connect addr : Unix.file_descr =
|
||||
let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
|
||||
Unix.set_nonblock sock;
|
||||
Unix.setsockopt sock Unix.TCP_NODELAY true;
|
||||
|
||||
(* connect asynchronously *)
|
||||
while
|
||||
try
|
||||
Unix.connect sock addr;
|
||||
false
|
||||
with
|
||||
| Unix.Unix_error ((Unix.EWOULDBLOCK | Unix.EINPROGRESS | Unix.EAGAIN), _, _)
|
||||
->
|
||||
IO.await_writable sock;
|
||||
true
|
||||
do
|
||||
()
|
||||
done;
|
||||
sock
|
||||
|
||||
let with_connect addr (f : IO_in.t -> IO_out.t -> 'a) : 'a =
|
||||
let sock = connect addr in
|
||||
|
||||
let ic = IO_in.of_unix_fd sock in
|
||||
let oc = IO_out.of_unix_fd sock in
|
||||
|
||||
let finally () = try Unix.close sock with _ -> () in
|
||||
let@ () = Fun.protect ~finally in
|
||||
f ic oc
|
||||
|
||||
let with_connect_lwt addr
|
||||
(f : Lwt_io.input_channel -> Lwt_io.output_channel -> 'a) : 'a =
|
||||
let sock = connect addr in
|
||||
|
||||
let ic =
|
||||
run_in_lwt_and_await (fun () ->
|
||||
Lwt.return @@ Lwt_io.of_unix_fd ~mode:Lwt_io.input sock)
|
||||
in
|
||||
let oc =
|
||||
run_in_lwt_and_await (fun () ->
|
||||
Lwt.return @@ Lwt_io.of_unix_fd ~mode:Lwt_io.output sock)
|
||||
in
|
||||
|
||||
let finally () =
|
||||
(try run_in_lwt_and_await (fun () -> Lwt_io.close ic) with _ -> ());
|
||||
(try run_in_lwt_and_await (fun () -> Lwt_io.close oc) with _ -> ());
|
||||
try Unix.close sock with _ -> ()
|
||||
in
|
||||
let@ () = Fun.protect ~finally in
|
||||
f ic oc
|
||||
|
|
@ -1,38 +0,0 @@
|
|||
open Common_
|
||||
open Base
|
||||
|
||||
type t = Lwt_io.server
|
||||
|
||||
let establish_lwt ?backlog ?no_close ~runner addr handler : t =
|
||||
let server =
|
||||
Lwt_io.establish_server_with_client_socket ?backlog ?no_close addr
|
||||
(fun client_addr client_sock ->
|
||||
let ic = Lwt_io.of_fd ~mode:Lwt_io.input client_sock in
|
||||
let oc = Lwt_io.of_fd ~mode:Lwt_io.output client_sock in
|
||||
|
||||
let fut =
|
||||
M.Fut.spawn ~on:runner (fun () -> handler client_addr ic oc)
|
||||
in
|
||||
|
||||
let lwt_fut = lwt_of_fut fut in
|
||||
lwt_fut)
|
||||
in
|
||||
await_lwt server
|
||||
|
||||
let establish ?backlog ?no_close ~runner addr handler : t =
|
||||
let server =
|
||||
Lwt_io.establish_server_with_client_socket ?backlog ?no_close addr
|
||||
(fun client_addr client_sock ->
|
||||
let ic = IO_in.of_unix_fd @@ Lwt_unix.unix_file_descr client_sock in
|
||||
let oc = IO_out.of_unix_fd @@ Lwt_unix.unix_file_descr client_sock in
|
||||
|
||||
let fut =
|
||||
M.Fut.spawn ~on:runner (fun () -> handler client_addr ic oc)
|
||||
in
|
||||
|
||||
let lwt_fut = lwt_of_fut fut in
|
||||
lwt_fut)
|
||||
in
|
||||
await_lwt server
|
||||
|
||||
let shutdown self = await_lwt @@ Lwt_io.shutdown_server self
|
||||
Loading…
Add table
Reference in a new issue