Compare commits

..

1 commit

Author SHA1 Message Date
Simon Cruanes
78552c9c04
Merge 5caef14945 into 8a8aadfbb0 2025-07-17 02:58:40 +00:00
3 changed files with 68 additions and 113 deletions

View file

@ -1,57 +1,42 @@
(* Direct-style wrapper for Lwt code
module ED = Effect.Deep
The implementation of the direct-style wrapper relies on ocaml5's effect
system capturing continuations and adding them as a callback to some lwt
promises. *)
(* part 1: tasks, getting the scheduler to call them *)
type _ Effect.t += Await : 'a Lwt.t -> 'a Effect.t | Yield : unit Effect.t
(** Queue of microtasks that are ready *)
let tasks : (unit -> unit) Queue.t = Queue.create ()
let[@inline] push_task f : unit = Queue.push f tasks
let absolute_max_number_of_steps =
(* TODO 6.0: what's a good number here? should it be customisable? *)
10_000
let default_on_uncaught_exn exn bt =
Printf.eprintf "lwt_task: uncaught task exception:\n%s\n%s\n%!"
(Printexc.to_string exn)
(Printexc.raw_backtrace_to_string bt)
let run_all_tasks () : unit =
let n_processed = ref 0 in
let max_number_of_steps =
min absolute_max_number_of_steps (2 * Queue.length tasks)
in
let max_number_of_steps = min 10_000 (2 * Queue.length tasks) in
while (not (Queue.is_empty tasks)) && !n_processed < max_number_of_steps do
let t = Queue.pop tasks in
incr n_processed;
try t ()
with exn ->
(* TODO 6.0: change async_exception handler to accept a backtrace, pass it
here and at the other use site. *)
(* TODO 6.0: this and other try-with: respect exception-filter *)
!Lwt.async_exception_hook exn
let bt = Printexc.get_raw_backtrace () in
default_on_uncaught_exn exn bt
done;
(* In the case where there are no promises ready for wakeup, the scheduler's
engine will pause until some IO completes. There might never be completed
IO, depending on the program structure and the state of the world. If this
happens and the queue is not empty, we add a [pause] so that the engine has
something to wakeup for so that the rest of the queue can be processed. *)
if (not (Queue.is_empty tasks)) && Lwt.paused_count () = 0 then
ignore (Lwt.pause () : unit Lwt.t)
(* make sure we don't sleep forever if there's no lwt promise
ready but [tasks] contains ready tasks *)
if not (Queue.is_empty tasks) then ignore (Lwt.pause () : unit Lwt.t)
let setup_hooks =
let already_done = ref false in
fun () ->
if not !already_done then (
already_done := true;
(* TODO 6.0: assess whether we should have both hooks or just one (which
one). Tempted to say we should only have the enter hook. *)
let _hook1 = Lwt_main.Enter_iter_hooks.add_first run_all_tasks in
let _hook2 = Lwt_main.Leave_iter_hooks.add_first run_all_tasks in
()
)
(* part 2: effects, performing them *)
type _ Effect.t += Await : 'a Lwt.t -> 'a Effect.t | Yield : unit Effect.t
let await (fut : 'a Lwt.t) : 'a =
match Lwt.state fut with
| Lwt.Return x -> x
@ -60,45 +45,49 @@ let await (fut : 'a Lwt.t) : 'a =
let yield () : unit = Effect.perform Yield
(* part 3: handling effects *)
let handler : _ Effect.Deep.effect_handler =
let effc : type b.
b Effect.t -> ((b, unit) Effect.Deep.continuation -> 'a) option = function
| Yield -> Some (fun k -> push_task (fun () -> Effect.Deep.continue k ()))
(** the main effect handler *)
let handler : _ ED.effect_handler =
let effc : type b. b Effect.t -> ((b, unit) ED.continuation -> 'a) option =
function
| Yield -> Some (fun k -> push_task (fun () -> ED.continue k ()))
| Await fut ->
Some
(fun k ->
Lwt.on_any fut
(fun res -> push_task (fun () -> Effect.Deep.continue k res))
(fun exn -> push_task (fun () -> Effect.Deep.discontinue k exn)))
(fun res -> push_task (fun () -> ED.continue k res))
(fun exn -> push_task (fun () -> ED.discontinue k exn)))
| _ -> None
in
{ effc }
(* part 4: putting it all together: running tasks *)
let run_inside_effect_handler_and_resolve_ (type a) (promise : a Lwt.u) f () :
unit =
let res = ref (Error (Failure "not resolved")) in
let run_f_and_set_res () =
match f () with
| res -> Lwt.wakeup promise res
| exception exc -> Lwt.wakeup_exn promise exc
(try
let r = f () in
res := Ok r
with exn -> res := Error exn);
Lwt.wakeup_result promise !res
in
Effect.Deep.try_with run_f_and_set_res () handler
ED.try_with run_f_and_set_res () handler
let spawn f : _ Lwt.t =
let run f : _ Lwt.t =
setup_hooks ();
let lwt, resolve = Lwt.wait () in
push_task (run_inside_effect_handler_and_resolve_ resolve f);
lwt
(* part 4 (encore): running a task in the background *)
let run_inside_effect_handler_in_the_background_ ~on_uncaught_exn f () : unit =
let run_f () : unit =
try f ()
with exn ->
let bt = Printexc.get_raw_backtrace () in
on_uncaught_exn exn bt
in
ED.try_with run_f () handler
let run_inside_effect_handler_in_the_background_ f () : unit =
let run_f () : unit = try f () with exn -> !Lwt.async_exception_hook exn in
Effect.Deep.try_with run_f () handler
let spawn_in_the_background f : unit =
let run_in_the_background ?(on_uncaught_exn = default_on_uncaught_exn) f : unit
=
setup_hooks ();
push_task (run_inside_effect_handler_in_the_background_ f)
push_task (run_inside_effect_handler_in_the_background_ ~on_uncaught_exn f)

View file

@ -1,67 +1,33 @@
(** Direct style control flow for Lwt.
(** Direct style control flow for Lwt. *)
This module relies on OCaml 5's
{{:https://ocaml.org/manual/5.3/effects.html} effect handlers}. Instead of
chaining promises using {!Lwt.bind} and {!Lwt.map} and other combinators, it
becomes possible to start lightweight "tasks" using
[Lwt_direct.spawn (fun () -> ...)]. The body of such a task is written in
direct-style code, using OCaml's standard control flow structures such as
loops, higher-order functions, exception handlers, [match], etc.
val run : (unit -> 'a) -> 'a Lwt.t
(** [run f] runs the function [f ()] in a task within
the [Lwt_unix] event loop. [f ()] can create [Lwt]
promises and use {!await} to wait for them. Like any promise
in Lwt, [f ()] can starve the event loop if it runs long computations
without yielding to the event loop.
Interactions with the rest of lwt can be done using [await], for example:
When [f ()] terminates (successfully or not), the promise
[run f] is resolved with [f ()]'s result, or the exception
raised by [f ()]. *)
{[
Lwt_direct.spawn (fun () ->
let continue = ref true in
while !continue do
match Lwt_io.read_line in_channel |> Lwt_direct.await with
| exception End_of_file -> continue := false
| line ->
let uppercase_line = String.uppercase_ascii line in
Lwt_io.write_line out_channel uppercase_line |> Lwt_direct.await
done)
]}
This code snippet contains a simple "task" that repeatedly reads a line from
a [Lwt_io] channel, uppercases it, and writes the uppercase version to
another channel.
This task is itself a [unit Lwt.t], which is resolved when the function
returns. It is possible to use {!Lwt_direct.run_in_the_background} to ignore
the result and let the task run in the background instead. *)
val spawn : (unit -> 'a) -> 'a Lwt.t
(** [spawn f] runs the function [f ()] in a task within the [Lwt_unix] event
loop. [f ()] can create [Lwt] promises and use {!await} to wait for them.
Like any promise in Lwt, [f ()] can starve the event loop if it runs long
computations without yielding to the event loop.
When [f ()] terminates (successfully or not), the promise [spawn f] is
resolved with [f ()]'s result, or the exception raised by [f ()].
The promise returned by [spawn f] is not cancellable. Canceling it will have
no effect. *)
val spawn_in_the_background : (unit -> unit) -> unit
(** [spawn_in_the_background f] is similar to [ignore (spawn f)]. The
computation [f()] runs in the background in the event loop and returns no
result. If [f()] raises an exception, {!Lwt.async_exception_hook} is called.
*)
val run_in_the_background :
?on_uncaught_exn:(exn -> Printexc.raw_backtrace -> unit) ->
(unit -> unit) ->
unit
(** [run_in_the_background f] is similar to [ignore (run f)].
The computation [f()] runs in the background in the event loop
and returns no result.
@param on_uncaught_exn if provided, this is called when [f()]
raises an exception. *)
val yield : unit -> unit
(** Yield to the event loop.
Calling [yield] outside of {!spawn} or {!run_in_the_background} will raise
an exception, crash your program, or otherwise cause errors. It is a
programming error to do so. *)
Can only be used inside {!run} or {!run_in_the_background}. *)
val await : 'a Lwt.t -> 'a
(** [await prom] returns the result of [prom], or re-raises the exception with
which [prom] failed if it failed. If [prom] is not resolved yet,
[await prom] will suspend the current task and resume it when [prom] is
resolved.
Calling [await] outside of {!spawn} or {!run_in_the_background} will raise
an exception, crash your program, or otherwise cause errors. It is a
programming error to do so. *)
(** [await prom] returns the result of [prom], or re-raises the
exception with which [prom] failed if it failed.
If [prom] is not resolved yet, [await prom] will suspend the
current task and resume it when [prom] is resolved.
Can only be used inside {!run} or {!run_in_the_background}. *)

View file

@ -123,7 +123,7 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
let port = ref port in
let server_loop : unit Lwt.t =
let@ () = Lwt_direct.spawn in
let@ () = Lwt_direct.run in
let backlog = max_connections in
let sock =
Lwt_unix.socket ~cloexec:true
@ -146,7 +146,7 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
let handle_client client_addr fd : unit =
Atomic.incr active_conns;
Lwt_direct.spawn_in_the_background @@ fun () ->
Lwt_direct.run_in_the_background @@ fun () ->
let@ buf_ic = Pool.with_resource buf_pool in
let@ buf_oc = Pool.with_resource buf_pool in
@ -212,7 +212,7 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
let create ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size ?buf_size
?middlewares () : H.t Lwt.t =
let@ () = Lwt_direct.spawn in
let@ () = Lwt_direct.run in
let backend =
io_backend ?addr ?port ?unix_sock ?max_buf_pool_size ?max_connections
?buf_size ()