From 3014046a8a1d3fc8d2ed0015d5017eedc1b86d33 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Tue, 8 Jul 2025 10:32:46 -0400 Subject: [PATCH] use lwt_direct from lwt PR --- examples/echo_lwt.ml | 6 ++-- src/lwt/{task.ml => lwt_direct.ml} | 50 ++++++++++++++++++++---------- src/lwt/lwt_direct.mli | 33 ++++++++++++++++++++ src/lwt/task.mli | 9 ------ src/lwt/tiny_httpd_lwt.ml | 29 +++++++++-------- src/lwt/tiny_httpd_lwt.mli | 2 +- 6 files changed, 85 insertions(+), 44 deletions(-) rename src/lwt/{task.ml => lwt_direct.ml} (55%) create mode 100644 src/lwt/lwt_direct.mli delete mode 100644 src/lwt/task.mli diff --git a/examples/echo_lwt.ml b/examples/echo_lwt.ml index e1b9e196..c66184b2 100644 --- a/examples/echo_lwt.ml +++ b/examples/echo_lwt.ml @@ -1,7 +1,7 @@ open Tiny_httpd_core module Log = Tiny_httpd.Log module MFD = Tiny_httpd_multipart_form_data -module Task = Tiny_httpd_lwt.Task +module Lwt_direct = Tiny_httpd_lwt.Lwt_direct let now_ = Unix.gettimeofday @@ -151,11 +151,11 @@ let () = let ev = new Lwt_engine.libev () in Lwt_engine.set ev; - Lwt_main.run @@ Task.run + Lwt_main.run @@ Lwt_direct.run @@ fun () -> let server = Tiny_httpd_lwt.create ~addr:!addr ~port:!port_ ~max_connections:!j () - |> Task.await + |> Lwt_direct.await in Tiny_httpd_camlzip.setup ~compress_above:1024 ~buf_size:(16 * 1024) server; diff --git a/src/lwt/task.ml b/src/lwt/lwt_direct.ml similarity index 55% rename from src/lwt/task.ml rename to src/lwt/lwt_direct.ml index 15c29b25..f989198b 100644 --- a/src/lwt/task.ml +++ b/src/lwt/lwt_direct.ml @@ -1,17 +1,16 @@ module ED = Effect.Deep -type _ Effect.t += Await : 'a Lwt.t -> 'a Effect.t +type _ Effect.t += Await : 'a Lwt.t -> 'a Effect.t | Yield : unit Effect.t (** Queue of microtasks that are ready *) let tasks : (unit -> unit) Queue.t = Queue.create () let[@inline] push_task f : unit = Queue.push f tasks -let on_uncaught_exn : (exn -> Printexc.raw_backtrace -> unit) ref = - ref (fun exn bt -> - Printf.eprintf "lwt_task: uncaught task exception:\n%s\n%s\n%!" - (Printexc.to_string exn) - (Printexc.raw_backtrace_to_string bt)) +let default_on_uncaught_exn exn bt = + Printf.eprintf "lwt_task: uncaught task exception:\n%s\n%s\n%!" + (Printexc.to_string exn) + (Printexc.raw_backtrace_to_string bt) let run_all_tasks () : unit = let n_processed = ref 0 in @@ -22,16 +21,21 @@ let run_all_tasks () : unit = try t () with exn -> let bt = Printexc.get_raw_backtrace () in - !on_uncaught_exn exn bt + default_on_uncaught_exn exn bt done; (* make sure we don't sleep forever if there's no lwt promise ready but [tasks] contains ready tasks *) if not (Queue.is_empty tasks) then ignore (Lwt.pause () : unit Lwt.t) -let () = - let _hook1 = Lwt_main.Enter_iter_hooks.add_first run_all_tasks in - let _hook2 = Lwt_main.Leave_iter_hooks.add_first run_all_tasks in - () +let setup_hooks = + let already_done = ref false in + fun () -> + if not !already_done then ( + already_done := true; + let _hook1 = Lwt_main.Enter_iter_hooks.add_first run_all_tasks in + let _hook2 = Lwt_main.Leave_iter_hooks.add_first run_all_tasks in + () + ) let await (fut : 'a Lwt.t) : 'a = match Lwt.state fut with @@ -39,10 +43,13 @@ let await (fut : 'a Lwt.t) : 'a = | Lwt.Fail exn -> raise exn | Lwt.Sleep -> Effect.perform (Await fut) +let yield () : unit = Effect.perform Yield + (** the main effect handler *) let handler : _ ED.effect_handler = let effc : type b. b Effect.t -> ((b, unit) ED.continuation -> 'a) option = function + | Yield -> Some (fun k -> push_task (fun () -> ED.continue k ())) | Await fut -> Some (fun k -> @@ -51,10 +58,10 @@ let handler : _ ED.effect_handler = (fun exn -> push_task (fun () -> ED.discontinue k exn))) | _ -> None in - { effc } -let run_inside_effect_handler_ (type a) (promise : a Lwt.u) f () : unit = +let run_inside_effect_handler_and_resolve_ (type a) (promise : a Lwt.u) f () : + unit = let res = ref (Error (Failure "not resolved")) in let run_f_and_set_res () = (try @@ -66,10 +73,21 @@ let run_inside_effect_handler_ (type a) (promise : a Lwt.u) f () : unit = ED.try_with run_f_and_set_res () handler let run f : _ Lwt.t = + setup_hooks (); let lwt, resolve = Lwt.wait () in - push_task (run_inside_effect_handler_ resolve f); + push_task (run_inside_effect_handler_and_resolve_ resolve f); lwt -let run_async f : unit = ignore (run f : unit Lwt.t) +let run_inside_effect_handler_in_the_background_ ~on_uncaught_exn f () : unit = + let run_f () : unit = + try f () + with exn -> + let bt = Printexc.get_raw_backtrace () in + on_uncaught_exn exn bt + in + ED.try_with run_f () handler -(* TODO: yield, use that in loops? *) +let run_in_the_background ?(on_uncaught_exn = default_on_uncaught_exn) f : unit + = + setup_hooks (); + push_task (run_inside_effect_handler_in_the_background_ ~on_uncaught_exn f) diff --git a/src/lwt/lwt_direct.mli b/src/lwt/lwt_direct.mli new file mode 100644 index 00000000..e1cbd422 --- /dev/null +++ b/src/lwt/lwt_direct.mli @@ -0,0 +1,33 @@ +(** Direct style control flow for Lwt. *) + +val run : (unit -> 'a) -> 'a Lwt.t +(** [run f] runs the function [f ()] in a task within + the [Lwt_unix] event loop. [f ()] can create [Lwt] + promises and use {!await} to wait for them. Like any promise + in Lwt, [f ()] can starve the event loop if it runs long computations + without yielding to the event loop. + + When [f ()] terminates (successfully or not), the promise + [run f] is resolved with [f ()]'s result, or the exception + raised by [f ()]. *) + +val run_in_the_background : + ?on_uncaught_exn:(exn -> Printexc.raw_backtrace -> unit) -> + (unit -> unit) -> + unit +(** [run_in_the_background f] is similar to [ignore (run f)]. + The computation [f()] runs in the background in the event loop + and returns no result. + @param on_uncaught_exn if provided, this is called when [f()] + raises an exception. *) + +val yield : unit -> unit +(** Yield to the event loop. + Can only be used inside {!run} or {!run_in_the_background}. *) + +val await : 'a Lwt.t -> 'a +(** [await prom] returns the result of [prom], or re-raises the + exception with which [prom] failed if it failed. + If [prom] is not resolved yet, [await prom] will suspend the + current task and resume it when [prom] is resolved. + Can only be used inside {!run} or {!run_in_the_background}. *) diff --git a/src/lwt/task.mli b/src/lwt/task.mli deleted file mode 100644 index 7b326dc0..00000000 --- a/src/lwt/task.mli +++ /dev/null @@ -1,9 +0,0 @@ -(** Direct style tasks for Lwt *) - -val run : (unit -> 'a) -> 'a Lwt.t -(** Run a microtask *) - -val run_async : (unit -> unit) -> unit - -val await : 'a Lwt.t -> 'a -(** Can only be used inside {!run} *) diff --git a/src/lwt/tiny_httpd_lwt.ml b/src/lwt/tiny_httpd_lwt.ml index e78926ed..f21d6606 100644 --- a/src/lwt/tiny_httpd_lwt.ml +++ b/src/lwt/tiny_httpd_lwt.ml @@ -3,7 +3,7 @@ module H = Tiny_httpd.Server module Pool = Tiny_httpd.Pool module Slice = IO.Slice module Log = Tiny_httpd.Log -module Task = Task +module Lwt_direct = Lwt_direct let spf = Printf.sprintf let ( let@ ) = ( @@ ) @@ -37,33 +37,33 @@ let ic_of_fd ~(num_open : int ref) ~bytes (fd : Lwt_unix.file_descr) : assert (sl.len = 0); sl.off <- 0; let n = - Lwt_unix.read fd sl.bytes 0 (Bytes.length sl.bytes) |> Task.await + Lwt_unix.read fd sl.bytes 0 (Bytes.length sl.bytes) |> Lwt_direct.await in sl.len <- n method close () = decr num_open; - if !num_open <= 0 then Lwt_unix.close fd |> Task.await + if !num_open <= 0 then Lwt_unix.close fd |> Lwt_direct.await end let oc_of_fd ~(num_open : int ref) ~bytes (fd : Lwt_unix.file_descr) : IO.Output.t = object inherit IO.Output.t_from_output ~bytes () - (* method flush () : unit = Lwt_io.flush oc |> Task.await *) + (* method flush () : unit = Lwt_io.flush oc |> Lwt_direct.await *) method private output_underlying buf i len = let i = ref i in let len = ref len in while !len > 0 do - let n = Lwt_unix.write fd buf !i !len |> Task.await in + let n = Lwt_unix.write fd buf !i !len |> Lwt_direct.await in i := !i + n; len := !len - n done method private close_underlying () = decr num_open; - if !num_open <= 0 then Lwt_unix.close fd |> Task.await + if !num_open <= 0 then Lwt_unix.close fd |> Lwt_direct.await end let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size @@ -80,7 +80,7 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size | addr, port, None -> let addr = Option.value ~default:"127.0.0.1" addr in let sockaddr, port = - match Lwt_unix.getaddrinfo addr "" [] |> Task.await, port with + match Lwt_unix.getaddrinfo addr "" [] |> Lwt_direct.await, port with | { Unix.ai_addr = ADDR_INET (h, _); _ } :: _, None -> let p = 8080 in Unix.ADDR_INET (h, p), p @@ -115,7 +115,7 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size let port = ref port in let server_loop : unit Lwt.t = - let@ () = Task.run in + let@ () = Lwt_direct.run in let backlog = max_connections in let sock = Lwt_unix.socket ~cloexec:true @@ -126,7 +126,7 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size Lwt_unix.setsockopt_optint sock Unix.SO_LINGER None; Lwt_unix.setsockopt sock Unix.SO_REUSEADDR true; Lwt_unix.setsockopt sock Unix.SO_REUSEPORT true; - Lwt_unix.bind sock sockaddr |> Task.await; + Lwt_unix.bind sock sockaddr |> Lwt_direct.await; Lwt_unix.listen sock backlog; (* recover real port, if any *) @@ -136,8 +136,7 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size let handle_client client_addr fd : unit = Atomic.incr active_conns; - let@ () = Task.run_async in - + Lwt_direct.run_in_the_background @@ fun () -> let cleanup () = Log.debug (fun k -> k "Tiny_httpd_lwt: client handler returned"); @@ -169,7 +168,7 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size in while Atomic.get running do - let fd, addr = Lwt_unix.accept sock |> Task.await in + let fd, addr = Lwt_unix.accept sock |> Lwt_direct.await in handle_client addr fd done in @@ -181,21 +180,21 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size (fun () -> Atomic.set running false; Lwt.wakeup_later set_server_done (); - Task.await server_loop); + Lwt_direct.await server_loop); endpoint = (fun () -> addr, !port); active_connections = (fun () -> Atomic.get active_conns); } in after_init tcp_server; - Task.await server_done); + Lwt_direct.await server_done); } end in (module M) let create ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size ?buf_size ?middlewares () : H.t Lwt.t = - let@ () = Task.run in + let@ () = Lwt_direct.run in let backend = io_backend ?addr ?port ?unix_sock ?max_buf_pool_size ?max_connections ?buf_size () diff --git a/src/lwt/tiny_httpd_lwt.mli b/src/lwt/tiny_httpd_lwt.mli index 4b0fabf5..2ef9fa1c 100644 --- a/src/lwt/tiny_httpd_lwt.mli +++ b/src/lwt/tiny_httpd_lwt.mli @@ -6,7 +6,7 @@ {b NOTE}: this is very experimental and will absolutely change over time, @since NEXT_RELEASE *) -module Task = Task +module Lwt_direct = Lwt_direct type 'a with_args = ?addr:string ->