File size: 3,856 Bytes
e994c16
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
"""Dataset loading and validation helpers."""

from __future__ import annotations

import json
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Iterable

from config import DATA_DIR, REPO_ROOT
from data.schemas import ALLOWED_LABELS, clean_defects, label_counts

TRAINING_JSONL = DATA_DIR / "training_data.jsonl"
FDS_SCANS_DIR = (
    DATA_DIR
    / "raw"
    / "FilmDamageSimulator"
    / "FilmDamageSimulator"
    / "scans"
)


@dataclass(frozen=True)
class DatasetIssue:
    image: str
    message: str


def load_jsonl(path: str | Path = TRAINING_JSONL) -> list[dict[str, Any]]:
    path = Path(path)
    rows: list[dict[str, Any]] = []
    if not path.exists():
        return rows
    with path.open("r", encoding="utf-8") as f:
        for line_no, line in enumerate(f, start=1):
            line = line.strip()
            if not line:
                continue
            try:
                rows.append(json.loads(line))
            except json.JSONDecodeError as exc:
                raise ValueError(f"{path}:{line_no}: invalid JSONL row: {exc}") from exc
    return rows


def resolve_image_path(entry: dict[str, Any], scans_dir: Path = FDS_SCANS_DIR) -> Path:
    image = str(entry.get("image", ""))
    path = Path(image)
    if path.is_absolute():
        return path
    candidate = scans_dir / image
    if candidate.exists():
        return candidate
    return REPO_ROOT / image


def validate_entries(
    entries: Iterable[dict[str, Any]],
    *,
    require_images: bool = False,
    scans_dir: Path = FDS_SCANS_DIR,
) -> list[DatasetIssue]:
    issues: list[DatasetIssue] = []
    for entry in entries:
        image = str(entry.get("image", ""))
        if not image:
            issues.append(DatasetIssue(image="(missing)", message="missing image field"))
        if require_images and image and not resolve_image_path(entry, scans_dir).exists():
            issues.append(DatasetIssue(image=image, message="image file does not exist"))

        annotations = entry.get("annotations", [])
        cleaned, dropped = clean_defects(annotations)
        if dropped:
            issues.append(
                DatasetIssue(
                    image=image or "(missing)",
                    message=f"{dropped} invalid annotations",
                )
            )
        for defect in cleaned:
            if defect["label"] not in ALLOWED_LABELS:
                issues.append(
                    DatasetIssue(
                        image=image or "(missing)",
                        message=f"unknown label {defect['label']}",
                    )
                )
    return issues


def dataset_summary(entries: Iterable[dict[str, Any]]) -> dict[str, Any]:
    entries_list = list(entries)
    all_defects: list[dict[str, Any]] = []
    dropped = 0
    sources: dict[str, int] = {}
    for entry in entries_list:
        source = str(entry.get("source", "unknown"))
        sources[source] = sources.get(source, 0) + 1
        cleaned, bad = clean_defects(entry.get("annotations", []))
        all_defects.extend(cleaned)
        dropped += bad
    counts = label_counts(all_defects)
    return {
        "images": len(entries_list),
        "defects": len(all_defects),
        "dropped_annotations": dropped,
        "label_counts": counts,
        "sources": dict(sorted(sources.items())),
    }


def load_training_summary(path: str | Path = TRAINING_JSONL) -> dict[str, Any]:
    entries = load_jsonl(path)
    summary = dataset_summary(entries)
    summary["issues"] = [
        issue.__dict__ for issue in validate_entries(entries, require_images=False)
    ]
    return summary


__all__ = [
    "DatasetIssue",
    "FDS_SCANS_DIR",
    "TRAINING_JSONL",
    "dataset_summary",
    "load_jsonl",
    "load_training_summary",
    "resolve_image_path",
    "validate_entries",
]