This commit is contained in:
Simon Cruanes 2025-09-04 20:03:38 +00:00 committed by GitHub
commit a5740468f5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
60 changed files with 734 additions and 1596 deletions

View file

@ -15,7 +15,7 @@ jobs:
- name: Use OCaml - name: Use OCaml
uses: ocaml/setup-ocaml@v3 uses: ocaml/setup-ocaml@v3
with: with:
ocaml-compiler: '5.0' ocaml-compiler: '5.3'
dune-cache: true dune-cache: true
allow-prerelease-opam: true allow-prerelease-opam: true

View file

@ -16,8 +16,8 @@ jobs:
os: os:
- ubuntu-latest - ubuntu-latest
ocaml-compiler: ocaml-compiler:
- '4.14' - '5.0'
- '5.2' - '5.3'
runs-on: ${{ matrix.os }} runs-on: ${{ matrix.os }}
steps: steps:
@ -32,15 +32,10 @@ jobs:
- run: opam pin picos 0.6.0 -y -n - run: opam pin picos 0.6.0 -y -n
- run: opam install -t moonpool moonpool-lwt moonpool-io --deps-only - run: opam install -t moonpool moonpool-lwt moonpool-io --deps-only
if: matrix.ocaml-compiler == '5.2'
- run: opam install -t moonpool --deps-only
if: matrix.ocaml-compiler != '5.2'
- run: opam exec -- dune build @install - run: opam exec -- dune build @install
# install some depopts # install some depopts
- run: opam install thread-local-storage trace hmap - run: opam install thread-local-storage trace hmap
if: matrix.ocaml-compiler == '5.2'
- run: opam exec -- dune build --profile=release --force @install @runtest - run: opam exec -- dune build --profile=release --force @install @runtest
compat: compat:

View file

@ -1,6 +1,3 @@
(executables (executables
(names fib_rec pi primes) (names fib_rec pi primes)
(preprocess
(action
(run %{project_root}/src/cpp/cpp.exe %{input-file})))
(libraries moonpool moonpool.forkjoin unix trace trace-tef domainslib)) (libraries moonpool moonpool.forkjoin unix trace trace-tef domainslib))

View file

@ -66,8 +66,6 @@ let run_par1 ~kind (num_steps : int) : float =
let pi = step *. Lock.get global_sum in let pi = step *. Lock.get global_sum in
pi pi
[@@@ifge 5.0]
let run_fork_join ~kind num_steps : float = let run_fork_join ~kind num_steps : float =
let@ pool = with_pool ~kind () in let@ pool = with_pool ~kind () in
@ -92,13 +90,6 @@ let run_fork_join ~kind num_steps : float =
let pi = step *. Lock.get global_sum in let pi = step *. Lock.get global_sum in
pi pi
[@@@else_]
let run_fork_join _ =
failwith "fork join not available on this version of OCaml"
[@@@endif]
type mode = type mode =
| Sequential | Sequential
| Par1 | Par1

View file

@ -16,7 +16,7 @@
(name moonpool) (name moonpool)
(synopsis "Pools of threads supported by a pool of domains") (synopsis "Pools of threads supported by a pool of domains")
(depends (depends
(ocaml (>= 4.14)) (ocaml (>= 5.0))
dune dune
(either (>= 1.0)) (either (>= 1.0))
(trace :with-test) (trace :with-test)

View file

@ -9,7 +9,7 @@ tags: ["thread" "pool" "domain" "futures" "fork-join"]
homepage: "https://github.com/c-cube/moonpool" homepage: "https://github.com/c-cube/moonpool"
bug-reports: "https://github.com/c-cube/moonpool/issues" bug-reports: "https://github.com/c-cube/moonpool/issues"
depends: [ depends: [
"ocaml" {>= "4.14"} "ocaml" {>= "5.0"}
"dune" {>= "3.0"} "dune" {>= "3.0"}
"either" {>= "1.0"} "either" {>= "1.0"}
"trace" {with-test} "trace" {with-test}

View file

@ -1,182 +0,0 @@
type 'a t = {
max_size: int;
q: 'a Queue.t;
mutex: Mutex.t;
cond_push: Condition.t;
cond_pop: Condition.t;
mutable closed: bool;
}
exception Closed
let create ~max_size () : _ t =
if max_size < 1 then invalid_arg "Bounded_queue.create";
{
max_size;
mutex = Mutex.create ();
cond_push = Condition.create ();
cond_pop = Condition.create ();
q = Queue.create ();
closed = false;
}
let close (self : _ t) =
Mutex.lock self.mutex;
if not self.closed then (
self.closed <- true;
(* awake waiters so they fail *)
Condition.broadcast self.cond_push;
Condition.broadcast self.cond_pop
);
Mutex.unlock self.mutex
(** Check if the queue is full. Precondition: [self.mutex] is acquired. *)
let[@inline] is_full_ (self : _ t) : bool = Queue.length self.q >= self.max_size
let push (self : _ t) x : unit =
let continue = ref true in
Mutex.lock self.mutex;
while !continue do
if self.closed then (
(* push always fails on a closed queue *)
Mutex.unlock self.mutex;
raise Closed
) else if is_full_ self then
Condition.wait self.cond_push self.mutex
else (
let was_empty = Queue.is_empty self.q in
Queue.push x self.q;
if was_empty then Condition.broadcast self.cond_pop;
(* exit loop *)
continue := false;
Mutex.unlock self.mutex
)
done
let pop (self : 'a t) : 'a =
Mutex.lock self.mutex;
let rec loop () =
if Queue.is_empty self.q then (
if self.closed then (
(* pop fails on a closed queue if it's also empty,
otherwise it still returns the remaining elements *)
Mutex.unlock self.mutex;
raise Closed
);
Condition.wait self.cond_pop self.mutex;
(loop [@tailcall]) ()
) else (
let was_full = is_full_ self in
let x = Queue.pop self.q in
(* wakeup pushers that were blocked *)
if was_full then Condition.broadcast self.cond_push;
Mutex.unlock self.mutex;
x
)
in
loop ()
let try_pop ~force_lock (self : _ t) : _ option =
let has_lock =
if force_lock then (
Mutex.lock self.mutex;
true
) else
Mutex.try_lock self.mutex
in
if has_lock then (
if self.closed then (
Mutex.unlock self.mutex;
raise Closed
);
let was_full_before_pop = is_full_ self in
match Queue.pop self.q with
| x ->
(* wakeup pushers that are blocked *)
if was_full_before_pop then Condition.broadcast self.cond_push;
Mutex.unlock self.mutex;
Some x
| exception Queue.Empty ->
Mutex.unlock self.mutex;
None
) else
None
let try_push ~force_lock (self : _ t) x : bool =
let has_lock =
if force_lock then (
Mutex.lock self.mutex;
true
) else
Mutex.try_lock self.mutex
in
if has_lock then (
if self.closed then (
Mutex.unlock self.mutex;
raise Closed
);
if is_full_ self then (
Mutex.unlock self.mutex;
false
) else (
let was_empty = Queue.is_empty self.q in
Queue.push x self.q;
if was_empty then Condition.broadcast self.cond_pop;
Mutex.unlock self.mutex;
true
)
) else
false
let[@inline] max_size self = self.max_size
let size (self : _ t) : int =
Mutex.lock self.mutex;
let n = Queue.length self.q in
Mutex.unlock self.mutex;
n
let transfer (self : 'a t) q2 : unit =
Mutex.lock self.mutex;
let continue = ref true in
while !continue do
if Queue.is_empty self.q then (
if self.closed then (
Mutex.unlock self.mutex;
raise Closed
);
Condition.wait self.cond_pop self.mutex
) else (
let was_full = is_full_ self in
Queue.transfer self.q q2;
if was_full then Condition.broadcast self.cond_push;
continue := false;
Mutex.unlock self.mutex
)
done
type 'a gen = unit -> 'a option
type 'a iter = ('a -> unit) -> unit
let to_iter self k =
try
while true do
let x = pop self in
k x
done
with Closed -> ()
let to_gen self : _ gen =
fun () ->
match pop self with
| exception Closed -> None
| x -> Some x
let rec to_seq self : _ Seq.t =
fun () ->
match pop self with
| exception Closed -> Seq.Nil
| x -> Seq.Cons (x, to_seq self)

View file

@ -1,82 +0,0 @@
(** A blocking queue of finite size.
This queue, while still using locks underneath (like the regular blocking
queue) should be enough for usage under reasonable contention.
The bounded size is helpful whenever some form of backpressure is desirable:
if the queue is used to communicate between producer(s) and consumer(s), the
consumer(s) can limit the rate at which producer(s) send new work down their
way. Whenever the queue is full, means that producer(s) will have to wait
before pushing new work.
@since 0.4 *)
type 'a t
(** A bounded queue. *)
val create : max_size:int -> unit -> 'a t
val close : _ t -> unit
(** [close q] closes [q]. No new elements can be pushed into [q], and after all
the elements still in [q] currently are [pop]'d, {!pop} will also raise
{!Closed}. *)
exception Closed
val push : 'a t -> 'a -> unit
(** [push q x] pushes [x] at the end of the queue. If [q] is full, this will
block until there is room for [x].
@raise Closed if [q] is closed. *)
val try_push : force_lock:bool -> 'a t -> 'a -> bool
(** [try_push q x] attempts to push [x] into [q], but abandons if it cannot
acquire [q] or if [q] is full.
@param force_lock
if true, use {!Mutex.lock} (which can block under contention); if false,
use {!Mutex.try_lock}, which might return [false] even if there's room in
the queue.
@raise Closed if [q] is closed. *)
val pop : 'a t -> 'a
(** [pop q] pops the first element off [q]. It blocks if [q] is empty, until
some element becomes available.
@raise Closed if [q] is empty and closed. *)
val try_pop : force_lock:bool -> 'a t -> 'a option
(** [try_pop ~force_lock q] tries to pop the first element, or returns [None] if
no element is available or if it failed to acquire [q].
@param force_lock
if true, use {!Mutex.lock} (which can block under contention); if false,
use {!Mutex.try_lock}, which might return [None] even in presence of an
element if there's contention.
@raise Closed if [q] is empty and closed. *)
val size : _ t -> int
(** Number of elements currently in [q] *)
val max_size : _ t -> int
(** Maximum size of the queue. See {!create}. *)
val transfer : 'a t -> 'a Queue.t -> unit
(** [transfer bq q2] transfers all elements currently available in [bq] into
local queue [q2], and clears [bq], atomically. It blocks if [bq] is empty.
See {!Bb_queue.transfer} for more details.
@raise Closed if [bq] is empty and closed. *)
type 'a gen = unit -> 'a option
type 'a iter = ('a -> unit) -> unit
val to_iter : 'a t -> 'a iter
(** [to_iter q] returns an iterator over all items in the queue. This might not
terminate if [q] is never closed. *)
val to_gen : 'a t -> 'a gen
(** [to_gen q] returns a generator from the queue. *)
val to_seq : 'a t -> 'a Seq.t
(** [to_gen q] returns a (transient) sequence from the queue. *)

View file

@ -70,8 +70,6 @@ let close (self : _ t) : unit =
Mutex.unlock self.mutex; Mutex.unlock self.mutex;
Queue.iter Trigger.signal q Queue.iter Trigger.signal q
[@@@ifge 5.0]
let rec push (self : _ t) x : unit = let rec push (self : _ t) x : unit =
Mutex.lock self.mutex; Mutex.lock self.mutex;
@ -120,5 +118,3 @@ let rec pop (self : 'a t) : 'a =
Mutex.unlock self.mutex; Mutex.unlock self.mutex;
Trigger.await_exn tr; Trigger.await_exn tr;
pop self pop self
[@@@endif]

View file

@ -28,8 +28,6 @@ val close : _ t -> unit
(** Close the channel. Further push and pop calls will fail. This is idempotent. (** Close the channel. Further push and pop calls will fail. This is idempotent.
*) *)
[@@@ifge 5.0]
val push : 'a t -> 'a -> unit val push : 'a t -> 'a -> unit
(** Push the value into the channel, suspending the current task if the channel (** Push the value into the channel, suspending the current task if the channel
is currently full. is currently full.
@ -48,5 +46,3 @@ val pop_block_exn : 'a t -> 'a
The precautions around blocking from inside a thread pool The precautions around blocking from inside a thread pool
are the same as explained in {!Fut.wait_block}. *) are the same as explained in {!Fut.wait_block}. *)
*) *)
[@@@endif]

View file

@ -12,7 +12,4 @@
moonpool.dpool moonpool.dpool
(re_export picos)) (re_export picos))
(flags :standard -open Moonpool_private) (flags :standard -open Moonpool_private)
(private_modules util_pool_) (private_modules util_pool_))
(preprocess
(action
(run %{project_root}/src/cpp/cpp.exe %{input-file}))))

View file

