CoolWasteAI / scripts /ingest_feedback.py
Celvin
first commit
206d8b5
"""
Convert operator feedback records into class-organized training images.
Usage:
python scripts/ingest_feedback.py --feedback_dir feedback_queue --output_dir data/feedback_labeled
"""
import argparse
import json
import shutil
from pathlib import Path
VALID_CLASSES = {"plastic", "paper", "organic", "metal", "glass"}
VALID_ACTIONS = {"confirmed", "corrected"}
INPUT_EXTS = {".jpg", ".jpeg", ".png", ".webp"}
def load_feedback_records(feedback_dir: Path) -> list[Path]:
return sorted(path for path in feedback_dir.glob("*.json") if path.is_file())
def ingest_record(record_path: Path, output_dir: Path, overwrite: bool) -> tuple[str, str]:
payload = json.loads(record_path.read_text(encoding="utf-8"))
reviewed_class = str(payload.get("reviewed_class", "")).strip().lower()
final_action = str(payload.get("final_action", "")).strip().lower()
image_path = Path(str(payload.get("image_path", "")).strip())
case_id = str(payload.get("case_id", record_path.stem)).strip() or record_path.stem
if reviewed_class not in VALID_CLASSES:
return "skipped", f"{record_path.name}: unsupported reviewed_class '{reviewed_class}'"
if final_action not in VALID_ACTIONS:
return "skipped", f"{record_path.name}: final_action '{final_action}' not eligible for training"
if not image_path.exists():
return "skipped", f"{record_path.name}: source image not found at '{image_path}'"
if image_path.suffix.lower() not in INPUT_EXTS:
return "skipped", f"{record_path.name}: unsupported image extension '{image_path.suffix}'"
class_dir = output_dir / reviewed_class
class_dir.mkdir(parents=True, exist_ok=True)
image_dest = class_dir / f"{case_id}{image_path.suffix.lower()}"
metadata_dest = class_dir / f"{case_id}.json"
if image_dest.exists() and not overwrite:
return "skipped", f"{record_path.name}: already ingested"
shutil.copy2(image_path, image_dest)
metadata_dest.write_text(json.dumps(payload, indent=2), encoding="utf-8")
return "ingested", f"{record_path.name}: copied to {image_dest}"
def main() -> None:
parser = argparse.ArgumentParser(description="Ingest reviewed feedback cases into labeled training folders.")
parser.add_argument("--feedback_dir", default="feedback_queue")
parser.add_argument("--output_dir", default="data/feedback_labeled")
parser.add_argument("--overwrite", action="store_true")
args = parser.parse_args()
feedback_dir = Path(args.feedback_dir)
output_dir = Path(args.output_dir)
if not feedback_dir.exists():
raise FileNotFoundError(f"Feedback directory not found: {feedback_dir}")
records = load_feedback_records(feedback_dir)
if not records:
print(f"No feedback records found in {feedback_dir.resolve()}")
return
ingested = 0
skipped = 0
for record_path in records:
status, message = ingest_record(record_path, output_dir, overwrite=args.overwrite)
print(message)
if status == "ingested":
ingested += 1
else:
skipped += 1
print("\nFeedback ingest summary")
print(f"Ingested : {ingested}")
print(f"Skipped : {skipped}")
print(f"Output dir: {output_dir.resolve()}")
if __name__ == "__main__":
main()