Spaces:
Running
Running
| """ | |
| Convert operator feedback records into class-organized training images. | |
| Usage: | |
| python scripts/ingest_feedback.py --feedback_dir feedback_queue --output_dir data/feedback_labeled | |
| """ | |
| import argparse | |
| import json | |
| import shutil | |
| from pathlib import Path | |
| VALID_CLASSES = {"plastic", "paper", "organic", "metal", "glass"} | |
| VALID_ACTIONS = {"confirmed", "corrected"} | |
| INPUT_EXTS = {".jpg", ".jpeg", ".png", ".webp"} | |
| def load_feedback_records(feedback_dir: Path) -> list[Path]: | |
| return sorted(path for path in feedback_dir.glob("*.json") if path.is_file()) | |
| def ingest_record(record_path: Path, output_dir: Path, overwrite: bool) -> tuple[str, str]: | |
| payload = json.loads(record_path.read_text(encoding="utf-8")) | |
| reviewed_class = str(payload.get("reviewed_class", "")).strip().lower() | |
| final_action = str(payload.get("final_action", "")).strip().lower() | |
| image_path = Path(str(payload.get("image_path", "")).strip()) | |
| case_id = str(payload.get("case_id", record_path.stem)).strip() or record_path.stem | |
| if reviewed_class not in VALID_CLASSES: | |
| return "skipped", f"{record_path.name}: unsupported reviewed_class '{reviewed_class}'" | |
| if final_action not in VALID_ACTIONS: | |
| return "skipped", f"{record_path.name}: final_action '{final_action}' not eligible for training" | |
| if not image_path.exists(): | |
| return "skipped", f"{record_path.name}: source image not found at '{image_path}'" | |
| if image_path.suffix.lower() not in INPUT_EXTS: | |
| return "skipped", f"{record_path.name}: unsupported image extension '{image_path.suffix}'" | |
| class_dir = output_dir / reviewed_class | |
| class_dir.mkdir(parents=True, exist_ok=True) | |
| image_dest = class_dir / f"{case_id}{image_path.suffix.lower()}" | |
| metadata_dest = class_dir / f"{case_id}.json" | |
| if image_dest.exists() and not overwrite: | |
| return "skipped", f"{record_path.name}: already ingested" | |
| shutil.copy2(image_path, image_dest) | |
| metadata_dest.write_text(json.dumps(payload, indent=2), encoding="utf-8") | |
| return "ingested", f"{record_path.name}: copied to {image_dest}" | |
| def main() -> None: | |
| parser = argparse.ArgumentParser(description="Ingest reviewed feedback cases into labeled training folders.") | |
| parser.add_argument("--feedback_dir", default="feedback_queue") | |
| parser.add_argument("--output_dir", default="data/feedback_labeled") | |
| parser.add_argument("--overwrite", action="store_true") | |
| args = parser.parse_args() | |
| feedback_dir = Path(args.feedback_dir) | |
| output_dir = Path(args.output_dir) | |
| if not feedback_dir.exists(): | |
| raise FileNotFoundError(f"Feedback directory not found: {feedback_dir}") | |
| records = load_feedback_records(feedback_dir) | |
| if not records: | |
| print(f"No feedback records found in {feedback_dir.resolve()}") | |
| return | |
| ingested = 0 | |
| skipped = 0 | |
| for record_path in records: | |
| status, message = ingest_record(record_path, output_dir, overwrite=args.overwrite) | |
| print(message) | |
| if status == "ingested": | |
| ingested += 1 | |
| else: | |
| skipped += 1 | |
| print("\nFeedback ingest summary") | |
| print(f"Ingested : {ingested}") | |
| print(f"Skipped : {skipped}") | |
| print(f"Output dir: {output_dir.resolve()}") | |
| if __name__ == "__main__": | |
| main() | |