akagtag commited on
Commit
337b262
·
1 Parent(s): 19d9b40

Fix ZeroGPU model loading lifecycle

Browse files
requirements.txt CHANGED
@@ -1,4 +1,5 @@
1
  gradio==5.23.0
 
2
  fastapi>=0.111.0
3
  uvicorn[standard]>=0.29.0
4
  python-multipart>=0.0.9
 
1
  gradio==5.23.0
2
+ spaces>=0.30.2
3
  fastapi>=0.111.0
4
  uvicorn[standard]>=0.29.0
5
  python-multipart>=0.0.9
src/api/main.py CHANGED
@@ -69,6 +69,10 @@ if _is_test_mode():
69
  os.environ.setdefault("GENAI_SKIP_MODEL_LOAD", "1")
70
 
71
 
 
 
 
 
72
  app = FastAPI(title="GenAI-DeepDetect", version="1.0.0")
73
  app.add_middleware(
74
  CORSMiddleware,
@@ -294,6 +298,14 @@ async def preload() -> None:
294
  logger.info("Skipping startup preload in test mode")
295
  return
296
 
 
 
 
 
 
 
 
 
297
  logger.info("Preloading models...")
298
  # Keep model imports/loads sequential to avoid lazy-import race issues.
299
  await asyncio.to_thread(_fp._ensure)
 
69
  os.environ.setdefault("GENAI_SKIP_MODEL_LOAD", "1")
70
 
71
 
72
+ def _is_zero_gpu_space() -> bool:
73
+ return os.environ.get("SPACE_ID", "").startswith("akagtag/")
74
+
75
+
76
  app = FastAPI(title="GenAI-DeepDetect", version="1.0.0")
77
  app.add_middleware(
78
  CORSMiddleware,
 
298
  logger.info("Skipping startup preload in test mode")
299
  return
300
 
301
+ if _is_zero_gpu_space():
302
+ logger.info("Skipping startup preload on ZeroGPU; local models load inside @spaces.GPU calls")
303
+ return
304
+
305
+ if get_inference_backend() in {"hf", "runpod"}:
306
+ logger.info("Skipping startup preload for remote inference backend")
307
+ return
308
+
309
  logger.info("Preloading models...")
310
  # Keep model imports/loads sequential to avoid lazy-import race issues.
311
  await asyncio.to_thread(_fp._ensure)
src/engines/coherence/engine.py CHANGED
@@ -13,11 +13,6 @@ from typing import Optional
13
  import numpy as np
14
  from PIL import Image
15
 
16
- try:
17
- import spaces # type: ignore # noqa: F401
18
- except ImportError:
19
- spaces = None
20
-
21
  from src.types import EngineResult
22
 
23
  logger = logging.getLogger(__name__)
@@ -28,15 +23,11 @@ _mtcnn = None
28
  _resnet = None
29
  _face_mesh = None
30
  _torch = None
31
- _device = "cpu" # updated to "cuda" in _load() when GPU is available
32
  _resnet_fallback = None # torchvision ResNet-18 used when facenet-pytorch unavailable
33
  _transform_fallback = None
34
 
35
 
36
- def _prefer_cuda(torch_module) -> bool:
37
- return torch_module.cuda.is_available() or os.environ.get("SPACE_ID", "").startswith("akagtag/")
38
-
39
-
40
  def _skip_model_loads() -> bool:
41
  return os.environ.get("GENAI_SKIP_MODEL_LOAD", "").strip().lower() in {
42
  "1",
@@ -139,7 +130,7 @@ def _load() -> None:
139
  import torch # type: ignore
140
 
141
  _torch = torch
142
- _device = "cuda" if _prefer_cuda(torch) else "cpu"
143
  logger.info(" Coherence device: %s", _device)
144
 
145
  from facenet_pytorch import InceptionResnetV1, MTCNN # type: ignore
@@ -159,7 +150,7 @@ def _load() -> None:
159
  import torchvision.transforms as tv_transforms # type: ignore
160
 
161
  _torch = torch
162
- _device = "cuda" if _prefer_cuda(torch) else "cpu"
163
 
164
  model = tv_models.resnet18(weights=tv_models.ResNet18_Weights.DEFAULT)
165
  model.fc = torch.nn.Identity() # strip classifier → 512-d embedding
@@ -183,6 +174,47 @@ def _load() -> None:
183
  logger.info("Coherence model load attempt complete")
184
 
185
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
186
  class CoherenceEngine:
187
  def _ensure(self) -> None:
188
  with _lock:
@@ -191,18 +223,22 @@ class CoherenceEngine:
191
  def run(self, image: Image.Image) -> EngineResult:
192
  t0 = time.perf_counter()
193
  self._ensure()
 
 
 
 
 
194
 
195
- frame = np.array(image.convert("RGB"))
196
- score = self._image_score(frame)
197
-
198
- return EngineResult(
199
- engine="coherence",
200
- verdict="FAKE" if score > 0.5 else "REAL",
201
- confidence=float(np.clip(score, 0.0, 1.0)),
202
- attributed_generator=None,
203
- explanation=f"Geometric coherence anomaly {score:.2f} (image mode).",
204
- processing_time_ms=(time.perf_counter() - t0) * 1000,
205
- )
206
 
207
  def _image_score(self, frame: np.ndarray) -> float:
208
  if _face_mesh is None:
@@ -252,58 +288,69 @@ class CoherenceEngine:
252
  """
253
  t0 = time.perf_counter()
254
  self._ensure()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
255
 
256
- if not frames:
257
  return EngineResult(
258
  engine="coherence",
259
- verdict="UNKNOWN",
260
- confidence=0.5,
261
  attributed_generator=None,
262
- explanation="No frames.",
263
- processing_time_ms=0.0,
 
 
264
  )
265
-
266
- if len(frames) < 4:
267
- result = self.run(Image.fromarray(frames[0]))
268
- result.explanation = "Too few frames for temporal analysis."
269
- return result
270
-
271
- delta = self._embedding_variance(frames)
272
- jerk = self._landmark_jerk(frames)
273
- blink = self._blink_anomaly(frames)
274
- visual_score = float(np.clip(delta * 0.45 + jerk * 0.35 + blink * 0.20, 0.0, 1.0))
275
-
276
- audio_anomaly: Optional[float] = None
277
- timestamp_markers: list[dict] = []
278
- if video_path is not None:
279
- audio_anomaly, timestamp_markers = self._audio_lipsync_score(video_path, frames)
280
-
281
- if audio_anomaly is not None:
282
- score = float(np.clip(visual_score * 0.60 + audio_anomaly * 0.40, 0.0, 1.0))
283
- explanation = (
284
- f"Embedding variance {delta:.2f}, landmark jerk {jerk:.2f}, "
285
- f"blink anomaly {blink:.2f}. "
286
- f"Audio lip-sync anomaly {audio_anomaly:.2f} "
287
- f"({len(timestamp_markers)} flagged segment(s))."
288
- )
289
- else:
290
- score = visual_score
291
- explanation = (
292
- f"Embedding variance {delta:.2f}, "
293
- f"landmark jerk {jerk:.2f}, "
294
- f"blink anomaly {blink:.2f}."
295
- )
296
-
297
- return EngineResult(
298
- engine="coherence",
299
- verdict="FAKE" if score > 0.5 else "REAL",
300
- confidence=score,
301
- attributed_generator=None,
302
- explanation=explanation,
303
- processing_time_ms=(time.perf_counter() - t0) * 1000,
304
- audio_sync_score=audio_anomaly,
305
- timestamp_markers=timestamp_markers,
306
- )
307
 
308
  def _audio_lipsync_score(
309
  self,
 
13
  import numpy as np
14
  from PIL import Image
15
 
 
 
 
 
 
16
  from src.types import EngineResult
17
 
18
  logger = logging.getLogger(__name__)
 
23
  _resnet = None
24
  _face_mesh = None
25
  _torch = None
26
+ _device = "cpu"
27
  _resnet_fallback = None # torchvision ResNet-18 used when facenet-pytorch unavailable
28
  _transform_fallback = None
29
 
30
 
 
 
 
 
31
  def _skip_model_loads() -> bool:
32
  return os.environ.get("GENAI_SKIP_MODEL_LOAD", "").strip().lower() in {
33
  "1",
 
130
  import torch # type: ignore
131
 
132
  _torch = torch
133
+ _device = "cpu"
134
  logger.info(" Coherence device: %s", _device)
135
 
136
  from facenet_pytorch import InceptionResnetV1, MTCNN # type: ignore
 
150
  import torchvision.transforms as tv_transforms # type: ignore
151
 
152
  _torch = torch
153
+ _device = "cpu"
154
 
155
  model = tv_models.resnet18(weights=tv_models.ResNet18_Weights.DEFAULT)
156
  model.fc = torch.nn.Identity() # strip classifier → 512-d embedding
 
174
  logger.info("Coherence model load attempt complete")
175
 
176
 
177
+ def _inference_device() -> str:
178
+ if _torch is None:
179
+ return "cpu"
180
+ try:
181
+ return "cuda" if _torch.cuda.is_available() else "cpu"
182
+ except Exception:
183
+ return "cpu"
184
+
185
+
186
+ def _prepare_runtime(device: str) -> None:
187
+ global _device
188
+ _device = device
189
+ if device != "cuda":
190
+ return
191
+ if _resnet is not None:
192
+ _resnet.to(device)
193
+ if _resnet_fallback is not None:
194
+ _resnet_fallback.to(device)
195
+
196
+
197
+ def _release_runtime(device: str) -> None:
198
+ global _device
199
+ _device = "cpu"
200
+ if device != "cuda" or _torch is None:
201
+ return
202
+ if _resnet is not None:
203
+ try:
204
+ _resnet.to("cpu")
205
+ except Exception:
206
+ pass
207
+ if _resnet_fallback is not None:
208
+ try:
209
+ _resnet_fallback.to("cpu")
210
+ except Exception:
211
+ pass
212
+ try:
213
+ _torch.cuda.empty_cache()
214
+ except Exception:
215
+ pass
216
+
217
+
218
  class CoherenceEngine:
219
  def _ensure(self) -> None:
220
  with _lock:
 
223
  def run(self, image: Image.Image) -> EngineResult:
224
  t0 = time.perf_counter()
225
  self._ensure()
226
+ device = _inference_device()
227
+ _prepare_runtime(device)
228
+ try:
229
+ frame = np.array(image.convert("RGB"))
230
+ score = self._image_score(frame)
231
 
232
+ return EngineResult(
233
+ engine="coherence",
234
+ verdict="FAKE" if score > 0.5 else "REAL",
235
+ confidence=float(np.clip(score, 0.0, 1.0)),
236
+ attributed_generator=None,
237
+ explanation=f"Geometric coherence anomaly {score:.2f} (image mode).",
238
+ processing_time_ms=(time.perf_counter() - t0) * 1000,
239
+ )
240
+ finally:
241
+ _release_runtime(device)
 
242
 
243
  def _image_score(self, frame: np.ndarray) -> float:
244
  if _face_mesh is None:
 
288
  """
289
  t0 = time.perf_counter()
290
  self._ensure()
291
+ device = _inference_device()
292
+ _prepare_runtime(device)
293
+
294
+ try:
295
+ if not frames:
296
+ return EngineResult(
297
+ engine="coherence",
298
+ verdict="UNKNOWN",
299
+ confidence=0.5,
300
+ attributed_generator=None,
301
+ explanation="No frames.",
302
+ processing_time_ms=0.0,
303
+ )
304
+
305
+ if len(frames) < 4:
306
+ score = self._image_score(frames[0])
307
+ return EngineResult(
308
+ engine="coherence",
309
+ verdict="FAKE" if score > 0.5 else "REAL",
310
+ confidence=float(np.clip(score, 0.0, 1.0)),
311
+ attributed_generator=None,
312
+ explanation="Too few frames for temporal analysis.",
313
+ processing_time_ms=(time.perf_counter() - t0) * 1000,
314
+ )
315
+
316
+ delta = self._embedding_variance(frames)
317
+ jerk = self._landmark_jerk(frames)
318
+ blink = self._blink_anomaly(frames)
319
+ visual_score = float(np.clip(delta * 0.45 + jerk * 0.35 + blink * 0.20, 0.0, 1.0))
320
+
321
+ audio_anomaly: Optional[float] = None
322
+ timestamp_markers: list[dict] = []
323
+ if video_path is not None:
324
+ audio_anomaly, timestamp_markers = self._audio_lipsync_score(video_path, frames)
325
+
326
+ if audio_anomaly is not None:
327
+ score = float(np.clip(visual_score * 0.60 + audio_anomaly * 0.40, 0.0, 1.0))
328
+ explanation = (
329
+ f"Embedding variance {delta:.2f}, landmark jerk {jerk:.2f}, "
330
+ f"blink anomaly {blink:.2f}. "
331
+ f"Audio lip-sync anomaly {audio_anomaly:.2f} "
332
+ f"({len(timestamp_markers)} flagged segment(s))."
333
+ )
334
+ else:
335
+ score = visual_score
336
+ explanation = (
337
+ f"Embedding variance {delta:.2f}, "
338
+ f"landmark jerk {jerk:.2f}, "
339
+ f"blink anomaly {blink:.2f}."
340
+ )
341
 
 
342
  return EngineResult(
343
  engine="coherence",
344
+ verdict="FAKE" if score > 0.5 else "REAL",
345
+ confidence=score,
346
  attributed_generator=None,
347
+ explanation=explanation,
348
+ processing_time_ms=(time.perf_counter() - t0) * 1000,
349
+ audio_sync_score=audio_anomaly,
350
+ timestamp_markers=timestamp_markers,
351
  )
352
+ finally:
353
+ _release_runtime(device)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
354
 
355
  def _audio_lipsync_score(
356
  self,
src/engines/fingerprint/engine.py CHANGED
@@ -17,24 +17,11 @@ import torch
17
  from PIL import Image
18
  from transformers import CLIPModel, CLIPProcessor
19
 
20
- try:
21
- import spaces # type: ignore # noqa: F401
22
- except ImportError:
23
- spaces = None
24
-
25
  from src.types import EngineResult
26
 
27
  logger = logging.getLogger(__name__)
28
  CACHE = os.environ.get("MODEL_CACHE_DIR", "/tmp/models")
29
 
30
- def _prefer_cuda() -> bool:
31
- return torch.cuda.is_available() or os.environ.get("SPACE_ID", "").startswith("akagtag/")
32
-
33
-
34
- # GPU device selection — ZeroGPU emulates CUDA outside the decorated section.
35
- _DEVICE = "cuda" if _prefer_cuda() else "cpu"
36
- _PIPELINE_DEVICE = 0 if _DEVICE == "cuda" else -1 # HF pipeline convention
37
-
38
  DETECTOR_CANDIDATES = [
39
  "Organika/sdxl-detector",
40
  "haywoodsloan/ai-image-detector-deploy",
@@ -43,14 +30,14 @@ DETECTOR_CANDIDATES = [
43
  ]
44
 
45
  GENERATOR_PROMPTS: dict[str, str] = {
46
- "real": "photograph with natural film grain, uneven organic noise, authentic lens distortion, and real-world lighting imperfections",
47
- "sora": "AI video frame with unnaturally smooth temporal transitions, photorealistic but physically implausible motion, and over-consistent lighting",
48
- "runway": "AI video frame with painterly color grading artifacts, dreamlike motion blur inconsistencies, and synthetic depth-of-field",
49
- "wav2lip": "face with sharp unnatural lip boundary artifacts, texture discontinuity around the mouth region, and mismatched skin tone at lip edges",
50
- "stable_diffusion": "image with soft overly-smooth skin, color bleeding at object edges, dreamlike over-saturation, and repeating background texture patterns",
51
- "sdxl": "image with hyper-sharp commercial detail, perfect noise-free skin, unnaturally crisp edges, and over-rendered textures lacking real-world imperfection",
52
- "midjourney": "image with dramatic cinematic vignette, fantasy color palette, exaggerated contrast, hyper-detailed surreal aesthetic, and painterly over-rendering",
53
- "dall_e": "image with clean flat graphic style, smooth AI-blended gradients, slightly plastic surface quality, and uniformly lit commercial illustration look",
54
  "unknown_generative": "image with subtle AI artifacts including unnatural smoothness, inconsistent frequency patterns, and synthetic pixel-level regularities absent in real photos",
55
  }
56
 
@@ -103,10 +90,9 @@ def _short_error(exc: Exception, *, limit: int = 300) -> str:
103
 
104
  def _build_detector(model_id: str) -> Any:
105
  hf_pipeline = _get_pipeline()
106
- # Try GPU first, fall back to CPU-only variants
107
- attempts: tuple[dict, ...] = (
108
- {"cache_dir": CACHE, "device": _PIPELINE_DEVICE},
109
- {"device": _PIPELINE_DEVICE},
110
  {"cache_dir": CACHE},
111
  {},
112
  )
@@ -126,12 +112,12 @@ def _load() -> None:
126
  if _loaded:
127
  return
128
 
129
- logger.info("Fingerprint engine: loading models on device=%s ...", _DEVICE)
130
 
131
  for model_id in DETECTOR_CANDIDATES:
132
  try:
133
- det = _build_detector(model_id)
134
- _detectors.append((model_id, det))
135
  logger.info(" detector loaded: %s", model_id)
136
  except Exception as exc:
137
  logger.warning(" detector unavailable (%s): %s", model_id, _short_error(exc))
@@ -140,31 +126,74 @@ def _load() -> None:
140
  logger.error("Fingerprint engine: no detectors loaded; using neutral fallback score.")
141
 
142
  try:
143
- # Load CLIP in FP16 on CUDA for ~2× speed + half memory on A100
144
- dtype = torch.float16 if _DEVICE == "cuda" else torch.float32
145
  _clip_model = CLIPModel.from_pretrained(
146
  "openai/clip-vit-large-patch14",
147
  cache_dir=CACHE,
148
- torch_dtype=dtype,
149
- ).to(_DEVICE)
150
  _clip_processor = CLIPProcessor.from_pretrained(
151
  "openai/clip-vit-large-patch14",
152
  cache_dir=CACHE,
153
  )
154
  _clip_model.eval()
155
- logger.info(" CLIP loaded on %s (dtype=%s)", _DEVICE, dtype)
156
  except Exception as exc:
157
  logger.warning(" CLIP unavailable: %s", _short_error(exc))
158
 
159
  _loaded = True
160
  logger.info(
161
- "Fingerprint engine ready: %s detectors, CLIP=%s, device=%s",
162
  len(_detectors),
163
  "ok" if _clip_model else "missing",
164
- _DEVICE,
165
  )
166
 
167
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
168
  def _fake_score_from_preds(preds: list[dict[str, Any]]) -> float:
169
  if not preds:
170
  return 0.5
@@ -197,45 +226,48 @@ class FingerprintEngine:
197
  def run(self, image: Image.Image) -> EngineResult:
198
  t0 = time.perf_counter()
199
  self._ensure()
 
 
200
 
201
- if image.mode != "RGB":
202
- image = image.convert("RGB")
203
-
204
- detector_weights = [0.4, 0.3, 0.2, 0.1]
205
- total_w = 0.0
206
- weighted_fake = 0.0
207
-
208
- for index, (model_id, det) in enumerate(_detectors):
209
- try:
210
- preds = det(image)
211
- score = _fake_score_from_preds(preds)
212
- weight = detector_weights[index] if index < len(detector_weights) else 0.1
213
- weighted_fake += score * weight
214
- total_w += weight
215
- logger.debug("%s fake_score=%.3f", model_id, score)
216
- except Exception as exc:
217
- logger.warning("Detector %s inference error: %s", model_id, _short_error(exc))
218
-
219
- ensemble_score = (weighted_fake / total_w) if total_w > 0 else 0.5
220
-
221
- dct_score = self._dct_frequency_score(image)
222
- fake_score = float(np.clip(ensemble_score * 0.85 + dct_score * 0.15, 0.0, 1.0))
223
-
224
- generator = self._attribute_generator(image, fake_score)
225
-
226
- return EngineResult(
227
- engine="fingerprint",
228
- verdict="FAKE" if fake_score > 0.5 else "REAL",
229
- confidence=float(fake_score),
230
- attributed_generator=generator,
231
- explanation=(
232
- f"Ensemble {ensemble_score:.2f} × 0.85 + DCT {dct_score:.2f} × 0.15 = {fake_score:.2f}. "
233
- f"Generator attributed to: {generator}."
234
- ),
235
- processing_time_ms=(time.perf_counter() - t0) * 1000,
236
- )
237
 
238
- def _attribute_generator(self, image: Image.Image, fake_score: float) -> str:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
239
  if _clip_model is None or _clip_processor is None:
240
  _thread_local.last_clip_embedding = None
241
  return "unknown_generative" if fake_score > 0.5 else "real"
@@ -250,11 +282,10 @@ class FingerprintEngine:
250
  truncation=True,
251
  max_length=77,
252
  )
253
- # Move all tensors to GPU
254
- inputs = {k: v.to(_DEVICE) for k, v in inputs.items()}
255
 
256
  with torch.no_grad():
257
- with torch.cuda.amp.autocast(enabled=(_DEVICE == "cuda")):
258
  outputs = _clip_model(**inputs)
259
  logits = outputs.logits_per_image[0].float()
260
  image_embeds = outputs.image_embeds.detach().float().cpu().numpy()[0]
@@ -279,16 +310,13 @@ class FingerprintEngine:
279
  return "unknown_generative" if fake_score > 0.5 else "real"
280
 
281
  def _batch_clip_attribution(
282
- self, images: list[Image.Image], fake_scores: list[float]
 
 
 
283
  ) -> list[str]:
284
- """
285
- Single batched CLIP forward pass for all keyframes — far faster than
286
- calling _attribute_generator() once per frame on GPU.
287
- """
288
  if _clip_model is None or _clip_processor is None or not images:
289
- return [
290
- "unknown_generative" if s > 0.5 else "real" for s in fake_scores
291
- ]
292
 
293
  try:
294
  texts = list(GENERATOR_PROMPTS.values())
@@ -300,14 +328,13 @@ class FingerprintEngine:
300
  truncation=True,
301
  max_length=77,
302
  )
303
- inputs = {k: v.to(_DEVICE) for k, v in inputs.items()}
304
 
305
  with torch.no_grad():
306
- with torch.cuda.amp.autocast(enabled=(_DEVICE == "cuda")):
307
- # logits_per_image: (N_images, N_texts)
308
  logits = _clip_model(**inputs).logits_per_image.float()
309
 
310
- probs_batch = logits.softmax(dim=-1).cpu().numpy() # (N, 9)
311
  keys = list(GENERATOR_PROMPTS.keys())
312
  results: list[str] = []
313
 
@@ -315,24 +342,22 @@ class FingerprintEngine:
315
  probs = probs_batch[i]
316
  max_prob = float(np.max(probs))
317
  if max_prob < 0.32:
318
- gen = "unknown_generative"
319
  else:
320
- gen = keys[int(np.argmax(probs))]
321
- if fake_score > 0.65 and gen == "real":
322
- gen = "unknown_generative"
323
- if fake_score < 0.35 and gen != "real":
324
- gen = "real"
325
- results.append(gen)
326
 
327
  return results
328
  except Exception as exc:
329
  logger.warning("Batch CLIP attribution error: %s", _short_error(exc))
330
- return [
331
- "unknown_generative" if s > 0.5 else "real" for s in fake_scores
332
- ]
333
 
334
  def _dct_frequency_score(self, image: Image.Image) -> float:
335
- """DCT frequency band analysis (paper §III-B). Runs on CPU (block-level)."""
336
  try:
337
  from scipy.fft import dctn # type: ignore
338
 
@@ -363,7 +388,6 @@ class FingerprintEngine:
363
  return 0.3
364
 
365
  def get_last_clip_embedding(self) -> Optional[np.ndarray]:
366
- """Return the CLIP image embedding from the most recent run() call in this thread."""
367
  return getattr(_thread_local, "last_clip_embedding", None)
368
 
369
  def run_video(self, frames: list) -> EngineResult:
@@ -378,42 +402,43 @@ class FingerprintEngine:
378
  )
379
 
380
  self._ensure()
381
- keyframes = frames[::8] or [frames[0]]
382
- keyframes_pil = [
383
- Image.fromarray(f).convert("RGB") for f in keyframes
384
- ]
385
-
386
- # Batch detector scores (HF pipeline accepts a list)
387
- detector_weights = [0.4, 0.3, 0.2, 0.1]
388
- frame_scores: list[float] = []
389
- for img in keyframes_pil:
390
- total_w = 0.0
391
- weighted_fake = 0.0
392
- for index, (model_id, det) in enumerate(_detectors):
393
- try:
394
- preds = det(img)
395
- score = _fake_score_from_preds(preds)
396
- weight = detector_weights[index] if index < len(detector_weights) else 0.1
397
- weighted_fake += score * weight
398
- total_w += weight
399
- except Exception:
400
- pass
401
- frame_scores.append((weighted_fake / total_w) if total_w > 0 else 0.5)
402
-
403
- # Single batched CLIP pass for all keyframes
404
- generators = self._batch_clip_attribution(keyframes_pil, frame_scores)
405
-
406
- avg_conf = float(np.mean(frame_scores))
407
- top_gen = max(set(generators), key=generators.count) if generators else "unknown_generative"
408
-
409
- return EngineResult(
410
- engine="fingerprint",
411
- verdict="FAKE" if avg_conf > 0.5 else "REAL",
412
- confidence=avg_conf,
413
- attributed_generator=top_gen,
414
- explanation=(
415
- f"Keyframe average fake score {avg_conf:.2f} over {len(keyframes)} sampled frames. "
416
- f"Dominant generator: {top_gen}."
417
- ),
418
- processing_time_ms=(time.perf_counter() - t0) * 1000,
419
- )
 
 
17
  from PIL import Image
18
  from transformers import CLIPModel, CLIPProcessor
19
 
 
 
 
 
 
20
  from src.types import EngineResult
21
 
22
  logger = logging.getLogger(__name__)
23
  CACHE = os.environ.get("MODEL_CACHE_DIR", "/tmp/models")
24
 
 
 
 
 
 
 
 
 
25
  DETECTOR_CANDIDATES = [
26
  "Organika/sdxl-detector",
27
  "haywoodsloan/ai-image-detector-deploy",
 
30
  ]
31
 
32
  GENERATOR_PROMPTS: dict[str, str] = {
33
+ "real": "photograph with natural film grain, uneven organic noise, authentic lens distortion, and real-world lighting imperfections",
34
+ "sora": "AI video frame with unnaturally smooth temporal transitions, photorealistic but physically implausible motion, and over-consistent lighting",
35
+ "runway": "AI video frame with painterly color grading artifacts, dreamlike motion blur inconsistencies, and synthetic depth-of-field",
36
+ "wav2lip": "face with sharp unnatural lip boundary artifacts, texture discontinuity around the mouth region, and mismatched skin tone at lip edges",
37
+ "stable_diffusion": "image with soft overly-smooth skin, color bleeding at object edges, dreamlike over-saturation, and repeating background texture patterns",
38
+ "sdxl": "image with hyper-sharp commercial detail, perfect noise-free skin, unnaturally crisp edges, and over-rendered textures lacking real-world imperfection",
39
+ "midjourney": "image with dramatic cinematic vignette, fantasy color palette, exaggerated contrast, hyper-detailed surreal aesthetic, and painterly over-rendering",
40
+ "dall_e": "image with clean flat graphic style, smooth AI-blended gradients, slightly plastic surface quality, and uniformly lit commercial illustration look",
41
  "unknown_generative": "image with subtle AI artifacts including unnatural smoothness, inconsistent frequency patterns, and synthetic pixel-level regularities absent in real photos",
42
  }
43
 
 
90
 
91
  def _build_detector(model_id: str) -> Any:
92
  hf_pipeline = _get_pipeline()
93
+ attempts: tuple[dict[str, Any], ...] = (
94
+ {"cache_dir": CACHE, "device": -1},
95
+ {"device": -1},
 
96
  {"cache_dir": CACHE},
97
  {},
98
  )
 
112
  if _loaded:
113
  return
114
 
115
+ logger.info("Fingerprint engine: loading models on CPU ...")
116
 
117
  for model_id in DETECTOR_CANDIDATES:
118
  try:
119
+ detector = _build_detector(model_id)
120
+ _detectors.append((model_id, detector))
121
  logger.info(" detector loaded: %s", model_id)
122
  except Exception as exc:
123
  logger.warning(" detector unavailable (%s): %s", model_id, _short_error(exc))
 
126
  logger.error("Fingerprint engine: no detectors loaded; using neutral fallback score.")
127
 
128
  try:
 
 
129
  _clip_model = CLIPModel.from_pretrained(
130
  "openai/clip-vit-large-patch14",
131
  cache_dir=CACHE,
132
+ torch_dtype=torch.float32,
133
+ ).to("cpu")
134
  _clip_processor = CLIPProcessor.from_pretrained(
135
  "openai/clip-vit-large-patch14",
136
  cache_dir=CACHE,
137
  )
138
  _clip_model.eval()
139
+ logger.info(" CLIP loaded on cpu")
140
  except Exception as exc:
141
  logger.warning(" CLIP unavailable: %s", _short_error(exc))
142
 
143
  _loaded = True
144
  logger.info(
145
+ "Fingerprint engine ready: %s detectors, CLIP=%s",
146
  len(_detectors),
147
  "ok" if _clip_model else "missing",
 
148
  )
149
 
150
 
151
+ def _inference_device() -> str:
152
+ try:
153
+ return "cuda" if torch.cuda.is_available() else "cpu"
154
+ except Exception:
155
+ return "cpu"
156
+
157
+
158
+ def _move_detector(detector: Any, device: str) -> None:
159
+ model = getattr(detector, "model", None)
160
+ if model is not None and hasattr(model, "to"):
161
+ model.to(device)
162
+ if hasattr(detector, "device"):
163
+ detector.device = torch.device(device)
164
+
165
+
166
+ def _prepare_runtime(device: str) -> None:
167
+ if device != "cuda":
168
+ return
169
+ for _, detector in _detectors:
170
+ try:
171
+ _move_detector(detector, device)
172
+ except Exception as exc:
173
+ logger.warning("Fingerprint detector GPU move failed: %s", _short_error(exc))
174
+ if _clip_model is not None:
175
+ _clip_model.to(device)
176
+
177
+
178
+ def _release_runtime(device: str) -> None:
179
+ if device != "cuda":
180
+ return
181
+ for _, detector in _detectors:
182
+ try:
183
+ _move_detector(detector, "cpu")
184
+ except Exception:
185
+ pass
186
+ if _clip_model is not None:
187
+ try:
188
+ _clip_model.to("cpu")
189
+ except Exception:
190
+ pass
191
+ try:
192
+ torch.cuda.empty_cache()
193
+ except Exception:
194
+ pass
195
+
196
+
197
  def _fake_score_from_preds(preds: list[dict[str, Any]]) -> float:
198
  if not preds:
199
  return 0.5
 
226
  def run(self, image: Image.Image) -> EngineResult:
227
  t0 = time.perf_counter()
228
  self._ensure()
229
+ device = _inference_device()
230
+ _prepare_runtime(device)
231
 
232
+ try:
233
+ if image.mode != "RGB":
234
+ image = image.convert("RGB")
235
+
236
+ detector_weights = [0.4, 0.3, 0.2, 0.1]
237
+ total_w = 0.0
238
+ weighted_fake = 0.0
239
+
240
+ for index, (model_id, detector) in enumerate(_detectors):
241
+ try:
242
+ preds = detector(image)
243
+ score = _fake_score_from_preds(preds)
244
+ weight = detector_weights[index] if index < len(detector_weights) else 0.1
245
+ weighted_fake += score * weight
246
+ total_w += weight
247
+ logger.debug("%s fake_score=%.3f", model_id, score)
248
+ except Exception as exc:
249
+ logger.warning("Detector %s inference error: %s", model_id, _short_error(exc))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
250
 
251
+ ensemble_score = (weighted_fake / total_w) if total_w > 0 else 0.5
252
+ dct_score = self._dct_frequency_score(image)
253
+ fake_score = float(np.clip(ensemble_score * 0.85 + dct_score * 0.15, 0.0, 1.0))
254
+ generator = self._attribute_generator(image, fake_score, device)
255
+
256
+ return EngineResult(
257
+ engine="fingerprint",
258
+ verdict="FAKE" if fake_score > 0.5 else "REAL",
259
+ confidence=float(fake_score),
260
+ attributed_generator=generator,
261
+ explanation=(
262
+ f"Ensemble {ensemble_score:.2f} x 0.85 + DCT {dct_score:.2f} x 0.15 = {fake_score:.2f}. "
263
+ f"Generator attributed to: {generator}."
264
+ ),
265
+ processing_time_ms=(time.perf_counter() - t0) * 1000,
266
+ )
267
+ finally:
268
+ _release_runtime(device)
269
+
270
+ def _attribute_generator(self, image: Image.Image, fake_score: float, device: str) -> str:
271
  if _clip_model is None or _clip_processor is None:
272
  _thread_local.last_clip_embedding = None
273
  return "unknown_generative" if fake_score > 0.5 else "real"
 
282
  truncation=True,
283
  max_length=77,
284
  )
285
+ inputs = {k: v.to(device) for k, v in inputs.items()}
 
286
 
287
  with torch.no_grad():
288
+ with torch.cuda.amp.autocast(enabled=(device == "cuda")):
289
  outputs = _clip_model(**inputs)
290
  logits = outputs.logits_per_image[0].float()
291
  image_embeds = outputs.image_embeds.detach().float().cpu().numpy()[0]
 
310
  return "unknown_generative" if fake_score > 0.5 else "real"
311
 
312
  def _batch_clip_attribution(
313
+ self,
314
+ images: list[Image.Image],
315
+ fake_scores: list[float],
316
+ device: str,
317
  ) -> list[str]:
 
 
 
 
318
  if _clip_model is None or _clip_processor is None or not images:
319
+ return ["unknown_generative" if s > 0.5 else "real" for s in fake_scores]
 
 
320
 
321
  try:
322
  texts = list(GENERATOR_PROMPTS.values())
 
328
  truncation=True,
329
  max_length=77,
330
  )
