diff --git a/dune-project b/dune-project index ff2150c..4035343 100644 --- a/dune-project +++ b/dune-project @@ -20,9 +20,14 @@ (synopsis "Tiny event loop abstraction") (depends ocaml dune base-unix) (depopts - (trace (>= 0.7)) + (trace + (>= 0.7)) + (iostream + (>= 0.3)) (picos - (and (>= 0.5) (< 0.7)))) + (and + (>= 0.5) + (< 0.7)))) (tags (unix select async))) @@ -40,9 +45,12 @@ ocaml dune nanoev - (picos (>= 0.6)) + (picos + (>= 0.6)) picos_std - (tiny_httpd (>= 0.17))) - (tags (nanoev http))) + (tiny_httpd + (>= 0.17))) + (tags + (nanoev http))) ; See the complete stanza docs at https://dune.readthedocs.io/en/stable/reference/dune-project/index.html diff --git a/nanoev.opam b/nanoev.opam index 9fa7ba9..9aeadaa 100644 --- a/nanoev.opam +++ b/nanoev.opam @@ -15,6 +15,7 @@ depends: [ ] depopts: [ "trace" {>= "0.7"} + "iostream" {>= "0.3"} "picos" {>= "0.5" & < "0.7"} ] build: [ diff --git a/src/picos/IO_in.ml b/src/picos/IO_in.ml new file mode 100644 index 0000000..6184eba --- /dev/null +++ b/src/picos/IO_in.ml @@ -0,0 +1,152 @@ +open Common_ + +class type t = object + method input : bytes -> int -> int -> int + (** Read into the slice. Returns [0] only if the stream is closed. *) + + method close : unit -> unit + (** Close the input. Must be idempotent. *) +end + +let create ?(close = ignore) ~input () : t = + object + method close = close + method input = input + end + +let empty : t = + object + method close () = () + method input _ _ _ = 0 + end + +let of_bytes ?(off = 0) ?len (b : bytes) : t = + (* i: current position in [b] *) + let i = ref off in + + let len = + match len with + | Some n -> + if n > Bytes.length b - off then invalid_arg "Iostream.In.of_bytes"; + n + | None -> Bytes.length b - off + in + let end_ = off + len in + + object + method input b_out i_out len_out = + let n = min (end_ - !i) len_out in + Bytes.blit b !i b_out i_out n; + i := !i + n; + n + + method close () = i := end_ + end + +let of_string ?off ?len s : t = of_bytes ?off ?len (Bytes.unsafe_of_string s) + +(** Read into the given slice. + @return the number of bytes read, [0] means end of input. *) +let[@inline] input (self : #t) buf i len = self#input buf i len + +(** Close the channel. *) +let[@inline] close self : unit = self#close () + +let rec really_input (self : #t) buf i len = + if len > 0 then ( + let n = input self buf i len in + if n = 0 then raise End_of_file; + (really_input [@tailrec]) self buf (i + n) (len - n) + ) + +let really_input_string self n : string = + let buf = Bytes.create n in + really_input self buf 0 n; + Bytes.unsafe_to_string buf + +let copy_into ?(buf = Bytes.create _default_buf_size) (ic : #t) (oc : IO_out.t) + : unit = + let continue = ref true in + while !continue do + let len = input ic buf 0 (Bytes.length buf) in + if len = 0 then + continue := false + else + IO_out.output oc buf 0 len + done + +let concat (l0 : t list) : t = + let l = ref l0 in + let rec input b i len : int = + match !l with + | [] -> 0 + | ic :: tl -> + let n = ic#input b i len in + if n > 0 then + n + else ( + l := tl; + input b i len + ) + in + let close () = List.iter close l0 in + create ~close ~input () + +let input_all ?(buf = Bytes.create 128) (self : #t) : string = + let buf = ref buf in + let i = ref 0 in + + let[@inline] full_ () = !i = Bytes.length !buf in + + let grow_ () = + let old_size = Bytes.length !buf in + let new_size = min Sys.max_string_length (old_size + (old_size / 4) + 10) in + if old_size = new_size then + failwith "input_all: maximum input size exceeded"; + let new_buf = Bytes.extend !buf 0 (new_size - old_size) in + buf := new_buf + in + + let rec loop () = + if full_ () then grow_ (); + let available = Bytes.length !buf - !i in + let n = input self !buf !i available in + if n > 0 then ( + i := !i + n; + (loop [@tailrec]) () + ) + in + loop (); + + if full_ () then + Bytes.unsafe_to_string !buf + else + Bytes.sub_string !buf 0 !i + +let of_unix_fd ?(close_noerr = false) ?(buf = Bytes.create _default_buf_size) + (fd : Unix.file_descr) : t = + let buf_len = ref 0 in + let buf_off = ref 0 in + + let refill () = + buf_off := 0; + buf_len := Base.read fd buf 0 (Bytes.length buf) + in + + object + method input b i len : int = + if !buf_len = 0 then refill (); + let n = min len !buf_len in + if n > 0 then ( + Bytes.blit buf !buf_off b i n; + buf_off := !buf_off + n; + buf_len := !buf_len - n + ); + n + + method close () = + if close_noerr then ( + try Unix.close fd with _ -> () + ) else + Unix.close fd + end diff --git a/src/picos/IO_out.ml b/src/picos/IO_out.ml new file mode 100644 index 0000000..b98525f --- /dev/null +++ b/src/picos/IO_out.ml @@ -0,0 +1,118 @@ +open Common_ + +class type t = object + method output_char : char -> unit + method output : bytes -> int -> int -> unit + method flush : unit -> unit + method close : unit -> unit +end + +let create ?(flush = ignore) ?(close = ignore) ~output_char ~output () : t = + object + method flush () = flush () + method close () = close () + method output_char c = output_char c + method output bs i len = output bs i len + end + +let dummy : t = + object + method flush () = () + method close () = () + method output_char _ = () + method output _ _ _ = () + end + +let of_unix_fd ?(close_noerr = false) ?(buf = Bytes.create _default_buf_size) fd + : t = + let buf_off = ref 0 in + + let[@inline] is_full () = !buf_off = Bytes.length buf in + + let flush () = + if !buf_off > 0 then ( + Base.write fd buf 0 !buf_off; + buf_off := 0 + ) + in + + object + method output_char c = + if is_full () then flush (); + Bytes.set buf !buf_off c; + incr buf_off + + method output bs i len : unit = + let i = ref i in + let len = ref len in + + while !len > 0 do + (* make space *) + if is_full () then flush (); + + let n = min !len (Bytes.length buf - !buf_off) in + Bytes.blit bs !i buf !buf_off n; + buf_off := !buf_off + n; + i := !i + n; + len := !len - n + done; + (* if full, write eagerly *) + if is_full () then flush () + + method close () = + if close_noerr then ( + try + flush (); + Unix.close fd + with _ -> () + ) else ( + flush (); + Unix.close fd + ) + + method flush = flush + end + +let of_buffer (buf : Buffer.t) : t = + object + method close () = () + method flush () = () + method output_char c = Buffer.add_char buf c + method output bs i len = Buffer.add_subbytes buf bs i len + end + +(** Output the buffer slice into this channel *) +let[@inline] output_char (self : #t) c : unit = self#output_char c + +(** Output the buffer slice into this channel *) +let[@inline] output (self : #t) buf i len : unit = self#output buf i len + +let[@inline] output_string (self : #t) (str : string) : unit = + self#output (Bytes.unsafe_of_string str) 0 (String.length str) + +let output_line (self : #t) (str : string) : unit = + output_string self str; + output_char self '\n' + +(** Close the channel. *) +let[@inline] close self : unit = self#close () + +(** Flush (ie. force write) any buffered bytes. *) +let[@inline] flush self : unit = self#flush () + +let output_int self i = + let s = string_of_int i in + output_string self s + +let output_lines self seq = Seq.iter (output_line self) seq + +let tee (l : t list) : t = + match l with + | [] -> dummy + | [ oc ] -> oc + | _ -> + let output bs i len = List.iter (fun oc -> output oc bs i len) l in + let output_char c = List.iter (fun oc -> output_char oc c) l in + let close () = List.iter close l in + let flush () = List.iter flush l in + create ~flush ~close ~output ~output_char () diff --git a/src/picos/background_thread.ml b/src/picos/background_thread.ml new file mode 100644 index 0000000..261daad --- /dev/null +++ b/src/picos/background_thread.ml @@ -0,0 +1,7 @@ +let is_setup = Global_.has_bg_thread +let setup = Global_.setup_bg_thread +let shutdown = Global_.shutdown_bg_thread + +let with_setup ev f = + setup ev; + Fun.protect ~finally:shutdown f diff --git a/src/picos/background_thread.mli b/src/picos/background_thread.mli new file mode 100644 index 0000000..6e4a531 --- /dev/null +++ b/src/picos/background_thread.mli @@ -0,0 +1,10 @@ +val setup : Nanoev.t -> unit +(** Install this event loop in a background thread *) + +val shutdown : unit -> unit +(** Shutdown background thread, assuming {! is_setup} returns [true] *) + +val with_setup : Nanoev.t -> (unit -> 'a) -> 'a + +val is_setup : unit -> bool +(** [is_setup()] is [true] iff a background thread is running a nanoev loop *) diff --git a/src/picos/base.ml b/src/picos/base.ml new file mode 100644 index 0000000..a34577f --- /dev/null +++ b/src/picos/base.ml @@ -0,0 +1,96 @@ +open Common_ + +let[@inline] get_loop_exn_ () : Nanoev.t = + match Atomic.get Global_.st with + | None -> failwith "No nanoev loop installed." + | Some st -> st.nanoev + +let[@inline] unwrap_ = function + | None -> () + | Some (exn, bt) -> Printexc.raise_with_backtrace exn bt + +let[@unroll 1] rec retry_read_ fd f = + match f () with + | res -> res + | exception + Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) -> + (* Trace_.message "read must wait"; *) + let trigger = Picos.Trigger.create () in + let closed_r = ref false in + let ev = get_loop_exn_ () in + Nanoev.on_readable ev fd trigger closed_r (fun ~closed trigger closed_r -> + closed_r := closed; + Picos.Trigger.signal trigger); + Picos.Trigger.await trigger |> unwrap_; + if !closed_r then raise Closed; + retry_read_ fd f + +let[@unroll 1] rec retry_write_ fd f = + match f () with + | res -> res + | exception + Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) -> + (* Trace_.message "write must wait"; *) + let ev = get_loop_exn_ () in + let trigger = Picos.Trigger.create () in + let closed_r = ref false in + Nanoev.on_writable ev fd trigger closed_r (fun ~closed trigger closed_r -> + closed_r := closed; + Picos.Trigger.signal trigger); + Picos.Trigger.await trigger |> unwrap_; + if !closed_r then raise Closed; + retry_write_ fd f + +let read fd buf i len : int = + try + retry_read_ fd (fun () -> + (* Trace_.message "read"; *) + Unix.read fd buf i len) + with Closed -> 0 + +let close fd = + Unix.close fd; + let ev = get_loop_exn_ () in + Nanoev.close ev fd + +let accept fd = + try + retry_read_ fd (fun () -> + (* Trace_.message "accept"; *) + Unix.accept fd) + with Unix.Unix_error ((Unix.ESHUTDOWN | Unix.ECONNABORTED), _, _) -> + raise Closed + +let write_once fd buf i len : int = + try + retry_write_ fd (fun () -> + (* Trace_.message "write"; *) + Unix.write fd buf i len) + with Closed -> 0 + +let rec write fd buf i len = + if len > 0 then ( + let n = write_once fd buf i len in + if n < len then write fd buf (i + n) (len - n) + ) + +let connect fd addr = retry_write_ fd (fun () -> Unix.connect fd addr) + +let[@inline] max_fds () = + match Atomic.get Global_.st with + | None -> 1024 + | Some st -> Nanoev.max_fds st.nanoev + +let sleep t = + if t > 0. then ( + let ev = get_loop_exn_ () in + let trigger = Picos.Trigger.create () in + Nanoev.run_after_s ev t trigger () (fun trigger () -> + Picos.Trigger.signal trigger); + Picos.Trigger.await trigger |> unwrap_ + ) + +module Raw = struct + let retry_read = retry_read_ + let retry_write = retry_write_ +end diff --git a/src/picos/base.mli b/src/picos/base.mli new file mode 100644 index 0000000..7b949e9 --- /dev/null +++ b/src/picos/base.mli @@ -0,0 +1,37 @@ +val read : Unix.file_descr -> bytes -> int -> int -> int +(** Read from the non blocking FD. + @raise Nanoev.Closed if the FD is closed + @raise Unix.Unix_error for other errors *) + +val write_once : Unix.file_descr -> bytes -> int -> int -> int +(** Write into the non blocking FD. + @raise Nanoev.Closed if the FD is closed + @raise Unix.Unix_error for other errors *) + +val write : Unix.file_descr -> bytes -> int -> int -> unit + +val close : Unix.file_descr -> unit +(** Close the file descriptor + @raise Unix.Unix_error when it fails *) + +val connect : Unix.file_descr -> Unix.sockaddr -> unit +(** Connect this FD to the remote address. + @raise Nanoev.Closed if the FD is closed. + @raise Unix.Unix_error for other errors *) + +val accept : Unix.file_descr -> Unix.file_descr * Unix.sockaddr +(** Accept a connection on this fd. + @raise Nanoev.Closed if the FD is closed. + @raise Unix.Unix_error for other errors *) + +val max_fds : unit -> int +(** Maximum number of file descriptors one can await on. See {!Nanoev.max_fds} +*) + +val sleep : float -> unit +(** Suspend current fiber for [n] seconds *) + +module Raw : sig + val retry_read : Unix.file_descr -> (unit -> 'a) -> 'a + val retry_write : Unix.file_descr -> (unit -> 'a) -> 'a +end diff --git a/src/picos/common_.ml b/src/picos/common_.ml new file mode 100644 index 0000000..d35dd22 --- /dev/null +++ b/src/picos/common_.ml @@ -0,0 +1,6 @@ +module Trace_ = Nanoev.Trace_ + +let ( let@ ) = ( @@ ) +let _default_buf_size = 4 * 1024 + +exception Closed = Nanoev.Closed diff --git a/src/picos/dune b/src/picos/dune index db9792d..fb37e29 100644 --- a/src/picos/dune +++ b/src/picos/dune @@ -2,4 +2,4 @@ (name nanoev_picos) (public_name nanoev.picos) (optional) ; picos - (libraries threads picos nanoev)) + (libraries threads picos iostream nanoev)) diff --git a/src/picos/global_.ml b/src/picos/global_.ml new file mode 100644 index 0000000..2e590b8 --- /dev/null +++ b/src/picos/global_.ml @@ -0,0 +1,59 @@ +open Common_ + +type st = + | None + | Some of { + active: bool Atomic.t; + nanoev: Nanoev.t; + th: Thread.t; + } + +let st : st Atomic.t = Atomic.make None +let lock = Mutex.create () + +let with_lock lock f = + Mutex.lock lock; + match f () with + | exception e -> + Mutex.unlock lock; + raise e + | x -> + Mutex.unlock lock; + x + +let bg_thread_ ~active ~evloop () : unit = + Trace_.set_thread_name "nanoev.picos.bg-thread"; + while Atomic.get active do + Nanoev.step evloop + done + +let[@inline] has_bg_thread () = Atomic.get st <> None + +let setup_bg_thread (ev : Nanoev.t) : unit = + let@ () = with_lock lock in + (* shutdown existing thread, if any *) + (match Atomic.get st with + | Some st -> + Atomic.set st.active false; + Nanoev.wakeup_from_outside st.nanoev; + Thread.join st.th + | None -> ()); + + (* start new bg thread *) + let active = Atomic.make true in + Atomic.set st + @@ Some + { + active; + nanoev = ev; + th = Thread.create (bg_thread_ ~active ~evloop:ev) (); + } + +let shutdown_bg_thread () = + let@ () = with_lock lock in + match Atomic.exchange st None with + | None -> () + | Some st -> + Atomic.set st.active false; + Nanoev.wakeup_from_outside st.nanoev; + Thread.join st.th diff --git a/src/picos/nanoev_picos.ml b/src/picos/nanoev_picos.ml index a11c89f..8029fda 100644 --- a/src/picos/nanoev_picos.ml +++ b/src/picos/nanoev_picos.ml @@ -1,161 +1,7 @@ -open struct - module Trace_ = Nanoev.Trace_ - - let ( let@ ) = ( @@ ) -end - -exception Closed = Nanoev.Closed - -module Global_ = struct - type st = - | None - | Some of { - active: bool Atomic.t; - nanoev: Nanoev.t; - th: Thread.t; - } - - let st : st Atomic.t = Atomic.make None - let lock = Mutex.create () - - let with_lock lock f = - Mutex.lock lock; - match f () with - | exception e -> - Mutex.unlock lock; - raise e - | x -> - Mutex.unlock lock; - x - - let bg_thread_ ~active ~evloop () : unit = - Trace_.set_thread_name "nanoev.picos.bg-thread"; - while Atomic.get active do - Nanoev.step evloop - done - - let[@inline] has_bg_thread () = Atomic.get st <> None - - let setup_bg_thread (ev : Nanoev.t) : unit = - let@ () = with_lock lock in - (* shutdown existing thread, if any *) - (match Atomic.get st with - | Some st -> - Atomic.set st.active false; - Nanoev.wakeup_from_outside st.nanoev; - Thread.join st.th - | None -> ()); - - (* start new bg thread *) - let active = Atomic.make true in - Atomic.set st - @@ Some - { - active; - nanoev = ev; - th = Thread.create (bg_thread_ ~active ~evloop:ev) (); - } - - let shutdown_bg_thread () = - let@ () = with_lock lock in - match Atomic.exchange st None with - | None -> () - | Some st -> - Atomic.set st.active false; - Nanoev.wakeup_from_outside st.nanoev; - Thread.join st.th -end - -module Background_thread = struct - let is_setup = Global_.has_bg_thread - let setup = Global_.setup_bg_thread - let shutdown = Global_.shutdown_bg_thread - - let with_setup ev f = - setup ev; - Fun.protect ~finally:shutdown f -end - -let[@inline] get_loop_exn_ () : Nanoev.t = - match Atomic.get Global_.st with - | None -> failwith "No nanoev loop installed." - | Some st -> st.nanoev - -let[@inline] unwrap_ = function - | None -> () - | Some (exn, bt) -> Printexc.raise_with_backtrace exn bt - -let[@unroll 1] rec retry_read_ fd f = - match f () with - | res -> res - | exception - Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) -> - (* Trace_.message "read must wait"; *) - let trigger = Picos.Trigger.create () in - let closed_r = ref false in - let ev = get_loop_exn_ () in - Nanoev.on_readable ev fd trigger closed_r (fun ~closed trigger closed_r -> - closed_r := closed; - Picos.Trigger.signal trigger); - Picos.Trigger.await trigger |> unwrap_; - if !closed_r then raise Closed; - retry_read_ fd f - -let[@unroll 1] rec retry_write_ fd f = - match f () with - | res -> res - | exception - Unix.Unix_error ((Unix.EAGAIN | Unix.EWOULDBLOCK | Unix.EINTR), _, _) -> - (* Trace_.message "write must wait"; *) - let ev = get_loop_exn_ () in - let trigger = Picos.Trigger.create () in - let closed_r = ref false in - Nanoev.on_writable ev fd trigger closed_r (fun ~closed trigger closed_r -> - closed_r := closed; - Picos.Trigger.signal trigger); - Picos.Trigger.await trigger |> unwrap_; - if !closed_r then raise Closed; - retry_write_ fd f - -let read fd buf i len : int = - try - retry_read_ fd (fun () -> - (* Trace_.message "read"; *) - Unix.read fd buf i len) - with Closed -> 0 - -let close fd = - Unix.close fd; - let ev = get_loop_exn_ () in - Nanoev.close ev fd - -let accept fd = - try - retry_read_ fd (fun () -> - (* Trace_.message "accept"; *) - Unix.accept fd) - with Unix.Unix_error ((Unix.ESHUTDOWN | Unix.ECONNABORTED), _, _) -> - raise Closed - -let write fd buf i len : int = - try - retry_write_ fd (fun () -> - (* Trace_.message "write"; *) - Unix.write fd buf i len) - with Closed -> 0 - -let connect fd addr = retry_write_ fd (fun () -> Unix.connect fd addr) - -let[@inline] max_fds () = - match Atomic.get Global_.st with - | None -> 1024 - | Some st -> Nanoev.max_fds st.nanoev - -let sleep t = - if t > 0. then ( - let ev = get_loop_exn_ () in - let trigger = Picos.Trigger.create () in - Nanoev.run_after_s ev t trigger () (fun trigger () -> - Picos.Trigger.signal trigger); - Picos.Trigger.await trigger |> unwrap_ - ) +module Background_thread = Background_thread +module Base = Base +include Base +module IO_in = IO_in +module IO_out = IO_out +module Net_client = Net_client +module Net_server = Net_server diff --git a/src/picos/nanoev_picos.mli b/src/picos/nanoev_picos.mli index 27310dc..c89e3a5 100644 --- a/src/picos/nanoev_picos.mli +++ b/src/picos/nanoev_picos.mli @@ -1,47 +1,18 @@ (** Basic interface with picos *) -module Background_thread : sig - val setup : Nanoev.t -> unit - (** Install this event loop in a background thread *) - - val shutdown : unit -> unit - (** Shutdown background thread, assuming {! is_setup} returns [true] *) - - val with_setup : Nanoev.t -> (unit -> 'a) -> 'a - - val is_setup : unit -> bool - (** [is_setup()] is [true] iff a background thread is running a nanoev loop *) -end +module Background_thread = Background_thread (** {2 Non blocking IO primitives} *) -val read : Unix.file_descr -> bytes -> int -> int -> int -(** Read from the non blocking FD. - @raise Nanoev.Closed if the FD is closed - @raise Unix.Unix_error for other errors *) +module Base = Base -val write : Unix.file_descr -> bytes -> int -> int -> int -(** Write into the non blocking FD. - @raise Nanoev.Closed if the FD is closed - @raise Unix.Unix_error for other errors *) +include module type of struct + include Base +end -val close : Unix.file_descr -> unit -(** Close the file descriptor - @raise Unix.Unix_error when it fails *) +(** {2 Building blocks on top of {!Base}} *) -val connect : Unix.file_descr -> Unix.sockaddr -> unit -(** Connect this FD to the remote address. - @raise Nanoev.Closed if the FD is closed. - @raise Unix.Unix_error for other errors *) - -val accept : Unix.file_descr -> Unix.file_descr * Unix.sockaddr -(** Accept a connection on this fd. - @raise Nanoev.Closed if the FD is closed. - @raise Unix.Unix_error for other errors *) - -val max_fds : unit -> int -(** Maximum number of file descriptors one can await on. See {!Nanoev.max_fds} -*) - -val sleep : float -> unit -(** Suspend current fiber for [n] seconds *) +module IO_in = IO_in +module IO_out = IO_out +module Net_client = Net_client +module Net_server = Net_server diff --git a/src/picos/net_client.ml b/src/picos/net_client.ml new file mode 100644 index 0000000..1bb3cf5 --- /dev/null +++ b/src/picos/net_client.ml @@ -0,0 +1,20 @@ +open Common_ + +let connect addr : Unix.file_descr = + let sock = Unix.socket (Unix.domain_of_sockaddr addr) Unix.SOCK_STREAM 0 in + Unix.set_nonblock sock; + (try Unix.setsockopt sock Unix.TCP_NODELAY true with _ -> ()); + + (* connect asynchronously *) + Base.Raw.retry_write sock (fun () -> Unix.connect sock addr); + sock + +let with_connect addr (f : IO_in.t -> IO_out.t -> 'a) : 'a = + let sock = connect addr in + + let ic = IO_in.of_unix_fd sock in + let oc = IO_out.of_unix_fd sock in + + let finally () = try Unix.close sock with _ -> () in + let@ () = Fun.protect ~finally in + f ic oc diff --git a/src/picos/net_server.ml b/src/picos/net_server.ml new file mode 100644 index 0000000..8c91799 --- /dev/null +++ b/src/picos/net_server.ml @@ -0,0 +1,48 @@ +type client_handler = Unix.sockaddr -> IO_in.t -> IO_out.t -> unit + +type t = { + active: bool Atomic.t; + sock: Unix.file_descr; + client_handler: client_handler; + spawn: (unit -> unit) -> unit Picos.Computation.t; + mutable running: unit Picos.Computation.t option; +} + +let shutdown (self : t) = + if Atomic.exchange self.active false then + Option.iter Picos.Computation.await self.running + +open struct + let run (self : t) () : unit = + while Atomic.get self.active do + let client_sock, client_addr = Base.accept self.sock in + let comp = + self.spawn (fun () -> + let ic = IO_in.of_unix_fd client_sock in + let oc = IO_out.of_unix_fd client_sock in + self.client_handler client_addr ic oc) + in + ignore (comp : _ Picos.Computation.t) + done +end + +let establish ?(backlog = 32) ~spawn ~(client_handler : client_handler) addr : t + = + let domain = Unix.domain_of_sockaddr addr in + let sock = Unix.socket domain Unix.SOCK_STREAM 0 in + Unix.bind sock addr; + Unix.listen sock backlog; + Unix.set_nonblock sock; + Unix.setsockopt sock Unix.SO_REUSEADDR true; + (try Unix.setsockopt sock Unix.TCP_NODELAY true with _ -> ()); + + let server = + { active = Atomic.make true; spawn; sock; client_handler; running = None } + in + + server.running <- Some (spawn (run server)); + server + +let with_ ?backlog ~spawn ~client_handler addr f = + let server = establish ?backlog ~spawn ~client_handler addr in + Fun.protect ~finally:(fun () -> shutdown server) (fun () -> f server) diff --git a/src/picos/net_server.mli b/src/picos/net_server.mli new file mode 100644 index 0000000..7432335 --- /dev/null +++ b/src/picos/net_server.mli @@ -0,0 +1,19 @@ +type client_handler = Unix.sockaddr -> IO_in.t -> IO_out.t -> unit +type t + +val shutdown : t -> unit + +val establish : + ?backlog:int -> + spawn:((unit -> unit) -> unit Picos.Computation.t) -> + client_handler:client_handler -> + Unix.sockaddr -> + t + +val with_ : + ?backlog:int -> + spawn:((unit -> unit) -> unit Picos.Computation.t) -> + client_handler:client_handler -> + Unix.sockaddr -> + (t -> 'a) -> + 'a diff --git a/src/tiny_httpd/nanoev_tiny_httpd.ml b/src/tiny_httpd/nanoev_tiny_httpd.ml index 46314e8..b637122 100644 --- a/src/tiny_httpd/nanoev_tiny_httpd.ml +++ b/src/tiny_httpd/nanoev_tiny_httpd.ml @@ -23,7 +23,7 @@ module Out = struct let i = ref i in let len = ref len0 in while !len > 0 do - match EV.write fd bs !i !len with + match EV.write_once fd bs !i !len with | 0 -> failwith "write failed" | n -> i := !i + n;