File size: 3,304 Bytes
206d8b5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
"""
Convert operator feedback records into class-organized training images.

Usage:
    python scripts/ingest_feedback.py --feedback_dir feedback_queue --output_dir data/feedback_labeled
"""

import argparse
import json
import shutil
from pathlib import Path

VALID_CLASSES = {"plastic", "paper", "organic", "metal", "glass"}
VALID_ACTIONS = {"confirmed", "corrected"}
INPUT_EXTS = {".jpg", ".jpeg", ".png", ".webp"}


def load_feedback_records(feedback_dir: Path) -> list[Path]:
    return sorted(path for path in feedback_dir.glob("*.json") if path.is_file())


def ingest_record(record_path: Path, output_dir: Path, overwrite: bool) -> tuple[str, str]:
    payload = json.loads(record_path.read_text(encoding="utf-8"))

    reviewed_class = str(payload.get("reviewed_class", "")).strip().lower()
    final_action = str(payload.get("final_action", "")).strip().lower()
    image_path = Path(str(payload.get("image_path", "")).strip())
    case_id = str(payload.get("case_id", record_path.stem)).strip() or record_path.stem

    if reviewed_class not in VALID_CLASSES:
        return "skipped", f"{record_path.name}: unsupported reviewed_class '{reviewed_class}'"
    if final_action not in VALID_ACTIONS:
        return "skipped", f"{record_path.name}: final_action '{final_action}' not eligible for training"
    if not image_path.exists():
        return "skipped", f"{record_path.name}: source image not found at '{image_path}'"
    if image_path.suffix.lower() not in INPUT_EXTS:
        return "skipped", f"{record_path.name}: unsupported image extension '{image_path.suffix}'"

    class_dir = output_dir / reviewed_class
    class_dir.mkdir(parents=True, exist_ok=True)

    image_dest = class_dir / f"{case_id}{image_path.suffix.lower()}"
    metadata_dest = class_dir / f"{case_id}.json"

    if image_dest.exists() and not overwrite:
        return "skipped", f"{record_path.name}: already ingested"

    shutil.copy2(image_path, image_dest)
    metadata_dest.write_text(json.dumps(payload, indent=2), encoding="utf-8")
    return "ingested", f"{record_path.name}: copied to {image_dest}"


def main() -> None:
    parser = argparse.ArgumentParser(description="Ingest reviewed feedback cases into labeled training folders.")
    parser.add_argument("--feedback_dir", default="feedback_queue")
    parser.add_argument("--output_dir", default="data/feedback_labeled")
    parser.add_argument("--overwrite", action="store_true")
    args = parser.parse_args()

    feedback_dir = Path(args.feedback_dir)
    output_dir = Path(args.output_dir)

    if not feedback_dir.exists():
        raise FileNotFoundError(f"Feedback directory not found: {feedback_dir}")

    records = load_feedback_records(feedback_dir)
    if not records:
        print(f"No feedback records found in {feedback_dir.resolve()}")
        return

    ingested = 0
    skipped = 0
    for record_path in records:
        status, message = ingest_record(record_path, output_dir, overwrite=args.overwrite)
        print(message)
        if status == "ingested":
            ingested += 1
        else:
            skipped += 1

    print("\nFeedback ingest summary")
    print(f"Ingested : {ingested}")
    print(f"Skipped  : {skipped}")
    print(f"Output dir: {output_dir.resolve()}")


if __name__ == "__main__":
    main()