331
+ inputs = {k: v.to(device) for k, v in inputs.items()}
332
 
333
  with torch.no_grad():
334
+ with torch.cuda.amp.autocast(enabled=(device == "cuda")):
 
335
  logits = _clip_model(**inputs).logits_per_image.float()
336
 
337
+ probs_batch = logits.softmax(dim=-1).cpu().numpy()
338
  keys = list(GENERATOR_PROMPTS.keys())
339
  results: list[str] = []
340
 
 
342
  probs = probs_batch[i]
343
  max_prob = float(np.max(probs))
344
  if max_prob < 0.32:
345
+ generator = "unknown_generative"
346
  else:
347
+ generator = keys[int(np.argmax(probs))]
348
+ if fake_score > 0.65 and generator == "real":
349
+ generator = "unknown_generative"
350
+ if fake_score < 0.35 and generator != "real":
351
+ generator = "real"
352
+ results.append(generator)
353
 
354
  return results
355
  except Exception as exc:
356
  logger.warning("Batch CLIP attribution error: %s", _short_error(exc))
357
+ return ["unknown_generative" if s > 0.5 else "real" for s in fake_scores]
 
 
358
 
359
  def _dct_frequency_score(self, image: Image.Image) -> float:
360
+ """DCT frequency band analysis (paper section III-B). Runs on CPU."""
361
  try:
