mirror of
https://github.com/c-cube/moonpool.git
synced 2025-12-08 20:25:31 -05:00
lwt: basic IO wrappers, simple TCP server wrapper
This commit is contained in:
parent
d248a569f6
commit
930e09e5b3
7 changed files with 558 additions and 224 deletions
71
src/lwt/IO.ml
Normal file
71
src/lwt/IO.ml
Normal file
|
|
@ -0,0 +1,71 @@
|
||||||
|
open Common_
|
||||||
|
open Base
|
||||||
|
|
||||||
|
let rec read fd buf i len : int =
|
||||||
|
if len = 0 then
|
||||||
|
0
|
||||||
|
else (
|
||||||
|
match Unix.read fd buf i len with
|
||||||
|
| exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) ->
|
||||||
|
(* wait for FD to be ready *)
|
||||||
|
Moonpool.Private.Suspend_.suspend
|
||||||
|
{
|
||||||
|
handle =
|
||||||
|
(fun ~ls ~run:_ ~resume sus ->
|
||||||
|
Perform_action_in_lwt.schedule
|
||||||
|
@@ Action.Wait_readable
|
||||||
|
( fd,
|
||||||
|
fun cancel ->
|
||||||
|
resume ~ls sus @@ Ok ();
|
||||||
|
Lwt_engine.stop_event cancel ));
|
||||||
|
};
|
||||||
|
read fd buf i len
|
||||||
|
| n -> n
|
||||||
|
)
|
||||||
|
|
||||||
|
let rec write_once fd buf i len : int =
|
||||||
|
if len = 0 then
|
||||||
|
0
|
||||||
|
else (
|
||||||
|
match Unix.write fd buf i len with
|
||||||
|
| exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) ->
|
||||||
|
(* wait for FD to be ready *)
|
||||||
|
Moonpool.Private.Suspend_.suspend
|
||||||
|
{
|
||||||
|
handle =
|
||||||
|
(fun ~ls ~run:_ ~resume sus ->
|
||||||
|
Perform_action_in_lwt.schedule
|
||||||
|
@@ Action.Wait_writable
|
||||||
|
( fd,
|
||||||
|
fun cancel ->
|
||||||
|
resume ~ls sus @@ Ok ();
|
||||||
|
Lwt_engine.stop_event cancel ));
|
||||||
|
};
|
||||||
|
write_once fd buf i len
|
||||||
|
| n -> n
|
||||||
|
)
|
||||||
|
|
||||||
|
let write fd buf i len : unit =
|
||||||
|
let i = ref i in
|
||||||
|
let len = ref len in
|
||||||
|
while !len > 0 do
|
||||||
|
let n = write_once fd buf !i !len in
|
||||||
|
i := !i + n;
|
||||||
|
len := !len - n
|
||||||
|
done
|
||||||
|
|
||||||
|
(** Sleep for the given amount of seconds *)
|
||||||
|
let sleep_s (f : float) : unit =
|
||||||
|
if f > 0. then
|
||||||
|
Moonpool.Private.Suspend_.suspend
|
||||||
|
{
|
||||||
|
handle =
|
||||||
|
(fun ~ls ~run:_ ~resume sus ->
|
||||||
|
Perform_action_in_lwt.schedule
|
||||||
|
@@ Action.Sleep
|
||||||
|
( f,
|
||||||
|
false,
|
||||||
|
fun cancel ->
|
||||||
|
resume ~ls sus @@ Ok ();
|
||||||
|
Lwt_engine.stop_event cancel ));
|
||||||
|
}
|
||||||
154
src/lwt/IO_in.ml
Normal file
154
src/lwt/IO_in.ml
Normal file
|
|
@ -0,0 +1,154 @@
|
||||||
|
open Common_
|
||||||
|
|
||||||
|
class type t =
|
||||||
|
object
|
||||||
|
method input : bytes -> int -> int -> int
|
||||||
|
(** Read into the slice. Returns [0] only if the
|
||||||
|
stream is closed. *)
|
||||||
|
|
||||||
|
method close : unit -> unit
|
||||||
|
(** Close the input. Must be idempotent. *)
|
||||||
|
end
|
||||||
|
|
||||||
|
let create ?(close = ignore) ~input () : t =
|
||||||
|
object
|
||||||
|
method close = close
|
||||||
|
method input = input
|
||||||
|
end
|
||||||
|
|
||||||
|
let empty : t =
|
||||||
|
object
|
||||||
|
method close () = ()
|
||||||
|
method input _ _ _ = 0
|
||||||
|
end
|
||||||
|
|
||||||
|
let of_bytes ?(off = 0) ?len (b : bytes) : t =
|
||||||
|
(* i: current position in [b] *)
|
||||||
|
let i = ref off in
|
||||||
|
|
||||||
|
let len =
|
||||||
|
match len with
|
||||||
|
| Some n ->
|
||||||
|
if n > Bytes.length b - off then invalid_arg "Iostream.In.of_bytes";
|
||||||
|
n
|
||||||
|
| None -> Bytes.length b - off
|
||||||
|
in
|
||||||
|
let end_ = off + len in
|
||||||
|
|
||||||
|
object
|
||||||
|
method input b_out i_out len_out =
|
||||||
|
let n = min (end_ - !i) len_out in
|
||||||
|
Bytes.blit b !i b_out i_out n;
|
||||||
|
i := !i + n;
|
||||||
|
n
|
||||||
|
|
||||||
|
method close () = i := end_
|
||||||
|
end
|
||||||
|
|
||||||
|
let of_string ?off ?len s : t = of_bytes ?off ?len (Bytes.unsafe_of_string s)
|
||||||
|
|
||||||
|
(** Read into the given slice.
|
||||||
|
@return the number of bytes read, [0] means end of input. *)
|
||||||
|
let[@inline] input (self : #t) buf i len = self#input buf i len
|
||||||
|
|
||||||
|
(** Close the channel. *)
|
||||||
|
let[@inline] close self : unit = self#close ()
|
||||||
|
|
||||||
|
let rec really_input (self : #t) buf i len =
|
||||||
|
if len > 0 then (
|
||||||
|
let n = input self buf i len in
|
||||||
|
if n = 0 then raise End_of_file;
|
||||||
|
(really_input [@tailrec]) self buf (i + n) (len - n)
|
||||||
|
)
|
||||||
|
|
||||||
|
let really_input_string self n : string =
|
||||||
|
let buf = Bytes.create n in
|
||||||
|
really_input self buf 0 n;
|
||||||
|
Bytes.unsafe_to_string buf
|
||||||
|
|
||||||
|
let copy_into ?(buf = Bytes.create _default_buf_size) (ic : #t) (oc : IO_out.t)
|
||||||
|
: unit =
|
||||||
|
let continue = ref true in
|
||||||
|
while !continue do
|
||||||
|
let len = input ic buf 0 (Bytes.length buf) in
|
||||||
|
if len = 0 then
|
||||||
|
continue := false
|
||||||
|
else
|
||||||
|
IO_out.output oc buf 0 len
|
||||||
|
done
|
||||||
|
|
||||||
|
let concat (l0 : t list) : t =
|
||||||
|
let l = ref l0 in
|
||||||
|
let rec input b i len : int =
|
||||||
|
match !l with
|
||||||
|
| [] -> 0
|
||||||
|
| ic :: tl ->
|
||||||
|
let n = ic#input b i len in
|
||||||
|
if n > 0 then
|
||||||
|
n
|
||||||
|
else (
|
||||||
|
l := tl;
|
||||||
|
input b i len
|
||||||
|
)
|
||||||
|
in
|
||||||
|
let close () = List.iter close l0 in
|
||||||
|
create ~close ~input ()
|
||||||
|
|
||||||
|
let input_all ?(buf = Bytes.create 128) (self : #t) : string =
|
||||||
|
let buf = ref buf in
|
||||||
|
let i = ref 0 in
|
||||||
|
|
||||||
|
let[@inline] full_ () = !i = Bytes.length !buf in
|
||||||
|
|
||||||
|
let grow_ () =
|
||||||
|
let old_size = Bytes.length !buf in
|
||||||
|
let new_size = min Sys.max_string_length (old_size + (old_size / 4) + 10) in
|
||||||
|
if old_size = new_size then
|
||||||
|
failwith "input_all: maximum input size exceeded";
|
||||||
|
let new_buf = Bytes.extend !buf 0 (new_size - old_size) in
|
||||||
|
buf := new_buf
|
||||||
|
in
|
||||||
|
|
||||||
|
let rec loop () =
|
||||||
|
if full_ () then grow_ ();
|
||||||
|
let available = Bytes.length !buf - !i in
|
||||||
|
let n = input self !buf !i available in
|
||||||
|
if n > 0 then (
|
||||||
|
i := !i + n;
|
||||||
|
(loop [@tailrec]) ()
|
||||||
|
)
|
||||||
|
in
|
||||||
|
loop ();
|
||||||
|
|
||||||
|
if full_ () then
|
||||||
|
Bytes.unsafe_to_string !buf
|
||||||
|
else
|
||||||
|
Bytes.sub_string !buf 0 !i
|
||||||
|
|
||||||
|
let of_unix_fd ?(close_noerr = false) ?(buf = Bytes.create _default_buf_size)
|
||||||
|
(fd : Unix.file_descr) : t =
|
||||||
|
let buf_len = ref 0 in
|
||||||
|
let buf_off = ref 0 in
|
||||||
|
|
||||||
|
let refill () =
|
||||||
|
buf_off := 0;
|
||||||
|
buf_len := IO.read fd buf 0 (Bytes.length buf)
|
||||||
|
in
|
||||||
|
|
||||||
|
object
|
||||||
|
method input b i len : int =
|
||||||
|
if !buf_len = 0 then refill ();
|
||||||
|
let n = min len !buf_len in
|
||||||
|
if n > 0 then (
|
||||||
|
Bytes.blit buf !buf_off b i n;
|
||||||
|
buf_off := !buf_off + n;
|
||||||
|
buf_len := !buf_len - n
|
||||||
|
);
|
||||||
|
n
|
||||||
|
|
||||||
|
method close () =
|
||||||
|
if close_noerr then (
|
||||||
|
try Unix.close fd with _ -> ()
|
||||||
|
) else
|
||||||
|
Unix.close fd
|
||||||
|
end
|
||||||
119
src/lwt/IO_out.ml
Normal file
119
src/lwt/IO_out.ml
Normal file
|
|
@ -0,0 +1,119 @@
|
||||||
|
open Common_
|
||||||
|
|
||||||
|
class type t =
|
||||||
|
object
|
||||||
|
method output_char : char -> unit
|
||||||
|
method output : bytes -> int -> int -> unit
|
||||||
|
method flush : unit -> unit
|
||||||
|
method close : unit -> unit
|
||||||
|
end
|
||||||
|
|
||||||
|
let create ?(flush = ignore) ?(close = ignore) ~output_char ~output () : t =
|
||||||
|
object
|
||||||
|
method flush () = flush ()
|
||||||
|
method close () = close ()
|
||||||
|
method output_char c = output_char c
|
||||||
|
method output bs i len = output bs i len
|
||||||
|
end
|
||||||
|
|
||||||
|
let dummy : t =
|
||||||
|
object
|
||||||
|
method flush () = ()
|
||||||
|
method close () = ()
|
||||||
|
method output_char _ = ()
|
||||||
|
method output _ _ _ = ()
|
||||||
|
end
|
||||||
|
|
||||||
|
let of_unix_fd ?(close_noerr = false) ?(buf = Bytes.create _default_buf_size) fd
|
||||||
|
: t =
|
||||||
|
let buf_off = ref 0 in
|
||||||
|
|
||||||
|
let[@inline] is_full () = !buf_off = Bytes.length buf in
|
||||||
|
|
||||||
|
let flush () =
|
||||||
|
if !buf_off > 0 then (
|
||||||
|
IO.write fd buf 0 !buf_off;
|
||||||
|
buf_off := 0
|
||||||
|
)
|
||||||
|
in
|
||||||
|
|
||||||
|
object
|
||||||
|
method output_char c =
|
||||||
|
if is_full () then flush ();
|
||||||
|
Bytes.set buf !buf_off c;
|
||||||
|
incr buf_off
|
||||||
|
|
||||||
|
method output bs i len : unit =
|
||||||
|
let i = ref i in
|
||||||
|
let len = ref len in
|
||||||
|
|
||||||
|
while !len > 0 do
|
||||||
|
(* make space *)
|
||||||
|
if is_full () then flush ();
|
||||||
|
|
||||||
|
let n = min !len (Bytes.length buf - !buf_off) in
|
||||||
|
Bytes.blit bs !i buf !buf_off n;
|
||||||
|
buf_off := !buf_off + n;
|
||||||
|
i := !i + n;
|
||||||
|
len := !len - n
|
||||||
|
done;
|
||||||
|
(* if full, write eagerly *)
|
||||||
|
if is_full () then flush ()
|
||||||
|
|
||||||
|
method close () =
|
||||||
|
if close_noerr then (
|
||||||
|
try
|
||||||
|
flush ();
|
||||||
|
Unix.close fd
|
||||||
|
with _ -> ()
|
||||||
|
) else (
|
||||||
|
flush ();
|
||||||
|
Unix.close fd
|
||||||
|
)
|
||||||
|
|
||||||
|
method flush = flush
|
||||||
|
end
|
||||||
|
|
||||||
|
let of_buffer (buf : Buffer.t) : t =
|
||||||
|
object
|
||||||
|
method close () = ()
|
||||||
|
method flush () = ()
|
||||||
|
method output_char c = Buffer.add_char buf c
|
||||||
|
method output bs i len = Buffer.add_subbytes buf bs i len
|
||||||
|
end
|
||||||
|
|
||||||
|
(** Output the buffer slice into this channel *)
|
||||||
|
let[@inline] output_char (self : #t) c : unit = self#output_char c
|
||||||
|
|
||||||
|
(** Output the buffer slice into this channel *)
|
||||||
|
let[@inline] output (self : #t) buf i len : unit = self#output buf i len
|
||||||
|
|
||||||
|
let[@inline] output_string (self : #t) (str : string) : unit =
|
||||||
|
self#output (Bytes.unsafe_of_string str) 0 (String.length str)
|
||||||
|
|
||||||
|
let output_line (self : #t) (str : string) : unit =
|
||||||
|
output_string self str;
|
||||||
|
output_char self '\n'
|
||||||
|
|
||||||
|
(** Close the channel. *)
|
||||||
|
let[@inline] close self : unit = self#close ()
|
||||||
|
|
||||||
|
(** Flush (ie. force write) any buffered bytes. *)
|
||||||
|
let[@inline] flush self : unit = self#flush ()
|
||||||
|
|
||||||
|
let output_int self i =
|
||||||
|
let s = string_of_int i in
|
||||||
|
output_string self s
|
||||||
|
|
||||||
|
let output_lines self seq = Seq.iter (output_line self) seq
|
||||||
|
|
||||||
|
let tee (l : t list) : t =
|
||||||
|
match l with
|
||||||
|
| [] -> dummy
|
||||||
|
| [ oc ] -> oc
|
||||||
|
| _ ->
|
||||||
|
let output bs i len = List.iter (fun oc -> output oc bs i len) l in
|
||||||
|
let output_char c = List.iter (fun oc -> output_char oc c) l in
|
||||||
|
let close () = List.iter close l in
|
||||||
|
let flush () = List.iter flush l in
|
||||||
|
create ~flush ~close ~output ~output_char ()
|
||||||
158
src/lwt/base.ml
Normal file
158
src/lwt/base.ml
Normal file
|
|
@ -0,0 +1,158 @@
|
||||||
|
open Common_
|
||||||
|
module Fiber = Moonpool_fib.Fiber
|
||||||
|
module FLS = Moonpool_fib.Fls
|
||||||
|
|
||||||
|
(** Action scheduled from outside the loop *)
|
||||||
|
module Action = struct
|
||||||
|
type event = Lwt_engine.event
|
||||||
|
type cb = event -> unit
|
||||||
|
|
||||||
|
(** Action that we ask the lwt loop to perform, from the outside *)
|
||||||
|
type t =
|
||||||
|
| Wait_readable of Unix.file_descr * cb
|
||||||
|
| Wait_writable of Unix.file_descr * cb
|
||||||
|
| Sleep of float * bool * cb
|
||||||
|
(* TODO: provide actions with cancellation, alongside a "select" operation *)
|
||||||
|
(* | Cancel of event *)
|
||||||
|
| On_termination : 'a Lwt.t * ('a Exn_bt.result -> unit) -> t
|
||||||
|
| Wakeup : 'a Lwt.u * 'a -> t
|
||||||
|
| Wakeup_exn : _ Lwt.u * exn -> t
|
||||||
|
| Other of (unit -> unit)
|
||||||
|
|
||||||
|
(** Perform the action from within the Lwt thread *)
|
||||||
|
let perform (self : t) : unit =
|
||||||
|
match self with
|
||||||
|
| Wait_readable (fd, cb) -> ignore (Lwt_engine.on_readable fd cb : event)
|
||||||
|
| Wait_writable (fd, cb) -> ignore (Lwt_engine.on_writable fd cb : event)
|
||||||
|
| Sleep (f, repeat, cb) -> ignore (Lwt_engine.on_timer f repeat cb : event)
|
||||||
|
(* | Cancel ev -> Lwt_engine.stop_event ev *)
|
||||||
|
| On_termination (fut, f) ->
|
||||||
|
Lwt.on_any fut
|
||||||
|
(fun x -> f @@ Ok x)
|
||||||
|
(fun exn -> f @@ Error (Exn_bt.get_callstack 10 exn))
|
||||||
|
| Wakeup (prom, x) -> Lwt.wakeup prom x
|
||||||
|
| Wakeup_exn (prom, e) -> Lwt.wakeup_exn prom e
|
||||||
|
| Other f -> f ()
|
||||||
|
end
|
||||||
|
|
||||||
|
module Action_queue = struct
|
||||||
|
type t = { q: Action.t list Atomic.t } [@@unboxed]
|
||||||
|
|
||||||
|
let create () : t = { q = Atomic.make [] }
|
||||||
|
let pop_all (self : t) : _ list = Atomic.exchange self.q []
|
||||||
|
|
||||||
|
(** Push the action and return whether the queue was previously empty *)
|
||||||
|
let push (self : t) (a : Action.t) : bool =
|
||||||
|
let is_first = ref true in
|
||||||
|
while
|
||||||
|
let old = Atomic.get self.q in
|
||||||
|
if Atomic.compare_and_set self.q old (a :: old) then (
|
||||||
|
is_first := old = [];
|
||||||
|
false
|
||||||
|
) else
|
||||||
|
true
|
||||||
|
do
|
||||||
|
()
|
||||||
|
done;
|
||||||
|
!is_first
|
||||||
|
end
|
||||||
|
|
||||||
|
module Perform_action_in_lwt = struct
|
||||||
|
open struct
|
||||||
|
let actions_ : Action_queue.t = Action_queue.create ()
|
||||||
|
|
||||||
|
(** Gets the current set of notifications and perform them from inside the
|
||||||
|
Lwt thread *)
|
||||||
|
let perform_pending_actions () : unit =
|
||||||
|
let l = Action_queue.pop_all actions_ in
|
||||||
|
List.iter Action.perform l
|
||||||
|
|
||||||
|
let notification : int =
|
||||||
|
Lwt_unix.make_notification ~once:false perform_pending_actions
|
||||||
|
end
|
||||||
|
|
||||||
|
let schedule (a : Action.t) : unit =
|
||||||
|
let is_first = Action_queue.push actions_ a in
|
||||||
|
if is_first then Lwt_unix.send_notification notification
|
||||||
|
end
|
||||||
|
|
||||||
|
let get_runner () : M.Runner.t =
|
||||||
|
match M.Runner.get_current_runner () with
|
||||||
|
| Some r -> r
|
||||||
|
| None -> failwith "Moonpool_lwt.get_runner: not inside a runner"
|
||||||
|
|
||||||
|
let lwt_of_fut (fut : 'a M.Fut.t) : 'a Lwt.t =
|
||||||
|
let lwt_fut, lwt_prom = Lwt.wait () in
|
||||||
|
M.Fut.on_result fut (function
|
||||||
|
| Ok x -> Perform_action_in_lwt.schedule @@ Action.Wakeup (lwt_prom, x)
|
||||||
|
| Error (exn, _) ->
|
||||||
|
Perform_action_in_lwt.schedule @@ Action.Wakeup_exn (lwt_prom, exn));
|
||||||
|
lwt_fut
|
||||||
|
|
||||||
|
let fut_of_lwt (lwt_fut : _ Lwt.t) : _ M.Fut.t =
|
||||||
|
match Lwt.poll lwt_fut with
|
||||||
|
| Some x -> M.Fut.return x
|
||||||
|
| None ->
|
||||||
|
let fut, prom = M.Fut.make () in
|
||||||
|
Lwt.on_any lwt_fut
|
||||||
|
(fun x -> M.Fut.fulfill prom (Ok x))
|
||||||
|
(fun e -> M.Fut.fulfill prom (Error (e, Printexc.get_callstack 10)));
|
||||||
|
fut
|
||||||
|
|
||||||
|
let await_lwt (fut : _ Lwt.t) =
|
||||||
|
match Lwt.poll fut with
|
||||||
|
| Some x -> x
|
||||||
|
| None ->
|
||||||
|
(* suspend fiber, wake it up when [fut] resolves *)
|
||||||
|
M.Private.Suspend_.suspend
|
||||||
|
{
|
||||||
|
handle =
|
||||||
|
(fun ~ls ~run:_ ~resume sus ->
|
||||||
|
let on_lwt_done _ = resume ~ls sus @@ Ok () in
|
||||||
|
Perform_action_in_lwt.(
|
||||||
|
schedule Action.(On_termination (fut, on_lwt_done))));
|
||||||
|
};
|
||||||
|
|
||||||
|
(match Lwt.poll fut with
|
||||||
|
| Some x -> x
|
||||||
|
| None -> assert false)
|
||||||
|
|
||||||
|
let run_in_lwt f : _ M.Fut.t =
|
||||||
|
let fut, prom = M.Fut.make () in
|
||||||
|
Perform_action_in_lwt.schedule
|
||||||
|
(Action.Other
|
||||||
|
(fun () ->
|
||||||
|
let lwt_fut = f () in
|
||||||
|
Lwt.on_any lwt_fut
|
||||||
|
(fun x -> M.Fut.fulfill prom @@ Ok x)
|
||||||
|
(fun exn -> M.Fut.fulfill prom @@ Error (Exn_bt.get exn))));
|
||||||
|
fut
|
||||||
|
|
||||||
|
let run_in_lwt_and_await f = M.Fut.await @@ run_in_lwt f
|
||||||
|
|
||||||
|
let detach_in_runner ~runner f : _ Lwt.t =
|
||||||
|
let fut, promise = Lwt.wait () in
|
||||||
|
M.Runner.run_async runner (fun () ->
|
||||||
|
match f () with
|
||||||
|
| x -> Perform_action_in_lwt.schedule @@ Action.Wakeup (promise, x)
|
||||||
|
| exception exn ->
|
||||||
|
Perform_action_in_lwt.schedule @@ Action.Wakeup_exn (promise, exn));
|
||||||
|
fut
|
||||||
|
|
||||||
|
let main_with_runner ~runner (f : unit -> 'a) : 'a =
|
||||||
|
let lwt_fut, lwt_prom = Lwt.wait () in
|
||||||
|
|
||||||
|
let _fiber =
|
||||||
|
Fiber.spawn_top ~name:"Moonpool_lwt.main" ~on:runner (fun () ->
|
||||||
|
try
|
||||||
|
let x = f () in
|
||||||
|
Perform_action_in_lwt.schedule (Action.Wakeup (lwt_prom, x))
|
||||||
|
with exn ->
|
||||||
|
Perform_action_in_lwt.schedule (Action.Wakeup_exn (lwt_prom, exn)))
|
||||||
|
in
|
||||||
|
|
||||||
|
Lwt_main.run lwt_fut
|
||||||
|
|
||||||
|
let main f =
|
||||||
|
let@ runner = M.Ws_pool.with_ () in
|
||||||
|
main_with_runner ~runner f
|
||||||
|
|
@ -2,3 +2,4 @@ module M = Moonpool
|
||||||
module Exn_bt = M.Exn_bt
|
module Exn_bt = M.Exn_bt
|
||||||
|
|
||||||
let ( let@ ) = ( @@ )
|
let ( let@ ) = ( @@ )
|
||||||
|
let _default_buf_size = 4 * 1024
|
||||||
|
|
|
||||||
|
|
@ -1,229 +1,31 @@
|
||||||
open Common_
|
open Common_
|
||||||
module Fiber = Moonpool_fib.Fiber
|
include Base
|
||||||
module FLS = Moonpool_fib.Fls
|
module IO = IO
|
||||||
|
module IO_out = IO_out
|
||||||
|
module IO_in = IO_in
|
||||||
|
|
||||||
(** Action scheduled from outside the loop *)
|
module TCP_server = struct
|
||||||
module Action = struct
|
type t = Lwt_io.server
|
||||||
type event = Lwt_engine.event
|
|
||||||
type cb = event -> unit
|
|
||||||
|
|
||||||
(** Action that we ask the lwt loop to perform, from the outside *)
|
let establish ?backlog ?no_close ~runner addr handler : t =
|
||||||
type t =
|
let server =
|
||||||
| Wait_readable of Unix.file_descr * cb
|
Lwt_io.establish_server_with_client_socket ?backlog ?no_close addr
|
||||||
| Wait_writable of Unix.file_descr * cb
|
(fun client_addr client_sock ->
|
||||||
| Sleep of float * bool * cb
|
let ic = IO_in.of_unix_fd @@ Lwt_unix.unix_file_descr client_sock in
|
||||||
(* TODO: provide actions with cancellation, alongside a "select" operation *)
|
let oc = IO_out.of_unix_fd @@ Lwt_unix.unix_file_descr client_sock in
|
||||||
(* | Cancel of event *)
|
|
||||||
| On_termination : 'a Lwt.t * ('a Exn_bt.result -> unit) -> t
|
|
||||||
| Wakeup : 'a Lwt.u * 'a -> t
|
|
||||||
| Wakeup_exn : _ Lwt.u * exn -> t
|
|
||||||
| Other of (unit -> unit)
|
|
||||||
|
|
||||||
(** Perform the action from within the Lwt thread *)
|
let fut =
|
||||||
let perform (self : t) : unit =
|
M.Fut.spawn ~name:"tcp.server.handler" ~on:runner (fun () ->
|
||||||
match self with
|
handler client_addr ic oc)
|
||||||
| Wait_readable (fd, cb) -> ignore (Lwt_engine.on_readable fd cb : event)
|
in
|
||||||
| Wait_writable (fd, cb) -> ignore (Lwt_engine.on_writable fd cb : event)
|
|
||||||
| Sleep (f, repeat, cb) -> ignore (Lwt_engine.on_timer f repeat cb : event)
|
let lwt_fut = lwt_of_fut fut in
|
||||||
(* | Cancel ev -> Lwt_engine.stop_event ev *)
|
lwt_fut)
|
||||||
| On_termination (fut, f) ->
|
in
|
||||||
Lwt.on_any fut
|
Printf.printf "awaiting server\n%!";
|
||||||
(fun x -> f @@ Ok x)
|
let s = await_lwt server in
|
||||||
(fun exn -> f @@ Error (Exn_bt.get_callstack 10 exn))
|
Printf.printf "got server\n%!";
|
||||||
| Wakeup (prom, x) -> Lwt.wakeup prom x
|
s
|
||||||
| Wakeup_exn (prom, e) -> Lwt.wakeup_exn prom e
|
|
||||||
| Other f -> f ()
|
let shutdown self = await_lwt @@ Lwt_io.shutdown_server self
|
||||||
end
|
|
||||||
|
|
||||||
module Action_queue = struct
|
|
||||||
type t = { q: Action.t list Atomic.t } [@@unboxed]
|
|
||||||
|
|
||||||
let create () : t = { q = Atomic.make [] }
|
|
||||||
let pop_all (self : t) : _ list = Atomic.exchange self.q []
|
|
||||||
|
|
||||||
(** Push the action and return whether the queue was previously empty *)
|
|
||||||
let push (self : t) (a : Action.t) : bool =
|
|
||||||
let is_first = ref true in
|
|
||||||
while
|
|
||||||
let old = Atomic.get self.q in
|
|
||||||
if Atomic.compare_and_set self.q old (a :: old) then (
|
|
||||||
is_first := old = [];
|
|
||||||
false
|
|
||||||
) else
|
|
||||||
true
|
|
||||||
do
|
|
||||||
()
|
|
||||||
done;
|
|
||||||
!is_first
|
|
||||||
end
|
|
||||||
|
|
||||||
module Perform_action_in_lwt = struct
|
|
||||||
open struct
|
|
||||||
let actions_ : Action_queue.t = Action_queue.create ()
|
|
||||||
|
|
||||||
(** Gets the current set of notifications and perform them from inside the
|
|
||||||
Lwt thread *)
|
|
||||||
let perform_pending_actions () : unit =
|
|
||||||
let l = Action_queue.pop_all actions_ in
|
|
||||||
List.iter Action.perform l
|
|
||||||
|
|
||||||
let notification : int =
|
|
||||||
Lwt_unix.make_notification ~once:false perform_pending_actions
|
|
||||||
end
|
|
||||||
|
|
||||||
let schedule (a : Action.t) : unit =
|
|
||||||
let is_first = Action_queue.push actions_ a in
|
|
||||||
if is_first then Lwt_unix.send_notification notification
|
|
||||||
end
|
|
||||||
|
|
||||||
let get_runner () : M.Runner.t =
|
|
||||||
match M.Runner.get_current_runner () with
|
|
||||||
| Some r -> r
|
|
||||||
| None -> failwith "Moonpool_lwt.get_runner: not inside a runner"
|
|
||||||
|
|
||||||
let lwt_of_fut (fut : 'a M.Fut.t) : 'a Lwt.t =
|
|
||||||
let lwt_fut, lwt_prom = Lwt.wait () in
|
|
||||||
M.Fut.on_result fut (function
|
|
||||||
| Ok x -> Perform_action_in_lwt.schedule @@ Action.Wakeup (lwt_prom, x)
|
|
||||||
| Error (exn, _) ->
|
|
||||||
Perform_action_in_lwt.schedule @@ Action.Wakeup_exn (lwt_prom, exn));
|
|
||||||
lwt_fut
|
|
||||||
|
|
||||||
let fut_of_lwt (lwt_fut : _ Lwt.t) : _ M.Fut.t =
|
|
||||||
match Lwt.poll lwt_fut with
|
|
||||||
| Some x -> M.Fut.return x
|
|
||||||
| None ->
|
|
||||||
let fut, prom = M.Fut.make () in
|
|
||||||
Lwt.on_any lwt_fut
|
|
||||||
(fun x -> M.Fut.fulfill prom (Ok x))
|
|
||||||
(fun e -> M.Fut.fulfill prom (Error (e, Printexc.get_callstack 10)));
|
|
||||||
fut
|
|
||||||
|
|
||||||
let await_lwt (fut : _ Lwt.t) =
|
|
||||||
match Lwt.poll fut with
|
|
||||||
| Some x -> x
|
|
||||||
| None ->
|
|
||||||
(* suspend fiber, wake it up when [fut] resolves *)
|
|
||||||
M.Private.Suspend_.suspend
|
|
||||||
{
|
|
||||||
handle =
|
|
||||||
(fun ~ls ~run:_ ~resume sus ->
|
|
||||||
let on_lwt_done _ = resume ~ls sus @@ Ok () in
|
|
||||||
Perform_action_in_lwt.(
|
|
||||||
schedule Action.(On_termination (fut, on_lwt_done))));
|
|
||||||
};
|
|
||||||
|
|
||||||
(match Lwt.poll fut with
|
|
||||||
| Some x -> x
|
|
||||||
| None -> assert false)
|
|
||||||
|
|
||||||
let run_in_lwt f : _ M.Fut.t =
|
|
||||||
let fut, prom = M.Fut.make () in
|
|
||||||
Perform_action_in_lwt.schedule
|
|
||||||
(Action.Other
|
|
||||||
(fun () ->
|
|
||||||
let lwt_fut = f () in
|
|
||||||
Lwt.on_any lwt_fut
|
|
||||||
(fun x -> M.Fut.fulfill prom @@ Ok x)
|
|
||||||
(fun exn -> M.Fut.fulfill prom @@ Error (Exn_bt.get exn))));
|
|
||||||
fut
|
|
||||||
|
|
||||||
let run_in_lwt_and_await f = M.Fut.await @@ run_in_lwt f
|
|
||||||
|
|
||||||
let detach_in_runner ~runner f : _ Lwt.t =
|
|
||||||
let fut, promise = Lwt.wait () in
|
|
||||||
M.Runner.run_async runner (fun () ->
|
|
||||||
match f () with
|
|
||||||
| x -> Perform_action_in_lwt.schedule @@ Action.Wakeup (promise, x)
|
|
||||||
| exception exn ->
|
|
||||||
Perform_action_in_lwt.schedule @@ Action.Wakeup_exn (promise, exn));
|
|
||||||
fut
|
|
||||||
|
|
||||||
let main_with_runner ~runner (f : unit -> 'a) : 'a =
|
|
||||||
let lwt_fut, lwt_prom = Lwt.wait () in
|
|
||||||
|
|
||||||
let _fiber =
|
|
||||||
Fiber.spawn_top ~name:"Moonpool_lwt.main" ~on:runner (fun () ->
|
|
||||||
try
|
|
||||||
let x = f () in
|
|
||||||
Perform_action_in_lwt.schedule (Action.Wakeup (lwt_prom, x))
|
|
||||||
with exn ->
|
|
||||||
Perform_action_in_lwt.schedule (Action.Wakeup_exn (lwt_prom, exn)))
|
|
||||||
in
|
|
||||||
|
|
||||||
Lwt_main.run lwt_fut
|
|
||||||
|
|
||||||
let main f =
|
|
||||||
let@ runner = M.Ws_pool.with_ () in
|
|
||||||
main_with_runner ~runner f
|
|
||||||
|
|
||||||
module IO = struct
|
|
||||||
let rec read fd buf i len : int =
|
|
||||||
if len = 0 then
|
|
||||||
0
|
|
||||||
else (
|
|
||||||
match Unix.read fd buf i len with
|
|
||||||
| exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) ->
|
|
||||||
(* wait for FD to be ready *)
|
|
||||||
Moonpool.Private.Suspend_.suspend
|
|
||||||
{
|
|
||||||
handle =
|
|
||||||
(fun ~ls ~run:_ ~resume sus ->
|
|
||||||
Perform_action_in_lwt.schedule
|
|
||||||
@@ Action.Wait_readable
|
|
||||||
( fd,
|
|
||||||
fun cancel ->
|
|
||||||
resume ~ls sus @@ Ok ();
|
|
||||||
Lwt_engine.stop_event cancel ));
|
|
||||||
};
|
|
||||||
read fd buf i len
|
|
||||||
| n -> n
|
|
||||||
)
|
|
||||||
|
|
||||||
let rec write_once fd buf i len : int =
|
|
||||||
if len = 0 then
|
|
||||||
0
|
|
||||||
else (
|
|
||||||
match Unix.write fd buf i len with
|
|
||||||
| exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) ->
|
|
||||||
(* wait for FD to be ready *)
|
|
||||||
Moonpool.Private.Suspend_.suspend
|
|
||||||
{
|
|
||||||
handle =
|
|
||||||
(fun ~ls ~run:_ ~resume sus ->
|
|
||||||
Perform_action_in_lwt.schedule
|
|
||||||
@@ Action.Wait_writable
|
|
||||||
( fd,
|
|
||||||
fun cancel ->
|
|
||||||
resume ~ls sus @@ Ok ();
|
|
||||||
Lwt_engine.stop_event cancel ));
|
|
||||||
};
|
|
||||||
write_once fd buf i len
|
|
||||||
| n -> n
|
|
||||||
)
|
|
||||||
|
|
||||||
let write fd buf i len : unit =
|
|
||||||
let i = ref i in
|
|
||||||
let len = ref len in
|
|
||||||
while !len > 0 do
|
|
||||||
let n = write_once fd buf !i !len in
|
|
||||||
i := !i + n;
|
|
||||||
len := !len - n
|
|
||||||
done
|
|
||||||
|
|
||||||
(** Sleep for the given amount of seconds *)
|
|
||||||
let sleep_s (f : float) : unit =
|
|
||||||
if f > 0. then
|
|
||||||
Moonpool.Private.Suspend_.suspend
|
|
||||||
{
|
|
||||||
handle =
|
|
||||||
(fun ~ls ~run:_ ~resume sus ->
|
|
||||||
Perform_action_in_lwt.schedule
|
|
||||||
@@ Action.Sleep
|
|
||||||
( f,
|
|
||||||
false,
|
|
||||||
fun cancel ->
|
|
||||||
resume ~ls sus @@ Ok ();
|
|
||||||
Lwt_engine.stop_event cancel ));
|
|
||||||
}
|
|
||||||
end
|
end
|
||||||
|
|
|
||||||
|
|
@ -33,6 +33,17 @@ val get_runner : unit -> Moonpool.Runner.t
|
||||||
Must be run from within a fiber.
|
Must be run from within a fiber.
|
||||||
@raise Failure if not run within a fiber *)
|
@raise Failure if not run within a fiber *)
|
||||||
|
|
||||||
|
(** {2 IO} *)
|
||||||
|
|
||||||
|
(** IO using the Lwt event loop.
|
||||||
|
|
||||||
|
These IO operations work on non-blocking file descriptors
|
||||||
|
and rely on a [Lwt_engine] event loop being active (meaning,
|
||||||
|
[Lwt_main.run] is currently running in some thread).
|
||||||
|
|
||||||
|
Calling these functions must be done from a moonpool runner and
|
||||||
|
will suspend the current task/fut/fiber if the FD is not ready.
|
||||||
|
*)
|
||||||
module IO : sig
|
module IO : sig
|
||||||
val read : Unix.file_descr -> bytes -> int -> int -> int
|
val read : Unix.file_descr -> bytes -> int -> int -> int
|
||||||
val write_once : Unix.file_descr -> bytes -> int -> int -> int
|
val write_once : Unix.file_descr -> bytes -> int -> int -> int
|
||||||
|
|
@ -40,6 +51,24 @@ module IO : sig
|
||||||
val sleep_s : float -> unit
|
val sleep_s : float -> unit
|
||||||
end
|
end
|
||||||
|
|
||||||
|
module IO_in = IO_in
|
||||||
|
module IO_out = IO_out
|
||||||
|
|
||||||
|
module TCP_server : sig
|
||||||
|
type t = Lwt_io.server
|
||||||
|
|
||||||
|
val establish :
|
||||||
|
?backlog:(* ?server_fd:Unix.file_descr -> *)
|
||||||
|
int ->
|
||||||
|
?no_close:bool ->
|
||||||
|
runner:Moonpool.Runner.t ->
|
||||||
|
Unix.sockaddr ->
|
||||||
|
(Unix.sockaddr -> IO_in.t -> IO_out.t -> unit) ->
|
||||||
|
t
|
||||||
|
|
||||||
|
val shutdown : t -> unit
|
||||||
|
end
|
||||||
|
|
||||||
(** {2 Helpers on the lwt side} *)
|
(** {2 Helpers on the lwt side} *)
|
||||||
|
|
||||||
val detach_in_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a Lwt.t
|
val detach_in_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a Lwt.t
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue