update lwt_direct

This commit is contained in:
Simon Cruanes 2025-07-28 14:33:43 -04:00
parent 5caef14945
commit 7e06203b14
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
3 changed files with 113 additions and 68 deletions

View file

@ -1,42 +1,57 @@
module ED = Effect.Deep (* Direct-style wrapper for Lwt code
type _ Effect.t += Await : 'a Lwt.t -> 'a Effect.t | Yield : unit Effect.t The implementation of the direct-style wrapper relies on ocaml5's effect
system capturing continuations and adding them as a callback to some lwt
promises. *)
(* part 1: tasks, getting the scheduler to call them *)
(** Queue of microtasks that are ready *)
let tasks : (unit -> unit) Queue.t = Queue.create () let tasks : (unit -> unit) Queue.t = Queue.create ()
let[@inline] push_task f : unit = Queue.push f tasks let[@inline] push_task f : unit = Queue.push f tasks
let default_on_uncaught_exn exn bt = let absolute_max_number_of_steps =
Printf.eprintf "lwt_task: uncaught task exception:\n%s\n%s\n%!" (* TODO 6.0: what's a good number here? should it be customisable? *)
(Printexc.to_string exn) 10_000
(Printexc.raw_backtrace_to_string bt)
let run_all_tasks () : unit = let run_all_tasks () : unit =
let n_processed = ref 0 in let n_processed = ref 0 in
let max_number_of_steps = min 10_000 (2 * Queue.length tasks) in let max_number_of_steps =
min absolute_max_number_of_steps (2 * Queue.length tasks)
in
while (not (Queue.is_empty tasks)) && !n_processed < max_number_of_steps do while (not (Queue.is_empty tasks)) && !n_processed < max_number_of_steps do
let t = Queue.pop tasks in let t = Queue.pop tasks in
incr n_processed; incr n_processed;
try t () try t ()
with exn -> with exn ->
let bt = Printexc.get_raw_backtrace () in (* TODO 6.0: change async_exception handler to accept a backtrace, pass it
default_on_uncaught_exn exn bt here and at the other use site. *)
(* TODO 6.0: this and other try-with: respect exception-filter *)
!Lwt.async_exception_hook exn
done; done;
(* make sure we don't sleep forever if there's no lwt promise (* In the case where there are no promises ready for wakeup, the scheduler's
ready but [tasks] contains ready tasks *) engine will pause until some IO completes. There might never be completed
if not (Queue.is_empty tasks) then ignore (Lwt.pause () : unit Lwt.t) IO, depending on the program structure and the state of the world. If this
happens and the queue is not empty, we add a [pause] so that the engine has
something to wakeup for so that the rest of the queue can be processed. *)
if (not (Queue.is_empty tasks)) && Lwt.paused_count () = 0 then
ignore (Lwt.pause () : unit Lwt.t)
let setup_hooks = let setup_hooks =
let already_done = ref false in let already_done = ref false in
fun () -> fun () ->
if not !already_done then ( if not !already_done then (
already_done := true; already_done := true;
(* TODO 6.0: assess whether we should have both hooks or just one (which
one). Tempted to say we should only have the enter hook. *)
let _hook1 = Lwt_main.Enter_iter_hooks.add_first run_all_tasks in let _hook1 = Lwt_main.Enter_iter_hooks.add_first run_all_tasks in
let _hook2 = Lwt_main.Leave_iter_hooks.add_first run_all_tasks in let _hook2 = Lwt_main.Leave_iter_hooks.add_first run_all_tasks in
() ()
) )
(* part 2: effects, performing them *)
type _ Effect.t += Await : 'a Lwt.t -> 'a Effect.t | Yield : unit Effect.t
let await (fut : 'a Lwt.t) : 'a = let await (fut : 'a Lwt.t) : 'a =
match Lwt.state fut with match Lwt.state fut with
| Lwt.Return x -> x | Lwt.Return x -> x
@ -45,49 +60,45 @@ let await (fut : 'a Lwt.t) : 'a =
let yield () : unit = Effect.perform Yield let yield () : unit = Effect.perform Yield
(** the main effect handler *) (* part 3: handling effects *)
let handler : _ ED.effect_handler =
let effc : type b. b Effect.t -> ((b, unit) ED.continuation -> 'a) option = let handler : _ Effect.Deep.effect_handler =
function let effc : type b.
| Yield -> Some (fun k -> push_task (fun () -> ED.continue k ())) b Effect.t -> ((b, unit) Effect.Deep.continuation -> 'a) option = function
| Yield -> Some (fun k -> push_task (fun () -> Effect.Deep.continue k ()))
| Await fut -> | Await fut ->
Some Some
(fun k -> (fun k ->
Lwt.on_any fut Lwt.on_any fut
(fun res -> push_task (fun () -> ED.continue k res)) (fun res -> push_task (fun () -> Effect.Deep.continue k res))
(fun exn -> push_task (fun () -> ED.discontinue k exn))) (fun exn -> push_task (fun () -> Effect.Deep.discontinue k exn)))
| _ -> None | _ -> None
in in
{ effc } { effc }
(* part 4: putting it all together: running tasks *)
let run_inside_effect_handler_and_resolve_ (type a) (promise : a Lwt.u) f () : let run_inside_effect_handler_and_resolve_ (type a) (promise : a Lwt.u) f () :
unit = unit =
let res = ref (Error (Failure "not resolved")) in
let run_f_and_set_res () = let run_f_and_set_res () =
(try match f () with
let r = f () in | res -> Lwt.wakeup promise res
res := Ok r | exception exc -> Lwt.wakeup_exn promise exc
with exn -> res := Error exn);
Lwt.wakeup_result promise !res
in in
ED.try_with run_f_and_set_res () handler Effect.Deep.try_with run_f_and_set_res () handler
let run f : _ Lwt.t = let spawn f : _ Lwt.t =
setup_hooks (); setup_hooks ();
let lwt, resolve = Lwt.wait () in let lwt, resolve = Lwt.wait () in
push_task (run_inside_effect_handler_and_resolve_ resolve f); push_task (run_inside_effect_handler_and_resolve_ resolve f);
lwt lwt
let run_inside_effect_handler_in_the_background_ ~on_uncaught_exn f () : unit = (* part 4 (encore): running a task in the background *)
let run_f () : unit =
try f ()
with exn ->
let bt = Printexc.get_raw_backtrace () in
on_uncaught_exn exn bt
in
ED.try_with run_f () handler
let run_in_the_background ?(on_uncaught_exn = default_on_uncaught_exn) f : unit let run_inside_effect_handler_in_the_background_ f () : unit =
= let run_f () : unit = try f () with exn -> !Lwt.async_exception_hook exn in
Effect.Deep.try_with run_f () handler
let spawn_in_the_background f : unit =
setup_hooks (); setup_hooks ();
push_task (run_inside_effect_handler_in_the_background_ ~on_uncaught_exn f) push_task (run_inside_effect_handler_in_the_background_ f)

View file

@ -1,33 +1,67 @@
(** Direct style control flow for Lwt. *) (** Direct style control flow for Lwt.
val run : (unit -> 'a) -> 'a Lwt.t This module relies on OCaml 5's
(** [run f] runs the function [f ()] in a task within {{:https://ocaml.org/manual/5.3/effects.html} effect handlers}. Instead of
the [Lwt_unix] event loop. [f ()] can create [Lwt] chaining promises using {!Lwt.bind} and {!Lwt.map} and other combinators, it
promises and use {!await} to wait for them. Like any promise becomes possible to start lightweight "tasks" using
in Lwt, [f ()] can starve the event loop if it runs long computations [Lwt_direct.spawn (fun () -> ...)]. The body of such a task is written in
without yielding to the event loop. direct-style code, using OCaml's standard control flow structures such as
loops, higher-order functions, exception handlers, [match], etc.
When [f ()] terminates (successfully or not), the promise Interactions with the rest of lwt can be done using [await], for example:
[run f] is resolved with [f ()]'s result, or the exception
raised by [f ()]. *)
val run_in_the_background : {[
?on_uncaught_exn:(exn -> Printexc.raw_backtrace -> unit) -> Lwt_direct.spawn (fun () ->
(unit -> unit) -> let continue = ref true in
unit while !continue do
(** [run_in_the_background f] is similar to [ignore (run f)]. match Lwt_io.read_line in_channel |> Lwt_direct.await with
The computation [f()] runs in the background in the event loop | exception End_of_file -> continue := false
and returns no result. | line ->
@param on_uncaught_exn if provided, this is called when [f()] let uppercase_line = String.uppercase_ascii line in
raises an exception. *) Lwt_io.write_line out_channel uppercase_line |> Lwt_direct.await
done)
]}
This code snippet contains a simple "task" that repeatedly reads a line from
a [Lwt_io] channel, uppercases it, and writes the uppercase version to
another channel.
This task is itself a [unit Lwt.t], which is resolved when the function
returns. It is possible to use {!Lwt_direct.run_in_the_background} to ignore
the result and let the task run in the background instead. *)
val spawn : (unit -> 'a) -> 'a Lwt.t
(** [spawn f] runs the function [f ()] in a task within the [Lwt_unix] event
loop. [f ()] can create [Lwt] promises and use {!await} to wait for them.
Like any promise in Lwt, [f ()] can starve the event loop if it runs long
computations without yielding to the event loop.
When [f ()] terminates (successfully or not), the promise [spawn f] is
resolved with [f ()]'s result, or the exception raised by [f ()].
The promise returned by [spawn f] is not cancellable. Canceling it will have
no effect. *)
val spawn_in_the_background : (unit -> unit) -> unit
(** [spawn_in_the_background f] is similar to [ignore (spawn f)]. The
computation [f()] runs in the background in the event loop and returns no
result. If [f()] raises an exception, {!Lwt.async_exception_hook} is called.
*)
val yield : unit -> unit val yield : unit -> unit
(** Yield to the event loop. (** Yield to the event loop.
Can only be used inside {!run} or {!run_in_the_background}. *)
Calling [yield] outside of {!spawn} or {!run_in_the_background} will raise
an exception, crash your program, or otherwise cause errors. It is a
programming error to do so. *)
val await : 'a Lwt.t -> 'a val await : 'a Lwt.t -> 'a
(** [await prom] returns the result of [prom], or re-raises the (** [await prom] returns the result of [prom], or re-raises the exception with
exception with which [prom] failed if it failed. which [prom] failed if it failed. If [prom] is not resolved yet,
If [prom] is not resolved yet, [await prom] will suspend the [await prom] will suspend the current task and resume it when [prom] is
current task and resume it when [prom] is resolved. resolved.
Can only be used inside {!run} or {!run_in_the_background}. *)
Calling [await] outside of {!spawn} or {!run_in_the_background} will raise
an exception, crash your program, or otherwise cause errors. It is a
programming error to do so. *)

View file

@ -123,7 +123,7 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
let port = ref port in let port = ref port in
let server_loop : unit Lwt.t = let server_loop : unit Lwt.t =
let@ () = Lwt_direct.run in let@ () = Lwt_direct.spawn in
let backlog = max_connections in let backlog = max_connections in
let sock = let sock =
Lwt_unix.socket ~cloexec:true Lwt_unix.socket ~cloexec:true
@ -146,7 +146,7 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
let handle_client client_addr fd : unit = let handle_client client_addr fd : unit =
Atomic.incr active_conns; Atomic.incr active_conns;
Lwt_direct.run_in_the_background @@ fun () -> Lwt_direct.spawn_in_the_background @@ fun () ->
let@ buf_ic = Pool.with_resource buf_pool in let@ buf_ic = Pool.with_resource buf_pool in
let@ buf_oc = Pool.with_resource buf_pool in let@ buf_oc = Pool.with_resource buf_pool in
@ -212,7 +212,7 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
let create ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size ?buf_size let create ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size ?buf_size
?middlewares () : H.t Lwt.t = ?middlewares () : H.t Lwt.t =
let@ () = Lwt_direct.run in let@ () = Lwt_direct.spawn in
let backend = let backend =
io_backend ?addr ?port ?unix_sock ?max_buf_pool_size ?max_connections io_backend ?addr ?port ?unix_sock ?max_buf_pool_size ?max_connections
?buf_size () ?buf_size ()