"""CSV persistence layer with file locking and atomic writes.""" import os import csv import tempfile import logging from pathlib import Path from filelock import FileLock, Timeout from models import LabelRecord logger = logging.getLogger(__name__) CSV_COLUMNS = ["source", "transcription", "gender", "pii", "labeler"] def save_label(record: LabelRecord, csv_path: str) -> None: """Save a label record to CSV with file locking and atomic write. Uses filelock for serialization and temp-file-then-rename for atomicity. Implements upsert: overwrites existing row for same source+labeler, or appends if new. Args: record: The label record to save. csv_path: Path to the target CSV file. Raises: IOError: If the write operation fails. """ lock_path = csv_path + ".lock" lock = FileLock(lock_path, timeout=10) try: with lock: # Read existing data rows: list[dict] = [] path = Path(csv_path) if path.exists(): with open(path, "r", encoding="utf-8", newline="") as f: reader = csv.DictReader(f) rows = [row for row in reader] # Upsert: overwrite existing row for this source+labeler, or append new_row = { "source": record.source, "transcription": record.transcription, "gender": record.gender, "pii": str(record.pii), "labeler": record.labeler, } updated = False for i, row in enumerate(rows): if row["source"] == record.source and row["labeler"] == record.labeler: rows[i] = new_row updated = True break if not updated: rows.append(new_row) # Write to temp file, then atomic rename dir_name = os.path.dirname(csv_path) or "." os.makedirs(dir_name, exist_ok=True) fd, tmp_path = tempfile.mkstemp(dir=dir_name, suffix=".tmp") try: with os.fdopen(fd, "w", encoding="utf-8", newline="") as f: writer = csv.DictWriter(f, fieldnames=CSV_COLUMNS) writer.writeheader() writer.writerows(rows) os.replace(tmp_path, csv_path) logger.info(f"Saved label for '{record.source}' by '{record.labeler}'") except Exception: # Clean up temp file on failure if os.path.exists(tmp_path): os.unlink(tmp_path) raise except Timeout: logger.error(f"Lock timeout for CSV: {csv_path}") raise IOError("Failed to save annotation (file locked). Please try again.") except IOError: raise except Exception as e: logger.error(f"Failed to save label for {record.source}: {e}") raise IOError("Failed to save annotation. Please try again.") from e