from __future__ import annotations import base64 import io import os import tempfile import time from PIL import Image os.environ.setdefault("MODEL_CACHE_DIR", "/tmp/models") os.environ.setdefault("TOKENIZERS_PARALLELISM", "false") from src.engines.coherence.engine import CoherenceEngine from src.engines.fingerprint.engine import FingerprintEngine from src.engines.sstgnn.engine import SSTGNNEngine from src.explainability.explainer import explain from src.fusion.fuser import fuse from src.services.media_utils import extract_video_frames _fp = FingerprintEngine() _co = CoherenceEngine() _st = SSTGNNEngine() def handler(job: dict) -> dict: inp = job.get("input", {}) encoded = inp.get("data") or inp.get("image_b64") if not encoded: raise ValueError("Missing input.data (base64 payload)") raw = base64.b64decode(encoded) media_type = str(inp.get("media_type", "image")).lower() t0 = time.perf_counter() if media_type == "image": image = Image.open(io.BytesIO(raw)).convert("RGB") fp = _fp.run(image) co = _co.run(image) st = _st.run(image) verdict, conf, generator = fuse([fp, co, st], is_video=False) else: with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as temp: temp.write(raw) tmp_path = temp.name try: frames = extract_video_frames(tmp_path, max_frames=300) finally: os.unlink(tmp_path) fp = _fp.run_video(frames) co = _co.run_video(frames) st = _st.run_video(frames) verdict, conf, generator = fuse([fp, co, st], is_video=True) engine_results = [fp, co, st] explanation = explain(verdict, conf, engine_results, generator) total_ms = (time.perf_counter() - t0) * 1000 return { "verdict": verdict, "confidence": conf, "attributed_generator": generator, "explanation": explanation, "processing_time_ms": total_ms, "engine_breakdown": [result.model_dump() for result in engine_results], } try: import runpod # type: ignore except Exception: runpod = None if runpod is not None: runpod.serverless.start({"handler": handler})