remove global withlock builder, pass it as argument instead
Some checks failed
github pages / deploy (push) Has been cancelled
build / build (4.08.x, ubuntu-latest) (push) Has been cancelled
build / build (4.14.x, ubuntu-latest) (push) Has been cancelled
build / build (5.03.x, ubuntu-latest) (push) Has been cancelled

This commit is contained in:
Simon Cruanes 2025-06-23 10:08:07 -04:00
parent f10992ec32
commit 9a1343aef7
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
2 changed files with 43 additions and 22 deletions

View file

@ -19,8 +19,6 @@ module With_lock = struct
Mutex.unlock mutex; Mutex.unlock mutex;
raise e); raise e);
} }
let builder : builder ref = ref default_builder
end end
type handler = unit Request.t -> IO.Input.t -> IO.Output.t -> unit type handler = unit Request.t -> IO.Input.t -> IO.Output.t -> unit
@ -78,7 +76,7 @@ module Writer = struct
mutex: With_lock.t; mutex: With_lock.t;
} }
let create ?(buf_size = 16 * 1024) ~oc () : t = let create ?(buf_size = 16 * 1024) ~with_lock ~oc () : t =
{ {
header = Header.create (); header = Header.create ();
header_buf = Bytes.create 16; header_buf = Bytes.create 16;
@ -86,7 +84,7 @@ module Writer = struct
offset = 0; offset = 0;
oc; oc;
closed = false; closed = false;
mutex = !With_lock.builder (); mutex = with_lock;
} }
let[@inline] close self = self.closed <- true let[@inline] close self = self.closed <- true
@ -403,8 +401,8 @@ module Reader = struct
) )
end end
let upgrade ic oc : _ * _ = let upgrade ?(with_lock = With_lock.default_builder ()) ic oc : _ * _ =
let writer = Writer.create ~oc () in let writer = Writer.create ~with_lock ~oc () in
let reader = Reader.create ~ic ~writer () in let reader = Reader.create ~ic ~writer () in
let ws_ic : IO.Input.t = let ws_ic : IO.Input.t =
object object
@ -431,6 +429,7 @@ let upgrade ic oc : _ * _ =
upgrade handler *) upgrade handler *)
module Make_upgrade_handler (X : sig module Make_upgrade_handler (X : sig
val accept_ws_protocol : string -> bool val accept_ws_protocol : string -> bool
val with_lock : With_lock.builder
val handler : handler val handler : handler
end) : Server.UPGRADE_HANDLER with type handshake_state = unit Request.t = end) : Server.UPGRADE_HANDLER with type handshake_state = unit Request.t =
struct struct
@ -475,7 +474,8 @@ struct
try Ok (handshake_ req) with Bad_req s -> Error s try Ok (handshake_ req) with Bad_req s -> Error s
let handle_connection req ic oc = let handle_connection req ic oc =
let ws_ic, ws_oc = upgrade ic oc in let with_lock = X.with_lock () in
let ws_ic, ws_oc = upgrade ~with_lock ic oc in
try X.handler req ws_ic ws_oc try X.handler req ws_ic ws_oc
with Close_connection -> with Close_connection ->
Log.debug (fun k -> k "websocket: requested to close the connection"); Log.debug (fun k -> k "websocket: requested to close the connection");
@ -483,9 +483,11 @@ struct
end end
let add_route_handler ?accept ?(accept_ws_protocol = fun _ -> true) ?middlewares let add_route_handler ?accept ?(accept_ws_protocol = fun _ -> true) ?middlewares
(server : Server.t) route (f : handler) : unit = ?(with_lock = With_lock.default_builder) (server : Server.t) route
(f : handler) : unit =
let module M = Make_upgrade_handler (struct let module M = Make_upgrade_handler (struct
let handler = f let handler = f
let with_lock = with_lock
let accept_ws_protocol = accept_ws_protocol let accept_ws_protocol = accept_ws_protocol
end) in end) in
let up : Server.upgrade_handler = (module M) in let up : Server.upgrade_handler = (module M) in

View file

@ -3,11 +3,36 @@
This sub-library ([tiny_httpd.ws]) exports a small implementation for a This sub-library ([tiny_httpd.ws]) exports a small implementation for a
websocket server. It has no additional dependencies. *) websocket server. It has no additional dependencies. *)
(** Synchronization primitive used to allow both the reader to reply to "ping",
and the handler to send messages, without stepping on each other's toes.
@since NEXT_RELEASE *)
module With_lock : sig
type t = { with_lock: 'a. (unit -> 'a) -> 'a }
(** A primitive to run the callback in a critical section where others cannot
run at the same time.
The default is a mutex, but that works poorly with thread pools so it's
possible to use a semaphore or a cooperative mutex instead. *)
type builder = unit -> t
val default_builder : builder
(** Lock using [Mutex]. *)
end
type handler = unit Request.t -> IO.Input.t -> IO.Output.t -> unit type handler = unit Request.t -> IO.Input.t -> IO.Output.t -> unit
(** Websocket handler *) (** Websocket handler *)
val upgrade : IO.Input.t -> IO.Output.t -> IO.Input.t * IO.Output.t val upgrade :
(** Upgrade a byte stream to the websocket framing protocol. *) ?with_lock:With_lock.t ->
IO.Input.t ->
IO.Output.t ->
IO.Input.t * IO.Output.t
(** Upgrade a byte stream to the websocket framing protocol.
@param with_lock
if provided, use this to prevent reader and writer to compete on sending
frames. since NEXT_RELEASE. *)
exception Close_connection exception Close_connection
(** Exception that can be raised from IOs inside the handler, when the (** Exception that can be raised from IOs inside the handler, when the
@ -17,6 +42,7 @@ val add_route_handler :
?accept:(unit Request.t -> (unit, int * string) result) -> ?accept:(unit Request.t -> (unit, int * string) result) ->
?accept_ws_protocol:(string -> bool) -> ?accept_ws_protocol:(string -> bool) ->
?middlewares:Server.Head_middleware.t list -> ?middlewares:Server.Head_middleware.t list ->
?with_lock:With_lock.builder ->
Server.t -> Server.t ->
(Server.upgrade_handler, Server.upgrade_handler) Route.t -> (Server.upgrade_handler, Server.upgrade_handler) Route.t ->
handler -> handler ->
@ -24,7 +50,11 @@ val add_route_handler :
(** Add a route handler for a websocket endpoint. (** Add a route handler for a websocket endpoint.
@param accept_ws_protocol @param accept_ws_protocol
decides whether this endpoint accepts the websocket protocol sent by the decides whether this endpoint accepts the websocket protocol sent by the
client. Default accepts everything. *) client. Default accepts everything.
@param with_lock
if provided, use this to synchronize writes between the frame reader
(replies "pong" to "ping") and the handler emitting writes. since
NEXT_RELEASE. *)
(**/**) (**/**)
@ -33,15 +63,4 @@ module Private_ : sig
mask_key:bytes -> mask_offset:int -> bytes -> int -> int -> unit mask_key:bytes -> mask_offset:int -> bytes -> int -> int -> unit
end end
(** @since NEXT_RELEASE *)
module With_lock : sig
type t = { with_lock: 'a. (unit -> 'a) -> 'a }
type builder = unit -> t
val default_builder : builder
(** Lock using [Mutex]. *)
val builder : builder ref
end
(**/**) (**/**)