IndiaNext-Hackathon / video_detector.py
RishiXD's picture
Upload 3 files
dc085f2 verified
# video_detector.py
# Primary: Sightengine video API
# Fallback: frame-by-frame using image detector
import requests
import os
import cv2
import uuid
import tempfile
from dotenv import load_dotenv
from image_detector import ImageDetector
load_dotenv()
SIGHTENGINE_USER = os.getenv("SIGHTENGINE_USER")
SIGHTENGINE_SECRET = os.getenv("SIGHTENGINE_SECRET")
class VideoDetector:
def __init__(self, image_detector: ImageDetector):
self.image_detector = image_detector
print("Video detector ready — Sightengine + frame fallback.")
def detect(self, video_path: str) -> dict:
# Try Sightengine video first
try:
return self._detect_via_sightengine(video_path)
except Exception as e:
print(f"Sightengine video failed ({e}), using frame-by-frame.")
return self._detect_frame_by_frame(video_path)
def _detect_via_sightengine(self, video_path: str) -> dict:
with open(video_path, "rb") as f:
response = requests.post(
"https://api.sightengine.com/1.0/video/check-sync.json",
files={"media": f},
data={
"models": "deepfake,genai",
"api_user": SIGHTENGINE_USER,
"api_secret": SIGHTENGINE_SECRET,
},
timeout=120
)
result = response.json()
print(f"Sightengine video raw: {result}")
if result.get("status") != "success":
raise RuntimeError(f"Sightengine error: {result}")
return self._parse_video_result(result, video_path)
def _parse_video_result(self, result: dict, video_path: str) -> dict:
# Video result has per-frame data
frames = result.get("data", {}).get("frames", [])
if not frames:
raise RuntimeError("No frames in Sightengine video response")
fake_scores = []
for frame in frames:
deepfake = frame.get("deepfake", {}).get("score", 0.0)
ai_gen = frame.get("type", {}).get("ai_generated", 0.0)
fake_scores.append(max(deepfake, ai_gen))
avg_fake = sum(fake_scores) / len(fake_scores)
fake_frames = sum(1 for s in fake_scores if s >= 0.5)
fake_ratio = fake_frames / len(fake_scores)
duration = self._get_duration(video_path)
return {
"verdict": "DEEPFAKE" if avg_fake >= 0.5 else "AUTHENTIC",
"confidence": round(max(avg_fake, 1 - avg_fake) * 100, 2),
"fake_probability": round(avg_fake * 100, 2),
"real_probability": round((1 - avg_fake) * 100, 2),
"severity": self._severity(avg_fake),
"detection_method": "sightengine_video",
"model_used": "sightengine-deepfake-genai",
"frames_analyzed": len(fake_scores),
"fake_frames_count": fake_frames,
"real_frames_count": len(fake_scores) - fake_frames,
"duration_seconds": duration,
"timeline": [],
}
def _detect_frame_by_frame(self, video_path: str, max_samples: int = 8) -> dict:
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise RuntimeError(f"Could not open video: {video_path}")
fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = round(total_frames / fps, 2)
sample_every = max(1, total_frames // max_samples)
print(f"Video: {duration}s | {total_frames} frames @ {fps:.1f} FPS")
frame_results = []
frame_count = 0
temp_dir = tempfile.gettempdir()
while cap.isOpened() and len(frame_results) < max_samples:
ret, frame = cap.read()
if not ret:
break
if frame_count % sample_every == 0:
temp_path = os.path.join(temp_dir, f"sentinel_{uuid.uuid4()}.jpg")
cv2.imwrite(temp_path, frame)
try:
timestamp = round(frame_count / fps, 2)
result = self.image_detector.detect(temp_path)
result["timestamp_seconds"] = timestamp
result["frame_number"] = frame_count
frame_results.append(result)
print(f" Frame {len(frame_results)}/{max_samples} @ {timestamp}s → {result['verdict']}")
except Exception as e:
print(f" Frame {frame_count} failed: {e}")
finally:
if os.path.exists(temp_path):
os.remove(temp_path)
frame_count += 1
cap.release()
if not frame_results:
raise RuntimeError("No frames could be analyzed.")
return self._aggregate(frame_results, duration)
def _aggregate(self, frame_results: list, duration: float) -> dict:
fake_frames = [r for r in frame_results if r["verdict"] == "DEEPFAKE"]
fake_ratio = len(fake_frames) / len(frame_results)
avg_fake_prob = sum(r["fake_probability"] for r in frame_results) / len(frame_results)
avg_conf = sum(r["confidence"] for r in frame_results) / len(frame_results)
worst = max(frame_results, key=lambda x: x.get("fake_probability", 0))
# Both conditions must be true to call DEEPFAKE
is_deepfake = fake_ratio >= 0.6 and avg_fake_prob >= 70.0
return {
"verdict": "DEEPFAKE" if is_deepfake else "AUTHENTIC",
"confidence": round(avg_conf, 2),
"fake_probability": round(avg_fake_prob, 2),
"real_probability": round(100 - avg_fake_prob, 2),
"severity": self._severity(avg_fake_prob / 100),
"detection_method": "frame_by_frame",
"model_used": "sightengine-deepfake-genai",
"frames_analyzed": len(frame_results),
"fake_frames_count": len(fake_frames),
"real_frames_count": len(frame_results) - len(fake_frames),
"duration_seconds": duration,
"most_suspicious_timestamp": worst.get("timestamp_seconds", 0),
"timeline": frame_results,
}
@staticmethod
def _get_duration(video_path: str) -> float:
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
cap.release()
return round(frames / fps, 2)
@staticmethod
def _severity(score: float) -> str:
if score >= 0.90: return "CRITICAL"
if score >= 0.75: return "HIGH"
if score >= 0.50: return "MEDIUM"
return "LOW"