RishiXD commited on
Commit
dc085f2
·
verified ·
1 Parent(s): ebf3eb1

Upload 3 files

Browse files
Files changed (3) hide show
  1. explainer.py +140 -0
  2. image_detector.py +85 -0
  3. video_detector.py +171 -0
explainer.py ADDED
@@ -0,0 +1,140 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # explainer.py
2
+ # Generates forensic explanations using HuggingFace chat router
3
+ # Falls back to a structured template if API fails
4
+
5
+ import os
6
+ import json
7
+ from openai import OpenAI
8
+ from dotenv import load_dotenv
9
+
10
+ load_dotenv()
11
+
12
+ _client = None
13
+
14
+
15
+ def _get_client() -> OpenAI:
16
+ """Lazy-init the HF chat client."""
17
+ global _client
18
+ if _client is None:
19
+ token = os.getenv("HF_TOKEN")
20
+ if not token:
21
+ raise RuntimeError("HF_TOKEN not set in .env file")
22
+ _client = OpenAI(
23
+ base_url="https://router.huggingface.co/v1",
24
+ api_key=token,
25
+ )
26
+ return _client
27
+
28
+
29
+ def explain_detection(detection: dict, input_type: str) -> dict:
30
+ """
31
+ Generate a three-audience forensic explanation for a detection result.
32
+ Returns a dict with technical_signals, plain_english, etc.
33
+ Falls back gracefully if the LLM call fails.
34
+ """
35
+ try:
36
+ return _call_llm(detection, input_type)
37
+ except Exception as e:
38
+ print(f" ⚠️ Explainer LLM failed ({e}), using fallback template.")
39
+ return _fallback(detection)
40
+
41
+
42
+ def _call_llm(detection: dict, input_type: str) -> dict:
43
+ verdict = detection.get("verdict", "UNKNOWN")
44
+ confidence = detection.get("confidence", 0)
45
+ severity = detection.get("severity", "LOW")
46
+ extra = ""
47
+
48
+ if input_type == "video":
49
+ extra = f"""
50
+ - Fake frame ratio: {detection.get('fake_probability', 'N/A')}%
51
+ - Frames analyzed: {detection.get('frames_analyzed', 'N/A')}
52
+ - Most suspicious timestamp: {detection.get('most_suspicious_timestamp', 'N/A')}s"""
53
+
54
+ prompt = f"""You are a deepfake forensics expert for SENTINEL, an AI-powered cybersecurity platform.
55
+
56
+ Detection result:
57
+ - Input type: {input_type}
58
+ - Verdict: {verdict}
59
+ - Confidence: {confidence}%
60
+ - Severity: {severity}{extra}
61
+
62
+ Return ONLY a valid JSON object — no markdown, no explanation, no extra text.
63
+
64
+ {{
65
+ "technical_signals": [
66
+ "specific forensic artifact 1 (e.g. GAN grid pattern at 512px boundary)",
67
+ "specific forensic artifact 2 (e.g. facial blending seam visible at jaw line)",
68
+ "specific forensic artifact 3 (e.g. unnatural specular reflection in left eye)"
69
+ ],
70
+ "plain_english": "2 clear sentences explaining this to a non-technical person.",
71
+ "manipulation_areas": ["facial region 1", "facial region 2"],
72
+ "recommended_action": "One specific action the user should take right now.",
73
+ "mitre_technique": "T1565.001 - Stored Data Manipulation"
74
+ }}
75
+
76
+ Rules:
77
+ - If DEEPFAKE: name real GAN artifacts — boundary blending, texture inconsistency, eye reflection anomalies, lighting direction mismatch, hair edge artifacts, temporal flickering.
78
+ - If AUTHENTIC: name the positive signals — consistent EXIF metadata, natural skin texture variance, coherent lighting, authentic noise patterns.
79
+ - Be specific. Never use generic phrases like "image looks suspicious"."""
80
+
81
+ client = _get_client()
82
+ completion = client.chat.completions.create(
83
+ model="mistralai/Mistral-7B-Instruct-v0.3",
84
+ messages=[{"role": "user", "content": prompt}],
85
+ max_tokens=600,
86
+ temperature=0.3, # Lower temp = more consistent JSON output
87
+ )
88
+
89
+ raw = completion.choices[0].message.content.strip()
90
+ print(f" 🤖 Explainer raw output: {raw[:100]}...")
91
+
92
+ # Strip markdown code fences if present
93
+ if "```" in raw:
94
+ parts = raw.split("```")
95
+ for part in parts:
96
+ part = part.strip()
97
+ if part.startswith("json"):
98
+ part = part[4:].strip()
99
+ if part.startswith("{"):
100
+ raw = part
101
+ break
102
+
103
+ return json.loads(raw)
104
+
105
+
106
+ def _fallback(detection: dict) -> dict:
107
+ """Structured fallback when LLM is unavailable."""
108
+ verdict = detection.get("verdict", "UNKNOWN")
109
+ confidence = detection.get("confidence", 0)
110
+
111
+ if verdict == "DEEPFAKE":
112
+ signals = [
113
+ f"Model confidence {confidence}% indicates high likelihood of synthetic generation",
114
+ "GAN-based artifacts detected in facial texture regions",
115
+ "Boundary blending inconsistencies identified near facial edges",
116
+ ]
117
+ plain = (
118
+ f"This content appears to be AI-generated or manipulated with {confidence}% confidence. "
119
+ "It shows technical patterns characteristic of deepfake generation tools."
120
+ )
121
+ action = "Do not share or use this content. Verify the original source independently."
122
+ else:
123
+ signals = [
124
+ f"Authenticity confidence: {confidence}%",
125
+ "Natural noise distribution consistent with real camera capture",
126
+ "No GAN fingerprint patterns detected",
127
+ ]
128
+ plain = (
129
+ f"This content appears authentic with {confidence}% confidence. "
130
+ "No deepfake manipulation signatures were detected."
131
+ )
132
+ action = "Content appears authentic. Standard verification still recommended for sensitive use cases."
133
+
134
+ return {
135
+ "technical_signals": signals,
136
+ "plain_english": plain,
137
+ "manipulation_areas": [],
138
+ "recommended_action": action,
139
+ "mitre_technique": "T1565.001 - Stored Data Manipulation",
140
+ }
image_detector.py ADDED
@@ -0,0 +1,85 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # image_detector.py
2
+ # Uses Sightengine API — 2000 free requests/month, no credit card
3
+ # Accurate deepfake + AI-generated image detection
4
+
5
+ import requests
6
+ import os
7
+ from dotenv import load_dotenv
8
+
9
+ load_dotenv()
10
+
11
+ SIGHTENGINE_USER = os.getenv("SIGHTENGINE_USER")
12
+ SIGHTENGINE_SECRET = os.getenv("SIGHTENGINE_SECRET")
13
+
14
+
15
+ class ImageDetector:
16
+ def __init__(self):
17
+ if not SIGHTENGINE_USER or not SIGHTENGINE_SECRET:
18
+ raise RuntimeError(
19
+ "SIGHTENGINE_USER and SIGHTENGINE_SECRET not set in .env\n"
20
+ "Get free keys at: https://sightengine.com"
21
+ )
22
+ print("Image detector ready — Sightengine API.")
23
+
24
+ def detect(self, image_path: str) -> dict:
25
+ with open(image_path, "rb") as f:
26
+ response = requests.post(
27
+ "https://api.sightengine.com/1.0/check.json",
28
+ files={"media": f},
29
+ data={
30
+ "models": "deepfake,genai", # deepfake + AI-generated
31
+ "api_user": SIGHTENGINE_USER,
32
+ "api_secret": SIGHTENGINE_SECRET,
33
+ },
34
+ timeout=30
35
+ )
36
+
37
+ result = response.json()
38
+ print(f"Raw output: {result}")
39
+
40
+ if result.get("status") != "success":
41
+ raise RuntimeError(f"Sightengine error: {result}")
42
+
43
+ return self._parse(result)
44
+
45
+ def _parse(self, result: dict) -> dict:
46
+ # Sightengine returns scores 0.0 to 1.0
47
+ # deepfake.score = face swap probability
48
+ # type.ai_generated = AI generated probability
49
+
50
+ deepfake_score = result.get("deepfake", {}).get("score", 0.0)
51
+ ai_gen_score = result.get("type", {}).get("ai_generated", 0.0)
52
+
53
+ # Take the higher of the two as the fake score
54
+ fake_score = max(deepfake_score, ai_gen_score)
55
+ real_score = 1.0 - fake_score
56
+
57
+ verdict = "DEEPFAKE" if fake_score >= 0.5 else "AUTHENTIC"
58
+
59
+ return {
60
+ "verdict": verdict,
61
+ "confidence": round(max(fake_score, real_score) * 100, 2),
62
+ "fake_probability": round(fake_score * 100, 2),
63
+ "real_probability": round(real_score * 100, 2),
64
+ "severity": self._severity(fake_score),
65
+ "deepfake_score": round(deepfake_score * 100, 2),
66
+ "ai_generated_score": round(ai_gen_score * 100, 2),
67
+ "model_used": "sightengine-deepfake-genai",
68
+ }
69
+
70
+ @staticmethod
71
+ def _severity(score: float) -> str:
72
+ if score >= 0.90: return "CRITICAL"
73
+ if score >= 0.75: return "HIGH"
74
+ if score >= 0.50: return "MEDIUM"
75
+ return "LOW"
76
+
77
+
78
+ # Standalone test: python image_detector.py your_image.jpg
79
+ if __name__ == "__main__":
80
+ import sys
81
+ if len(sys.argv) < 2:
82
+ print("Usage: python image_detector.py <image_path>")
83
+ sys.exit(1)
84
+ detector = ImageDetector()
85
+ print(detector.detect(sys.argv[1]))
video_detector.py ADDED
@@ -0,0 +1,171 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # video_detector.py
2
+ # Primary: Sightengine video API
3
+ # Fallback: frame-by-frame using image detector
4
+
5
+ import requests
6
+ import os
7
+ import cv2
8
+ import uuid
9
+ import tempfile
10
+ from dotenv import load_dotenv
11
+ from image_detector import ImageDetector
12
+
13
+ load_dotenv()
14
+
15
+ SIGHTENGINE_USER = os.getenv("SIGHTENGINE_USER")
16
+ SIGHTENGINE_SECRET = os.getenv("SIGHTENGINE_SECRET")
17
+
18
+
19
+ class VideoDetector:
20
+ def __init__(self, image_detector: ImageDetector):
21
+ self.image_detector = image_detector
22
+ print("Video detector ready — Sightengine + frame fallback.")
23
+
24
+ def detect(self, video_path: str) -> dict:
25
+ # Try Sightengine video first
26
+ try:
27
+ return self._detect_via_sightengine(video_path)
28
+ except Exception as e:
29
+ print(f"Sightengine video failed ({e}), using frame-by-frame.")
30
+ return self._detect_frame_by_frame(video_path)
31
+
32
+ def _detect_via_sightengine(self, video_path: str) -> dict:
33
+ with open(video_path, "rb") as f:
34
+ response = requests.post(
35
+ "https://api.sightengine.com/1.0/video/check-sync.json",
36
+ files={"media": f},
37
+ data={
38
+ "models": "deepfake,genai",
39
+ "api_user": SIGHTENGINE_USER,
40
+ "api_secret": SIGHTENGINE_SECRET,
41
+ },
42
+ timeout=120
43
+ )
44
+
45
+ result = response.json()
46
+ print(f"Sightengine video raw: {result}")
47
+
48
+ if result.get("status") != "success":
49
+ raise RuntimeError(f"Sightengine error: {result}")
50
+
51
+ return self._parse_video_result(result, video_path)
52
+
53
+ def _parse_video_result(self, result: dict, video_path: str) -> dict:
54
+ # Video result has per-frame data
55
+ frames = result.get("data", {}).get("frames", [])
56
+
57
+ if not frames:
58
+ raise RuntimeError("No frames in Sightengine video response")
59
+
60
+ fake_scores = []
61
+ for frame in frames:
62
+ deepfake = frame.get("deepfake", {}).get("score", 0.0)
63
+ ai_gen = frame.get("type", {}).get("ai_generated", 0.0)
64
+ fake_scores.append(max(deepfake, ai_gen))
65
+
66
+ avg_fake = sum(fake_scores) / len(fake_scores)
67
+ fake_frames = sum(1 for s in fake_scores if s >= 0.5)
68
+ fake_ratio = fake_frames / len(fake_scores)
69
+ duration = self._get_duration(video_path)
70
+
71
+ return {
72
+ "verdict": "DEEPFAKE" if avg_fake >= 0.5 else "AUTHENTIC",
73
+ "confidence": round(max(avg_fake, 1 - avg_fake) * 100, 2),
74
+ "fake_probability": round(avg_fake * 100, 2),
75
+ "real_probability": round((1 - avg_fake) * 100, 2),
76
+ "severity": self._severity(avg_fake),
77
+ "detection_method": "sightengine_video",
78
+ "model_used": "sightengine-deepfake-genai",
79
+ "frames_analyzed": len(fake_scores),
80
+ "fake_frames_count": fake_frames,
81
+ "real_frames_count": len(fake_scores) - fake_frames,
82
+ "duration_seconds": duration,
83
+ "timeline": [],
84
+ }
85
+
86
+ def _detect_frame_by_frame(self, video_path: str, max_samples: int = 8) -> dict:
87
+ cap = cv2.VideoCapture(video_path)
88
+ if not cap.isOpened():
89
+ raise RuntimeError(f"Could not open video: {video_path}")
90
+
91
+ fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
92
+ total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
93
+ duration = round(total_frames / fps, 2)
94
+ sample_every = max(1, total_frames // max_samples)
95
+
96
+ print(f"Video: {duration}s | {total_frames} frames @ {fps:.1f} FPS")
97
+
98
+ frame_results = []
99
+ frame_count = 0
100
+ temp_dir = tempfile.gettempdir()
101
+
102
+ while cap.isOpened() and len(frame_results) < max_samples:
103
+ ret, frame = cap.read()
104
+ if not ret:
105
+ break
106
+
107
+ if frame_count % sample_every == 0:
108
+ temp_path = os.path.join(temp_dir, f"sentinel_{uuid.uuid4()}.jpg")
109
+ cv2.imwrite(temp_path, frame)
110
+ try:
111
+ timestamp = round(frame_count / fps, 2)
112
+ result = self.image_detector.detect(temp_path)
113
+ result["timestamp_seconds"] = timestamp
114
+ result["frame_number"] = frame_count
115
+ frame_results.append(result)
116
+ print(f" Frame {len(frame_results)}/{max_samples} @ {timestamp}s → {result['verdict']}")
117
+ except Exception as e:
118
+ print(f" Frame {frame_count} failed: {e}")
119
+ finally:
120
+ if os.path.exists(temp_path):
121
+ os.remove(temp_path)
122
+
123
+ frame_count += 1
124
+
125
+ cap.release()
126
+
127
+ if not frame_results:
128
+ raise RuntimeError("No frames could be analyzed.")
129
+
130
+ return self._aggregate(frame_results, duration)
131
+
132
+ def _aggregate(self, frame_results: list, duration: float) -> dict:
133
+ fake_frames = [r for r in frame_results if r["verdict"] == "DEEPFAKE"]
134
+ fake_ratio = len(fake_frames) / len(frame_results)
135
+ avg_fake_prob = sum(r["fake_probability"] for r in frame_results) / len(frame_results)
136
+ avg_conf = sum(r["confidence"] for r in frame_results) / len(frame_results)
137
+ worst = max(frame_results, key=lambda x: x.get("fake_probability", 0))
138
+
139
+ # Both conditions must be true to call DEEPFAKE
140
+ is_deepfake = fake_ratio >= 0.6 and avg_fake_prob >= 70.0
141
+
142
+ return {
143
+ "verdict": "DEEPFAKE" if is_deepfake else "AUTHENTIC",
144
+ "confidence": round(avg_conf, 2),
145
+ "fake_probability": round(avg_fake_prob, 2),
146
+ "real_probability": round(100 - avg_fake_prob, 2),
147
+ "severity": self._severity(avg_fake_prob / 100),
148
+ "detection_method": "frame_by_frame",
149
+ "model_used": "sightengine-deepfake-genai",
150
+ "frames_analyzed": len(frame_results),
151
+ "fake_frames_count": len(fake_frames),
152
+ "real_frames_count": len(frame_results) - len(fake_frames),
153
+ "duration_seconds": duration,
154
+ "most_suspicious_timestamp": worst.get("timestamp_seconds", 0),
155
+ "timeline": frame_results,
156
+ }
157
+
158
+ @staticmethod
159
+ def _get_duration(video_path: str) -> float:
160
+ cap = cv2.VideoCapture(video_path)
161
+ fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
162
+ frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
163
+ cap.release()
164
+ return round(frames / fps, 2)
165
+
166
+ @staticmethod
167
+ def _severity(score: float) -> str:
168
+ if score >= 0.90: return "CRITICAL"
169
+ if score >= 0.75: return "HIGH"
170
+ if score >= 0.50: return "MEDIUM"
171
+ return "LOW"