From d68142a16127beb50e76023f372e09a6498868f7 Mon Sep 17 00:00:00 2001 From: Simon Cruanes Date: Sat, 1 Jan 2022 21:50:40 -0500 Subject: [PATCH] feat: tiny_httpd_pool library, with a small thread pool implementation --- src/pool/dune | 13 +++ src/pool/gen/dune | 3 + src/pool/gen/mkshims.ml | 37 ++++++++ src/pool/tiny_httpd_pool.ml | 163 +++++++++++++++++++++++++++++++++++ src/pool/tiny_httpd_pool.mli | 11 +++ 5 files changed, 227 insertions(+) create mode 100644 src/pool/dune create mode 100644 src/pool/gen/dune create mode 100644 src/pool/gen/mkshims.ml create mode 100644 src/pool/tiny_httpd_pool.ml create mode 100644 src/pool/tiny_httpd_pool.mli diff --git a/src/pool/dune b/src/pool/dune new file mode 100644 index 00000000..7516da52 --- /dev/null +++ b/src/pool/dune @@ -0,0 +1,13 @@ + +(library + (name tiny_httpd_pool) + (public_name tiny_httpd.pool) + (synopsis "Simple thread pool for tiny_httpd") + (libraries unix threads) + (flags :standard -safe-string -warn-error -a+8)) + +; produce shims for atomics (before 4.12) +(rule + (targets tiny_httpd_atomic.ml) + (deps ./gen/mkshims.exe) + (action (run ./gen/mkshims.exe))) diff --git a/src/pool/gen/dune b/src/pool/gen/dune new file mode 100644 index 00000000..c36f7e11 --- /dev/null +++ b/src/pool/gen/dune @@ -0,0 +1,3 @@ +(executable + (name mkshims)) + diff --git a/src/pool/gen/mkshims.ml b/src/pool/gen/mkshims.ml new file mode 100644 index 00000000..97c7745a --- /dev/null +++ b/src/pool/gen/mkshims.ml @@ -0,0 +1,37 @@ + +let atomic_before_412 = {| + type !'a t = {mutable x: 'a} + let[@inline] make x = {x} + let[@inline] get {x} = x + let[@inline] set r x = r.x <- x + let[@inline] exchange r x = + let y = r.x in + r.x <- x; + y + + let[@inline] compare_and_set r seen v = + if r.x == seen then + r.x <- v; + true + else false + + let[@inline] fetch_and_add r x = + let v = r.x in + r.x <- x + r.x; + v + + let[@inline] incr r = r.x <- 1 + r.x + let[@inline] decr r = r.x <- r.x - 1 + |} + +let atomic_after_412 = {|include Atomic|} + +let write_file file s = + let oc = open_out file in output_string oc s; close_out oc + +let () = + let version = Scanf.sscanf Sys.ocaml_version "%d.%d.%s" (fun x y _ -> x,y) in + write_file "tiny_httpd_atomic.ml" (if version >= (4,12) then atomic_after_412 else atomic_before_412); + () + + diff --git a/src/pool/tiny_httpd_pool.ml b/src/pool/tiny_httpd_pool.ml new file mode 100644 index 00000000..b7278375 --- /dev/null +++ b/src/pool/tiny_httpd_pool.ml @@ -0,0 +1,163 @@ + +(* atomics *) +module A = Tiny_httpd_atomic + +(* guess how many cores we can use *) +let guess_cpu_count () = + let default = 4 in + try + let cmd = "grep -c processor /proc/cpuinfo" in + let p = Unix.open_process_in cmd in + try + let x = input_line p |> int_of_string in + ignore (Unix.close_process_in p); x + with _ -> ignore (Unix.close_process_in p); default + with _ -> default + + +(* thread-safe queue. + We mix "Implementing Lock-Free Queues", Valois 1994, with a parking lot + for readers using a normal mutex + condition *) +module Q : sig + type 'a t + + val create : dummy:'a -> unit -> 'a t + + val push : 'a t -> 'a -> unit + (** Push an element. *) + + val pop : 'a t -> 'a + (** pop the first element. Blocks if none is available. *) +end = struct + type 'a node = { + value: 'a; + next: 'a node option A.t; + } + + type 'a t = { + head: 'a node A.t; + tail: 'a node A.t; + dummy: 'a; + + n_parked: int A.t; (* threads waiting *) + park_lock: Mutex.t; + park_cond: Condition.t; + } + + let create ~dummy () : _ t = + let ptr0 = {value=dummy;next=A.make None} in + { head=A.make ptr0; + tail=A.make ptr0; + dummy; + n_parked=A.make 0; + park_lock=Mutex.create(); + park_cond=Condition.create(); + } + + let push (self:_ t) x : unit = + (* new node to insert at the back *) + let q = {value=x; next=A.make None} in + + let ok = ref false in + while not !ok do + let p = A.get self.tail in + ok := A.compare_and_set p.next None (Some q); + if not !ok then ( + (* try to ensure progress if another thread takes too long to update [tail] *) + begin match A.get p.next with + | None -> () + | Some p_next -> + ignore (A.compare_and_set self.tail p p_next : bool) + end; + ); + done; + + (* if any thread is parked, try to unpark one thread *) + if A.get self.n_parked > 0 then ( + Mutex.lock self.park_lock; + Condition.signal self.park_cond; + Mutex.unlock self.park_lock; + ) + + (* try to pop an element already in the queue *) + let pop_nonblock self : _ option = + let res = ref None in + + let continue = ref true in + while !continue do + let p = A.get self.head in + match A.get p.next with + | None -> + continue := false; (* return None, queue is empty *) + | Some p_next -> + let ok = A.compare_and_set self.head p p_next in + if ok then ( + res := Some p_next.value; + continue := false; + ) + done; + !res + + let rec pop (self:'a t) : 'a = + (* be on the safe side: assume we're going to park, + so that if another thread pushes after the "PARK" line it'll unpark us *) + A.incr self.n_parked; + + (* try to pop from queue *) + begin match pop_nonblock self with + | Some x -> + A.decr self.n_parked; + x + | None -> + (* PARK *) + Mutex.lock self.park_lock; + Condition.wait self.park_cond self.park_lock; + Mutex.unlock self.park_lock; + A.decr self.n_parked; + (pop [@tailcall]) self + end +end + +type task = unit -> unit + +type t = { + tasks: task Q.t; + threads: Thread.t array; + active: bool A.t; +} + +(* run a task in some background thread *) +let[@inline] run self (f:task) : unit = + Q.push self.tasks f + +exception Shutdown + +let worker_ (tasks:task Q.t) : unit = + let continue = ref true in + while !continue do + let f = Q.pop tasks in + try f() + with + | Shutdown -> continue := false + | e -> + Printf.eprintf "tiny_httpd_pool: uncaught task exception:\n%s\n%!" + (Printexc.to_string e) + done + +let max_threads_ = 256 +let create ?(j=guess_cpu_count()) () : t = + let j = min (max j 2) max_threads_ in + Printf.eprintf "pool: %d threads\n%!" j; + let tasks = Q.create ~dummy:(fun()->assert false) () in + let threads = Array.init j (fun _ -> Thread.create worker_ tasks) in + { tasks; threads; active=A.make true; } + +let shutdown self = + (* [if self.active then self.active <- false; …] *) + if A.compare_and_set self.active true false then ( + for _i=1 to Array.length self.threads do + run self (fun () -> raise Shutdown) + done; + Array.iter Thread.join self.threads + ) + diff --git a/src/pool/tiny_httpd_pool.mli b/src/pool/tiny_httpd_pool.mli new file mode 100644 index 00000000..d14842b9 --- /dev/null +++ b/src/pool/tiny_httpd_pool.mli @@ -0,0 +1,11 @@ + +type t +(** A thread pool. *) + +val create : ?j:int -> unit -> t + +val run : t -> (unit -> unit) -> unit +(** [run pool f] schedules the task [f()] to be run in the pool + when a worker thread becomes available. *) + +val shutdown : t -> unit