362
  from scipy.fft import dctn # type: ignore
363
 
 
388
  return 0.3
389
 
390
  def get_last_clip_embedding(self) -> Optional[np.ndarray]:
 
391
  return getattr(_thread_local, "last_clip_embedding", None)
392
 
393
  def run_video(self, frames: list) -> EngineResult:
 
402
  )
403
 
404
  self._ensure()
405
+ device = _inference_device()
406
+ _prepare_runtime(device)
407
+
408
+ try:
409
+ keyframes = frames[::8] or [frames[0]]
410
+ keyframes_pil = [Image.fromarray(frame).convert("RGB") for frame in keyframes]
411
+
412
+ detector_weights = [0.4, 0.3, 0.2, 0.1]
413
+ frame_scores: list[float] = []
414
+ for image in keyframes_pil:
415
+ total_w = 0.0
416
+ weighted_fake = 0.0
417
+ for index, (_, detector) in enumerate(_detectors):
418
+ try:
419
+ preds = detector(image)
420
+ score = _fake_score_from_preds(preds)
421
+ weight = detector_weights[index] if index < len(detector_weights) else 0.1
422
+ weighted_fake += score * weight
423
+ total_w += weight
424
+ except Exception:
425
+ pass
426
+ frame_scores.append((weighted_fake / total_w) if total_w > 0 else 0.5)
427
+
428
+ generators = self._batch_clip_attribution(keyframes_pil, frame_scores, device)
429
+ avg_conf = float(np.mean(frame_scores))
430
+ top_gen = max(set(generators), key=generators.count) if generators else "unknown_generative"
431
+
432
+ return EngineResult(
433
+ engine="fingerprint",
434
+ verdict="FAKE" if avg_conf > 0.5 else "REAL",
435
+ confidence=avg_conf,
436
+ attributed_generator=top_gen,
437
+ explanation=(
438
+ f"Keyframe average fake score {avg_conf:.2f} over {len(keyframes)} sampled frames. "
439
+ f"Dominant generator: {top_gen}."
440
+ ),
441
+ processing_time_ms=(time.perf_counter() - t0) * 1000,
442
+ )
443
+ finally:
444
+ _release_runtime(device)
src/engines/sstgnn/engine.py CHANGED
@@ -12,24 +12,11 @@ import numpy as np
12
  import torch
