Spaces:
Sleeping
Sleeping
| # video_detector.py | |
| # Primary: Sightengine video API | |
| # Fallback: frame-by-frame using image detector | |
| import requests | |
| import os | |
| import cv2 | |
| import uuid | |
| import tempfile | |
| from dotenv import load_dotenv | |
| from image_detector import ImageDetector | |
| load_dotenv() | |
| SIGHTENGINE_USER = os.getenv("SIGHTENGINE_USER") | |
| SIGHTENGINE_SECRET = os.getenv("SIGHTENGINE_SECRET") | |
| class VideoDetector: | |
| def __init__(self, image_detector: ImageDetector): | |
| self.image_detector = image_detector | |
| print("Video detector ready — Sightengine + frame fallback.") | |
| def detect(self, video_path: str) -> dict: | |
| # Try Sightengine video first | |
| try: | |
| return self._detect_via_sightengine(video_path) | |
| except Exception as e: | |
| print(f"Sightengine video failed ({e}), using frame-by-frame.") | |
| return self._detect_frame_by_frame(video_path) | |
| def _detect_via_sightengine(self, video_path: str) -> dict: | |
| with open(video_path, "rb") as f: | |
| response = requests.post( | |
| "https://api.sightengine.com/1.0/video/check-sync.json", | |
| files={"media": f}, | |
| data={ | |
| "models": "deepfake,genai", | |
| "api_user": SIGHTENGINE_USER, | |
| "api_secret": SIGHTENGINE_SECRET, | |
| }, | |
| timeout=120 | |
| ) | |
| result = response.json() | |
| print(f"Sightengine video raw: {result}") | |
| if result.get("status") != "success": | |
| raise RuntimeError(f"Sightengine error: {result}") | |
| return self._parse_video_result(result, video_path) | |
| def _parse_video_result(self, result: dict, video_path: str) -> dict: | |
| # Video result has per-frame data | |
| frames = result.get("data", {}).get("frames", []) | |
| if not frames: | |
| raise RuntimeError("No frames in Sightengine video response") | |
| fake_scores = [] | |
| for frame in frames: | |
| deepfake = frame.get("deepfake", {}).get("score", 0.0) | |
| ai_gen = frame.get("type", {}).get("ai_generated", 0.0) | |
| fake_scores.append(max(deepfake, ai_gen)) | |
| avg_fake = sum(fake_scores) / len(fake_scores) | |
| fake_frames = sum(1 for s in fake_scores if s >= 0.5) | |
| fake_ratio = fake_frames / len(fake_scores) | |
| duration = self._get_duration(video_path) | |
| return { | |
| "verdict": "DEEPFAKE" if avg_fake >= 0.5 else "AUTHENTIC", | |
| "confidence": round(max(avg_fake, 1 - avg_fake) * 100, 2), | |
| "fake_probability": round(avg_fake * 100, 2), | |
| "real_probability": round((1 - avg_fake) * 100, 2), | |
| "severity": self._severity(avg_fake), | |
| "detection_method": "sightengine_video", | |
| "model_used": "sightengine-deepfake-genai", | |
| "frames_analyzed": len(fake_scores), | |
| "fake_frames_count": fake_frames, | |
| "real_frames_count": len(fake_scores) - fake_frames, | |
| "duration_seconds": duration, | |
| "timeline": [], | |
| } | |
| def _detect_frame_by_frame(self, video_path: str, max_samples: int = 8) -> dict: | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| raise RuntimeError(f"Could not open video: {video_path}") | |
| fps = cap.get(cv2.CAP_PROP_FPS) or 25.0 | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| duration = round(total_frames / fps, 2) | |
| sample_every = max(1, total_frames // max_samples) | |
| print(f"Video: {duration}s | {total_frames} frames @ {fps:.1f} FPS") | |
| frame_results = [] | |
| frame_count = 0 | |
| temp_dir = tempfile.gettempdir() | |
| while cap.isOpened() and len(frame_results) < max_samples: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| if frame_count % sample_every == 0: | |
| temp_path = os.path.join(temp_dir, f"sentinel_{uuid.uuid4()}.jpg") | |
| cv2.imwrite(temp_path, frame) | |
| try: | |
| timestamp = round(frame_count / fps, 2) | |
| result = self.image_detector.detect(temp_path) | |
| result["timestamp_seconds"] = timestamp | |
| result["frame_number"] = frame_count | |
| frame_results.append(result) | |
| print(f" Frame {len(frame_results)}/{max_samples} @ {timestamp}s → {result['verdict']}") | |
| except Exception as e: | |
| print(f" Frame {frame_count} failed: {e}") | |
| finally: | |
| if os.path.exists(temp_path): | |
| os.remove(temp_path) | |
| frame_count += 1 | |
| cap.release() | |
| if not frame_results: | |
| raise RuntimeError("No frames could be analyzed.") | |
| return self._aggregate(frame_results, duration) | |
| def _aggregate(self, frame_results: list, duration: float) -> dict: | |
| fake_frames = [r for r in frame_results if r["verdict"] == "DEEPFAKE"] | |
| fake_ratio = len(fake_frames) / len(frame_results) | |
| avg_fake_prob = sum(r["fake_probability"] for r in frame_results) / len(frame_results) | |
| avg_conf = sum(r["confidence"] for r in frame_results) / len(frame_results) | |
| worst = max(frame_results, key=lambda x: x.get("fake_probability", 0)) | |
| # Both conditions must be true to call DEEPFAKE | |
| is_deepfake = fake_ratio >= 0.6 and avg_fake_prob >= 70.0 | |
| return { | |
| "verdict": "DEEPFAKE" if is_deepfake else "AUTHENTIC", | |
| "confidence": round(avg_conf, 2), | |
| "fake_probability": round(avg_fake_prob, 2), | |
| "real_probability": round(100 - avg_fake_prob, 2), | |
| "severity": self._severity(avg_fake_prob / 100), | |
| "detection_method": "frame_by_frame", | |
| "model_used": "sightengine-deepfake-genai", | |
| "frames_analyzed": len(frame_results), | |
| "fake_frames_count": len(fake_frames), | |
| "real_frames_count": len(frame_results) - len(fake_frames), | |
| "duration_seconds": duration, | |
| "most_suspicious_timestamp": worst.get("timestamp_seconds", 0), | |
| "timeline": frame_results, | |
| } | |
| def _get_duration(video_path: str) -> float: | |
| cap = cv2.VideoCapture(video_path) | |
| fps = cap.get(cv2.CAP_PROP_FPS) or 25.0 | |
| frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| cap.release() | |
| return round(frames / fps, 2) | |
| def _severity(score: float) -> str: | |
| if score >= 0.90: return "CRITICAL" | |
| if score >= 0.75: return "HIGH" | |
| if score >= 0.50: return "MEDIUM" | |
| return "LOW" |