diff --git a/src/ws/tiny_httpd_ws.ml b/src/ws/tiny_httpd_ws.ml index a0cf01e1..3c4fdf03 100644 --- a/src/ws/tiny_httpd_ws.ml +++ b/src/ws/tiny_httpd_ws.ml @@ -19,8 +19,6 @@ module With_lock = struct Mutex.unlock mutex; raise e); } - - let builder : builder ref = ref default_builder end type handler = unit Request.t -> IO.Input.t -> IO.Output.t -> unit @@ -78,7 +76,7 @@ module Writer = struct mutex: With_lock.t; } - let create ?(buf_size = 16 * 1024) ~oc () : t = + let create ?(buf_size = 16 * 1024) ~with_lock ~oc () : t = { header = Header.create (); header_buf = Bytes.create 16; @@ -86,7 +84,7 @@ module Writer = struct offset = 0; oc; closed = false; - mutex = !With_lock.builder (); + mutex = with_lock; } let[@inline] close self = self.closed <- true @@ -403,8 +401,8 @@ module Reader = struct ) end -let upgrade ic oc : _ * _ = - let writer = Writer.create ~oc () in +let upgrade ?(with_lock = With_lock.default_builder ()) ic oc : _ * _ = + let writer = Writer.create ~with_lock ~oc () in let reader = Reader.create ~ic ~writer () in let ws_ic : IO.Input.t = object @@ -431,6 +429,7 @@ let upgrade ic oc : _ * _ = upgrade handler *) module Make_upgrade_handler (X : sig val accept_ws_protocol : string -> bool + val with_lock : With_lock.builder val handler : handler end) : Server.UPGRADE_HANDLER with type handshake_state = unit Request.t = struct @@ -475,7 +474,8 @@ struct try Ok (handshake_ req) with Bad_req s -> Error s let handle_connection req ic oc = - let ws_ic, ws_oc = upgrade ic oc in + let with_lock = X.with_lock () in + let ws_ic, ws_oc = upgrade ~with_lock ic oc in try X.handler req ws_ic ws_oc with Close_connection -> Log.debug (fun k -> k "websocket: requested to close the connection"); @@ -483,9 +483,11 @@ struct end let add_route_handler ?accept ?(accept_ws_protocol = fun _ -> true) ?middlewares - (server : Server.t) route (f : handler) : unit = + ?(with_lock = With_lock.default_builder) (server : Server.t) route + (f : handler) : unit = let module M = Make_upgrade_handler (struct let handler = f + let with_lock = with_lock let accept_ws_protocol = accept_ws_protocol end) in let up : Server.upgrade_handler = (module M) in diff --git a/src/ws/tiny_httpd_ws.mli b/src/ws/tiny_httpd_ws.mli index 4066b2b4..30120454 100644 --- a/src/ws/tiny_httpd_ws.mli +++ b/src/ws/tiny_httpd_ws.mli @@ -3,11 +3,36 @@ This sub-library ([tiny_httpd.ws]) exports a small implementation for a websocket server. It has no additional dependencies. *) +(** Synchronization primitive used to allow both the reader to reply to "ping", + and the handler to send messages, without stepping on each other's toes. + + @since NEXT_RELEASE *) +module With_lock : sig + type t = { with_lock: 'a. (unit -> 'a) -> 'a } + (** A primitive to run the callback in a critical section where others cannot + run at the same time. + + The default is a mutex, but that works poorly with thread pools so it's + possible to use a semaphore or a cooperative mutex instead. *) + + type builder = unit -> t + + val default_builder : builder + (** Lock using [Mutex]. *) +end + type handler = unit Request.t -> IO.Input.t -> IO.Output.t -> unit (** Websocket handler *) -val upgrade : IO.Input.t -> IO.Output.t -> IO.Input.t * IO.Output.t -(** Upgrade a byte stream to the websocket framing protocol. *) +val upgrade : + ?with_lock:With_lock.t -> + IO.Input.t -> + IO.Output.t -> + IO.Input.t * IO.Output.t +(** Upgrade a byte stream to the websocket framing protocol. + @param with_lock + if provided, use this to prevent reader and writer to compete on sending + frames. since NEXT_RELEASE. *) exception Close_connection (** Exception that can be raised from IOs inside the handler, when the @@ -17,6 +42,7 @@ val add_route_handler : ?accept:(unit Request.t -> (unit, int * string) result) -> ?accept_ws_protocol:(string -> bool) -> ?middlewares:Server.Head_middleware.t list -> + ?with_lock:With_lock.builder -> Server.t -> (Server.upgrade_handler, Server.upgrade_handler) Route.t -> handler -> @@ -24,7 +50,11 @@ val add_route_handler : (** Add a route handler for a websocket endpoint. @param accept_ws_protocol decides whether this endpoint accepts the websocket protocol sent by the - client. Default accepts everything. *) + client. Default accepts everything. + @param with_lock + if provided, use this to synchronize writes between the frame reader + (replies "pong" to "ping") and the handler emitting writes. since + NEXT_RELEASE. *) (**/**) @@ -33,15 +63,4 @@ module Private_ : sig mask_key:bytes -> mask_offset:int -> bytes -> int -> int -> unit end -(** @since NEXT_RELEASE *) -module With_lock : sig - type t = { with_lock: 'a. (unit -> 'a) -> 'a } - type builder = unit -> t - - val default_builder : builder - (** Lock using [Mutex]. *) - - val builder : builder ref -end - (**/**)