""" Convert operator feedback records into class-organized training images. Usage: python scripts/ingest_feedback.py --feedback_dir feedback_queue --output_dir data/feedback_labeled """ import argparse import json import shutil from pathlib import Path VALID_CLASSES = {"plastic", "paper", "organic", "metal", "glass"} VALID_ACTIONS = {"confirmed", "corrected"} INPUT_EXTS = {".jpg", ".jpeg", ".png", ".webp"} def load_feedback_records(feedback_dir: Path) -> list[Path]: return sorted(path for path in feedback_dir.glob("*.json") if path.is_file()) def ingest_record(record_path: Path, output_dir: Path, overwrite: bool) -> tuple[str, str]: payload = json.loads(record_path.read_text(encoding="utf-8")) reviewed_class = str(payload.get("reviewed_class", "")).strip().lower() final_action = str(payload.get("final_action", "")).strip().lower() image_path = Path(str(payload.get("image_path", "")).strip()) case_id = str(payload.get("case_id", record_path.stem)).strip() or record_path.stem if reviewed_class not in VALID_CLASSES: return "skipped", f"{record_path.name}: unsupported reviewed_class '{reviewed_class}'" if final_action not in VALID_ACTIONS: return "skipped", f"{record_path.name}: final_action '{final_action}' not eligible for training" if not image_path.exists(): return "skipped", f"{record_path.name}: source image not found at '{image_path}'" if image_path.suffix.lower() not in INPUT_EXTS: return "skipped", f"{record_path.name}: unsupported image extension '{image_path.suffix}'" class_dir = output_dir / reviewed_class class_dir.mkdir(parents=True, exist_ok=True) image_dest = class_dir / f"{case_id}{image_path.suffix.lower()}" metadata_dest = class_dir / f"{case_id}.json" if image_dest.exists() and not overwrite: return "skipped", f"{record_path.name}: already ingested" shutil.copy2(image_path, image_dest) metadata_dest.write_text(json.dumps(payload, indent=2), encoding="utf-8") return "ingested", f"{record_path.name}: copied to {image_dest}" def main() -> None: parser = argparse.ArgumentParser(description="Ingest reviewed feedback cases into labeled training folders.") parser.add_argument("--feedback_dir", default="feedback_queue") parser.add_argument("--output_dir", default="data/feedback_labeled") parser.add_argument("--overwrite", action="store_true") args = parser.parse_args() feedback_dir = Path(args.feedback_dir) output_dir = Path(args.output_dir) if not feedback_dir.exists(): raise FileNotFoundError(f"Feedback directory not found: {feedback_dir}") records = load_feedback_records(feedback_dir) if not records: print(f"No feedback records found in {feedback_dir.resolve()}") return ingested = 0 skipped = 0 for record_path in records: status, message = ingest_record(record_path, output_dir, overwrite=args.overwrite) print(message) if status == "ingested": ingested += 1 else: skipped += 1 print("\nFeedback ingest summary") print(f"Ingested : {ingested}") print(f"Skipped : {skipped}") print(f"Output dir: {output_dir.resolve()}") if __name__ == "__main__": main()