13
  from PIL import Image
14
 
15
- try:
16
- import spaces # type: ignore # noqa: F401
17
- except ImportError:
18
- spaces = None
19
-
20
  from src.types import EngineResult
21
 
22
  logger = logging.getLogger(__name__)
23
  CACHE = os.environ.get("MODEL_CACHE_DIR", "/tmp/models")
24
 
25
- def _prefer_cuda() -> bool:
26
- return torch.cuda.is_available() or os.environ.get("SPACE_ID", "").startswith("akagtag/")
27
-
28
-
29
- # GPU device selection
30
- _DEVICE = "cuda" if _prefer_cuda() else "cpu"
31
- _PIPELINE_DEVICE = 0 if _DEVICE == "cuda" else -1 # HF pipeline convention
32
-
33
  _lock = threading.Lock()
34
  _load_attempted = False
35
  _detectors: list[Any] = []
@@ -53,6 +40,9 @@ _FAKE_LABEL_KEYWORDS = (
53
  "1",
54
  )
55
 
 
 
 
56
 
57
  def _skip_model_loads() -> bool:
58
  return os.environ.get("GENAI_SKIP_MODEL_LOAD", "").strip().lower() in {
@@ -80,10 +70,9 @@ def _short_error(exc: Exception, *, limit: int = 300) -> str:
80
 
81
  def _build_image_classifier(model_id: str) -> Any:
82
  pipeline = _get_pipeline()
83
- # Try with GPU first, fall back gracefully
84
- attempts: tuple[dict, ...] = (
85
- {"cache_dir": CACHE, "device": _PIPELINE_DEVICE},
86
- {"device": _PIPELINE_DEVICE},
87
  {"cache_dir": CACHE},
88
  {},
89
  )
@@ -119,10 +108,6 @@ def _fake_prob_from_preds(preds: list[dict[str, Any]]) -> float:
119
  return float(np.clip(fake_best, 0.0, 1.0))
120
 
121
 
122
- KEYPOINT_STEP = 7
123
- KEYPOINT_COUNT = 68
124
-
125
-
126
  class _TasksFaceMeshAdapter:
127
  def __init__(self, mp_module, landmarker) -> None:
128
  self._mp = mp_module
@@ -195,7 +180,7 @@ def _load() -> None:
195
  logger.info("Skipping SSTGNN model load (GENAI_SKIP_MODEL_LOAD=1)")
196
  return
197
 
198
- logger.info("Loading SSTGNN models on device=%s ...", _DEVICE)
199
 
200
  try:
201
  configured_models = [
@@ -234,7 +219,46 @@ def _load() -> None:
234
  except Exception:
235
  _delaunay = None
236
 
237
- logger.info("SSTGNN model load attempt complete (device=%s)", _DEVICE)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
238
 
239
 
240
  class SSTGNNEngine:
@@ -245,27 +269,31 @@ class SSTGNNEngine:
245
  def run(self, image: Image.Image) -> EngineResult:
246
  t0 = time.perf_counter()
247
  self._ensure()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
248
 
249
- if image.mode != "RGB":
250
- image = image.convert("RGB")
251
-
252
- cnn = self._cnn_score(image)
253
- graph = self._geometry_score(np.array(image))
254
- if _detectors:
255
- final = float(np.clip(cnn * 0.70 + graph * 0.30, 0.0, 1.0))
256
- note = f"CNN ensemble {cnn:.2f}; geometric graph anomaly {graph:.2f}."
257
- else:
258
- final = float(np.clip(graph, 0.0, 1.0))
259
- note = f"Geometric graph anomaly {graph:.2f} (cnn fallback unavailable)."
260
-
261
- return EngineResult(
262
- engine="sstgnn",
263
- verdict="FAKE" if final > 0.5 else "REAL",
264
- confidence=final,
265
- attributed_generator=None,
266
- explanation=note,
267
- processing_time_ms=(time.perf_counter() - t0) * 1000,
268
- )
269
 
270
  def _cnn_score(self, image: Image.Image) -> float:
271
  if not _detectors:
@@ -287,10 +315,6 @@ class SSTGNNEngine:
287
  return 0.5
288
 
289
  def _batch_cnn_scores(self, images: list[Image.Image]) -> list[float]:
290
- """
291
- Pass a batch of images through each detector at once — HF pipeline
292
- accepts a list and handles batching internally on GPU.
293
- """
294
  if not _detectors or not images:
295
  return [0.5] * len(images)
296
 
@@ -301,7 +325,6 @@ class SSTGNNEngine:
301
  for index, detector in enumerate(_detectors):
302
  weight = _detector_weights[index] if index < len(_detector_weights) else 1.0
303
  try:
304
- # Pass the full list — GPU pipeline processes all frames in one batch
305
  batch_preds = detector(images)
306
  for i, preds in enumerate(batch_preds):
307
  score = _fake_prob_from_preds(preds if isinstance(preds, list) else [preds])
@@ -346,16 +369,11 @@ class SSTGNNEngine:
346
  arr = np.array(areas, dtype=np.float32)
347
  cv_score = float(np.std(arr) / (np.mean(arr) + 1e-9))
348
  return float(np.clip((cv_score - 0.8) / 1.5, 0.0, 1.0))
349
-
350
  except Exception as exc:
351
  logger.warning("Geometry score error: %s", exc)
352
  return 0.3
353
 
354
  def _temporal_fft_score(self, frames: list[np.ndarray]) -> float:
355
- """
356
- Pixel-wise 1D FFT over the time axis (paper §III-C / Kim et al. [7]).
357
- Uses torch.fft on GPU for ~10× speedup over numpy on A100.
358
- """
359
  try:
360
  import cv2 # type: ignore
361
 
@@ -370,24 +388,23 @@ class SSTGNNEngine:
370
  gray_stack = np.array(
371
  [
372
  cv2.resize(
373
- cv2.cvtColor(f, cv2.COLOR_RGB2GRAY)
374
- if (f.ndim == 3 and f.shape[2] >= 3)
375
- else f[:, :, 0] if f.ndim == 3 else f,
376
  (32, 32),
377
  ).astype(np.float32)
378
- for f in sampled
379
  ]
380
- ) # shape: (T, 32, 32)
381
 
382
- if _DEVICE == "cuda":
383
- # GPU path: torch.fft on A100 is dramatically faster
384
- gray_tensor = torch.from_numpy(gray_stack).to(_DEVICE) # (T, 32, 32)
385
- fft_result = torch.fft.rfft(gray_tensor, dim=0) # (T//2+1, 32, 32)
386
  power = torch.abs(fft_result) ** 2
387
  dc_power = power[0].cpu().numpy()
388
  total_power = (torch.sum(power, dim=0) + 1e-9).cpu().numpy()
389
  else:
390
- # CPU fallback
391
  fft_result = np.fft.rfft(gray_stack, axis=0)
392
  power = np.abs(fft_result) ** 2
393
  dc_power = power[0]
@@ -395,10 +412,7 @@ class SSTGNNEngine:
395
 
396
  hf_ratio = 1.0 - (dc_power / total_power)
397
  mean_hf = float(np.mean(hf_ratio))
398
-
399
- score = float(np.clip(abs(mean_hf - 0.30) / 0.25, 0.0, 1.0))
400
- return score
401
-
402
  except Exception as exc:
403
  logger.warning("Temporal FFT score error: %s", _short_error(exc))
404
  return 0.3
@@ -406,48 +420,45 @@ class SSTGNNEngine:
406
  def run_video(self, frames: list[np.ndarray]) -> EngineResult:
407
  t0 = time.perf_counter()
408
  self._ensure()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
409
 
410
- if not frames:
411
  return EngineResult(
412
  engine="sstgnn",
413
- verdict="REAL",
414
- confidence=0.5,
415
  attributed_generator=None,
416
- explanation="No frames.",
417
- processing_time_ms=0.0,
 
 
 
418
  )
419
-
420
- sample = frames[::6] or [frames[0]]
421
- sample_pil = [Image.fromarray(f) for f in sample]
422
-
423
- # Batched CNN scoring — single pipeline call per detector for all frames
424
- cnn_scores = self._batch_cnn_scores(sample_pil)
425
-
426
- # Geometry scores still per-frame (MediaPipe is CPU-only)
427
- geo_scores = [self._geometry_score(np.array(img)) for img in sample_pil]
428
-
429
- per_frame = [
430
- float(np.clip(c * 0.70 + g * 0.30, 0.0, 1.0))
431
- for c, g in zip(cnn_scores, geo_scores)
432
- ]
433
- cnn_geo_avg = float(np.mean(per_frame))
434
-
435
- # Temporal FFT on GPU
436
- fft_score = self._temporal_fft_score(frames)
437
-
438
- avg = float(np.clip(cnn_geo_avg * 0.80 + fft_score * 0.20, 0.0, 1.0))
439
-
440
- return EngineResult(
441
- engine="sstgnn",
442
- verdict="FAKE" if avg > 0.5 else "REAL",
443
- confidence=avg,
444
- attributed_generator=None,
445
- explanation=(
446
- f"CNN+geometry avg {cnn_geo_avg:.2f} over {len(sample)} frames, "
447
- f"temporal FFT anomaly {fft_score:.2f}."
448
- ),
449
- processing_time_ms=(time.perf_counter() - t0) * 1000,
450
- )
451
 
452
  @staticmethod
453
  def image_stub() -> EngineResult:
 
12
  import torch
13
  from PIL import Image
14
 
 
 
 
 
 
15
  from src.types import EngineResult
16
 
17
  logger = logging.getLogger(__name__)
18
  CACHE = os.environ.get("MODEL_CACHE_DIR", "/tmp/models")
19
 
 
 
 
 
 
 
 
 
20
  _lock = threading.Lock()
21
  _load_attempted = False
22
  _detectors: list[Any] = []
 
40
  "1",
41
  )
42
 
43
+ KEYPOINT_STEP = 7
44
+ KEYPOINT_COUNT = 68
45
+
46
 
47
  def _skip_model_loads() -> bool:
48
  return os.environ.get("GENAI_SKIP_MODEL_LOAD", "").strip().lower() in {
 
70
 
71
  def _build_image_classifier(model_id: str) -> Any:
72
  pipeline = _get_pipeline()
73
+ attempts: tuple[dict[str, Any], ...] = (
74
+ {"cache_dir": CACHE, "device": -1},
75
+ {"device": -1},
 
76
  {"cache_dir": CACHE},
77
  {},
78
  )
 
108
  return float(np.clip(fake_best, 0.0, 1.0))
109
 
110
 
 
 
 
 
111
  class _TasksFaceMeshAdapter:
112
  def __init__(self, mp_module, landmarker) -> None:
113
  self._mp = mp_module
 
180
  logger.info("Skipping SSTGNN model load (GENAI_SKIP_MODEL_LOAD=1)")
181
  return
182
 
183
+ logger.info("Loading SSTGNN models on CPU ...")
184
 
185
  try:
186
  configured_models = [
 
219
  except Exception:
220
  _delaunay = None
221
 
222
+ logger.info("SSTGNN model load attempt complete")
223
+
224
+
225
+ def _inference_device() -> str:
226
+ try:
227
+ return "cuda" if torch.cuda.is_available() else "cpu"
228
+ except Exception:
229
+ return "cpu"
230
+
231
+
232
+ def _move_detector(detector: Any, device: str) -> None:
233
+ model = getattr(detector, "model", None)
234
+ if model is not None and hasattr(model, "to"):
235
+ model.to(device)
236
+ if hasattr(detector, "device"):
237
+ detector.device = torch.device(device)
238
+
239
+
240
+ def _prepare_runtime(device: str) -> None:
241
+ if device != "cuda":
242
+ return
243
+ for detector in _detectors:
244
+ try:
245
+ _move_detector(detector, device)
246
+ except Exception as exc:
247
+ logger.warning("SSTGNN detector GPU move failed: %s", _short_error(exc))
248
+
249
+
250
+ def _release_runtime(device: str) -> None:
251
+ if device != "cuda":
252
+ return
253
+ for detector in _detectors:
254
+ try:
255
+ _move_detector(detector, "cpu")
256
+ except Exception:
257
+ pass
258
+ try:
259
+ torch.cuda.empty_cache()
260
+ except Exception:
261
+ pass
262
 
263
 
264
  class SSTGNNEngine:
 
269
  def run(self, image: Image.Image) -> EngineResult:
270
  t0 = time.perf_counter()
271
  self._ensure()
272
+ device = _inference_device()
273
+ _prepare_runtime(device)
274
+ try:
275
+ if image.mode != "RGB":
276
+ image = image.convert("RGB")
277
+
278
+ cnn = self._cnn_score(image)
279
+ graph = self._geometry_score(np.array(image))
280
+ if _detectors:
281
+ final = float(np.clip(cnn * 0.70 + graph * 0.30, 0.0, 1.0))
282
+ note = f"CNN ensemble {cnn:.2f}; geometric graph anomaly {graph:.2f}."
283
+ else:
284
+ final = float(np.clip(graph, 0.0, 1.0))
285
+ note = f"Geometric graph anomaly {graph:.2f} (cnn fallback unavailable)."
286
 
287
+ return EngineResult(
288
+ engine="sstgnn",
289
+ verdict="FAKE" if final > 0.5 else "REAL",
290
+ confidence=final,
291
+ attributed_generator=None,
292
+ explanation=note,
293
+ processing_time_ms=(time.perf_counter() - t0) * 1000,
294
+ )
295
+ finally:
296
+ _release_runtime(device)
 
 
 
 
 
 
 
 
 
 
297
 
298
  def _cnn_score(self, image: Image.Image) -> float:
299
  if not _detectors:
 
315
  return 0.5
316
 
317
  def _batch_cnn_scores(self, images: list[Image.Image]) -> list[float]:
 
 
 
 
318
  if not _detectors or not images:
319
  return [0.5] * len(images)
320
 
 
325
  for index, detector in enumerate(_detectors):
326
  weight = _detector_weights[index] if index < len(_detector_weights) else 1.0
327
  try:
 
328
  batch_preds = detector(images)
329
  for i, preds in enumerate(batch_preds):
330
  score = _fake_prob_from_preds(preds if isinstance(preds, list) else [preds])
 
369
  arr = np.array(areas, dtype=np.float32)
370
  cv_score = float(np.std(arr) / (np.mean(arr) + 1e-9))
371
  return float(np.clip((cv_score - 0.8) / 1.5, 0.0, 1.0))
 
372
  except Exception as exc:
373
  logger.warning("Geometry score error: %s", exc)
374
  return 0.3
375
 
376
  def _temporal_fft_score(self, frames: list[np.ndarray]) -> float:
 
 
 
 
377
  try:
378
  import cv2 # type: ignore
379
 
 
388
  gray_stack = np.array(
389
  [
390
  cv2.resize(
391
+ cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
392
+ if (frame.ndim == 3 and frame.shape[2] >= 3)
393
+ else frame[:, :, 0] if frame.ndim == 3 else frame,
394
  (32, 32),
395
  ).astype(np.float32)
396
+ for frame in sampled
397
  ]
398
+ )
399
 
400
+ device = _inference_device()
401
+ if device == "cuda":
402
+ gray_tensor = torch.from_numpy(gray_stack).to(device)
403
+ fft_result = torch.fft.rfft(gray_tensor, dim=0)
404
  power = torch.abs(fft_result) ** 2
405
  dc_power = power[0].cpu().numpy()
406
  total_power = (torch.sum(power, dim=0) + 1e-9).cpu().numpy()
407
  else:
 
408
  fft_result = np.fft.rfft(gray_stack, axis=0)
409
  power = np.abs(fft_result) ** 2
410
  dc_power = power[0]
 
412
 
413
  hf_ratio = 1.0 - (dc_power / total_power)
414
  mean_hf = float(np.mean(hf_ratio))
415
+ return float(np.clip(abs(mean_hf - 0.30) / 0.25, 0.0, 1.0))
 
 
 
416
  except Exception as exc:
417
  logger.warning("Temporal FFT score error: %s", _short_error(exc))
418
  return 0.3
 
420
  def run_video(self, frames: list[np.ndarray]) -> EngineResult:
421
  t0 = time.perf_counter()
422
  self._ensure()
423
+ device = _inference_device()
424
+ _prepare_runtime(device)
425
+ try:
426
+ if not frames:
427
+ return EngineResult(
428
+ engine="sstgnn",
429
+ verdict="REAL",
430
+ confidence=0.5,
431
+ attributed_generator=None,
432
+ explanation="No frames.",
433
+ processing_time_ms=0.0,
434
+ )
435
+
436
+ sample = frames[::6] or [frames[0]]
437
+ sample_pil = [Image.fromarray(frame) for frame in sample]
438
+ cnn_scores = self._batch_cnn_scores(sample_pil)
439
+ geo_scores = [self._geometry_score(np.array(image)) for image in sample_pil]
440
+
441
+ per_frame = [
442
+ float(np.clip(c * 0.70 + g * 0.30, 0.0, 1.0))
443
+ for c, g in zip(cnn_scores, geo_scores)
444
+ ]
445
+ cnn_geo_avg = float(np.mean(per_frame))
446
+ fft_score = self._temporal_fft_score(frames)
447
+ avg = float(np.clip(cnn_geo_avg * 0.80 + fft_score * 0.20, 0.0, 1.0))
448
 
 
449
  return EngineResult(
450
  engine="sstgnn",
451
+ verdict="FAKE" if avg > 0.5 else "REAL",
452
+ confidence=avg,
453
  attributed_generator=None,
454
+ explanation=(
455
+ f"CNN+geometry avg {cnn_geo_avg:.2f} over {len(sample)} frames, "
456
+ f"temporal FFT anomaly {fft_score:.2f}."
457
+ ),
458
+ processing_time_ms=(time.perf_counter() - t0) * 1000,
459
  )
460
+ finally:
461
+ _release_runtime(device)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
462
 
463
  @staticmethod
464
  def image_stub() -> EngineResult: