Spaces:
Sleeping
Sleeping
File size: 3,042 Bytes
d7efa84 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 | """CSV persistence layer with file locking and atomic writes."""
import os
import csv
import tempfile
import logging
from pathlib import Path
from filelock import FileLock, Timeout
from models import LabelRecord
logger = logging.getLogger(__name__)
CSV_COLUMNS = ["source", "transcription", "gender", "pii", "labeler"]
def save_label(record: LabelRecord, csv_path: str) -> None:
"""Save a label record to CSV with file locking and atomic write.
Uses filelock for serialization and temp-file-then-rename for atomicity.
Implements upsert: overwrites existing row for same source+labeler,
or appends if new.
Args:
record: The label record to save.
csv_path: Path to the target CSV file.
Raises:
IOError: If the write operation fails.
"""
lock_path = csv_path + ".lock"
lock = FileLock(lock_path, timeout=10)
try:
with lock:
# Read existing data
rows: list[dict] = []
path = Path(csv_path)
if path.exists():
with open(path, "r", encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
rows = [row for row in reader]
# Upsert: overwrite existing row for this source+labeler, or append
new_row = {
"source": record.source,
"transcription": record.transcription,
"gender": record.gender,
"pii": str(record.pii),
"labeler": record.labeler,
}
updated = False
for i, row in enumerate(rows):
if row["source"] == record.source and row["labeler"] == record.labeler:
rows[i] = new_row
updated = True
break
if not updated:
rows.append(new_row)
# Write to temp file, then atomic rename
dir_name = os.path.dirname(csv_path) or "."
os.makedirs(dir_name, exist_ok=True)
fd, tmp_path = tempfile.mkstemp(dir=dir_name, suffix=".tmp")
try:
with os.fdopen(fd, "w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=CSV_COLUMNS)
writer.writeheader()
writer.writerows(rows)
os.replace(tmp_path, csv_path)
logger.info(f"Saved label for '{record.source}' by '{record.labeler}'")
except Exception:
# Clean up temp file on failure
if os.path.exists(tmp_path):
os.unlink(tmp_path)
raise
except Timeout:
logger.error(f"Lock timeout for CSV: {csv_path}")
raise IOError("Failed to save annotation (file locked). Please try again.")
except IOError:
raise
except Exception as e:
logger.error(f"Failed to save label for {record.source}: {e}")
raise IOError("Failed to save annotation. Please try again.") from e
|