diff --git a/src/chan.ml b/src/chan.ml deleted file mode 100644 index 7042e1da..00000000 --- a/src/chan.ml +++ /dev/null @@ -1,172 +0,0 @@ -module A = Atomic_ - -type 'a or_error = 'a Fut.or_error -type 'a waiter = 'a Fut.promise - -let[@inline] list_is_empty_ = function - | [] -> true - | _ :: _ -> false - -(** Simple functional queue *) -module Q : sig - type 'a t - - val return : 'a -> 'a t - val is_empty : _ t -> bool - - exception Empty - - val pop_exn : 'a t -> 'a * 'a t - val push : 'a t -> 'a -> 'a t -end = struct - type 'a t = { - hd: 'a list; - tl: 'a list; - } - (** Queue containing elements of type 'a. - - invariant: if hd=[], then tl=[] *) - - let[@inline] return x : _ t = { hd = [ x ]; tl = [] } - - let make_ hd tl = - match hd with - | [] -> { hd = List.rev tl; tl = [] } - | _ :: _ -> { hd; tl } - - let[@inline] is_empty q = list_is_empty_ q.hd - let[@inline] push q x : _ t = make_ q.hd (x :: q.tl) - - exception Empty - - let pop_exn q = - match q.hd with - | [] -> - assert (list_is_empty_ q.tl); - raise Empty - | x :: hd' -> - let q' = make_ hd' q.tl in - x, q' -end - -exception Closed - -type 'a state = - | Empty - | St_closed - | Elems of 'a Q.t - | Waiters of 'a waiter list - -type 'a t = { st: 'a state A.t } [@@unboxed] - -let create () : _ t = { st = A.make Empty } - -let[@inline] mk_st_waiters_ l = - match l with - | [] -> Empty - | _ -> Waiters l - -let[@inline] mk_st_elems_ q = - if Q.is_empty q then - Empty - else - Elems q - -let push (self : _ t) x : unit = - while - let old_st = A.get self.st in - match old_st with - | St_closed -> raise Closed - | Empty -> not (A.compare_and_set self.st old_st (Elems (Q.return x))) - | Waiters [] -> assert false - | Waiters (w :: waiters') -> - let new_st = mk_st_waiters_ waiters' in - if A.compare_and_set self.st old_st new_st then ( - Fut.fulfill w (Ok x); - false - ) else - true - | Elems q -> not (A.compare_and_set self.st old_st (Elems (Q.push q x))) - do - Domain_.relax () - done - -let try_pop (type elt) self : elt option = - let module M = struct - exception Found of elt - end in - try - for _i = 1 to 10 do - let old_st = A.get self.st in - match old_st with - | Elems q -> - let x, q' = Q.pop_exn q in - let new_st = mk_st_elems_ q' in - if A.compare_and_set self.st old_st new_st then - raise_notrace (M.Found x) - else - Domain_.relax () - | _ -> raise_notrace Exit - done; - None - with - | M.Found x -> Some x - | Exit -> None - -let pop (type elt) (self : _ t) : elt Fut.t = - let module M = struct - exception Ret of elt - exception Fut of elt Fut.t - end in - try - while - let old_st = A.get self.st in - (match old_st with - | St_closed -> - let bt = Printexc.get_callstack 10 in - raise_notrace (M.Fut (Fut.fail Closed bt)) - | Elems q -> - let x, q' = Q.pop_exn q in - let new_st = mk_st_elems_ q' in - if A.compare_and_set self.st old_st new_st then raise_notrace (M.Ret x) - | Empty -> - let fut, promise = Fut.make () in - let new_st = Waiters [ promise ] in - if A.compare_and_set self.st old_st new_st then - raise_notrace (M.Fut fut) - | Waiters ws -> - let fut, promise = Fut.make () in - let new_st = Waiters (promise :: ws) in - if A.compare_and_set self.st old_st new_st then - raise_notrace (M.Fut fut)); - true - do - Domain_.relax () - done; - assert false - with - | M.Ret x -> Fut.return x - | M.Fut f -> f - -let pop_block_exn (self : 'a t) : 'a = - match try_pop self with - | Some x -> x - | None -> Fut.wait_block_exn @@ pop self - -let close (self : _ t) : unit = - while - let old_st = A.get self.st in - match old_st with - | St_closed -> false - | Elems _ | Empty -> not (A.compare_and_set self.st old_st St_closed) - | Waiters ws -> - if A.compare_and_set self.st old_st St_closed then ( - (* fail all waiters *) - let bt = Printexc.get_callstack 10 in - List.iter (fun w -> Fut.fulfill_idempotent w (Error (Closed, bt))) ws; - false - ) else - true - do - Domain_.relax () - done diff --git a/src/chan.mli b/src/chan.mli deleted file mode 100644 index 80679aeb..00000000 --- a/src/chan.mli +++ /dev/null @@ -1,31 +0,0 @@ -(** Channels. - - Channels are pipelines of values where threads can push into - one end, and pull from the other end. *) - -type 'a or_error = 'a Fut.or_error - -type 'a t -(** Channel carrying values of type ['a] *) - -val create : unit -> 'a t -(** Create a channel. *) - -exception Closed - -val push : 'a t -> 'a -> unit -(** [push chan x] pushes [x] into [chan]. - @raise Closed if the channel is closed. *) - -val pop : 'a t -> 'a Fut.t -(** Pop an element. - @raise Closed if the channel is closed. *) - -val try_pop : 'a t -> 'a option - -val pop_block_exn : 'a t -> 'a -(** Like [pop], but block if an element is not available immediately. *) - -val close : _ t -> unit -(** Close the channel. Further push and pop calls will fail. - This is idempotent. *) diff --git a/src/moonpool.ml b/src/moonpool.ml index ab1a25ed..109cc500 100644 --- a/src/moonpool.ml +++ b/src/moonpool.ml @@ -4,4 +4,3 @@ let start_thread_on_some_domain f x = module Pool = Pool module Fut = Fut -module Chan = Chan diff --git a/src/moonpool.mli b/src/moonpool.mli index 172a5b2f..a96f07e8 100644 --- a/src/moonpool.mli +++ b/src/moonpool.mli @@ -12,4 +12,3 @@ val start_thread_on_some_domain : ('a -> unit) -> 'a -> Thread.t to run all the various threads needed in an application (timers, event loops, etc.) *) module Fut = Fut -module Chan = Chan