use lwt_direct from lwt PR

This commit is contained in:
Simon Cruanes 2025-07-08 10:32:46 -04:00
parent 029c558802
commit 3014046a8a
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
6 changed files with 85 additions and 44 deletions

View file

@ -1,7 +1,7 @@
open Tiny_httpd_core
module Log = Tiny_httpd.Log
module MFD = Tiny_httpd_multipart_form_data
module Task = Tiny_httpd_lwt.Task
module Lwt_direct = Tiny_httpd_lwt.Lwt_direct
let now_ = Unix.gettimeofday
@ -151,11 +151,11 @@ let () =
let ev = new Lwt_engine.libev () in
Lwt_engine.set ev;
Lwt_main.run @@ Task.run
Lwt_main.run @@ Lwt_direct.run
@@ fun () ->
let server =
Tiny_httpd_lwt.create ~addr:!addr ~port:!port_ ~max_connections:!j ()
|> Task.await
|> Lwt_direct.await
in
Tiny_httpd_camlzip.setup ~compress_above:1024 ~buf_size:(16 * 1024) server;

View file

@ -1,17 +1,16 @@
module ED = Effect.Deep
type _ Effect.t += Await : 'a Lwt.t -> 'a Effect.t
type _ Effect.t += Await : 'a Lwt.t -> 'a Effect.t | Yield : unit Effect.t
(** Queue of microtasks that are ready *)
let tasks : (unit -> unit) Queue.t = Queue.create ()
let[@inline] push_task f : unit = Queue.push f tasks
let on_uncaught_exn : (exn -> Printexc.raw_backtrace -> unit) ref =
ref (fun exn bt ->
Printf.eprintf "lwt_task: uncaught task exception:\n%s\n%s\n%!"
(Printexc.to_string exn)
(Printexc.raw_backtrace_to_string bt))
let default_on_uncaught_exn exn bt =
Printf.eprintf "lwt_task: uncaught task exception:\n%s\n%s\n%!"
(Printexc.to_string exn)
(Printexc.raw_backtrace_to_string bt)
let run_all_tasks () : unit =
let n_processed = ref 0 in
@ -22,16 +21,21 @@ let run_all_tasks () : unit =
try t ()
with exn ->
let bt = Printexc.get_raw_backtrace () in
!on_uncaught_exn exn bt
default_on_uncaught_exn exn bt
done;
(* make sure we don't sleep forever if there's no lwt promise
ready but [tasks] contains ready tasks *)
if not (Queue.is_empty tasks) then ignore (Lwt.pause () : unit Lwt.t)
let () =
let _hook1 = Lwt_main.Enter_iter_hooks.add_first run_all_tasks in
let _hook2 = Lwt_main.Leave_iter_hooks.add_first run_all_tasks in
()
let setup_hooks =
let already_done = ref false in
fun () ->
if not !already_done then (
already_done := true;
let _hook1 = Lwt_main.Enter_iter_hooks.add_first run_all_tasks in
let _hook2 = Lwt_main.Leave_iter_hooks.add_first run_all_tasks in
()
)
let await (fut : 'a Lwt.t) : 'a =
match Lwt.state fut with
@ -39,10 +43,13 @@ let await (fut : 'a Lwt.t) : 'a =
| Lwt.Fail exn -> raise exn
| Lwt.Sleep -> Effect.perform (Await fut)
let yield () : unit = Effect.perform Yield
(** the main effect handler *)
let handler : _ ED.effect_handler =
let effc : type b. b Effect.t -> ((b, unit) ED.continuation -> 'a) option =
function
| Yield -> Some (fun k -> push_task (fun () -> ED.continue k ()))
| Await fut ->
Some
(fun k ->
@ -51,10 +58,10 @@ let handler : _ ED.effect_handler =
(fun exn -> push_task (fun () -> ED.discontinue k exn)))
| _ -> None
in
{ effc }
let run_inside_effect_handler_ (type a) (promise : a Lwt.u) f () : unit =
let run_inside_effect_handler_and_resolve_ (type a) (promise : a Lwt.u) f () :
unit =
let res = ref (Error (Failure "not resolved")) in
let run_f_and_set_res () =
(try
@ -66,10 +73,21 @@ let run_inside_effect_handler_ (type a) (promise : a Lwt.u) f () : unit =
ED.try_with run_f_and_set_res () handler
let run f : _ Lwt.t =
setup_hooks ();
let lwt, resolve = Lwt.wait () in
push_task (run_inside_effect_handler_ resolve f);
push_task (run_inside_effect_handler_and_resolve_ resolve f);
lwt
let run_async f : unit = ignore (run f : unit Lwt.t)
let run_inside_effect_handler_in_the_background_ ~on_uncaught_exn f () : unit =
let run_f () : unit =
try f ()
with exn ->
let bt = Printexc.get_raw_backtrace () in
on_uncaught_exn exn bt
in
ED.try_with run_f () handler
(* TODO: yield, use that in loops? *)
let run_in_the_background ?(on_uncaught_exn = default_on_uncaught_exn) f : unit
=
setup_hooks ();
push_task (run_inside_effect_handler_in_the_background_ ~on_uncaught_exn f)

33
src/lwt/lwt_direct.mli Normal file
View file

@ -0,0 +1,33 @@
(** Direct style control flow for Lwt. *)
val run : (unit -> 'a) -> 'a Lwt.t
(** [run f] runs the function [f ()] in a task within
the [Lwt_unix] event loop. [f ()] can create [Lwt]
promises and use {!await} to wait for them. Like any promise
in Lwt, [f ()] can starve the event loop if it runs long computations
without yielding to the event loop.
When [f ()] terminates (successfully or not), the promise
[run f] is resolved with [f ()]'s result, or the exception
raised by [f ()]. *)
val run_in_the_background :
?on_uncaught_exn:(exn -> Printexc.raw_backtrace -> unit) ->
(unit -> unit) ->
unit
(** [run_in_the_background f] is similar to [ignore (run f)].
The computation [f()] runs in the background in the event loop
and returns no result.
@param on_uncaught_exn if provided, this is called when [f()]
raises an exception. *)
val yield : unit -> unit
(** Yield to the event loop.
Can only be used inside {!run} or {!run_in_the_background}. *)
val await : 'a Lwt.t -> 'a
(** [await prom] returns the result of [prom], or re-raises the
exception with which [prom] failed if it failed.
If [prom] is not resolved yet, [await prom] will suspend the
current task and resume it when [prom] is resolved.
Can only be used inside {!run} or {!run_in_the_background}. *)

View file

@ -1,9 +0,0 @@
(** Direct style tasks for Lwt *)
val run : (unit -> 'a) -> 'a Lwt.t
(** Run a microtask *)
val run_async : (unit -> unit) -> unit
val await : 'a Lwt.t -> 'a
(** Can only be used inside {!run} *)

View file

@ -3,7 +3,7 @@ module H = Tiny_httpd.Server
module Pool = Tiny_httpd.Pool
module Slice = IO.Slice
module Log = Tiny_httpd.Log
module Task = Task
module Lwt_direct = Lwt_direct
let spf = Printf.sprintf
let ( let@ ) = ( @@ )
@ -37,33 +37,33 @@ let ic_of_fd ~(num_open : int ref) ~bytes (fd : Lwt_unix.file_descr) :
assert (sl.len = 0);
sl.off <- 0;
let n =
Lwt_unix.read fd sl.bytes 0 (Bytes.length sl.bytes) |> Task.await
Lwt_unix.read fd sl.bytes 0 (Bytes.length sl.bytes) |> Lwt_direct.await
in
sl.len <- n
method close () =
decr num_open;
if !num_open <= 0 then Lwt_unix.close fd |> Task.await
if !num_open <= 0 then Lwt_unix.close fd |> Lwt_direct.await
end
let oc_of_fd ~(num_open : int ref) ~bytes (fd : Lwt_unix.file_descr) :
IO.Output.t =
object
inherit IO.Output.t_from_output ~bytes ()
(* method flush () : unit = Lwt_io.flush oc |> Task.await *)
(* method flush () : unit = Lwt_io.flush oc |> Lwt_direct.await *)
method private output_underlying buf i len =
let i = ref i in
let len = ref len in
while !len > 0 do
let n = Lwt_unix.write fd buf !i !len |> Task.await in
let n = Lwt_unix.write fd buf !i !len |> Lwt_direct.await in
i := !i + n;
len := !len - n
done
method private close_underlying () =
decr num_open;
if !num_open <= 0 then Lwt_unix.close fd |> Task.await
if !num_open <= 0 then Lwt_unix.close fd |> Lwt_direct.await
end
let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
@ -80,7 +80,7 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
| addr, port, None ->
let addr = Option.value ~default:"127.0.0.1" addr in
let sockaddr, port =
match Lwt_unix.getaddrinfo addr "" [] |> Task.await, port with
match Lwt_unix.getaddrinfo addr "" [] |> Lwt_direct.await, port with
| { Unix.ai_addr = ADDR_INET (h, _); _ } :: _, None ->
let p = 8080 in
Unix.ADDR_INET (h, p), p
@ -115,7 +115,7 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
let port = ref port in
let server_loop : unit Lwt.t =
let@ () = Task.run in
let@ () = Lwt_direct.run in
let backlog = max_connections in
let sock =
Lwt_unix.socket ~cloexec:true
@ -126,7 +126,7 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
Lwt_unix.setsockopt_optint sock Unix.SO_LINGER None;
Lwt_unix.setsockopt sock Unix.SO_REUSEADDR true;
Lwt_unix.setsockopt sock Unix.SO_REUSEPORT true;
Lwt_unix.bind sock sockaddr |> Task.await;
Lwt_unix.bind sock sockaddr |> Lwt_direct.await;
Lwt_unix.listen sock backlog;
(* recover real port, if any *)
@ -136,8 +136,7 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
let handle_client client_addr fd : unit =
Atomic.incr active_conns;
let@ () = Task.run_async in
Lwt_direct.run_in_the_background @@ fun () ->
let cleanup () =
Log.debug (fun k ->
k "Tiny_httpd_lwt: client handler returned");
@ -169,7 +168,7 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
in
while Atomic.get running do
let fd, addr = Lwt_unix.accept sock |> Task.await in
let fd, addr = Lwt_unix.accept sock |> Lwt_direct.await in
handle_client addr fd
done
in
@ -181,21 +180,21 @@ let io_backend ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size
(fun () ->
Atomic.set running false;
Lwt.wakeup_later set_server_done ();
Task.await server_loop);
Lwt_direct.await server_loop);
endpoint = (fun () -> addr, !port);
active_connections = (fun () -> Atomic.get active_conns);
}
in
after_init tcp_server;
Task.await server_done);
Lwt_direct.await server_done);
}
end in
(module M)
let create ?addr ?port ?unix_sock ?max_connections ?max_buf_pool_size ?buf_size
?middlewares () : H.t Lwt.t =
let@ () = Task.run in
let@ () = Lwt_direct.run in
let backend =
io_backend ?addr ?port ?unix_sock ?max_buf_pool_size ?max_connections
?buf_size ()

View file

@ -6,7 +6,7 @@
{b NOTE}: this is very experimental and will absolutely change over time,
@since NEXT_RELEASE *)
module Task = Task
module Lwt_direct = Lwt_direct
type 'a with_args =
?addr:string ->