From 7e06203b14b87d2394b4f7b0bd72879cf990c214 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Mon, 28 Jul 2025 14:33:43 -0400 Subject: [PATCH] update lwt_direct --- src/lwt/lwt_direct.ml | 91 ++++++++++++++++++++++----------------- src/lwt/lwt_direct.mli | 84 +++++++++++++++++++++++++----------- src/lwt/tiny_httpd_lwt.ml | 6 +-- 3 files changed, 113 insertions(+), 68 deletions(-) diff --git a/src/lwt/lwt_direct.ml b/src/lwt/lwt_direct.ml index f989198b..704a49ab 100644 --- a/src/lwt/lwt_direct.ml +++ b/src/lwt/lwt_direct.ml @@ -1,42 +1,57 @@ -module ED = Effect.Deep +(* Direct-style wrapper for Lwt code -type _ Effect.t += Await : 'a Lwt.t -> 'a Effect.t | Yield : unit Effect.t + The implementation of the direct-style wrapper relies on ocaml5's effect + system capturing continuations and adding them as a callback to some lwt + promises. *) + +(* part 1: tasks, getting the scheduler to call them *) -(** Queue of microtasks that are ready *) let tasks : (unit -> unit) Queue.t = Queue.create () - let[@inline] push_task f : unit = Queue.push f tasks -let default_on_uncaught_exn exn bt = - Printf.eprintf "lwt_task: uncaught task exception:\n%s\n%s\n%!" - (Printexc.to_string exn) - (Printexc.raw_backtrace_to_string bt) +let absolute_max_number_of_steps = + (* TODO 6.0: what's a good number here? should it be customisable? *) + 10_000 let run_all_tasks () : unit = let n_processed = ref 0 in - let max_number_of_steps = min 10_000 (2 * Queue.length tasks) in + let max_number_of_steps = + min absolute_max_number_of_steps (2 * Queue.length tasks) + in while (not (Queue.is_empty tasks)) && !n_processed < max_number_of_steps do let t = Queue.pop tasks in incr n_processed; try t () with exn -> - let bt = Printexc.get_raw_backtrace () in - default_on_uncaught_exn exn bt + (* TODO 6.0: change async_exception handler to accept a backtrace, pass it + here and at the other use site. *) + (* TODO 6.0: this and other try-with: respect exception-filter *) + !Lwt.async_exception_hook exn done; - (* make sure we don't sleep forever if there's no lwt promise - ready but [tasks] contains ready tasks *) - if not (Queue.is_empty tasks) then ignore (Lwt.pause () : unit Lwt.t) + (* In the case where there are no promises ready for wakeup, the scheduler's + engine will pause until some IO completes. There might never be completed + IO, depending on the program structure and the state of the world. If this + happens and the queue is not empty, we add a [pause] so that the engine has + something to wakeup for so that the rest of the queue can be processed. *) + if (not (Queue.is_empty tasks)) && Lwt.paused_count () = 0 then + ignore (Lwt.pause () : unit Lwt.t) let setup_hooks = let already_done = ref false in fun () -> if not !already_done then ( already_done := true; + (* TODO 6.0: assess whether we should have both hooks or just one (which + one). Tempted to say we should only have the enter hook. *) let _hook1 = Lwt_main.Enter_iter_hooks.add_first run_all_tasks in let _hook2 = Lwt_main.Leave_iter_hooks.add_first run_all_tasks in () ) +(* part 2: effects, performing them *) + +type _ Effect.t += Await : 'a Lwt.t -> 'a Effect.t | Yield : unit Effect.t + let await (fut : 'a Lwt.t) : 'a = match Lwt.state fut with | Lwt.Return x -> x @@ -45,49 +60,45 @@ let await (fut : 'a Lwt.t) : 'a = let yield () : unit = Effect.perform Yield -(** the main effect handler *) -let handler : _ ED.effect_handler = - let effc : type b. b Effect.t -> ((b, unit) ED.continuation -> 'a) option = - function - | Yield -> Some (fun k -> push_task (fun () -> ED.continue k ())) +(* part 3: handling effects *) + +let handler : _ Effect.Deep.effect_handler = + let effc : type b. + b Effect.t -> ((b, unit) Effect.Deep.continuation -> 'a) option = function + | Yield -> Some (fun k -> push_task (fun () -> Effect.Deep.continue k ())) | Await fut -> Some (fun k -> Lwt.on_any fut - (fun res -> push_task (fun () -> ED.continue k res)) - (fun exn -> push_task (fun () -> ED.discontinue k exn))) + (fun res -> push_task (fun () -> Effect.Deep.continue k res)) + (fun exn -> push_task (fun () -> Effect.Deep.discontinue k exn))) | _ -> None in { effc } +(* part 4: putting it all together: running tasks *) + let run_inside_effect_handler_and_resolve_ (type a) (promise : a Lwt.u) f () : unit = - let res = ref (Error (Failure "not resolved")) in let run_f_and_set_res () = - (try - let r = f () in - res := Ok r - with exn -> res := Error exn); - Lwt.wakeup_result promise !res + match f () with + | res -> Lwt.wakeup promise res + | exception exc -> Lwt.wakeup_exn promise exc in - ED.try_with run_f_and_set_res () handler + Effect.Deep.try_with run_f_and_set_res () handler -let run f : _ Lwt.t = +let spawn f : _ Lwt.t = setup_hooks (); let lwt, resolve = Lwt.wait () in push_task (run_inside_effect_handler_and_resolve_ resolve f); lwt -let run_inside_effect_handler_in_the_background_ ~on_uncaught_exn f () : unit = - let run_f () : unit = - try f () - with exn -> - let bt = Printexc.get_raw_backtrace () in - on_uncaught_exn exn bt - in - ED.try_with run_f () handler +(* part 4 (encore): running a task in the background *) -let run_in_the_background ?(on_uncaught_exn = default_on_uncaught_exn) f : unit - = +let run_inside_effect_handler_in_the_background_ f () : unit = + let run_f () : unit = try f () with exn -> !Lwt.async_exception_hook exn in + Effect.Deep.try_with run_f () handler + +let spawn_in_the_background f : unit = setup_hooks (); - push_task (run_inside_effect_handler_in_the_background_ ~on_uncaught_exn f) + push_task (run_inside_effect_handler_in_the_background_ f) diff --git a/src/lwt/lwt_direct.mli b/src/lwt/lwt_direct.mli index e1cbd422..b964ab78 100644 --- a/src/lwt/lwt_direct.mli +++ b/src/lwt/lwt_direct.mli @@ -1,33 +1,67 @@ -(** Direct style control flow for Lwt. *) +(** Direct style control flow for Lwt. -val run : (unit -> 'a) -> 'a Lwt.t -(** [run f] runs the function [f ()] in a task within - the [Lwt_unix] event loop. [f ()] can create [Lwt] - promises and use {!await} to wait for them. Like any promise - in Lwt, [f ()] can starve the event loop if it runs long computations - without yielding to the event loop. + This module relies on OCaml 5's + {{:https://ocaml.org/manual/5.3/effects.html} effect handlers}. Instead of + chaining promises using {!Lwt.bind} and {!Lwt.map} and other combinators, it + becomes possible to start lightweight "tasks" using + [Lwt_direct.spawn (fun () -> ...)]. The body of such a task is written in + direct-style code, using OCaml's standard control flow structures such as + loops, higher-order functions, exception handlers, [match], etc. - When [f ()] terminates (successfully or not), the promise - [run f] is resolved with [f ()]'s result, or the exception - raised by [f ()]. *) + Interactions with the rest of lwt can be done using [await], for example: -val run_in_the_background : - ?on_uncaught_exn:(exn -> Printexc.raw_backtrace -> unit) -> - (unit -> unit) -> - unit -(** [run_in_the_background f] is similar to [ignore (run f)]. - The computation [f()] runs in the background in the event loop - and returns no result. - @param on_uncaught_exn if provided, this is called when [f()] - raises an exception. *) + {[ + Lwt_direct.spawn (fun () -> + let continue = ref true in + while !continue do + match Lwt_io.read_line in_channel |> Lwt_direct.await with + | exception End_of_file -> continue := false + | line -> + let uppercase_line = String.uppercase_ascii line in + Lwt_io.write_line out_channel uppercase_line |> Lwt_direct.await + done) + ]} + + This code snippet contains a simple "task" that repeatedly reads a line from + a [Lwt_io] channel, uppercases it, and writes the uppercase version to + another channel. + + This task is itself a [unit Lwt.t], which is resolved when the function + returns. It is possible to use {!Lwt_direct.run_in_the_background} to ignore + the result and let the task run in the background instead. *) + +val spawn : (unit -> 'a) -> 'a Lwt.t +(** [spawn f] runs the function [f ()] in a task within the [Lwt_unix] event + loop. [f ()] can create [Lwt] promises and use {!await} to wait for them. + Like any promise in Lwt, [f ()] can starve the event loop if it runs long + computations without yielding to the event loop. + + When [f ()] terminates (successfully or not), the promise [spawn f] is + resolved with [f ()]'s result, or the exception raised by [f ()]. + + The promise returned by [spawn f] is not cancellable. Canceling it will have + no effect. *) + +val spawn_in_the_background : (unit -> unit) -> unit +(** [spawn_in_the_background f] is similar to [ignore (spawn f)]. The + computation [f()] runs in the background in the event loop and returns no + result. If [f()] raises an exception, {!Lwt.async_exception_hook} is called. +*) val yield : unit -> unit (** Yield to the event loop. - Can only be used inside {!run} or {!run_in_the_background}. *) + + Calling [yield] outside of {!spawn} or {!run_in_the_background} will raise + an exception, crash your program, or otherwise cause errors. It is a + programming error to do so. *) val await : 'a Lwt.t -> 'a -(** [await prom] returns the result of [prom], or re-raises the - exception with which [prom] failed if it failed. - If [prom] is not resolved yet, [await prom] will suspend the - current task and resume it when [prom] is resolved. - Can only be used inside {!run} or {!run_in_the_background}. *) +(** [await prom] returns the result of [prom], or re-raises the exception with + which [prom] failed if it failed. If [prom] is not resolved yet, + [await prom] will suspend the current task and resume it when [prom] is + resolved. + + Calling [await] outside of {!spawn} or {!run_in_the_background} will raise + an exception, crash your program, or otherwise cause errors. It is a + programming error to do so. *) + diff --git a/src/lwt/tiny_httpd_lwt.ml b/src/lwt/tiny_httpd_lwt.ml index 43141363..af7f1dfd 100644 --- a/src/lwt/tiny_httpd_lwt.ml +++ b/src/lwt/tiny_httpd_lwt.ml @@ -123,7 +123,7 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size let port = ref port in let server_loop : unit Lwt.t = - let@ () = Lwt_direct.run in + let@ () = Lwt_direct.spawn in let backlog = max_connections in let sock = Lwt_unix.socket ~cloexec:true @@ -146,7 +146,7 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size let handle_client client_addr fd : unit = Atomic.incr active_conns; - Lwt_direct.run_in_the_background @@ fun () -> + Lwt_direct.spawn_in_the_background @@ fun () -> let@ buf_ic = Pool.with_resource buf_pool in let@ buf_oc = Pool.with_resource buf_pool in @@ -212,7 +212,7 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size let create ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size ?buf_size ?middlewares () : H.t Lwt.t = - let@ () = Lwt_direct.run in + let@ () = Lwt_direct.spawn in let backend = io_backend ?addr ?port ?unix_sock ?max_buf_pool_size ?max_connections ?buf_size ()