File size: 3,042 Bytes
d7efa84
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
"""CSV persistence layer with file locking and atomic writes."""

import os
import csv
import tempfile
import logging
from pathlib import Path

from filelock import FileLock, Timeout

from models import LabelRecord

logger = logging.getLogger(__name__)

CSV_COLUMNS = ["source", "transcription", "gender", "pii", "labeler"]


def save_label(record: LabelRecord, csv_path: str) -> None:
    """Save a label record to CSV with file locking and atomic write.

    Uses filelock for serialization and temp-file-then-rename for atomicity.
    Implements upsert: overwrites existing row for same source+labeler,
    or appends if new.

    Args:
        record: The label record to save.
        csv_path: Path to the target CSV file.

    Raises:
        IOError: If the write operation fails.
    """
    lock_path = csv_path + ".lock"
    lock = FileLock(lock_path, timeout=10)

    try:
        with lock:
            # Read existing data
            rows: list[dict] = []
            path = Path(csv_path)
            if path.exists():
                with open(path, "r", encoding="utf-8", newline="") as f:
                    reader = csv.DictReader(f)
                    rows = [row for row in reader]

            # Upsert: overwrite existing row for this source+labeler, or append
            new_row = {
                "source": record.source,
                "transcription": record.transcription,
                "gender": record.gender,
                "pii": str(record.pii),
                "labeler": record.labeler,
            }

            updated = False
            for i, row in enumerate(rows):
                if row["source"] == record.source and row["labeler"] == record.labeler:
                    rows[i] = new_row
                    updated = True
                    break

            if not updated:
                rows.append(new_row)

            # Write to temp file, then atomic rename
            dir_name = os.path.dirname(csv_path) or "."
            os.makedirs(dir_name, exist_ok=True)
            fd, tmp_path = tempfile.mkstemp(dir=dir_name, suffix=".tmp")
            try:
                with os.fdopen(fd, "w", encoding="utf-8", newline="") as f:
                    writer = csv.DictWriter(f, fieldnames=CSV_COLUMNS)
                    writer.writeheader()
                    writer.writerows(rows)
                os.replace(tmp_path, csv_path)
                logger.info(f"Saved label for '{record.source}' by '{record.labeler}'")
            except Exception:
                # Clean up temp file on failure
                if os.path.exists(tmp_path):
                    os.unlink(tmp_path)
                raise

    except Timeout:
        logger.error(f"Lock timeout for CSV: {csv_path}")
        raise IOError("Failed to save annotation (file locked). Please try again.")
    except IOError:
        raise
    except Exception as e:
        logger.error(f"Failed to save label for {record.source}: {e}")
        raise IOError("Failed to save annotation. Please try again.") from e