@ -28,7 +28,6 @@ type worker_state = {
let[@inline] size_ (self : state) = Array.length self.threads let[@inline] size_ (self : state) = Array.length self.threads
let[@inline] num_tasks_ (self : state) : int = Bb_queue.size self.q let[@inline] num_tasks_ (self : state) : int = Bb_queue.size self.q
let k_worker_state : worker_state TLS.t = TLS.create ()
(* (*
get_thread_state = TLS.get_opt k_worker_state get_thread_state = TLS.get_opt k_worker_state
@ -71,12 +70,6 @@ let schedule_w (self : worker_state) (task : task_full) : unit =
let get_next_task (self : worker_state) = let get_next_task (self : worker_state) =
try Bb_queue.pop self.st.q with Bb_queue.Closed -> raise WL.No_more_tasks try Bb_queue.pop self.st.q with Bb_queue.Closed -> raise WL.No_more_tasks
let get_thread_state () =
match TLS.get_exn k_worker_state with
| st -> st
| exception TLS.Not_set ->
failwith "Moonpool: get_thread_state called from outside a runner."
let before_start (self : worker_state) = let before_start (self : worker_state) =
let t_id = Thread.id @@ Thread.self () in let t_id = Thread.id @@ Thread.self () in
self.st.on_init_thread ~dom_id:self.dom_idx ~t_id (); self.st.on_init_thread ~dom_id:self.dom_idx ~t_id ();
@ -103,7 +96,6 @@ let worker_ops : worker_state WL.ops =
WL.schedule = schedule_w; WL.schedule = schedule_w;
runner; runner;
get_next_task; get_next_task;
get_thread_state;
around_task; around_task;
on_exn; on_exn;
before_start; before_start;

View file

@ -1,4 +1,4 @@
module A = Atomic_ module A = Atomic
module C = Picos.Computation module C = Picos.Computation
type 'a or_error = ('a, Exn_bt.t) result type 'a or_error = ('a, Exn_bt.t) result
@ -424,8 +424,6 @@ let wait_block self =
let bt = Printexc.get_raw_backtrace () in let bt = Printexc.get_raw_backtrace () in
Error (Exn_bt.make exn bt) Error (Exn_bt.make exn bt)
[@@@ifge 5.0]
let await (self : 'a t) : 'a = let await (self : 'a t) : 'a =
(* fast path: peek *) (* fast path: peek *)
match C.peek_exn self with match C.peek_exn self with
@ -439,8 +437,6 @@ let await (self : 'a t) : 'a =
(* un-suspended: we should have a result! *) (* un-suspended: we should have a result! *)
get_or_fail_exn self get_or_fail_exn self
[@@@endif]
module Infix = struct module Infix = struct
let[@inline] ( >|= ) x f = map ~f x let[@inline] ( >|= ) x f = map ~f x
let[@inline] ( >>= ) x f = bind ~f x let[@inline] ( >>= ) x f = bind ~f x

View file

@ -236,8 +236,6 @@ val for_list : on:Runner.t -> 'a list -> ('a -> unit) -> unit t
{b NOTE} This is only available on OCaml 5. *) {b NOTE} This is only available on OCaml 5. *)
[@@@ifge 5.0]
val await : 'a t -> 'a val await : 'a t -> 'a
(** [await fut] suspends the current tasks until [fut] is fulfilled, then (** [await fut] suspends the current tasks until [fut] is fulfilled, then
resumes the task on this same runner (but possibly on a different resumes the task on this same runner (but possibly on a different
@ -248,8 +246,6 @@ val await : 'a t -> 'a
This must only be run from inside the runner itself. The runner must support This must only be run from inside the runner itself. The runner must support
{!Suspend_}. {b NOTE}: only on OCaml 5.x *) {!Suspend_}. {b NOTE}: only on OCaml 5.x *)
[@@@endif]
(** {2 Blocking} *) (** {2 Blocking} *)
val wait_block : 'a t -> 'a or_error val wait_block : 'a t -> 'a or_error

View file

@ -12,18 +12,12 @@ let get_current_runner = Runner.get_current_runner
let recommended_thread_count () = Domain_.recommended_number () let recommended_thread_count () = Domain_.recommended_number ()
let spawn = Fut.spawn let spawn = Fut.spawn
let spawn_on_current_runner = Fut.spawn_on_current_runner let spawn_on_current_runner = Fut.spawn_on_current_runner
[@@@ifge 5.0]
let await = Fut.await let await = Fut.await
let yield = Picos.Fiber.yield let yield = Picos.Fiber.yield
[@@@endif] module Atomic = Atomic
module Atomic = Atomic_
module Blocking_queue = Bb_queue module Blocking_queue = Bb_queue
module Background_thread = Background_thread module Background_thread = Background_thread
module Bounded_queue = Bounded_queue
module Chan = Chan module Chan = Chan
module Exn_bt = Exn_bt module Exn_bt = Exn_bt
module Fifo_pool = Fifo_pool module Fifo_pool = Fifo_pool

View file

@ -72,8 +72,6 @@ val get_current_runner : unit -> Runner.t option
(** See {!Runner.get_current_runner} (** See {!Runner.get_current_runner}
@since 0.7 *) @since 0.7 *)
[@@@ifge 5.0]
val await : 'a Fut.t -> 'a val await : 'a Fut.t -> 'a
(** Await a future, must be run on a moonpool runner. See {!Fut.await}. Only on (** Await a future, must be run on a moonpool runner. See {!Fut.await}. Only on
OCaml >= 5.0. OCaml >= 5.0.
@ -84,8 +82,6 @@ val yield : unit -> unit
>= 5.0. >= 5.0.
@since NEXT_RELEASE *) @since NEXT_RELEASE *)
[@@@endif]
module Lock = Lock module Lock = Lock
module Fut = Fut module Fut = Fut
module Chan = Chan module Chan = Chan
@ -203,9 +199,7 @@ module Blocking_queue : sig
@since 0.4 *) @since 0.4 *)
end end
module Bounded_queue = Bounded_queue module Atomic = Atomic
module Atomic = Atomic_
(** Atomic values. (** Atomic values.
This is either a shim using [ref], on pre-OCaml 5, or the standard [Atomic] This is either a shim using [ref], on pre-OCaml 5, or the standard [Atomic]

View file

@ -21,8 +21,6 @@ exception No_more_tasks
type 'st ops = { type 'st ops = {
schedule: 'st -> task_full -> unit; schedule: 'st -> task_full -> unit;
get_next_task: 'st -> task_full; (** @raise No_more_tasks *) get_next_task: 'st -> task_full; (** @raise No_more_tasks *)
get_thread_state: unit -> 'st;
(** Access current thread's worker state from any worker *)
around_task: 'st -> around_task; around_task: 'st -> around_task;
on_exn: 'st -> Exn_bt.t -> unit; on_exn: 'st -> Exn_bt.t -> unit;
runner: 'st -> Runner.t; runner: 'st -> Runner.t;
@ -33,8 +31,6 @@ type 'st ops = {
(** A dummy task. *) (** A dummy task. *)
let _dummy_task : task_full = T_start { f = ignore; fiber = _dummy_fiber } let _dummy_task : task_full = T_start { f = ignore; fiber = _dummy_fiber }
[@@@ifge 5.0]
let[@inline] discontinue k exn = let[@inline] discontinue k exn =
let bt = Printexc.get_raw_backtrace () in let bt = Printexc.get_raw_backtrace () in
Effect.Deep.discontinue_with_backtrace k exn bt Effect.Deep.discontinue_with_backtrace k exn bt
@ -43,8 +39,8 @@ let[@inline] raise_with_bt exn =
let bt = Printexc.get_raw_backtrace () in let bt = Printexc.get_raw_backtrace () in
Printexc.raise_with_backtrace exn bt Printexc.raise_with_backtrace exn bt
let with_handler (type st arg) ~(ops : st ops) (self : st) : let with_handler (type st) ~(ops : st ops) (self : st) : (unit -> unit) -> unit
(unit -> unit) -> unit = =
let current = let current =
Some Some
(fun k -> (fun k ->
@ -87,8 +83,8 @@ let with_handler (type st arg) ~(ops : st ops) (self : st) :
let fiber = get_current_fiber_exn () in let fiber = get_current_fiber_exn () in
(* when triggers is signaled, reschedule task *) (* when triggers is signaled, reschedule task *)
if not (Picos.Fiber.try_suspend fiber trigger fiber k reschedule) then if not (Picos.Fiber.try_suspend fiber trigger fiber k reschedule) then
(* trigger was already signaled, run task now *) (* trigger was already signaled, reschedule task now *)
Picos.Fiber.resume fiber k) reschedule trigger fiber k)
| Picos.Computation.Cancel_after _r -> | Picos.Computation.Cancel_after _r ->
Some Some
(fun k -> (fun k ->
@ -100,37 +96,28 @@ let with_handler (type st arg) ~(ops : st ops) (self : st) :
let handler = Effect.Deep.{ retc = Fun.id; exnc = raise_with_bt; effc } in let handler = Effect.Deep.{ retc = Fun.id; exnc = raise_with_bt; effc } in
fun f -> Effect.Deep.match_with f () handler fun f -> Effect.Deep.match_with f () handler
[@@@else_] module type FINE_GRAINED_ARGS = sig
type st
let with_handler ~ops:_ self f = f () val ops : st ops
val st : st
end
[@@@endif] module Fine_grained (Args : FINE_GRAINED_ARGS) () = struct
open Args
let worker_loop (type st) ~block_signals ~(ops : st ops) (self : st) : unit = let cur_fiber : fiber ref = ref _dummy_fiber
if block_signals then ( let runner = ops.runner st
try
ignore
(Unix.sigprocmask SIG_BLOCK
[
Sys.sigterm;
Sys.sigpipe;
Sys.sigint;
Sys.sigchld;
Sys.sigalrm;
Sys.sigusr1;
Sys.sigusr2;
]
: _ list)
with _ -> ()
);
let cur_fiber : fiber ref = ref _dummy_fiber in type state =
let runner = ops.runner self in | New
TLS.set Runner.For_runner_implementors.k_cur_runner runner; | Ready
| Torn_down
let (AT_pair (before_task, after_task)) = ops.around_task self in let state = ref New
let run_task (task : task_full) : unit = let run_task (task : task_full) : unit =
let (AT_pair (before_task, after_task)) = ops.around_task st in
let fiber = let fiber =
match task with match task with
| T_start { fiber; _ } | T_resume { fiber; _ } -> fiber | T_start { fiber; _ } | T_resume { fiber; _ } -> fiber
@ -144,32 +131,82 @@ let worker_loop (type st) ~block_signals ~(ops : st ops) (self : st) : unit =
assert (task != _dummy_task); assert (task != _dummy_task);
(try (try
match task with match task with
| T_start { fiber = _; f } -> with_handler ~ops self f | T_start { fiber = _; f } -> with_handler ~ops st f
| T_resume { fiber = _; k } -> | T_resume { fiber = _; k } ->
(* this is already in an effect handler *) (* this is already in an effect handler *)
k () k ()
with e -> with e ->
let bt = Printexc.get_raw_backtrace () in let bt = Printexc.get_raw_backtrace () in
let ebt = Exn_bt.make e bt in let ebt = Exn_bt.make e bt in
ops.on_exn self ebt); ops.on_exn st ebt);
after_task runner _ctx; after_task runner _ctx;
cur_fiber := _dummy_fiber; cur_fiber := _dummy_fiber;
TLS.set k_cur_fiber _dummy_fiber TLS.set k_cur_fiber _dummy_fiber
in
ops.before_start self; let setup ~block_signals () : unit =
if !state <> New then invalid_arg "worker_loop.setup: not a new instance";
state := Ready;
let continue = ref true in if block_signals then (
try try
while !continue do ignore
match ops.get_next_task self with (Unix.sigprocmask SIG_BLOCK
| task -> run_task task [
Sys.sigterm;
Sys.sigpipe;
Sys.sigint;
Sys.sigchld;
Sys.sigalrm;
Sys.sigusr1;
Sys.sigusr2;
]
: _ list)
with _ -> ()
);
TLS.set Runner.For_runner_implementors.k_cur_runner runner;
ops.before_start st
let run ?(max_tasks = max_int) () : unit =
if !state <> Ready then invalid_arg "worker_loop.run: not setup";
let continue = ref true in
let n_tasks = ref 0 in
while !continue && !n_tasks < max_tasks do
match ops.get_next_task st with
| task ->
incr n_tasks;
run_task task
| exception No_more_tasks -> continue := false | exception No_more_tasks -> continue := false
done; done
ops.cleanup self
let teardown () =
if !state <> Torn_down then (
state := Torn_down;
cur_fiber := _dummy_fiber;
ops.cleanup st
)
end
let worker_loop (type st) ~block_signals ~(ops : st ops) (self : st) : unit =
let module FG =
Fine_grained
(struct
type nonrec st = st
let ops = ops
let st = self
end)
()
in
FG.setup ~block_signals ();
try
FG.run ();
FG.teardown ()
with exn -> with exn ->
let bt = Printexc.get_raw_backtrace () in let bt = Printexc.get_raw_backtrace () in
ops.cleanup self; FG.teardown ();
Printexc.raise_with_backtrace exn bt Printexc.raise_with_backtrace exn bt

View file

@ -26,7 +26,6 @@ exception No_more_tasks
type 'st ops = { type 'st ops = {
schedule: 'st -> task_full -> unit; schedule: 'st -> task_full -> unit;
get_next_task: 'st -> task_full; get_next_task: 'st -> task_full;
get_thread_state: unit -> 'st;
around_task: 'st -> around_task; around_task: 'st -> around_task;
on_exn: 'st -> Exn_bt.t -> unit; on_exn: 'st -> Exn_bt.t -> unit;
runner: 'st -> Runner.t; runner: 'st -> Runner.t;
@ -34,4 +33,23 @@ type 'st ops = {
cleanup: 'st -> unit; cleanup: 'st -> unit;
} }
module type FINE_GRAINED_ARGS = sig
type st
val ops : st ops
val st : st
end
module Fine_grained (_ : FINE_GRAINED_ARGS) () : sig
val setup : block_signals:bool -> unit -> unit
(** Just initialize the loop *)
val run : ?max_tasks:int -> unit -> unit
(** Run the loop until no task remains or until [max_tasks] tasks have been
run *)
val teardown : unit -> unit
(** Tear down the loop *)
end
val worker_loop : block_signals:bool -> ops:'st ops -> 'st -> unit val worker_loop : block_signals:bool -> ops:'st ops -> 'st -> unit

View file

@ -1,5 +1,5 @@
open Types_ open Types_
module A = Atomic_ module A = Atomic
module WSQ = Ws_deque_ module WSQ = Ws_deque_
module WL = Worker_loop_ module WL = Worker_loop_
include Runner include Runner
@ -62,12 +62,6 @@ let k_worker_state : worker_state TLS.t = TLS.create ()
let[@inline] get_current_worker_ () : worker_state option = let[@inline] get_current_worker_ () : worker_state option =
TLS.get_opt k_worker_state TLS.get_opt k_worker_state
let[@inline] get_current_worker_exn () : worker_state =
match TLS.get_exn k_worker_state with
| w -> w
| exception TLS.Not_set ->
failwith "Moonpool: get_current_runner was called from outside a pool."
(** Try to wake up a waiter, if there's any. *) (** Try to wake up a waiter, if there's any. *)
let[@inline] try_wake_someone_ (self : state) : unit = let[@inline] try_wake_someone_ (self : state) : unit =
if self.n_waiting_nonzero then ( if self.n_waiting_nonzero then (
@ -212,7 +206,6 @@ let worker_ops : worker_state WL.ops =
WL.schedule = schedule_from_w; WL.schedule = schedule_from_w;
runner; runner;
get_next_task; get_next_task;
get_thread_state = get_current_worker_exn;
around_task; around_task;
on_exn; on_exn;
before_start; before_start;

View file

@ -1,124 +0,0 @@
type op =
| Le
| Ge
type line =
| If of op * int * int
| Elseif of op * int * int
| Else
| Endif
| Raw of string
| Eof
let prefix ~pre s =
let len = String.length pre in
if len > String.length s then
false
else (
let rec check i =
if i = len then
true
else if String.unsafe_get s i <> String.unsafe_get pre i then
false
else
check (i + 1)
in
check 0
)
let eval ~major ~minor op i j =
match op with
| Le -> (major, minor) <= (i, j)
| Ge -> (major, minor) >= (i, j)
let preproc_lines ~file ~major ~minor (ic : in_channel) : unit =
let pos = ref 0 in
let fail msg =
failwith (Printf.sprintf "at line %d in '%s': %s" !pos file msg)
in
let pp_pos () = Printf.printf "#%d %S\n" !pos file in
let parse_line () : line =
match input_line ic with
| exception End_of_file -> Eof
| line ->
let line' = String.trim line in
incr pos;
if line' <> "" && line'.[0] = '[' then
if prefix line' ~pre:"[@@@ifle" then
Scanf.sscanf line' "[@@@ifle %d.%d]" (fun x y -> If (Le, x, y))
else if prefix line' ~pre:"[@@@ifge" then
Scanf.sscanf line' "[@@@ifge %d.%d]" (fun x y -> If (Ge, x, y))
else if prefix line' ~pre:"[@@@elifle" then
Scanf.sscanf line' "[@@@elifle %d.%d]" (fun x y -> Elseif (Le, x, y))
else if prefix line' ~pre:"[@@@elifge" then
Scanf.sscanf line' "[@@@elifge %d.%d]" (fun x y -> Elseif (Ge, x, y))
else if line' = "[@@@else_]" then
Else
else if line' = "[@@@endif]" then
Endif
else
Raw line
else
Raw line
in
(* entry point *)
let rec top () =
match parse_line () with
| Eof -> ()
| If (op, i, j) ->
if eval ~major ~minor op i j then (
pp_pos ();
cat_block ()
) else
skip_block ~elseok:true ()
| Raw s ->
print_endline s;
top ()
| Elseif _ | Else | Endif -> fail "unexpected elseif|else|endif"
(* current block is the valid one *)
and cat_block () =
match parse_line () with
| Eof -> fail "unexpected EOF"
| If _ -> fail "nested if not supported"
| Raw s ->
print_endline s;
cat_block ()
| Endif ->
pp_pos ();
top ()
| Elseif _ | Else -> skip_block ~elseok:false ()
(* skip current block.
@param elseok if true, we should evaluate "elseif" *)
and skip_block ~elseok () =
match parse_line () with
| Eof -> fail "unexpected EOF"
| If _ -> fail "nested if not supported"
| Raw _ -> skip_block ~elseok ()
| Endif ->
pp_pos ();
top ()
| Elseif (op, i, j) ->
if elseok && eval ~major ~minor op i j then (
pp_pos ();
cat_block ()
) else
skip_block ~elseok ()
| Else ->
if elseok then (
pp_pos ();
cat_block ()
) else
skip_block ~elseok ()
in
top ()
let () =
let file = Sys.argv.(1) in
let version = Sys.ocaml_version in
let major, minor = Scanf.sscanf version "%u.%u" (fun maj min -> maj, min) in
let ic = open_in file in
preproc_lines ~file ~major ~minor ic;
()

View file

@ -1,6 +0,0 @@
; our little preprocessor (ported from containers)
(executable
(name cpp)
(modes
(best exe)))

View file

@ -2,8 +2,5 @@
(name moonpool_dpool) (name moonpool_dpool)
(public_name moonpool.dpool) (public_name moonpool.dpool)
(synopsis "Moonpool's domain pool (used to start worker threads)") (synopsis "Moonpool's domain pool (used to start worker threads)")
(preprocess
(action
(run %{project_root}/src/cpp/cpp.exe %{input-file})))
(flags :standard -open Moonpool_private) (flags :standard -open Moonpool_private)
(libraries moonpool.private)) (libraries moonpool.private))

View file

@ -71,7 +71,7 @@ type event =
new threads for pools. *) new threads for pools. *)
type worker_state = { type worker_state = {
q: event Bb_queue.t; q: event Bb_queue.t;
th_count: int Atomic_.t; (** Number of threads on this *) th_count: int Atomic.t; (** Number of threads on this *)
} }
(** Array of (optional) workers. (** Array of (optional) workers.
@ -101,14 +101,14 @@ let work_ idx (st : worker_state) : unit =
match Bb_queue.pop st.q with match Bb_queue.pop st.q with
| Run f -> (try f () with _ -> ()) | Run f -> (try f () with _ -> ())
| Decr -> | Decr ->
if Atomic_.fetch_and_add st.th_count (-1) = 1 then ( if Atomic.fetch_and_add st.th_count (-1) = 1 then (
continue := false; continue := false;
(* wait a bit, we might be needed again in a short amount of time *) (* wait a bit, we might be needed again in a short amount of time *)
try try
for _n_attempt = 1 to 50 do for _n_attempt = 1 to 50 do
Thread.delay 0.001; Thread.delay 0.001;
if Atomic_.get st.th_count > 0 then ( if Atomic.get st.th_count > 0 then (
(* needed again! *) (* needed again! *)
continue := true; continue := true;
raise Exit raise Exit
@ -129,7 +129,7 @@ let work_ idx (st : worker_state) : unit =
| Some _st', dom -> | Some _st', dom ->
assert (st == _st'); assert (st == _st');
if Atomic_.get st.th_count > 0 then if Atomic.get st.th_count > 0 then
(* still alive! *) (* still alive! *)
(Some st, dom), true (Some st, dom), true
else else
@ -145,7 +145,7 @@ let work_ idx (st : worker_state) : unit =
(* special case for main domain: we start a worker immediately *) (* special case for main domain: we start a worker immediately *)
let () = let () =
assert (Domain_.is_main_domain ()); assert (Domain_.is_main_domain ());
let w = { th_count = Atomic_.make 1; q = Bb_queue.create () } in let w = { th_count = Atomic.make 1; q = Bb_queue.create () } in
(* thread that stays alive *) (* thread that stays alive *)
ignore (Thread.create (fun () -> work_ 0 w) () : Thread.t); ignore (Thread.create (fun () -> work_ 0 w) () : Thread.t);
domains_.(0) <- Lock.create (Some w, None) domains_.(0) <- Lock.create (Some w, None)
@ -157,12 +157,12 @@ let run_on (i : int) (f : unit -> unit) : unit =
let w = let w =
Lock.update_map domains_.(i) (function Lock.update_map domains_.(i) (function
| (Some w, _) as st -> | (Some w, _) as st ->
Atomic_.incr w.th_count; Atomic.incr w.th_count;
st, w st, w
| None, dying_dom -> | None, dying_dom ->
(* join previous dying domain, to free its resources, if any *) (* join previous dying domain, to free its resources, if any *)
Option.iter Domain_.join dying_dom; Option.iter Domain_.join dying_dom;
let w = { th_count = Atomic_.make 1; q = Bb_queue.create () } in let w = { th_count = Atomic.make 1; q = Bb_queue.create () } in
let worker : domain = Domain_.spawn (fun () -> work_ i w) in let worker : domain = Domain_.spawn (fun () -> work_ i w) in
(Some w, Some worker), w) (Some w, Some worker), w)
in in

View file

@ -3,10 +3,4 @@
(public_name moonpool.fib) (public_name moonpool.fib)
(synopsis "Fibers and structured concurrency for Moonpool") (synopsis "Fibers and structured concurrency for Moonpool")
(libraries moonpool picos) (libraries moonpool picos)
(enabled_if (flags :standard -open Moonpool_private -open Moonpool))
(>= %{ocaml_version} 5.0))
(flags :standard -open Moonpool_private -open Moonpool)
(optional)
(preprocess
(action
(run %{project_root}/src/cpp/cpp.exe %{input-file}))))

View file

@ -5,8 +5,15 @@
@since 0.6. *) @since 0.6. *)
module Fiber = Fiber module Fiber = Fiber
[@@deprecated "use picos structured concurrency or something else"]
module Fls = Fls module Fls = Fls
module Handle = Handle module Handle = Handle
module Main = Main module Main = Main
[@@deprecated "use picos structured concurrency or something else"]
[@@@ocaml.alert "-deprecated"]
include Fiber include Fiber
include Main include Main

View file

@ -4,6 +4,4 @@
(synopsis "Fork-join parallelism for moonpool") (synopsis "Fork-join parallelism for moonpool")
(flags :standard -open Moonpool) (flags :standard -open Moonpool)
(optional) (optional)
(enabled_if
(>= %{ocaml_version} 5.0))
(libraries moonpool moonpool.private picos)) (libraries moonpool moonpool.private picos))

View file

@ -1,3 +1,5 @@
[@@@deprecated "just use lwt or eio or something else"]
module Fd = Picos_io_fd module Fd = Picos_io_fd
module Unix = Picos_io.Unix module Unix = Picos_io.Unix
module Select = Picos_io_select module Select = Picos_io_select

View file

@ -1,66 +0,0 @@
open Base
let await_readable fd : unit =
let trigger = Trigger.create () in
Perform_action_in_lwt.schedule
@@ Action.Wait_readable
( fd,
fun cancel ->
Trigger.signal trigger;
Lwt_engine.stop_event cancel );
Trigger.await_exn trigger
let rec read fd buf i len : int =
if len = 0 then
0
else (
match Unix.read fd buf i len with
| exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) ->
await_readable fd;
read fd buf i len
| n -> n
)
let await_writable fd =
let trigger = Trigger.create () in
Perform_action_in_lwt.schedule
@@ Action.Wait_writable
( fd,
fun cancel ->
Trigger.signal trigger;
Lwt_engine.stop_event cancel );
Trigger.await_exn trigger
let rec write_once fd buf i len : int =
if len = 0 then
0
else (
match Unix.write fd buf i len with
| exception Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK), _, _) ->
await_writable fd;
write_once fd buf i len
| n -> n
)
let write fd buf i len : unit =
let i = ref i in
let len = ref len in
while !len > 0 do
let n = write_once fd buf !i !len in
i := !i + n;
len := !len - n
done
(** Sleep for the given amount of seconds *)
let sleep_s (f : float) : unit =
if f > 0. then (
let trigger = Trigger.create () in
Perform_action_in_lwt.schedule
@@ Action.Sleep
( f,
false,
fun cancel ->
Trigger.signal trigger;
Lwt_engine.stop_event cancel );
Trigger.await_exn trigger
)

View file

@ -1,152 +0,0 @@
open Common_
class type t = object
method input : bytes -> int -> int -> int
(** Read into the slice. Returns [0] only if the stream is closed. *)
method close : unit -> unit
(** Close the input. Must be idempotent. *)
end
let create ?(close = ignore) ~input () : t =
object
method close = close
method input = input
end
let empty : t =
object
method close () = ()
method input _ _ _ = 0
end
let of_bytes ?(off = 0) ?len (b : bytes) : t =
(* i: current position in [b] *)
let i = ref off in
let len =
match len with
| Some n ->
if n > Bytes.length b - off then invalid_arg "Iostream.In.of_bytes";
n
| None -> Bytes.length b - off
in
let end_ = off + len in
object
method input b_out i_out len_out =
let n = min (end_ - !i) len_out in
Bytes.blit b !i b_out i_out n;
i := !i + n;
n
method close () = i := end_
end
let of_string ?off ?len s : t = of_bytes ?off ?len (Bytes.unsafe_of_string s)
(** Read into the given slice.
@return the number of bytes read, [0] means end of input. *)
let[@inline] input (self : #t) buf i len = self#input buf i len
(** Close the channel. *)
let[@inline] close self : unit = self#close ()
let rec really_input (self : #t) buf i len =
if len > 0 then (
let n = input self buf i len in
if n = 0 then raise End_of_file;
(really_input [@tailrec]) self buf (i + n) (len - n)
)
let really_input_string self n : string =
let buf = Bytes.create n in
really_input self buf 0 n;
Bytes.unsafe_to_string buf
let copy_into ?(buf = Bytes.create _default_buf_size) (ic : #t) (oc : IO_out.t)
: unit =
let continue = ref true in
while !continue do
let len = input ic buf 0 (Bytes.length buf) in
if len = 0 then
continue := false
else
IO_out.output oc buf 0 len
done
let concat (l0 : t list) : t =
let l = ref l0 in
let rec input b i len : int =
match !l with
| [] -> 0
| ic :: tl ->
let n = ic#input b i len in
if n > 0 then
n
else (
l := tl;
input b i len
)
in
let close () = List.iter close l0 in
create ~close ~input ()
let input_all ?(buf = Bytes.create 128) (self : #t) : string =
let buf = ref buf in
let i = ref 0 in
let[@inline] full_ () = !i = Bytes.length !buf in
let grow_ () =
let old_size = Bytes.length !buf in
let new_size = min Sys.max_string_length (old_size + (old_size / 4) + 10) in
if old_size = new_size then
failwith "input_all: maximum input size exceeded";
let new_buf = Bytes.extend !buf 0 (new_size - old_size) in
buf := new_buf
in
let rec loop () =
if full_ () then grow_ ();
let available = Bytes.length !buf - !i in
let n = input self !buf !i available in
if n > 0 then (
i := !i + n;
(loop [@tailrec]) ()
)
in
loop ();
if full_ () then
Bytes.unsafe_to_string !buf
else
Bytes.sub_string !buf 0 !i
let of_unix_fd ?(close_noerr = false) ?(buf = Bytes.create _default_buf_size)
(fd : Unix.file_descr) : t =
let buf_len = ref 0 in
let buf_off = ref 0 in
let refill () =
buf_off := 0;
buf_len := IO.read fd buf 0 (Bytes.length buf)
in
object
method input b i len : int =
if !buf_len = 0 then refill ();
let n = min len !buf_len in
if n > 0 then (
Bytes.blit buf !buf_off b i n;
buf_off := !buf_off + n;
buf_len := !buf_len - n
);
n
method close () =
if close_noerr then (
try Unix.close fd with _ -> ()
) else
Unix.close fd
end

View file

@ -1,118 +0,0 @@
open Common_
class type t = object
method output_char : char -> unit
method output : bytes -> int -> int -> unit
method flush : unit -> unit
method close : unit -> unit
end
let create ?(flush = ignore) ?(close = ignore) ~output_char ~output () : t =
object
method flush () = flush ()
method close () = close ()
method output_char c = output_char c
method output bs i len = output bs i len
end
let dummy : t =
object
method flush () = ()
method close () = ()
method output_char _ = ()
method output _ _ _ = ()
end
let of_unix_fd ?(close_noerr = false) ?(buf = Bytes.create _default_buf_size) fd
: t =
let buf_off = ref 0 in
let[@inline] is_full () = !buf_off = Bytes.length buf in
let flush () =
if !buf_off > 0 then (
IO.write fd buf 0 !buf_off;
buf_off := 0
)
in
object
method output_char c =
if is_full () then flush ();
Bytes.set buf !buf_off c;
incr buf_off
method output bs i len : unit =
let i = ref i in
let len = ref len in
while !len > 0 do
(* make space *)
if is_full () then flush ();
let n = min !len (Bytes.length buf - !buf_off) in
Bytes.blit bs !i buf !buf_off n;
buf_off := !buf_off + n;
i := !i + n;
len := !len - n
done;
(* if full, write eagerly *)
if is_full () then flush ()
method close () =
if close_noerr then (
try
flush ();
Unix.close fd
with _ -> ()
) else (
flush ();
Unix.close fd
)
method flush = flush
end
let of_buffer (buf : Buffer.t) : t =
object
method close () = ()
method flush () = ()
method output_char c = Buffer.add_char buf c
method output bs i len = Buffer.add_subbytes buf bs i len
end
(** Output the buffer slice into this channel *)
let[@inline] output_char (self : #t) c : unit = self#output_char c
(** Output the buffer slice into this channel *)
let[@inline] output (self : #t) buf i len : unit = self#output buf i len
let[@inline] output_string (self : #t) (str : string) : unit =
self#output (Bytes.unsafe_of_string str) 0 (String.length str)
let output_line (self : #t) (str : string) : unit =
output_string self str;
output_char self '\n'
(** Close the channel. *)
let[@inline] close self : unit = self#close ()
(** Flush (ie. force write) any buffered bytes. *)
let[@inline] flush self : unit = self#flush ()
let output_int self i =
let s = string_of_int i in
output_string self s
let output_lines self seq = Seq.iter (output_line self) seq
let tee (l : t list) : t =
match l with
| [] -> dummy
| [ oc ] -> oc
| _ ->
let output bs i len = List.iter (fun oc -> output oc bs i len) l in
let output_char c = List.iter (fun oc -> output_char oc c) l in
let close () = List.iter close l in
let flush () = List.iter flush l in
create ~flush ~close ~output ~output_char ()

View file

@ -1,167 +0,0 @@
open Common_
module Trigger = M.Trigger
module Fiber = Moonpool_fib.Fiber
module FLS = Moonpool_fib.Fls
(** Action scheduled from outside the loop *)
module Action = struct
type event = Lwt_engine.event
type cb = event -> unit
(** Action that we ask the lwt loop to perform, from the outside *)
type t =
| Wait_readable of Unix.file_descr * cb
| Wait_writable of Unix.file_descr * cb
| Sleep of float * bool * cb
(* TODO: provide actions with cancellation, alongside a "select" operation *)
(* | Cancel of event *)
| On_termination : 'a Lwt.t * 'a Exn_bt.result ref * Trigger.t -> t
| Wakeup : 'a Lwt.u * 'a -> t
| Wakeup_exn : _ Lwt.u * exn -> t
| Other of (unit -> unit)
(** Perform the action from within the Lwt thread *)
let perform (self : t) : unit =
match self with
| Wait_readable (fd, cb) -> ignore (Lwt_engine.on_readable fd cb : event)
| Wait_writable (fd, cb) -> ignore (Lwt_engine.on_writable fd cb : event)
| Sleep (f, repeat, cb) -> ignore (Lwt_engine.on_timer f repeat cb : event)
(* | Cancel ev -> Lwt_engine.stop_event ev *)
| On_termination (fut, res, trigger) ->
Lwt.on_any fut
(fun x ->
res := Ok x;
Trigger.signal trigger)
(fun exn ->
res := Error (Exn_bt.get_callstack 10 exn);
Trigger.signal trigger)
| Wakeup (prom, x) -> Lwt.wakeup prom x
| Wakeup_exn (prom, e) -> Lwt.wakeup_exn prom e
| Other f -> f ()
end
module Action_queue = struct
type t = { q: Action.t list Atomic.t } [@@unboxed]
let create () : t = { q = Atomic.make [] }
let pop_all (self : t) : _ list = Atomic.exchange self.q []
(** Push the action and return whether the queue was previously empty *)
let push (self : t) (a : Action.t) : bool =
let is_first = ref true in
while
let old = Atomic.get self.q in
if Atomic.compare_and_set self.q old (a :: old) then (
is_first := old == [];
false
) else
true
do
()
done;
!is_first
end
module Perform_action_in_lwt = struct
open struct
let actions_ : Action_queue.t = Action_queue.create ()
(** Gets the current set of notifications and perform them from inside the
Lwt thread *)
let perform_pending_actions () : unit =
let@ _sp =
Moonpool.Private.Tracing_.with_span
"moonpool-lwt.perform-pending-actions"
in
let l = Action_queue.pop_all actions_ in
List.iter Action.perform l
let notification : int =
Lwt_unix.make_notification ~once:false perform_pending_actions
end
let schedule (a : Action.t) : unit =
let is_first = Action_queue.push actions_ a in
if is_first then Lwt_unix.send_notification notification
end
let get_runner () : M.Runner.t =
match M.Runner.get_current_runner () with
| Some r -> r
| None -> failwith "Moonpool_lwt.get_runner: not inside a runner"
let lwt_of_fut (fut : 'a M.Fut.t) : 'a Lwt.t =
let lwt_fut, lwt_prom = Lwt.wait () in
M.Fut.on_result fut (function
| Ok x -> Perform_action_in_lwt.schedule @@ Action.Wakeup (lwt_prom, x)
| Error ebt ->
let exn = Exn_bt.exn ebt in
Perform_action_in_lwt.schedule @@ Action.Wakeup_exn (lwt_prom, exn));
lwt_fut
let fut_of_lwt (lwt_fut : _ Lwt.t) : _ M.Fut.t =
match Lwt.poll lwt_fut with
| Some x -> M.Fut.return x
| None ->
let fut, prom = M.Fut.make () in
Lwt.on_any lwt_fut
(fun x -> M.Fut.fulfill prom (Ok x))
(fun exn ->
let bt = Printexc.get_callstack 10 in
M.Fut.fulfill prom (Error (Exn_bt.make exn bt)));
fut
let _dummy_exn_bt : Exn_bt.t =
Exn_bt.get_callstack 0 (Failure "dummy Exn_bt from moonpool-lwt")
let await_lwt (fut : _ Lwt.t) =
match Lwt.poll fut with
| Some x -> x
| None ->
(* suspend fiber, wake it up when [fut] resolves *)
let trigger = M.Trigger.create () in
let res = ref (Error _dummy_exn_bt) in
Perform_action_in_lwt.(schedule Action.(On_termination (fut, res, trigger)));
Trigger.await trigger |> Option.iter Exn_bt.raise;
Exn_bt.unwrap !res
let run_in_lwt f : _ M.Fut.t =
let fut, prom = M.Fut.make () in
Perform_action_in_lwt.schedule
(Action.Other
(fun () ->
let lwt_fut = f () in
Lwt.on_any lwt_fut
(fun x -> M.Fut.fulfill prom @@ Ok x)
(fun exn -> M.Fut.fulfill prom @@ Error (Exn_bt.get exn))));
fut
let run_in_lwt_and_await f = M.Fut.await @@ run_in_lwt f
let detach_in_runner ~runner f : _ Lwt.t =
let fut, promise = Lwt.wait () in
M.Runner.run_async runner (fun () ->
match f () with
| x -> Perform_action_in_lwt.schedule @@ Action.Wakeup (promise, x)
| exception exn ->
Perform_action_in_lwt.schedule @@ Action.Wakeup_exn (promise, exn));
fut
let main_with_runner ~runner (f : unit -> 'a) : 'a =
let lwt_fut, lwt_prom = Lwt.wait () in
let _fiber =
Fiber.spawn_top ~on:runner (fun () ->
try
let x = f () in
Perform_action_in_lwt.schedule (Action.Wakeup (lwt_prom, x))
with exn ->
Perform_action_in_lwt.schedule (Action.Wakeup_exn (lwt_prom, exn)))
in
Lwt_main.run lwt_fut
let main f =
let@ runner = M.Ws_pool.with_ () in
main_with_runner ~runner f

View file

@ -1,5 +0,0 @@
module M = Moonpool
module Exn_bt = M.Exn_bt
let ( let@ ) = ( @@ )
let _default_buf_size = 4 * 1024

View file

@ -1,12 +1,10 @@
(library (library
(name moonpool_lwt) (name moonpool_lwt)
(public_name moonpool-lwt) (public_name moonpool-lwt)
(private_modules common_)
(enabled_if (enabled_if
(>= %{ocaml_version} 5.0)) (>= %{ocaml_version} 5.0))
(libraries (libraries
(re_export moonpool) (re_export moonpool)
(re_export moonpool.fib)
picos picos
(re_export lwt) (re_export lwt)
lwt.unix)) lwt.unix))

View file

@ -1,6 +1,310 @@
include Base module Exn_bt = Moonpool.Exn_bt
module IO = IO
module IO_out = IO_out open struct
module IO_in = IO_in module WL = Moonpool.Private.Worker_loop_
module TCP_server = Tcp_server module M = Moonpool
module TCP_client = Tcp_client end
module Fut = Moonpool.Fut
let default_around_task_ : WL.around_task = AT_pair (ignore, fun _ _ -> ())
let on_uncaught_exn : (Moonpool.Exn_bt.t -> unit) ref =
ref (fun ebt ->
Printf.eprintf "uncaught exception in moonpool-lwt:\n%s" (Exn_bt.show ebt))
module Scheduler_state = struct
type st = {
tasks: WL.task_full Queue.t;
actions_from_other_threads: (unit -> unit) Queue.t;
(** Other threads ask us to run closures in the lwt thread *)
mutex: Mutex.t;
mutable thread: int;
closed: bool Atomic.t;
mutable as_runner: Moonpool.Runner.t;
mutable enter_hook: Lwt_main.Enter_iter_hooks.hook option;
mutable leave_hook: Lwt_main.Leave_iter_hooks.hook option;
mutable notification: int;
(** A lwt_unix notification to wake up the event loop *)
has_notified: bool Atomic.t;
}
(** Main state *)
let cur_st : st option Atomic.t = Atomic.make None
let create_new () : st =
{
tasks = Queue.create ();
actions_from_other_threads = Queue.create ();
mutex = Mutex.create ();
thread = Thread.id (Thread.self ());
closed = Atomic.make false;
as_runner = Moonpool.Runner.dummy;
enter_hook = None;
leave_hook = None;
notification = 0;
has_notified = Atomic.make false;
}
let[@inline never] add_action_from_another_thread_ (self : st) f : unit =
Mutex.lock self.mutex;
Queue.push f self.actions_from_other_threads;
if not (Atomic.exchange self.has_notified true) then
Lwt_unix.send_notification self.notification;
Mutex.unlock self.mutex
let[@inline] on_lwt_thread_ (self : st) : bool =
Thread.id (Thread.self ()) = self.thread
let[@inline] run_on_lwt_thread_ (self : st) (f : unit -> unit) : unit =
if on_lwt_thread_ self then
f ()
else
add_action_from_another_thread_ self f
let cleanup (st : st) =
match Atomic.get cur_st with
| Some st' ->
if st != st' then
failwith
"moonpool-lwt: cleanup failed (state is not the currently active \
one!)";
if not (on_lwt_thread_ st) then
failwith "moonpool-lwt: cleanup from the wrong thread";
Option.iter Lwt_main.Enter_iter_hooks.remove st.enter_hook;
Option.iter Lwt_main.Leave_iter_hooks.remove st.leave_hook;
Atomic.set cur_st None
| _ -> ()
end
module Ops = struct
type st = Scheduler_state.st
let around_task _ = default_around_task_
let schedule (self : st) t =
if Atomic.get self.closed then
failwith "moonpool-lwt.schedule: scheduler is closed";
Scheduler_state.run_on_lwt_thread_ self (fun () -> Queue.push t self.tasks)
let get_next_task (self : st) =
if Atomic.get self.closed then raise WL.No_more_tasks;
try Queue.pop self.tasks with Queue.Empty -> raise WL.No_more_tasks
let on_exn _ ebt = !on_uncaught_exn ebt
let runner (self : st) = self.as_runner
let cleanup = Scheduler_state.cleanup
let as_runner (self : st) : Moonpool.Runner.t =
Moonpool.Runner.For_runner_implementors.create
~size:(fun () -> 1)
~num_tasks:(fun () ->
Mutex.lock self.mutex;
let n = Queue.length self.tasks in
Mutex.unlock self.mutex;
n)
~run_async:(fun ~fiber f -> schedule self @@ WL.T_start { fiber; f })
~shutdown:(fun ~wait:_ () -> Atomic.set self.closed true)
()
let before_start (self : st) : unit =
self.as_runner <- as_runner self;
()
let ops : st WL.ops =
{
schedule;
around_task;
get_next_task;
on_exn;
runner;
before_start;
cleanup;
}
let setup st =
if Atomic.compare_and_set Scheduler_state.cur_st None (Some st) then
before_start st
else
failwith "moonpool-lwt: setup failed (state already in place)"
end
(** Resolve [prom] with the result of [lwt_fut] *)
let transfer_lwt_to_fut (lwt_fut : 'a Lwt.t) (prom : 'a Fut.promise) : unit =
Lwt.on_any lwt_fut
(fun x -> M.Fut.fulfill prom (Ok x))
(fun exn ->
let bt = Printexc.get_callstack 10 in
M.Fut.fulfill prom (Error (Exn_bt.make exn bt)))
let[@inline] register_trigger_on_lwt_termination (lwt_fut : _ Lwt.t)
(tr : M.Trigger.t) : unit =
Lwt.on_termination lwt_fut (fun _ -> M.Trigger.signal tr)
let[@inline] await_lwt_terminated (fut : _ Lwt.t) =
match Lwt.state fut with
| Return x -> x
| Fail exn -> raise exn
| Sleep -> assert false
module Main_state = struct
let[@inline] get_st () : Scheduler_state.st =
match Atomic.get Scheduler_state.cur_st with
| Some st ->
if Atomic.get st.closed then failwith "moonpool-lwt: scheduler is closed";
st
| None -> failwith "moonpool-lwt: scheduler is not setup"
let[@inline] run_on_lwt_thread f =
Scheduler_state.run_on_lwt_thread_ (get_st ()) f
let[@inline] on_lwt_thread () : bool =
Scheduler_state.on_lwt_thread_ (get_st ())
let[@inline] add_action_from_another_thread f : unit =
Scheduler_state.add_action_from_another_thread_ (get_st ()) f
end
let await_lwt (fut : _ Lwt.t) =
if Scheduler_state.on_lwt_thread_ (Main_state.get_st ()) then (
(* can directly access the future *)
match Lwt.state fut with
| Return x -> x
| Fail exn -> raise exn
| Sleep ->
let tr = M.Trigger.create () in
register_trigger_on_lwt_termination fut tr;
M.Trigger.await_exn tr;
await_lwt_terminated fut
) else (
let tr = M.Trigger.create () in
Main_state.add_action_from_another_thread (fun () ->
register_trigger_on_lwt_termination fut tr);
M.Trigger.await_exn tr;
await_lwt_terminated fut
)
let lwt_of_fut (fut : 'a M.Fut.t) : 'a Lwt.t =
if not (Main_state.on_lwt_thread ()) then
failwith "lwt_of_fut: not on the lwt thread";
let lwt_fut, lwt_prom = Lwt.wait () in
(* in lwt thread, resolve [lwt_fut] *)
let wakeup_using_res = function
| Ok x -> Lwt.wakeup lwt_prom x
| Error ebt ->
let exn = Exn_bt.exn ebt in
Lwt.wakeup_exn lwt_prom exn
in
M.Fut.on_result fut (fun res ->
Main_state.run_on_lwt_thread (fun () ->
(* can safely wakeup from the lwt thread *)
wakeup_using_res res));
lwt_fut
let fut_of_lwt (lwt_fut : _ Lwt.t) : _ M.Fut.t =
if Main_state.on_lwt_thread () then (
match Lwt.state lwt_fut with
| Return x -> M.Fut.return x
| _ ->
let fut, prom = M.Fut.make () in
transfer_lwt_to_fut lwt_fut prom;
fut
) else (
let fut, prom = M.Fut.make () in
Main_state.add_action_from_another_thread (fun () ->
transfer_lwt_to_fut lwt_fut prom);
fut
)
let run_in_lwt_and_await (f : unit -> 'a Lwt.t) : 'a =
if Main_state.on_lwt_thread () then (
let fut = f () in
await_lwt fut
) else (
let fut, prom = Fut.make () in
Main_state.add_action_from_another_thread (fun () ->
let lwt_fut = f () in
transfer_lwt_to_fut lwt_fut prom);
Fut.await fut
)
module Setup_lwt_hooks (ARG : sig
val st : Scheduler_state.st
end) =
struct
open ARG
module FG =
WL.Fine_grained
(struct
include Scheduler_state
let st = st
let ops = Ops.ops
end)
()
let run_in_hook () =
(* execute actions sent from other threads; first transfer them
all atomically to a local queue to reduce contention *)
let local_acts = Queue.create () in
Mutex.lock st.mutex;
Queue.transfer st.actions_from_other_threads local_acts;
Atomic.set st.has_notified false;
Mutex.unlock st.mutex;
Queue.iter (fun f -> f ()) local_acts;
(* run tasks *)
FG.run ~max_tasks:1000 ();
if not (Queue.is_empty st.tasks) then ignore (Lwt.pause () : unit Lwt.t);
()
let setup () =
(* only one thread does this *)
FG.setup ~block_signals:false ();
st.thread <- Thread.self () |> Thread.id;
st.enter_hook <- Some (Lwt_main.Enter_iter_hooks.add_last run_in_hook);
st.leave_hook <- Some (Lwt_main.Leave_iter_hooks.add_last run_in_hook);
(* notification used to wake lwt up *)
st.notification <- Lwt_unix.make_notification ~once:false run_in_hook
end
let setup () : Scheduler_state.st =
let st = Scheduler_state.create_new () in
Ops.setup st;
let module Setup_lwt_hooks' = Setup_lwt_hooks (struct
let st = st
end) in
Setup_lwt_hooks'.setup ();
st
let[@inline] is_setup () = Option.is_some @@ Atomic.get Scheduler_state.cur_st
let spawn_lwt f : _ Lwt.t =
let st = Main_state.get_st () in
let lwt_fut, lwt_prom = Lwt.wait () in
M.Runner.run_async st.as_runner (fun () ->
try
let x = f () in
Lwt.wakeup lwt_prom x
with exn -> Lwt.wakeup_exn lwt_prom exn);
lwt_fut
let lwt_main (f : _ -> 'a) : 'a =
let st = setup () in
(* make sure to cleanup *)
let finally () = Scheduler_state.cleanup st in
Fun.protect ~finally @@ fun () ->
let fut = spawn_lwt (fun () -> f st.as_runner) in
Lwt_main.run fut
let[@inline] lwt_main_runner () =
let st = Main_state.get_st () in
st.as_runner

View file

@ -7,8 +7,7 @@
@since 0.6 *) @since 0.6 *)
module Fiber = Moonpool_fib.Fiber module Fut = Moonpool.Fut
module FLS = Moonpool_fib.Fls
(** {2 Basic conversions} *) (** {2 Basic conversions} *)
@ -19,126 +18,32 @@ val fut_of_lwt : 'a Lwt.t -> 'a Moonpool.Fut.t
val lwt_of_fut : 'a Moonpool.Fut.t -> 'a Lwt.t val lwt_of_fut : 'a Moonpool.Fut.t -> 'a Lwt.t
(** [lwt_of_fut fut] makes a lwt future that completes when [fut] does. This (** [lwt_of_fut fut] makes a lwt future that completes when [fut] does. This
must be called from the Lwt thread, and the result must always be used only must be called from the Lwt thread, and the result must always be used only
from inside the Lwt thread. *) from inside the Lwt thread.
@raise Failure if not run from the lwt thread. *)
(** {2 Helpers on the moonpool side} *) (** {2 Helpers on the moonpool side} *)
val spawn_lwt : (unit -> 'a) -> 'a Lwt.t
(** This spawns a task that runs in the Lwt scheduler.
@raise Failure if {!lwt_main} was not called. *)
val await_lwt : 'a Lwt.t -> 'a val await_lwt : 'a Lwt.t -> 'a
(** [await_lwt fut] awaits a Lwt future from inside a task running on a moonpool (** [await_lwt fut] awaits a Lwt future from inside a task running on a moonpool
runner. This must be run from within a Moonpool runner so that the await-ing runner. This must be run from within a Moonpool runner so that the await-ing
effect is handled. *) effect is handled. *)
val run_in_lwt : (unit -> 'a Lwt.t) -> 'a Moonpool.Fut.t
(** [run_in_lwt f] runs [f()] from within the Lwt thread and returns a
thread-safe future. This can be run from anywhere. *)
val run_in_lwt_and_await : (unit -> 'a Lwt.t) -> 'a val run_in_lwt_and_await : (unit -> 'a Lwt.t) -> 'a
(** [run_in_lwt_and_await f] runs [f] in the Lwt thread, and awaits its result.
Must be run from inside a moonpool runner so that the await-in effect is
handled.
This is similar to [Moonpool.await @@ run_in_lwt f]. *)
val get_runner : unit -> Moonpool.Runner.t
(** Returns the runner from within which this is called. Must be run from within
a fiber.
@raise Failure if not run within a fiber *)
(** {2 IO} *)
(** IO using the Lwt event loop.
These IO operations work on non-blocking file descriptors and rely on a
[Lwt_engine] event loop being active (meaning, [Lwt_main.run] is currently
running in some thread).
Calling these functions must be done from a moonpool runner. A function like
[read] will first try to perform the IO action directly (here, call
{!Unix.read}); if the action fails because the FD is not ready, then
[await_readable] is called: it suspends the fiber and subscribes it to Lwt
to be awakened when the FD becomes ready. *)
module IO : sig
val read : Unix.file_descr -> bytes -> int -> int -> int
(** Read from the file descriptor *)
val await_readable : Unix.file_descr -> unit
(** Suspend the fiber until the FD is readable *)
val write_once : Unix.file_descr -> bytes -> int -> int -> int
(** Perform one write into the file descriptor *)
val await_writable : Unix.file_descr -> unit
(** Suspend the fiber until the FD is writable *)
val write : Unix.file_descr -> bytes -> int -> int -> unit
(** Loop around {!write_once} to write the entire slice. *)
val sleep_s : float -> unit
(** Suspend the fiber for [n] seconds. *)
end
module IO_in = IO_in
(** Input channel *)
module IO_out = IO_out
(** Output channel *)
module TCP_server : sig
type t = Lwt_io.server
val establish_lwt :
?backlog:
(* ?server_fd:Unix.file_descr -> *)
int ->
?no_close:bool ->
runner:Moonpool.Runner.t ->
Unix.sockaddr ->
(Unix.sockaddr -> Lwt_io.input_channel -> Lwt_io.output_channel -> unit) ->
t
(** [establish ~runner addr handler] runs a TCP server in the Lwt thread. When
a client connects, a moonpool fiber is started on [runner] to handle it.
*)
val establish :
?backlog:
(* ?server_fd:Unix.file_descr -> *)
int ->
?no_close:bool ->
runner:Moonpool.Runner.t ->
Unix.sockaddr ->
(Unix.sockaddr -> IO_in.t -> IO_out.t -> unit) ->
t
(** Like {!establish_lwt} but uses {!IO} to directly handle reads and writes
on client sockets. *)
val shutdown : t -> unit
(** Shutdown the server *)
end
module TCP_client : sig
val connect : Unix.sockaddr -> Unix.file_descr
val with_connect : Unix.sockaddr -> (IO_in.t -> IO_out.t -> 'a) -> 'a
(** Open a connection, and use {!IO} to read and write from the socket in a
non blocking way. *)
val with_connect_lwt :
Unix.sockaddr -> (Lwt_io.input_channel -> Lwt_io.output_channel -> 'a) -> 'a
(** Open a connection. *)
end
(** {2 Helpers on the lwt side} *)
val detach_in_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a Lwt.t
(** [detach_in_runner ~runner f] runs [f] in the given moonpool runner, and
returns a lwt future. This must be run from within the thread running
[Lwt_main]. *)
(** {2 Wrappers around Lwt_main} *) (** {2 Wrappers around Lwt_main} *)
val main_with_runner : runner:Moonpool.Runner.t -> (unit -> 'a) -> 'a val on_uncaught_exn : (Moonpool.Exn_bt.t -> unit) ref
(** [main_with_runner ~runner f] starts a Lwt-based event loop and runs [f()]
inside a fiber in [runner]. *)
val main : (unit -> 'a) -> 'a val lwt_main : (Moonpool.Runner.t -> 'a) -> 'a
(** Like {!main_with_runner} but with a default choice of runner. *) (** Setup, run lwt main, return the result *)
val lwt_main_runner : unit -> Moonpool.Runner.t
(** The runner from {!lwt_main}. The runner is only going to work if {!lwt_main}
is currently running in some thread.
@raise Failure if {!lwt_main} was not called. *)
val is_setup : unit -> bool

View file

@ -1,53 +0,0 @@
open Common_
open Base
let connect addr : Unix.file_descr =
let sock = Unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
Unix.set_nonblock sock;
Unix.setsockopt sock Unix.TCP_NODELAY true;
(* connect asynchronously *)
while
try
Unix.connect sock addr;
false
with
| Unix.Unix_error ((Unix.EWOULDBLOCK | Unix.EINPROGRESS | Unix.EAGAIN), _, _)
->
IO.await_writable sock;
true
do
()
done;
sock
let with_connect addr (f : IO_in.t -> IO_out.t -> 'a) : 'a =
let sock = connect addr in
let ic = IO_in.of_unix_fd sock in
let oc = IO_out.of_unix_fd sock in
let finally () = try Unix.close sock with _ -> () in
let@ () = Fun.protect ~finally in
f ic oc
let with_connect_lwt addr
(f : Lwt_io.input_channel -> Lwt_io.output_channel -> 'a) : 'a =
let sock = connect addr in
let ic =
run_in_lwt_and_await (fun () ->
Lwt.return @@ Lwt_io.of_unix_fd ~mode:Lwt_io.input sock)
in
let oc =
run_in_lwt_and_await (fun () ->
Lwt.return @@ Lwt_io.of_unix_fd ~mode:Lwt_io.output sock)
in
let finally () =
(try run_in_lwt_and_await (fun () -> Lwt_io.close ic) with _ -> ());
(try run_in_lwt_and_await (fun () -> Lwt_io.close oc) with _ -> ());
try Unix.close sock with _ -> ()
in
let@ () = Fun.protect ~finally in
f ic oc

View file

@ -1,38 +0,0 @@
open Common_
open Base
type t = Lwt_io.server
let establish_lwt ?backlog ?no_close ~runner addr handler : t =
let server =
Lwt_io.establish_server_with_client_socket ?backlog ?no_close addr
(fun client_addr client_sock ->
let ic = Lwt_io.of_fd ~mode:Lwt_io.input client_sock in
let oc = Lwt_io.of_fd ~mode:Lwt_io.output client_sock in
let fut =
M.Fut.spawn ~on:runner (fun () -> handler client_addr ic oc)
in
let lwt_fut = lwt_of_fut fut in
lwt_fut)
in
await_lwt server
let establish ?backlog ?no_close ~runner addr handler : t =
let server =
Lwt_io.establish_server_with_client_socket ?backlog ?no_close addr
(fun client_addr client_sock ->
let ic = IO_in.of_unix_fd @@ Lwt_unix.unix_file_descr client_sock in
let oc = IO_out.of_unix_fd @@ Lwt_unix.unix_file_descr client_sock in
let fut =
M.Fut.spawn ~on:runner (fun () -> handler client_addr ic oc)
in
let lwt_fut = lwt_of_fut fut in
lwt_fut)
in
await_lwt server
let shutdown self = await_lwt @@ Lwt_io.shutdown_server self

View file

@ -1,46 +0,0 @@
[@@@ifge 4.12]
include Atomic
[@@@else_]
type 'a t = { mutable x: 'a }
let[@inline] make x = { x }
let[@inline] get { x } = x
let[@inline] set r x = r.x <- x
let[@inline never] exchange r x =
(* atomic *)
let y = r.x in
r.x <- x;
(* atomic *)
y
let[@inline never] compare_and_set r seen v =
(* atomic *)
if r.x == seen then (
r.x <- v;
(* atomic *)
true
) else
false
let[@inline never] fetch_and_add r x =
(* atomic *)
let v = r.x in
r.x <- x + r.x;
(* atomic *)
v
let[@inline never] incr r =
(* atomic *)
r.x <- 1 + r.x
(* atomic *)
let[@inline never] decr r =
(* atomic *)
r.x <- r.x - 1
(* atomic *)
[@@@endif]

View file

@ -1,4 +1,3 @@
[@@@ifge 5.0]
[@@@ocaml.alert "-unstable"] [@@@ocaml.alert "-unstable"]
let recommended_number () = Domain.recommended_domain_count () let recommended_number () = Domain.recommended_domain_count ()
@ -10,18 +9,3 @@ let spawn : _ -> t = Domain.spawn
let relax = Domain.cpu_relax let relax = Domain.cpu_relax
let join = Domain.join let join = Domain.join
let is_main_domain = Domain.is_main_domain let is_main_domain = Domain.is_main_domain
[@@@ocaml.alert "+unstable"]
[@@@else_]
let recommended_number () = 1
type t = Thread.t
let get_id (self : t) : int = Thread.id self
let spawn f : t = Thread.create f ()
let relax () = Thread.yield ()
let join = Thread.join
let is_main_domain () = true
[@@@endif]

View file

@ -2,9 +2,6 @@
(name moonpool_private) (name moonpool_private)
(public_name moonpool.private) (public_name moonpool.private)
(synopsis "Private internal utils for Moonpool (do not rely on)") (synopsis "Private internal utils for Moonpool (do not rely on)")
(preprocess
(action
(run %{project_root}/src/cpp/cpp.exe %{input-file})))
(libraries (libraries
threads threads
either either

View file

@ -1,4 +1,4 @@
module A = Atomic_ module A = Atomic
(* terminology: (* terminology:

View file

@ -1,3 +1,5 @@
[@@@ocaml.deprecated "use Picos_std_sync directly or single threaded solutions"]
module Mutex = Picos_std_sync.Mutex module Mutex = Picos_std_sync.Mutex
module Condition = Picos_std_sync.Condition module Condition = Picos_std_sync.Condition
module Lock = Lock module Lock = Lock

View file

@ -10,8 +10,7 @@
t_resource t_resource
t_unfair t_unfair
t_ws_deque t_ws_deque
t_ws_wait t_ws_wait)
t_bounded_queue)
(package moonpool) (package moonpool)
(libraries (libraries
moonpool moonpool

View file

@ -9,9 +9,6 @@
t_sort t_sort
t_fork_join t_fork_join
t_fork_join_heavy) t_fork_join_heavy)
(preprocess
(action
(run %{project_root}/src/cpp/cpp.exe %{input-file})))
(enabled_if (enabled_if
(and (and
(= %{system} "linux") (= %{system} "linux")

View file

@ -1,5 +1,3 @@
[@@@ifge 5.0]
open Moonpool open Moonpool
let ( let@ ) = ( @@ ) let ( let@ ) = ( @@ )
@ -56,5 +54,3 @@ let main () =
let () = let () =
let@ () = Trace_tef.with_setup () in let@ () = Trace_tef.with_setup () in
main () main ()
[@@@endif]

View file

@ -1,5 +1,3 @@
[@@@ifge 5.0]
open Moonpool open Moonpool
module FJ = Moonpool_forkjoin module FJ = Moonpool_forkjoin
@ -52,5 +50,3 @@ let () =
(* now make sure we can do this with multiple pools in parallel *) (* now make sure we can do this with multiple pools in parallel *)
let jobs = Array.init 2 (fun _ -> Thread.create run_test ()) in let jobs = Array.init 2 (fun _ -> Thread.create run_test ()) in
Array.iter Thread.join jobs Array.iter Thread.join jobs
[@@@endif]

View file

@ -1,5 +1,3 @@
[@@@ifge 5.0]
let ( let@ ) = ( @@ ) let ( let@ ) = ( @@ )
open Moonpool open Moonpool
@ -44,5 +42,3 @@ let () =
(* now make sure we can do this with multiple pools in parallel *) (* now make sure we can do this with multiple pools in parallel *)
let jobs = Array.init 2 (fun _ -> Thread.create run_test ()) in let jobs = Array.init 2 (fun _ -> Thread.create run_test ()) in
Array.iter Thread.join jobs Array.iter Thread.join jobs
[@@@endif]

View file

@ -1,5 +1,3 @@
[@@@ifge 5.0]
let spf = Printf.sprintf let spf = Printf.sprintf
let ( let@ ) = ( @@ ) let ( let@ ) = ( @@ )
@ -328,5 +326,3 @@ let () =
t_for_nested ~min:1 ~chunk_size:100 (); t_for_nested ~min:1 ~chunk_size:100 ();
t_for_nested ~min:4 ~chunk_size:100 (); t_for_nested ~min:4 ~chunk_size:100 ();
] ]
[@@@endif]

View file

@ -1,5 +1,3 @@
[@@@ifge 5.0]
module Q = QCheck module Q = QCheck
let spf = Printf.sprintf let spf = Printf.sprintf
@ -52,5 +50,3 @@ let () =
run ~min:4 (); run ~min:4 ();
run ~min:1 (); run ~min:1 ();
Printf.printf "done\n%!" Printf.printf "done\n%!"
[@@@endif]

View file

@ -1,5 +1,3 @@
[@@@ifge 5.0]
open! Moonpool open! Moonpool
let pool = Ws_pool.create ~num_threads:4 () let pool = Ws_pool.create ~num_threads:4 ()
@ -53,5 +51,3 @@ let () =
in in
let fut = Fut.both f1 f2 in let fut = Fut.both f1 f2 in
assert (Fut.wait_block fut = Ok (2, 20)) assert (Fut.wait_block fut = Ok (2, 20))
[@@@endif]

View file

@ -1,5 +1,3 @@
[@@@ifge 5.0]
open Moonpool open Moonpool
let ( let@ ) = ( @@ ) let ( let@ ) = ( @@ )
@ -44,5 +42,3 @@ let () =
run ~pool ()); run ~pool ());
() ()
[@@@endif]

View file

@ -1,5 +1,3 @@
[@@@ifge 5.0]
open Moonpool open Moonpool
module FJ = Moonpool_forkjoin module FJ = Moonpool_forkjoin
@ -69,5 +67,3 @@ let () =
(* Printf.printf "arr: [%s]\n%!" *) (* Printf.printf "arr: [%s]\n%!" *)
(* (String.concat ", " @@ List.map string_of_int @@ Array.to_list arr); *) (* (String.concat ", " @@ List.map string_of_int @@ Array.to_list arr); *)
assert (sorted arr) assert (sorted arr)
[@@@endif]

View file

@ -16,7 +16,7 @@
(action (action
(with-stdout-to (with-stdout-to
%{targets} %{targets}
(run ./run_hash.sh -d ../data/ --n-conn=2 -j=4)))) (run ./run_hash.sh -d ../data/ --n-conn=2))))
(rule (rule
(alias runtest) (alias runtest)
@ -36,9 +36,12 @@
(= %{system} "linux") (= %{system} "linux")
(>= %{ocaml_version} 5.0))) (>= %{ocaml_version} 5.0)))
(action (action
(with-stdout-to (setenv
%{targets} CI_MODE
(run ./run_echo.sh -n 10 --n-conn=2 -j=4)))) 1
(with-stdout-to
%{targets}
(run ./run_echo.sh -n 10 --n-conn=2 -v)))))
(rule (rule
(alias runtest) (alias runtest)

View file

@ -1,93 +1,117 @@
module M = Moonpool
module M_lwt = Moonpool_lwt module M_lwt = Moonpool_lwt
module Trace = Trace_core module Trace = Trace_core
let ci_mode = Option.is_some @@ Sys.getenv_opt "CI_MODE"
let spf = Printf.sprintf let spf = Printf.sprintf
let await_lwt = Moonpool_lwt.await_lwt
let ( let@ ) = ( @@ ) let ( let@ ) = ( @@ )
let lock_stdout = M.Lock.create ()
let main ~port ~runner ~n ~n_conn () : unit Lwt.t = let main ~port ~n ~n_conn ~verbose ~msg_per_conn () : unit =
let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "main" in let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "main" in
let remaining = Atomic.make n in let t0 = Unix.gettimeofday () in
let all_done = Atomic.make 0 in Printf.printf
"connecting to port %d (%d msg per conn, %d conns total, %d max at a time)\n\
let fut_exit, prom_exit = M.Fut.make () in %!"
port msg_per_conn n n_conn;
Printf.printf "connecting to port %d\n%!" port;
let addr = Unix.ADDR_INET (Unix.inet_addr_loopback, port) in let addr = Unix.ADDR_INET (Unix.inet_addr_loopback, port) in
let rec run_task () = let token_pool = Lwt_pool.create n_conn (fun () -> Lwt.return_unit) in
let n_msg_total = ref 0 in
let run_task () =
(* Printf.printf "running task\n%!"; *) (* Printf.printf "running task\n%!"; *)
let n = Atomic.fetch_and_add remaining (-1) in let@ () = Lwt_pool.use token_pool in
if n > 0 then (
(let _sp =
Trace.enter_manual_toplevel_span ~__FILE__ ~__LINE__ "connect.client"
in
Trace.message "connecting new client…";
M_lwt.TCP_client.with_connect addr @@ fun ic oc ->
let buf = Bytes.create 32 in
for _j = 1 to 10 do let@ () = M_lwt.spawn_lwt in
let _sp = let _sp =
Trace.enter_manual_sub_span ~parent:_sp ~__FILE__ ~__LINE__ Trace.enter_manual_span ~parent:None ~__FILE__ ~__LINE__ "connect.client"
"write.loop" in
in Trace.message "connecting new client…";
let s = spf "hello %d" _j in let ic, oc = Lwt_io.open_connection addr |> await_lwt in
M_lwt.IO_out.output_string oc s;
M_lwt.IO_out.flush oc;
(* read back something *) let cleanup () =
M_lwt.IO_in.really_input ic buf 0 (String.length s); Trace.message "closing connection";
(let@ () = M.Lock.with_ lock_stdout in Lwt_io.close ic |> await_lwt;
Printf.printf "read: %s\n%!" Lwt_io.close oc |> await_lwt
(Bytes.sub_string buf 0 (String.length s))); in
Trace.exit_manual_span _sp;
()
done;
Trace.exit_manual_span _sp);
(* run another task *) M.Runner.run_async runner run_task let@ () = Fun.protect ~finally:cleanup in
) else (
(* if we're the last to exit, resolve the promise *) let buf = Bytes.create 32 in
let n_already_done = Atomic.fetch_and_add all_done 1 in
if n_already_done = n_conn - 1 then ( for _j = 1 to msg_per_conn do
(let@ () = M.Lock.with_ lock_stdout in let _sp =
Printf.printf "all done\n%!"); Trace.enter_manual_span
M.Fut.fulfill prom_exit @@ Ok () ~parent:(Some (Trace.ctx_of_span _sp))
) ~__FILE__ ~__LINE__ "write.loop"
) in
let s = spf "hello %d" _j in
Lwt_io.write oc s |> await_lwt;
Lwt_io.flush oc |> await_lwt;
incr n_msg_total;
(* read back something *)
Lwt_io.read_into_exactly ic buf 0 (String.length s) |> await_lwt;
if verbose then
Printf.printf "read: %s\n%!" (Bytes.sub_string buf 0 (String.length s));
Trace.exit_manual_span _sp;
()
done;
Trace.exit_manual_span _sp
in in
(* start the first [n_conn] tasks *) (* start the first [n_conn] tasks *)
for _i = 1 to n_conn do let futs = List.init n (fun _ -> run_task ()) in
M.Runner.run_async runner run_task Lwt.join futs |> await_lwt;
done;
(* exit when [fut_exit] is resolved *) Printf.printf "all done\n%!";
M_lwt.lwt_of_fut fut_exit let elapsed = Unix.gettimeofday () -. t0 in
if not ci_mode then
Printf.printf " sent %d messages in %.4fs (%.2f msg/s)\n%!" !n_msg_total
elapsed
(float !n_msg_total /. elapsed);
()
let () = let () =
let@ () = Trace_tef.with_setup () in let@ () = Trace_tef.with_setup () in
Trace.set_thread_name "main"; Trace.set_thread_name "main";
let port = ref 0 in let port = ref 0 in
let j = ref 4 in
let n_conn = ref 100 in let n_conn = ref 100 in
let n = ref 50_000 in let n = ref 50_000 in
let msg_per_conn = ref 10 in
let verbose = ref false in
let opts = let opts =
[ [
"-p", Arg.Set_int port, " port"; "-p", Arg.Set_int port, " port";
"-j", Arg.Set_int j, " number of threads";
"-n", Arg.Set_int n, " total number of connections"; "-n", Arg.Set_int n, " total number of connections";
"--n-conn", Arg.Set_int n_conn, " number of parallel connections"; ( "--msg-per-conn",
Arg.Set_int msg_per_conn,
" messages sent per connection" );
"-v", Arg.Set verbose, " verbose";
( "--n-conn",
Arg.Set_int n_conn,
" maximum number of connections opened simultaneously" );
] ]
|> Arg.align |> Arg.align
in in
Arg.parse opts ignore "echo client"; Arg.parse opts ignore "echo client";
let@ runner = M.Ws_pool.with_ ~name:"tpool" ~num_threads:!j () in let main () =
(* Lwt_engine.set @@ new Lwt_engine.libev (); *) (* Lwt_engine.set @@ new Lwt_engine.libev (); *)
Lwt_main.run @@ main ~runner ~port:!port ~n:!n ~n_conn:!n_conn () M_lwt.lwt_main @@ fun _runner ->
main ~port:!port ~n:!n ~n_conn:!n_conn ~verbose:!verbose
~msg_per_conn:!msg_per_conn ()
in
print_endline "first run";
main ();
assert (not (M_lwt.is_setup ()));
print_endline "second run";
main ();
assert (not (M_lwt.is_setup ()));
print_endline "done"

View file

@ -3,6 +3,7 @@ module M_lwt = Moonpool_lwt
module Trace = Trace_core module Trace = Trace_core
let ( let@ ) = ( @@ ) let ( let@ ) = ( @@ )
let await_lwt = M_lwt.await_lwt
let spf = Printf.sprintf let spf = Printf.sprintf
let str_of_sockaddr = function let str_of_sockaddr = function
@ -10,52 +11,63 @@ let str_of_sockaddr = function
| Unix.ADDR_INET (addr, port) -> | Unix.ADDR_INET (addr, port) ->
spf "%s:%d" (Unix.string_of_inet_addr addr) port spf "%s:%d" (Unix.string_of_inet_addr addr) port
let main ~port ~runner () : unit Lwt.t = let main ~port ~verbose ~runner:_ () : unit =
let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "main" in let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "main" in
let lwt_fut, _lwt_prom = Lwt.wait () in let lwt_fut, _lwt_prom = Lwt.wait () in
(* TODO: handle exit?? *) (* TODO: handle exit?? ctrl-c? *)
Printf.printf "listening on port %d\n%!" port; Printf.printf "listening on port %d\n%!" port;
let handle_client client_addr ic oc = let handle_client client_addr (ic, oc) : _ Lwt.t =
let@ () = M_lwt.spawn_lwt in
let _sp = let _sp =
Trace.enter_manual_toplevel_span ~__FILE__ ~__LINE__ "handle.client" Trace.enter_manual_span ~parent:None ~__FILE__ ~__LINE__ "handle.client"
~data:(fun () -> [ "addr", `String (str_of_sockaddr client_addr) ]) ~data:(fun () -> [ "addr", `String (str_of_sockaddr client_addr) ])
in in
if verbose then
Printf.printf "got new client on %s\n%!" (str_of_sockaddr client_addr);
let buf = Bytes.create 32 in let buf = Bytes.create 32 in
let continue = ref true in let continue = ref true in
while !continue do while !continue do
Trace.message "read"; Trace.message "read";
let n = M_lwt.IO_in.input ic buf 0 (Bytes.length buf) in let n = Lwt_io.read_into ic buf 0 (Bytes.length buf) |> await_lwt in
if n = 0 then if n = 0 then
continue := false continue := false
else ( else (
Trace.messagef (fun k -> k "got %dB" n); Trace.messagef (fun k -> k "got %dB" n);
M_lwt.IO_out.output oc buf 0 n; Lwt_io.write_from_exactly oc buf 0 n |> await_lwt;
M_lwt.IO_out.flush oc; Lwt_io.flush oc |> await_lwt;
Trace.message "write" Trace.message "write"
) )
done; done;
if verbose then
Printf.printf "done with client on %s\n%!" (str_of_sockaddr client_addr);
Trace.exit_manual_span _sp; Trace.exit_manual_span _sp;
Trace.message "exit handle client" Trace.message "exit handle client"
in in
let addr = Unix.ADDR_INET (Unix.inet_addr_any, port) in let addr = Unix.ADDR_INET (Unix.inet_addr_any, port) in
let _server = M_lwt.TCP_server.establish ~runner addr handle_client in let _server =
Lwt_io.establish_server_with_client_address addr handle_client |> await_lwt
in
lwt_fut M_lwt.await_lwt lwt_fut
let () = let () =
let@ () = Trace_tef.with_setup () in let@ () = Trace_tef.with_setup () in
Trace.set_thread_name "main"; Trace.set_thread_name "main";
let port = ref 0 in let port = ref 0 in
let j = ref 4 in let j = ref 4 in
let verbose = ref false in
let opts = let opts =
[ [
"-p", Arg.Set_int port, " port"; "-j", Arg.Set_int j, " number of threads"; "-v", Arg.Set verbose, " verbose";
"-p", Arg.Set_int port, " port";
"-j", Arg.Set_int j, " number of threads";
] ]
|> Arg.align |> Arg.align
in in
@ -63,4 +75,4 @@ let () =
let@ runner = M.Ws_pool.with_ ~name:"tpool" ~num_threads:!j () in let@ runner = M.Ws_pool.with_ ~name:"tpool" ~num_threads:!j () in
(* Lwt_engine.set @@ new Lwt_engine.libev (); *) (* Lwt_engine.set @@ new Lwt_engine.libev (); *)
Lwt_main.run @@ main ~runner ~port:!port () M_lwt.lwt_main @@ fun _ -> main ~runner ~port:!port ~verbose:!verbose ()

View file

@ -8,10 +8,10 @@ module Str_tbl = Hashtbl.Make (struct
let hash = Hashtbl.hash let hash = Hashtbl.hash
end) end)
let await_lwt = Moonpool_lwt.await_lwt
let ( let@ ) = ( @@ ) let ( let@ ) = ( @@ )
let lock_stdout = M.Lock.create ()
let main ~port ~runner ~ext ~dir ~n_conn () : unit Lwt.t = let main ~port ~ext ~dir ~n_conn () : unit =
let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "main" in let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "main" in
Printf.printf "hash dir=%S\n%!" dir; Printf.printf "hash dir=%S\n%!" dir;
@ -20,12 +20,15 @@ let main ~port ~runner ~ext ~dir ~n_conn () : unit Lwt.t =
let addr = Unix.ADDR_INET (Unix.inet_addr_loopback, port) in let addr = Unix.ADDR_INET (Unix.inet_addr_loopback, port) in
(* TODO: *) (* TODO: *)
let run_task () : unit = let run_task () : unit Lwt.t =
let _sp = Trace.enter_manual_toplevel_span ~__FILE__ ~__LINE__ "run-task" in let@ () = M_lwt.spawn_lwt in
let _sp =
Trace.enter_manual_span ~parent:None ~__FILE__ ~__LINE__ "run-task"
in
let seen = Str_tbl.create 16 in let seen = Str_tbl.create 16 in
M_lwt.TCP_client.with_connect_lwt addr @@ fun ic oc -> let ic, oc = Lwt_io.open_connection addr |> await_lwt in
let rec walk file : unit = let rec walk file : unit =
if not (Sys.file_exists file) then if not (Sys.file_exists file) then
() ()
@ -33,7 +36,9 @@ let main ~port ~runner ~ext ~dir ~n_conn () : unit Lwt.t =
() ()
else if Sys.is_directory file then ( else if Sys.is_directory file then (
let _sp = let _sp =
Trace.enter_manual_sub_span ~parent:_sp ~__FILE__ ~__LINE__ "walk-dir" Trace.enter_manual_span
~parent:(Some (Trace.ctx_of_span _sp))
~__FILE__ ~__LINE__ "walk-dir"
~data:(fun () -> [ "d", `String file ]) ~data:(fun () -> [ "d", `String file ])
in in
@ -45,9 +50,8 @@ let main ~port ~runner ~ext ~dir ~n_conn () : unit Lwt.t =
() ()
else ( else (
Str_tbl.add seen file (); Str_tbl.add seen file ();
M_lwt.run_in_lwt_and_await (fun () -> Lwt_io.write_line oc file); Lwt_io.write_line oc file |> await_lwt;
let res = M_lwt.run_in_lwt_and_await (fun () -> Lwt_io.read_line ic) in let res = Lwt_io.read_line ic |> await_lwt in
let@ () = M.Lock.with_ lock_stdout in
Printf.printf "%s\n%!" res Printf.printf "%s\n%!" res
) )
in in
@ -56,16 +60,14 @@ let main ~port ~runner ~ext ~dir ~n_conn () : unit Lwt.t =
in in
(* start the first [n_conn] tasks *) (* start the first [n_conn] tasks *)
let futs = List.init n_conn (fun _ -> M.Fut.spawn ~on:runner run_task) in let futs = List.init n_conn (fun _ -> run_task ()) in
Lwt.join futs |> await_lwt
Lwt.join (List.map M_lwt.lwt_of_fut futs)
let () = let () =
let@ () = Trace_tef.with_setup () in let@ () = Trace_tef.with_setup () in
Trace.set_thread_name "main"; Trace.set_thread_name "main";
let port = ref 1234 in let port = ref 1234 in
let j = ref 4 in
let n_conn = ref 100 in let n_conn = ref 100 in
let ext = ref "" in let ext = ref "" in
let dir = ref "." in let dir = ref "." in
@ -73,7 +75,6 @@ let () =
let opts = let opts =
[ [
"-p", Arg.Set_int port, " port"; "-p", Arg.Set_int port, " port";
"-j", Arg.Set_int j, " number of threads";
"-d", Arg.Set_string dir, " directory to hash"; "-d", Arg.Set_string dir, " directory to hash";
"--n-conn", Arg.Set_int n_conn, " number of parallel connections"; "--n-conn", Arg.Set_int n_conn, " number of parallel connections";
"--ext", Arg.Set_string ext, " extension to filter files"; "--ext", Arg.Set_string ext, " extension to filter files";
@ -82,7 +83,6 @@ let () =
in in
Arg.parse opts ignore "echo client"; Arg.parse opts ignore "echo client";
let@ runner = M.Ws_pool.with_ ~name:"tpool" ~num_threads:!j () in
(* Lwt_engine.set @@ new Lwt_engine.libev (); *) (* Lwt_engine.set @@ new Lwt_engine.libev (); *)
Lwt_main.run M_lwt.lwt_main @@ fun _runner ->
@@ main ~runner ~port:!port ~ext:!ext ~dir:!dir ~n_conn:!n_conn () main ~port:!port ~ext:!ext ~dir:!dir ~n_conn:!n_conn ()

View file

@ -1,4 +1,7 @@
(* vendored from https://github.com/dbuenzli/uuidm *) (* vendored from https://github.com/dbuenzli/uuidm
This function is Copyright (c) 2008 The uuidm programmers.
SPDX-License-Identifier: ISC *)
let sha_1 s = let sha_1 s =
(* Based on pseudo-code of RFC 3174. Slow and ugly but does the job. *) (* Based on pseudo-code of RFC 3174. Slow and ugly but does the job. *)
@ -116,28 +119,16 @@ let sha_1 s =
i2s h 16 !h4; i2s h 16 !h4;
Bytes.unsafe_to_string h Bytes.unsafe_to_string h
(*--------------------------------------------------------------------------- (* ================== *)
Copyright (c) 2008 The uuidm programmers
Permission to use, copy, modify, and/or distribute this software for any (* test server that reads a list of files from each client connection, and sends back
purpose with or without fee is hereby granted, provided that the above to the client the hashes of these files *)
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
---------------------------------------------------------------------------*)
(* server that reads from sockets lists of files, and returns hashes of these files *)
module M = Moonpool
module M_lwt = Moonpool_lwt module M_lwt = Moonpool_lwt
module Trace = Trace_core module Trace = Trace_core
module Fut = Moonpool.Fut
let await_lwt = Moonpool_lwt.await_lwt
let ( let@ ) = ( @@ ) let ( let@ ) = ( @@ )
let spf = Printf.sprintf let spf = Printf.sprintf
@ -165,7 +156,7 @@ let read_file filename : string =
in in
In_channel.with_open_bin filename In_channel.input_all In_channel.with_open_bin filename In_channel.input_all
let main ~port ~runner () : unit Lwt.t = let main ~port ~runner () : unit =
let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "main" in let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "main" in
let lwt_fut, _lwt_prom = Lwt.wait () in let lwt_fut, _lwt_prom = Lwt.wait () in
@ -173,38 +164,39 @@ let main ~port ~runner () : unit Lwt.t =
(* TODO: handle exit?? *) (* TODO: handle exit?? *)
Printf.printf "listening on port %d\n%!" port; Printf.printf "listening on port %d\n%!" port;
let handle_client client_addr ic oc = let handle_client client_addr (ic, oc) =
let@ () = Moonpool_lwt.spawn_lwt in
let _sp = let _sp =
Trace.enter_manual_toplevel_span ~__FILE__ ~__LINE__ "handle.client" Trace.enter_manual_span ~parent:None ~__FILE__ ~__LINE__ "handle.client"
~data:(fun () -> [ "addr", `String (str_of_sockaddr client_addr) ]) ~data:(fun () -> [ "addr", `String (str_of_sockaddr client_addr) ])
in in
try try
while true do while true do
Trace.message "read"; Trace.message "read";
let filename = let filename = Lwt_io.read_line ic |> await_lwt |> String.trim in
M_lwt.run_in_lwt_and_await (fun () -> Lwt_io.read_line ic)
|> String.trim
in
Trace.messagef (fun k -> k "hash %S" filename); Trace.messagef (fun k -> k "hash %S" filename);
match read_file filename with match read_file filename with
| exception e -> | exception e ->
Printf.eprintf "error while reading %S:\n%s\n" filename Printf.eprintf "error while reading %S:\n%s\n" filename
(Printexc.to_string e); (Printexc.to_string e);
M_lwt.run_in_lwt_and_await (fun () -> Lwt_io.write_line oc (spf "%s: error" filename) |> await_lwt;
Lwt_io.write_line oc (spf "%s: error" filename)); Lwt_io.flush oc |> await_lwt
M_lwt.run_in_lwt_and_await (fun () -> Lwt_io.flush oc)
| content -> | content ->
(* got the content, now hash it *) (* got the content, now hash it in a background task *)
let hash = let hash : _ Fut.t =
let@ _sp = Trace.with_span ~__FILE__ ~__LINE__ "hash" in let@ () = Moonpool.spawn ~on:runner in
let@ _sp =
Trace.with_span ~__FILE__ ~__LINE__ "hash" ~data:(fun () ->
[ "file", `String filename ])
in
sha_1 content |> to_hex sha_1 content |> to_hex
in in
M_lwt.run_in_lwt_and_await (fun () -> let hash = Fut.await hash in
Lwt_io.write_line oc (spf "%s: %s" filename hash)); Lwt_io.write_line oc (spf "%s: %s" filename hash) |> await_lwt;
M_lwt.run_in_lwt_and_await (fun () -> Lwt_io.flush oc) Lwt_io.flush oc |> await_lwt
done done
with End_of_file | Unix.Unix_error (Unix.ECONNRESET, _, _) -> with End_of_file | Unix.Unix_error (Unix.ECONNRESET, _, _) ->
Trace.exit_manual_span _sp; Trace.exit_manual_span _sp;
@ -212,16 +204,17 @@ let main ~port ~runner () : unit Lwt.t =
in in
let addr = Unix.ADDR_INET (Unix.inet_addr_loopback, port) in let addr = Unix.ADDR_INET (Unix.inet_addr_loopback, port) in
let _server = M_lwt.TCP_server.establish_lwt ~runner addr handle_client in let _server =
Printf.printf "listening on port=%d\n%!" port; Lwt_io.establish_server_with_client_address addr handle_client |> await_lwt
in
lwt_fut lwt_fut |> await_lwt
let () = let () =
let@ () = Trace_tef.with_setup () in let@ () = Trace_tef.with_setup () in
Trace.set_thread_name "main"; Trace.set_thread_name "main";
let port = ref 1234 in let port = ref 1234 in
let j = ref 4 in let j = ref 0 in
let opts = let opts =
[ [
@ -231,6 +224,14 @@ let () =
in in
Arg.parse opts ignore "echo server"; Arg.parse opts ignore "echo server";
let@ runner = M.Ws_pool.with_ ~name:"tpool" ~num_threads:!j () in
(* Lwt_engine.set @@ new Lwt_engine.libev (); *) (* Lwt_engine.set @@ new Lwt_engine.libev (); *)
Lwt_main.run @@ main ~runner ~port:!port () let@ runner =
let num_threads =
if !j = 0 then
None
else
Some !j
in
Moonpool.Ws_pool.with_ ?num_threads ()
in
M_lwt.lwt_main @@ fun _main_runner -> main ~runner ~port:!port ()

View file

@ -1,8 +1,22 @@
run echo server on port=12346 run echo server on port=12346
listening on port 12346 listening on port 12346
run echo client -p 12346 -n 10 --n-conn=2 -j=4 run echo client -p 12346 -n 10 --n-conn=2 -v
all done all done
connecting to port 12346 all done
connecting to port 12346 (10 msg per conn, 10 conns total, 2 max at a time)
connecting to port 12346 (10 msg per conn, 10 conns total, 2 max at a time)
done
first run
read: hello 1
read: hello 1
read: hello 1
read: hello 1
read: hello 1
read: hello 1
read: hello 1
read: hello 1
read: hello 1
read: hello 1
read: hello 1 read: hello 1
read: hello 1 read: hello 1
read: hello 1 read: hello 1
@ -23,6 +37,26 @@ read: hello 10
read: hello 10 read: hello 10
read: hello 10 read: hello 10
read: hello 10 read: hello 10
read: hello 10
read: hello 10
read: hello 10
read: hello 10
read: hello 10
read: hello 10
read: hello 10
read: hello 10
read: hello 10
read: hello 10
read: hello 2
read: hello 2
read: hello 2
read: hello 2
read: hello 2
read: hello 2
read: hello 2
read: hello 2
read: hello 2
read: hello 2
read: hello 2 read: hello 2
read: hello 2 read: hello 2
read: hello 2 read: hello 2
@ -43,6 +77,26 @@ read: hello 3
read: hello 3 read: hello 3
read: hello 3 read: hello 3
read: hello 3 read: hello 3
read: hello 3
read: hello 3
read: hello 3
read: hello 3
read: hello 3
read: hello 3
read: hello 3
read: hello 3
read: hello 3
read: hello 3
read: hello 4
read: hello 4
read: hello 4
read: hello 4
read: hello 4
read: hello 4
read: hello 4
read: hello 4
read: hello 4
read: hello 4
read: hello 4 read: hello 4
read: hello 4 read: hello 4
read: hello 4 read: hello 4
@ -63,6 +117,26 @@ read: hello 5
read: hello 5 read: hello 5
read: hello 5 read: hello 5
read: hello 5 read: hello 5
read: hello 5
read: hello 5
read: hello 5
read: hello 5
read: hello 5
read: hello 5
read: hello 5
read: hello 5
read: hello 5
read: hello 5
read: hello 6
read: hello 6
read: hello 6
read: hello 6
read: hello 6
read: hello 6
read: hello 6
read: hello 6
read: hello 6
read: hello 6
read: hello 6 read: hello 6
read: hello 6 read: hello 6
read: hello 6 read: hello 6
@ -83,6 +157,26 @@ read: hello 7
read: hello 7 read: hello 7
read: hello 7 read: hello 7
read: hello 7 read: hello 7
read: hello 7
read: hello 7
read: hello 7
read: hello 7
read: hello 7
read: hello 7
read: hello 7
read: hello 7
read: hello 7
read: hello 7
read: hello 8
read: hello 8
read: hello 8
read: hello 8
read: hello 8
read: hello 8
read: hello 8
read: hello 8
read: hello 8
read: hello 8
read: hello 8 read: hello 8
read: hello 8 read: hello 8
read: hello 8 read: hello 8
@ -103,3 +197,14 @@ read: hello 9
read: hello 9 read: hello 9
read: hello 9 read: hello 9
read: hello 9 read: hello 9
read: hello 9
read: hello 9
read: hello 9
read: hello 9
read: hello 9
read: hello 9
read: hello 9
read: hello 9
read: hello 9
read: hello 9
second run

View file

@ -1,7 +1,6 @@
running hash server on port=12345 running hash server on port=12345
listening on port 12345 listening on port 12345
listening on port=12345 run hash client -p 12345 -d ../data/ --n-conn=2
run hash client -p 12345 -d ../data/ --n-conn=2 -j=4
../data/d1/large: fdb479c5661572f9606266eeb280b4db5c26cc38 ../data/d1/large: fdb479c5661572f9606266eeb280b4db5c26cc38
../data/d1/large: fdb479c5661572f9606266eeb280b4db5c26cc38 ../data/d1/large: fdb479c5661572f9606266eeb280b4db5c26cc38
../data/d1/large_10: c31560efa1a5ad6dbf89990d51878f3bd64b13ce ../data/d1/large_10: c31560efa1a5ad6dbf89990d51878f3bd64b13ce

View file

@ -1,111 +0,0 @@
module BQ = Moonpool.Bounded_queue
module Bb_queue = Moonpool.Blocking_queue
module A = Moonpool.Atomic
let spawn f = ignore (Moonpool.start_thread_on_some_domain f () : Thread.t)
let () =
let bq = BQ.create ~max_size:3 () in
BQ.push bq 1;
BQ.push bq 2;
assert (BQ.size bq = 2);
assert (BQ.pop bq = 1);
assert (BQ.pop bq = 2);
assert (BQ.try_pop ~force_lock:true bq = None);
spawn (fun () -> BQ.push bq 3);
assert (BQ.pop bq = 3)
let () =
(* cannot create with size 0 *)
assert (
try
ignore (BQ.create ~max_size:0 ());
false
with _ -> true)
let () =
let bq = BQ.create ~max_size:3 () in
BQ.push bq 1;
BQ.push bq 2;
assert (BQ.size bq = 2);
assert (BQ.pop bq = 1);
BQ.close bq;
assert (BQ.pop bq = 2);
assert (
try
ignore (BQ.pop bq);
false
with BQ.Closed -> true);
assert (
try
ignore (BQ.push bq 42);
false
with BQ.Closed -> true)
let () =
let bq = BQ.create ~max_size:2 () in
let side_q = Bb_queue.create () in
BQ.push bq 1;
BQ.push bq 2;
spawn (fun () ->
for i = 3 to 10 do
BQ.push bq i;
Bb_queue.push side_q (`Pushed i)
done);
(* make space for new element *)
assert (BQ.pop bq = 1);
assert (Bb_queue.pop side_q = `Pushed 3);
assert (BQ.pop bq = 2);
assert (BQ.pop bq = 3);
for j = 4 to 10 do
assert (BQ.pop bq = j);
assert (Bb_queue.pop side_q = `Pushed j)
done;
assert (BQ.size bq = 0);
()
let () =
let bq = BQ.create ~max_size:5 () in
let bq1 = BQ.create ~max_size:10 () in
let bq2 = BQ.create ~max_size:10 () in
let bq_res = BQ.create ~max_size:2 () in
(* diamond:
bq -------> bq1
| |
| |
v v
bq2 -----> bq_res *)
spawn (fun () ->
BQ.to_iter bq (BQ.push bq1);
BQ.close bq1);
spawn (fun () ->
BQ.to_iter bq (BQ.push bq2);
BQ.close bq2);
spawn (fun () -> BQ.to_iter bq1 (BQ.push bq_res));
spawn (fun () -> BQ.to_iter bq2 (BQ.push bq_res));
let n = 100_000 in
(* push into [bq] *)
let sum = A.make 0 in
spawn (fun () ->
for i = 1 to n do
ignore (A.fetch_and_add sum i : int);
BQ.push bq i
done;
BQ.close bq);
let sum' = ref 0 in
for _j = 1 to n do
let x = BQ.pop bq_res in
sum' := x + !sum'
done;
assert (BQ.size bq_res = 0);
assert (A.get sum = !sum')