project-halide / data /datasets.py
Lonelyguyse1's picture
Deploy Project Halide Gradio Space
e994c16 verified
Raw
History Blame Contribute Delete
3.86 kB
"""Dataset loading and validation helpers."""
from __future__ import annotations
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Iterable
from config import DATA_DIR, REPO_ROOT
from data.schemas import ALLOWED_LABELS, clean_defects, label_counts
TRAINING_JSONL = DATA_DIR / "training_data.jsonl"
FDS_SCANS_DIR = (
DATA_DIR
/ "raw"
/ "FilmDamageSimulator"
/ "FilmDamageSimulator"
/ "scans"
)
@dataclass(frozen=True)
class DatasetIssue:
image: str
message: str
def load_jsonl(path: str | Path = TRAINING_JSONL) -> list[dict[str, Any]]:
path = Path(path)
rows: list[dict[str, Any]] = []
if not path.exists():
return rows
with path.open("r", encoding="utf-8") as f:
for line_no, line in enumerate(f, start=1):
line = line.strip()
if not line:
continue
try:
rows.append(json.loads(line))
except json.JSONDecodeError as exc:
raise ValueError(f"{path}:{line_no}: invalid JSONL row: {exc}") from exc
return rows
def resolve_image_path(entry: dict[str, Any], scans_dir: Path = FDS_SCANS_DIR) -> Path:
image = str(entry.get("image", ""))
path = Path(image)
if path.is_absolute():
return path
candidate = scans_dir / image
if candidate.exists():
return candidate
return REPO_ROOT / image
def validate_entries(
entries: Iterable[dict[str, Any]],
*,
require_images: bool = False,
scans_dir: Path = FDS_SCANS_DIR,
) -> list[DatasetIssue]:
issues: list[DatasetIssue] = []
for entry in entries:
image = str(entry.get("image", ""))
if not image:
issues.append(DatasetIssue(image="(missing)", message="missing image field"))
if require_images and image and not resolve_image_path(entry, scans_dir).exists():
issues.append(DatasetIssue(image=image, message="image file does not exist"))
annotations = entry.get("annotations", [])
cleaned, dropped = clean_defects(annotations)
if dropped:
issues.append(
DatasetIssue(
image=image or "(missing)",
message=f"{dropped} invalid annotations",
)
)
for defect in cleaned:
if defect["label"] not in ALLOWED_LABELS:
issues.append(
DatasetIssue(
image=image or "(missing)",
message=f"unknown label {defect['label']}",
)
)
return issues
def dataset_summary(entries: Iterable[dict[str, Any]]) -> dict[str, Any]:
entries_list = list(entries)
all_defects: list[dict[str, Any]] = []
dropped = 0
sources: dict[str, int] = {}
for entry in entries_list:
source = str(entry.get("source", "unknown"))
sources[source] = sources.get(source, 0) + 1
cleaned, bad = clean_defects(entry.get("annotations", []))
all_defects.extend(cleaned)
dropped += bad
counts = label_counts(all_defects)
return {
"images": len(entries_list),
"defects": len(all_defects),
"dropped_annotations": dropped,
"label_counts": counts,
"sources": dict(sorted(sources.items())),
}
def load_training_summary(path: str | Path = TRAINING_JSONL) -> dict[str, Any]:
entries = load_jsonl(path)
summary = dataset_summary(entries)
summary["issues"] = [
issue.__dict__ for issue in validate_entries(entries, require_images=False)
]
return summary
__all__ = [
"DatasetIssue",
"FDS_SCANS_DIR",
"TRAINING_JSONL",
"dataset_summary",
"load_jsonl",
"load_training_summary",
"resolve_image_path",
"validate_entries",
]