feat: tiny_httpd_pool library, with a small thread pool implementation

This commit is contained in:
Simon Cruanes 2022-01-01 21:50:40 -05:00
parent a78c48955b
commit d68142a161
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4
5 changed files with 227 additions and 0 deletions

13
src/pool/dune Normal file
View file

@ -0,0 +1,13 @@
(library
(name tiny_httpd_pool)
(public_name tiny_httpd.pool)
(synopsis "Simple thread pool for tiny_httpd")
(libraries unix threads)
(flags :standard -safe-string -warn-error -a+8))
; produce shims for atomics (before 4.12)
(rule
(targets tiny_httpd_atomic.ml)
(deps ./gen/mkshims.exe)
(action (run ./gen/mkshims.exe)))

3
src/pool/gen/dune Normal file
View file

@ -0,0 +1,3 @@
(executable
(name mkshims))

37
src/pool/gen/mkshims.ml Normal file
View file

@ -0,0 +1,37 @@
let atomic_before_412 = {|
type !'a t = {mutable x: 'a}
let[@inline] make x = {x}
let[@inline] get {x} = x
let[@inline] set r x = r.x <- x
let[@inline] exchange r x =
let y = r.x in
r.x <- x;
y
let[@inline] compare_and_set r seen v =
if r.x == seen then
r.x <- v;
true
else false
let[@inline] fetch_and_add r x =
let v = r.x in
r.x <- x + r.x;
v
let[@inline] incr r = r.x <- 1 + r.x
let[@inline] decr r = r.x <- r.x - 1
|}
let atomic_after_412 = {|include Atomic|}
let write_file file s =
let oc = open_out file in output_string oc s; close_out oc
let () =
let version = Scanf.sscanf Sys.ocaml_version "%d.%d.%s" (fun x y _ -> x,y) in
write_file "tiny_httpd_atomic.ml" (if version >= (4,12) then atomic_after_412 else atomic_before_412);
()

163
src/pool/tiny_httpd_pool.ml Normal file
View file

@ -0,0 +1,163 @@
(* atomics *)
module A = Tiny_httpd_atomic
(* guess how many cores we can use *)
let guess_cpu_count () =
let default = 4 in
try
let cmd = "grep -c processor /proc/cpuinfo" in
let p = Unix.open_process_in cmd in
try
let x = input_line p |> int_of_string in
ignore (Unix.close_process_in p); x
with _ -> ignore (Unix.close_process_in p); default
with _ -> default
(* thread-safe queue.
We mix "Implementing Lock-Free Queues", Valois 1994, with a parking lot
for readers using a normal mutex + condition *)
module Q : sig
type 'a t
val create : dummy:'a -> unit -> 'a t
val push : 'a t -> 'a -> unit
(** Push an element. *)
val pop : 'a t -> 'a
(** pop the first element. Blocks if none is available. *)
end = struct
type 'a node = {
value: 'a;
next: 'a node option A.t;
}
type 'a t = {
head: 'a node A.t;
tail: 'a node A.t;
dummy: 'a;
n_parked: int A.t; (* threads waiting *)
park_lock: Mutex.t;
park_cond: Condition.t;
}
let create ~dummy () : _ t =
let ptr0 = {value=dummy;next=A.make None} in
{ head=A.make ptr0;
tail=A.make ptr0;
dummy;
n_parked=A.make 0;
park_lock=Mutex.create();
park_cond=Condition.create();
}
let push (self:_ t) x : unit =
(* new node to insert at the back *)
let q = {value=x; next=A.make None} in
let ok = ref false in
while not !ok do
let p = A.get self.tail in
ok := A.compare_and_set p.next None (Some q);
if not !ok then (
(* try to ensure progress if another thread takes too long to update [tail] *)
begin match A.get p.next with
| None -> ()
| Some p_next ->
ignore (A.compare_and_set self.tail p p_next : bool)
end;
);
done;
(* if any thread is parked, try to unpark one thread *)
if A.get self.n_parked > 0 then (
Mutex.lock self.park_lock;
Condition.signal self.park_cond;
Mutex.unlock self.park_lock;
)
(* try to pop an element already in the queue *)
let pop_nonblock self : _ option =
let res = ref None in
let continue = ref true in
while !continue do
let p = A.get self.head in
match A.get p.next with
| None ->
continue := false; (* return None, queue is empty *)
| Some p_next ->
let ok = A.compare_and_set self.head p p_next in
if ok then (
res := Some p_next.value;
continue := false;
)
done;
!res
let rec pop (self:'a t) : 'a =
(* be on the safe side: assume we're going to park,
so that if another thread pushes after the "PARK" line it'll unpark us *)
A.incr self.n_parked;
(* try to pop from queue *)
begin match pop_nonblock self with
| Some x ->
A.decr self.n_parked;
x
| None ->
(* PARK *)
Mutex.lock self.park_lock;
Condition.wait self.park_cond self.park_lock;
Mutex.unlock self.park_lock;
A.decr self.n_parked;
(pop [@tailcall]) self
end
end
type task = unit -> unit
type t = {
tasks: task Q.t;
threads: Thread.t array;
active: bool A.t;
}
(* run a task in some background thread *)
let[@inline] run self (f:task) : unit =
Q.push self.tasks f
exception Shutdown
let worker_ (tasks:task Q.t) : unit =
let continue = ref true in
while !continue do
let f = Q.pop tasks in
try f()
with
| Shutdown -> continue := false
| e ->
Printf.eprintf "tiny_httpd_pool: uncaught task exception:\n%s\n%!"
(Printexc.to_string e)
done
let max_threads_ = 256
let create ?(j=guess_cpu_count()) () : t =
let j = min (max j 2) max_threads_ in
Printf.eprintf "pool: %d threads\n%!" j;
let tasks = Q.create ~dummy:(fun()->assert false) () in
let threads = Array.init j (fun _ -> Thread.create worker_ tasks) in
{ tasks; threads; active=A.make true; }
let shutdown self =
(* [if self.active then self.active <- false; …] *)
if A.compare_and_set self.active true false then (
for _i=1 to Array.length self.threads do
run self (fun () -> raise Shutdown)
done;
Array.iter Thread.join self.threads
)

View file

@ -0,0 +1,11 @@
type t
(** A thread pool. *)
val create : ?j:int -> unit -> t
val run : t -> (unit -> unit) -> unit
(** [run pool f] schedules the task [f()] to be run in the pool
when a worker thread becomes available. *)
val shutdown : t -> unit