detect corrupt chunks in multiproc

This commit is contained in:
Simon Cruanes 2024-08-21 14:10:19 -04:00
parent ff8c7e5353
commit 26b8648c82
No known key found for this signature in database
GPG key ID: EBFFF6F283F3A2B4

View file

@ -27,13 +27,20 @@ let aggregate_into ~dir ~final_file () : unit =
let buf = Bytes.create 4096 in let buf = Bytes.create 4096 in
let emit_chunk buf i len = let emit_chunk buf i len =
if !afternewline && !first then if len = 0 then
first := false ()
else if !afternewline then ( else if Bytes.get buf i = '{' && Bytes.get buf (i + len - 1) <> '}' then
output_string oc ",\n"; (* incomplete chunk *)
afternewline := false ()
); else (
output oc buf i len if !afternewline && !first then
first := false
else if !afternewline then (
output_string oc ",\n";
afternewline := false
);
output oc buf i len
)
in in
(* dump content of jsonl file into [oc]. Insert "," before every object (* dump content of jsonl file into [oc]. Insert "," before every object