Spaces:
Sleeping
Sleeping
| """CSV persistence layer with file locking and atomic writes.""" | |
| import os | |
| import csv | |
| import tempfile | |
| import logging | |
| from pathlib import Path | |
| from filelock import FileLock, Timeout | |
| from models import LabelRecord | |
| logger = logging.getLogger(__name__) | |
| CSV_COLUMNS = ["source", "transcription", "gender", "pii", "labeler"] | |
| def save_label(record: LabelRecord, csv_path: str) -> None: | |
| """Save a label record to CSV with file locking and atomic write. | |
| Uses filelock for serialization and temp-file-then-rename for atomicity. | |
| Implements upsert: overwrites existing row for same source+labeler, | |
| or appends if new. | |
| Args: | |
| record: The label record to save. | |
| csv_path: Path to the target CSV file. | |
| Raises: | |
| IOError: If the write operation fails. | |
| """ | |
| lock_path = csv_path + ".lock" | |
| lock = FileLock(lock_path, timeout=10) | |
| try: | |
| with lock: | |
| # Read existing data | |
| rows: list[dict] = [] | |
| path = Path(csv_path) | |
| if path.exists(): | |
| with open(path, "r", encoding="utf-8", newline="") as f: | |
| reader = csv.DictReader(f) | |
| rows = [row for row in reader] | |
| # Upsert: overwrite existing row for this source+labeler, or append | |
| new_row = { | |
| "source": record.source, | |
| "transcription": record.transcription, | |
| "gender": record.gender, | |
| "pii": str(record.pii), | |
| "labeler": record.labeler, | |
| } | |
| updated = False | |
| for i, row in enumerate(rows): | |
| if row["source"] == record.source and row["labeler"] == record.labeler: | |
| rows[i] = new_row | |
| updated = True | |
| break | |
| if not updated: | |
| rows.append(new_row) | |
| # Write to temp file, then atomic rename | |
| dir_name = os.path.dirname(csv_path) or "." | |
| os.makedirs(dir_name, exist_ok=True) | |
| fd, tmp_path = tempfile.mkstemp(dir=dir_name, suffix=".tmp") | |
| try: | |
| with os.fdopen(fd, "w", encoding="utf-8", newline="") as f: | |
| writer = csv.DictWriter(f, fieldnames=CSV_COLUMNS) | |
| writer.writeheader() | |
| writer.writerows(rows) | |
| os.replace(tmp_path, csv_path) | |
| logger.info(f"Saved label for '{record.source}' by '{record.labeler}'") | |
| except Exception: | |
| # Clean up temp file on failure | |
| if os.path.exists(tmp_path): | |
| os.unlink(tmp_path) | |
| raise | |
| except Timeout: | |
| logger.error(f"Lock timeout for CSV: {csv_path}") | |
| raise IOError("Failed to save annotation (file locked). Please try again.") | |
| except IOError: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Failed to save label for {record.source}: {e}") | |
| raise IOError("Failed to save annotation. Please try again.") from e | |