"""Small IO helpers used by the pipeline.""" from __future__ import annotations import json import os import tempfile from pathlib import Path from typing import Iterable, Iterator def atomic_write_text(path: str | Path, text: str) -> None: path = Path(path) path.parent.mkdir(parents=True, exist_ok=True) with tempfile.NamedTemporaryFile( mode="w", dir=path.parent, delete=False, suffix=".tmp" ) as tmp: tmp.write(text) tmp.flush() os.fsync(tmp.fileno()) tmp_path = tmp.name os.replace(tmp_path, path) def write_jsonl(path: str | Path, rows: Iterable[dict]) -> None: lines = "\n".join(json.dumps(row, sort_keys=True, ensure_ascii=False) for row in rows) if lines: lines += "\n" atomic_write_text(path, lines) def read_jsonl(path: str | Path) -> Iterator[dict]: with open(path) as f: for line in f: line = line.strip() if line: yield json.loads(line)