ar07xd commited on
Commit
1b18758
·
verified ·
1 Parent(s): 711bdfc

Sync from GitHub via hub-sync

Browse files
.env.example CHANGED
@@ -34,6 +34,13 @@ LLM_MODEL=gemini-1.5-flash
34
 
35
  # News lookup (Phase 13)
36
  NEWS_API_KEY=
 
 
 
 
 
 
 
37
 
38
  # Auth (REQUIRED in production — generate with python -c "import secrets; print(secrets.token_urlsafe(48))")
39
  JWT_SECRET_KEY=change-me-in-production
 
34
 
35
  # News lookup (Phase 13)
36
  NEWS_API_KEY=
37
+ NEWS_API_BASE_URL=https://newsdata.io/api/1/latest
38
+ NEWS_API_ARCHIVE_BASE_URL=https://newsdata.io/api/1/archive
39
+ NEWS_API_LANGUAGES=en,hi
40
+ NEWS_API_RECENT_TIMEFRAME=1
41
+ NEWS_API_OLDER_DAYS=7
42
+ NEWS_API_PAGE_SIZE=10
43
+ NEWS_API_PRIMARY_COUNTRY=in
44
 
45
  # Auth (REQUIRED in production — generate with python -c "import secrets; print(secrets.token_urlsafe(48))")
46
  JWT_SECRET_KEY=change-me-in-production
api/v1/analyze.py CHANGED
@@ -71,7 +71,13 @@ from services.storage import (
71
  )
72
  from services.job_queue import registry as job_registry, run_job
73
  from utils.file_handler import read_upload_bytes, save_upload_to_tempfile
74
- from utils.scoring import compute_authenticity_score, compute_video_authenticity_score, get_verdict_label, maybe_clamp_to_uncertain
 
 
 
 
 
 
75
 
76
  router = APIRouter(prefix="/analyze", tags=["analyze"])
77
 
@@ -261,9 +267,9 @@ def generate_llm_endpoint(
261
  return {"llm_summary": existing_summary}
262
  raise HTTPException(status_code=500, detail="LLM generation failed")
263
 
264
- def _persist_response_payload(db: Session, record: AnalysisRecord, resp) -> None:
265
  """Keep reloaded/history responses aligned with the fresh API response."""
266
- record.result_json = json.dumps(resp.model_dump())
267
  db.add(record)
268
  db.commit()
269
 
@@ -479,7 +485,7 @@ async def analyze_image(
479
  media_type="image",
480
  verdict=label,
481
  authenticity_score=float(score),
482
- result_json=json.dumps(resp.model_dump()),
483
  media_hash=media_hash,
484
  media_path=media_path,
485
  thumbnail_url=thumbnail_url,
@@ -497,7 +503,7 @@ async def analyze_image(
497
  stages.append("llm_explanation")
498
 
499
  resp.processing_summary.stages_completed = stages
500
- _persist_response_payload(db, record, resp)
501
 
502
  # ── Phase 14: VLM breakdown runs after response is returned ──
503
  if user is not None and vlm_bd is None:
@@ -795,7 +801,14 @@ async def analyze_text_endpoint(
795
  weighted = raw_score
796
 
797
  score = int(round(max(0.0, min(100.0, weighted))))
798
- label, severity = get_verdict_label(score)
 
 
 
 
 
 
 
799
  duration_ms = int((time.perf_counter() - start) * 1000)
800
 
801
  model_used = (
@@ -850,6 +863,7 @@ async def analyze_text_endpoint(
850
  total_duration_ms=duration_ms,
851
  model_used=model_used,
852
  calibrator_applied=False,
 
853
  ),
854
  )
855
 
@@ -972,7 +986,14 @@ async def analyze_screenshot_endpoint(
972
  if not full_text.strip():
973
  weighted = 50
974
  score = int(round(max(0.0, min(100.0, weighted))))
975
- label, severity = get_verdict_label(score)
 
 
 
 
 
 
 
976
  duration_ms = int((time.perf_counter() - start) * 1000)
977
 
978
  model_used_str = (
@@ -1025,6 +1046,7 @@ async def analyze_screenshot_endpoint(
1025
  total_duration_ms=duration_ms,
1026
  model_used=model_used_str,
1027
  calibrator_applied=False,
 
1028
  ),
1029
  )
1030
 
 
71
  )
72
  from services.job_queue import registry as job_registry, run_job
73
  from utils.file_handler import read_upload_bytes, save_upload_to_tempfile
74
+ from utils.scoring import (
75
+ apply_unverified_news_gate,
76
+ compute_authenticity_score,
77
+ compute_video_authenticity_score,
78
+ get_verdict_label,
79
+ maybe_clamp_to_uncertain,
80
+ )
81
 
82
  router = APIRouter(prefix="/analyze", tags=["analyze"])
83
 
 
267
  return {"llm_summary": existing_summary}
268
  raise HTTPException(status_code=500, detail="LLM generation failed")
269
 
270
+ def _persist_response_payload(db: Session, record: AnalysisRecord, resp, exclude: dict | None = None) -> None:
271
  """Keep reloaded/history responses aligned with the fresh API response."""
272
+ record.result_json = json.dumps(resp.model_dump(exclude=exclude))
273
  db.add(record)
274
  db.commit()
275
 
 
485
  media_type="image",
486
  verdict=label,
487
  authenticity_score=float(score),
488
+ result_json=json.dumps(resp.model_dump(exclude=_IMAGE_EXCLUDE)),
489
  media_hash=media_hash,
490
  media_path=media_path,
491
  thumbnail_url=thumbnail_url,
 
503
  stages.append("llm_explanation")
504
 
505
  resp.processing_summary.stages_completed = stages
506
+ _persist_response_payload(db, record, resp, exclude=_IMAGE_EXCLUDE)
507
 
508
  # ── Phase 14: VLM breakdown runs after response is returned ──
509
  if user is not None and vlm_bd is None:
 
801
  weighted = raw_score
802
 
803
  score = int(round(max(0.0, min(100.0, weighted))))
804
+ score, label, severity, news_gate = apply_unverified_news_gate(
805
+ score,
806
+ has_trusted_sources=bool(news.trusted_sources),
807
+ has_contradicting_evidence=bool(news.contradicting_evidence),
808
+ truth_override_applied=bool(news.truth_override and news.truth_override.applied),
809
+ )
810
+ if news_gate:
811
+ stages.append(news_gate)
812
  duration_ms = int((time.perf_counter() - start) * 1000)
813
 
814
  model_used = (
 
863
  total_duration_ms=duration_ms,
864
  model_used=model_used,
865
  calibrator_applied=False,
866
+ gating_applied=news_gate,
867
  ),
868
  )
869
 
 
986
  if not full_text.strip():
987
  weighted = 50
988
  score = int(round(max(0.0, min(100.0, weighted))))
989
+ score, label, severity, news_gate = apply_unverified_news_gate(
990
+ score,
991
+ has_trusted_sources=bool(news.trusted_sources),
992
+ has_contradicting_evidence=bool(news.contradicting_evidence),
993
+ truth_override_applied=bool(news.truth_override and news.truth_override.applied),
994
+ )
995
+ if news_gate:
996
+ stages.append(news_gate)
997
  duration_ms = int((time.perf_counter() - start) * 1000)
998
 
999
  model_used_str = (
 
1046
  total_duration_ms=duration_ms,
1047
  model_used=model_used_str,
1048
  calibrator_applied=False,
1049
+ gating_applied=news_gate,
1050
  ),
1051
  )
1052
 
config.py CHANGED
@@ -156,7 +156,13 @@ class Settings(BaseSettings):
156
 
157
  # News API
158
  NEWS_API_KEY: str = ""
159
- NEWS_API_BASE_URL: str = "https://newsdata.io/api/1/news"
 
 
 
 
 
 
160
 
161
  # Reports
162
  REPORT_DIR: str = "/data/reports"
@@ -229,6 +235,13 @@ class Settings(BaseSettings):
229
  GENERAL_FAKE_GATING_THRESHOLD: float = 0.80
230
  GAN_ARTIFACT_GATING_THRESHOLD: float = 0.70
231
  GATING_FAKE_FLOOR: float = 0.50
 
 
 
 
 
 
 
232
 
233
  # Video-frame weight overrides. When an image is detected as a low-res
234
  # video frame (face-swap deepfakes are extracted from video), the general
@@ -239,6 +252,10 @@ class Settings(BaseSettings):
239
  VIDEO_FRAME_GENERAL_WEIGHT: float = 0.15
240
  VIDEO_FRAME_FORENSICS_WEIGHT: float = 0.10
241
  VIDEO_FRAME_EXIF_WEIGHT: float = 0.05
 
 
 
 
242
  VIDEO_SAMPLE_FRAMES: int = 16 # frames to sample per video for inference
243
  EXIFTOOL_PATH: str = "" # full path to ExifTool binary; empty = metadata write disabled
244
 
 
156
 
157
  # News API
158
  NEWS_API_KEY: str = ""
159
+ NEWS_API_BASE_URL: str = "https://newsdata.io/api/1/latest"
160
+ NEWS_API_ARCHIVE_BASE_URL: str = "https://newsdata.io/api/1/archive"
161
+ NEWS_API_LANGUAGES: str = "en,hi"
162
+ NEWS_API_RECENT_TIMEFRAME: str = "1"
163
+ NEWS_API_OLDER_DAYS: int = 7
164
+ NEWS_API_PAGE_SIZE: int = 10
165
+ NEWS_API_PRIMARY_COUNTRY: str = "in"
166
 
167
  # Reports
168
  REPORT_DIR: str = "/data/reports"
 
235
  GENERAL_FAKE_GATING_THRESHOLD: float = 0.80
236
  GAN_ARTIFACT_GATING_THRESHOLD: float = 0.70
237
  GATING_FAKE_FLOOR: float = 0.50
238
+ # Synthetic still-image overrides. FaceForensics/DFDC models are trained for
239
+ # manipulated video faces, so they should not veto a strong still-image AI
240
+ # detector on generated portraits.
241
+ SYNTHETIC_STILL_HIGH_THRESHOLD: float = 0.80
242
+ SYNTHETIC_STILL_HIGH_FLOOR: float = 0.80
243
+ SYNTHETIC_STILL_VERY_HIGH_THRESHOLD: float = 0.90
244
+ SYNTHETIC_STILL_VERY_HIGH_FLOOR: float = 0.90
245
 
246
  # Video-frame weight overrides. When an image is detected as a low-res
247
  # video frame (face-swap deepfakes are extracted from video), the general
 
252
  VIDEO_FRAME_GENERAL_WEIGHT: float = 0.15
253
  VIDEO_FRAME_FORENSICS_WEIGHT: float = 0.10
254
  VIDEO_FRAME_EXIF_WEIGHT: float = 0.05
255
+ # Per-frame video detector blend. FFPP ViT is trained on FaceForensics++
256
+ # face forgery frames, so it is the dominant signal for video analysis.
257
+ VIDEO_FFPP_WEIGHT: float = 0.70
258
+ VIDEO_EFFNET_WEIGHT: float = 0.30
259
  VIDEO_SAMPLE_FRAMES: int = 16 # frames to sample per video for inference
260
  EXIFTOOL_PATH: str = "" # full path to ExifTool binary; empty = metadata write disabled
261
 
models/heatmap_generator.py CHANGED
@@ -10,7 +10,6 @@ import torch
10
  from loguru import logger
11
  from PIL import Image
12
  from pytorch_grad_cam import GradCAMPlusPlus
13
- from pytorch_grad_cam.utils.image import show_cam_on_image
14
  from pytorch_grad_cam.utils.model_targets import ClassifierOutputTarget
15
 
16
  from config import settings
@@ -41,6 +40,16 @@ def _vit_reshape_transform(tensor: torch.Tensor, height: int = 14, width: int =
41
  return result
42
 
43
 
 
 
 
 
 
 
 
 
 
 
44
  def _preprocess_for_cam(pil_img: Image.Image, processor) -> tuple[torch.Tensor, np.ndarray]:
45
  """Return (input_tensor, rgb_float_224) where rgb_float_224 is a (H,W,3) float
46
  array in [0,1] matching the model input geometry — needed for overlaying.
@@ -58,7 +67,7 @@ def _preprocess_for_cam(pil_img: Image.Image, processor) -> tuple[torch.Tensor,
58
 
59
 
60
  def _encode_overlay_to_base64(overlay: np.ndarray) -> str:
61
- """Encode a uint8 (H,W,3) RGB overlay to a base64 data-URL PNG."""
62
  buf = io.BytesIO()
63
  Image.fromarray(overlay).save(buf, format="PNG")
64
  b64 = base64.b64encode(buf.getvalue()).decode("ascii")
@@ -93,9 +102,13 @@ def _compute_gradcam_pp(
93
 
94
  wrapped = _HFLogitsWrapper(model)
95
 
96
- targets = None
97
- if target_class_idx is not None:
98
- targets = [ClassifierOutputTarget(int(target_class_idx))]
 
 
 
 
99
 
100
  with GradCAMPlusPlus(
101
  model=wrapped,
 
10
  from loguru import logger
11
  from PIL import Image
12
  from pytorch_grad_cam import GradCAMPlusPlus
 
13
  from pytorch_grad_cam.utils.model_targets import ClassifierOutputTarget
14
 
15
  from config import settings
 
40
  return result
41
 
42
 
43
+ def _find_class_index(model: torch.nn.Module, label_tokens: tuple[str, ...]) -> Optional[int]:
44
+ """Find the first class index whose label contains one of `label_tokens`."""
45
+ id2label: dict[int, str] = getattr(getattr(model, "config", None), "id2label", {}) or {}
46
+ for idx, label in id2label.items():
47
+ lowered = str(label).lower()
48
+ if any(token in lowered for token in label_tokens):
49
+ return int(idx)
50
+ return None
51
+
52
+
53
  def _preprocess_for_cam(pil_img: Image.Image, processor) -> tuple[torch.Tensor, np.ndarray]:
54
  """Return (input_tensor, rgb_float_224) where rgb_float_224 is a (H,W,3) float
55
  array in [0,1] matching the model input geometry — needed for overlaying.
 
67
 
68
 
69
  def _encode_overlay_to_base64(overlay: np.ndarray) -> str:
70
+ """Encode a uint8 RGB/RGBA overlay to a base64 data-URL PNG."""
71
  buf = io.BytesIO()
72
  Image.fromarray(overlay).save(buf, format="PNG")
73
  b64 = base64.b64encode(buf.getvalue()).decode("ascii")
 
102
 
103
  wrapped = _HFLogitsWrapper(model)
104
 
105
+ if target_class_idx is None:
106
+ target_class_idx = _find_class_index(
107
+ model,
108
+ ("fake", "deepfake", "manipulated", "ai", "generated", "synthetic"),
109
+ )
110
+
111
+ targets = [ClassifierOutputTarget(int(target_class_idx))] if target_class_idx is not None else None
112
 
113
  with GradCAMPlusPlus(
114
  model=wrapped,
schemas/common.py CHANGED
@@ -4,7 +4,7 @@ from typing import List, Optional
4
 
5
  from pydantic import BaseModel, ConfigDict, Field
6
 
7
- ANALYSIS_CACHE_VERSION = "2026-05-06-phase-a-unified-fusion"
8
 
9
 
10
  class Verdict(BaseModel):
 
4
 
5
  from pydantic import BaseModel, ConfigDict, Field
6
 
7
+ ANALYSIS_CACHE_VERSION = "2026-05-07-archive-7d-news-fallback"
8
 
9
 
10
  class Verdict(BaseModel):
services/image_service.py CHANGED
@@ -200,11 +200,18 @@ def _classify_no_face(
200
  models_used = [general.model_used if general else "no-face-forensic-fusion"]
201
 
202
  # Apply hard gating (Phase A4) on the no-face path too.
 
203
  gated_prob, gating_reason = _apply_hard_gating(
204
  fake_prob=fused.fake_probability,
205
  general_fake_prob=general.fake_probability if general else None,
206
  artifacts=artifact_indicators or [],
207
  )
 
 
 
 
 
 
208
  final_label = "Fake" if gated_prob >= 0.5 else fused.label
209
 
210
  return ImageClassification(
@@ -219,8 +226,9 @@ def _classify_no_face(
219
  "weights": fused.weights,
220
  "method": fused.method,
221
  "pre_gating": fused.fake_probability,
 
222
  },
223
- gating_applied=gating_reason,
224
  )
225
 
226
 
@@ -265,6 +273,34 @@ def _apply_hard_gating(
265
  return fake_prob, None
266
 
267
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
268
  def classify_image(
269
  pil_img: Image.Image,
270
  *,
@@ -409,6 +445,12 @@ def classify_image(
409
  general_fake_prob=general_fake_prob,
410
  artifacts=artifacts_list,
411
  )
 
 
 
 
 
 
412
 
413
  method = f"unified_evidence_{face_stack_method}"
414
  label = "Fake" if ensemble_prob >= 0.5 else "Real"
@@ -417,7 +459,7 @@ def classify_image(
417
  f"face_stack={face_stack_prob:.3f} general={general_fake_prob if general_fake_prob is not None else 'n/a'} "
418
  f"forensics={components.get('forensics', 'n/a')} exif={components.get('exif', 'n/a')} "
419
  f"vlm={components.get('vlm', 'n/a')} -> {pre_gating_prob:.3f} "
420
- f"(gated:{gating_reason or 'none'} -> {ensemble_prob:.3f})"
421
  )
422
  return ImageClassification(
423
  label=label,
@@ -434,7 +476,7 @@ def classify_image(
434
  "pre_gating": pre_gating_prob,
435
  "is_video_frame": is_video_frame,
436
  },
437
- gating_applied=gating_reason,
438
  )
439
 
440
 
 
200
  models_used = [general.model_used if general else "no-face-forensic-fusion"]
201
 
202
  # Apply hard gating (Phase A4) on the no-face path too.
203
+ is_video_frame = _looks_like_video_frame(pil_img)
204
  gated_prob, gating_reason = _apply_hard_gating(
205
  fake_prob=fused.fake_probability,
206
  general_fake_prob=general.fake_probability if general else None,
207
  artifacts=artifact_indicators or [],
208
  )
209
+ gated_prob, synthetic_reason = _apply_synthetic_still_overrides(
210
+ fake_prob=gated_prob,
211
+ general_fake_prob=general.fake_probability if general else None,
212
+ is_video_frame=is_video_frame,
213
+ )
214
+ final_gating_reason = synthetic_reason or gating_reason
215
  final_label = "Fake" if gated_prob >= 0.5 else fused.label
216
 
217
  return ImageClassification(
 
226
  "weights": fused.weights,
227
  "method": fused.method,
228
  "pre_gating": fused.fake_probability,
229
+ "is_video_frame": is_video_frame,
230
  },
231
+ gating_applied=final_gating_reason,
232
  )
233
 
234
 
 
273
  return fake_prob, None
274
 
275
 
276
+ def _apply_synthetic_still_overrides(
277
+ *,
278
+ fake_prob: float,
279
+ general_fake_prob: Optional[float],
280
+ is_video_frame: bool,
281
+ ) -> Tuple[float, Optional[str]]:
282
+ """Keep still-image AI-generation evidence from being diluted by FFPP/DFDC.
283
+
284
+ FaceForensics/DFDC models are trained for manipulated real/video faces. They
285
+ are useful evidence, but they should not veto a high-confidence whole-image
286
+ AI detector on generated still portraits.
287
+ """
288
+ if is_video_frame or general_fake_prob is None:
289
+ return fake_prob, None
290
+
291
+ general = max(0.0, min(1.0, float(general_fake_prob)))
292
+ if general >= settings.SYNTHETIC_STILL_VERY_HIGH_THRESHOLD:
293
+ adjusted = max(fake_prob, settings.SYNTHETIC_STILL_VERY_HIGH_FLOOR)
294
+ if adjusted != fake_prob:
295
+ return adjusted, f"general_detector_very_high({general:.2f})"
296
+ elif general >= settings.SYNTHETIC_STILL_HIGH_THRESHOLD:
297
+ adjusted = max(fake_prob, settings.SYNTHETIC_STILL_HIGH_FLOOR)
298
+ if adjusted != fake_prob:
299
+ return adjusted, f"general_detector_high({general:.2f})"
300
+
301
+ return fake_prob, None
302
+
303
+
304
  def classify_image(
305
  pil_img: Image.Image,
306
  *,
 
445
  general_fake_prob=general_fake_prob,
446
  artifacts=artifacts_list,
447
  )
448
+ ensemble_prob, synthetic_reason = _apply_synthetic_still_overrides(
449
+ fake_prob=ensemble_prob,
450
+ general_fake_prob=general_fake_prob,
451
+ is_video_frame=is_video_frame,
452
+ )
453
+ final_gating_reason = synthetic_reason or gating_reason
454
 
455
  method = f"unified_evidence_{face_stack_method}"
456
  label = "Fake" if ensemble_prob >= 0.5 else "Real"
 
459
  f"face_stack={face_stack_prob:.3f} general={general_fake_prob if general_fake_prob is not None else 'n/a'} "
460
  f"forensics={components.get('forensics', 'n/a')} exif={components.get('exif', 'n/a')} "
461
  f"vlm={components.get('vlm', 'n/a')} -> {pre_gating_prob:.3f} "
462
+ f"(gated:{final_gating_reason or 'none'} -> {ensemble_prob:.3f})"
463
  )
464
  return ImageClassification(
465
  label=label,
 
476
  "pre_gating": pre_gating_prob,
477
  "is_video_frame": is_video_frame,
478
  },
479
+ gating_applied=final_gating_reason,
480
  )
481
 
482
 
services/news_lookup.py CHANGED
@@ -1,6 +1,7 @@
1
  from __future__ import annotations
2
 
3
  from dataclasses import dataclass
 
4
  from typing import List, Optional
5
  from urllib.parse import urlparse
6
 
@@ -17,6 +18,9 @@ TRUSTED_DOMAINS = {
17
  "cnn.com": 0.9, "npr.org": 0.95, "aljazeera.com": 0.9,
18
  "thehindu.com": 0.9, "indianexpress.com": 0.9, "ndtv.com": 0.85,
19
  "hindustantimes.com": 0.85, "pti.news": 0.95,
 
 
 
20
  }
21
 
22
  # Fact-check / contradiction sources
@@ -46,6 +50,78 @@ class NewsLookupResult:
46
  no_source_penalty: float = 0.0
47
 
48
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
49
  def _domain_of(url: str) -> str:
50
  try:
51
  return urlparse(url).netloc.lower().replace("www.", "")
@@ -173,22 +249,56 @@ def _compute_truth_override(
173
  return None
174
 
175
 
176
- async def _fetch(q: str, country: Optional[str]) -> list[dict]:
177
- params = {"apikey": settings.NEWS_API_KEY, "q": q, "language": "en", "size": 10, "country": country or "in"}
178
- logger.info(f"News lookup query: {q!r} country={country or 'in'}")
179
-
 
180
  try:
181
  async with httpx.AsyncClient(timeout=httpx.Timeout(8.0, connect=3.0)) as c:
182
- r = await c.get(settings.NEWS_API_BASE_URL, params=params)
183
  r.raise_for_status()
184
  results = (r.json() or {}).get("results") or []
185
- logger.info(f"News lookup returned {len(results)} articles for query: {q!r}")
186
  return results
187
  except Exception as e:
188
- logger.warning(f"News lookup failed (query={q!r}): {e}")
189
  return []
190
 
191
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
192
  async def search_news(
193
  keywords: List[str],
194
  limit: int = 6,
@@ -219,36 +329,17 @@ async def search_news_full(
219
  return NewsLookupResult([], [], 0)
220
 
221
  q = " ".join(keywords[:4])
222
- articles = await _fetch(q, country)
223
-
224
  seen: set[str] = set()
225
  trusted: List[TrustedSource] = []
226
  contradictions: List[ContradictingEvidence] = []
227
 
228
- for art in articles:
229
- url = art.get("link") or ""
230
- if not url or url in seen:
231
- continue
232
- seen.add(url)
233
-
234
- title = art.get("title") or ""
235
- dom = _domain_of(url)
236
- src_name = art.get("source_id") or dom or "news"
237
-
238
- if _is_factcheck(url, title):
239
- contradictions.append(ContradictingEvidence(
240
- source_name=src_name, title=title, url=url, type="fact_check",
241
- ))
242
- continue
243
-
244
- trusted.append(TrustedSource(
245
- source_name=src_name,
246
- title=title,
247
- url=url,
248
- description=art.get("description") or art.get("content"),
249
- published_at=art.get("pubDate"),
250
- relevance_score=_relevance(url),
251
- ))
252
 
253
  trusted.sort(key=lambda s: -s.relevance_score)
254
  trusted = trusted[:limit]
@@ -270,7 +361,7 @@ async def search_news_full(
270
  return NewsLookupResult(
271
  trusted_sources=trusted,
272
  contradicting_evidence=contradictions[:limit],
273
- total_articles=len(articles),
274
  truth_override=truth_override,
275
  no_source_penalty=no_source_penalty,
276
  )
 
1
  from __future__ import annotations
2
 
3
  from dataclasses import dataclass
4
+ from datetime import datetime, timedelta, timezone
5
  from typing import List, Optional
6
  from urllib.parse import urlparse
7
 
 
18
  "cnn.com": 0.9, "npr.org": 0.95, "aljazeera.com": 0.9,
19
  "thehindu.com": 0.9, "indianexpress.com": 0.9, "ndtv.com": 0.85,
20
  "hindustantimes.com": 0.85, "pti.news": 0.95,
21
+ "timesofindia.indiatimes.com": 0.85, "livemint.com": 0.85,
22
+ "deccanherald.com": 0.85, "scroll.in": 0.8, "theprint.in": 0.8,
23
+ "news18.com": 0.8, "business-standard.com": 0.85, "thewire.in": 0.8,
24
  }
25
 
26
  # Fact-check / contradiction sources
 
50
  no_source_penalty: float = 0.0
51
 
52
 
53
+ def _clean_param(value: object) -> str:
54
+ return str(value).strip()
55
+
56
+
57
+ def _configured_languages() -> str:
58
+ return ",".join(
59
+ part.strip()
60
+ for part in _clean_param(settings.NEWS_API_LANGUAGES).split(",")
61
+ if part.strip()
62
+ ) or "en"
63
+
64
+
65
+ def _page_size() -> int:
66
+ return max(1, min(int(settings.NEWS_API_PAGE_SIZE or 10), 50))
67
+
68
+
69
+ def _archive_window() -> tuple[str, str]:
70
+ days = max(1, int(settings.NEWS_API_OLDER_DAYS or 7))
71
+ today = datetime.now(timezone.utc).date()
72
+ from_day = today - timedelta(days=days)
73
+ return from_day.isoformat(), today.isoformat()
74
+
75
+
76
+ def _query_attempts(q: str, country: Optional[str]) -> list[dict]:
77
+ """Build a recency/country fallback ladder for NewsData lookups."""
78
+ primary_country = _clean_param(country or settings.NEWS_API_PRIMARY_COUNTRY or "in").lower()
79
+ recent_window = _clean_param(settings.NEWS_API_RECENT_TIMEFRAME or "1")
80
+ archive_from, archive_to = _archive_window()
81
+ base = {
82
+ "apikey": settings.NEWS_API_KEY,
83
+ "q": q,
84
+ "language": _configured_languages(),
85
+ "size": _page_size(),
86
+ }
87
+
88
+ attempts: list[dict] = []
89
+ countries: list[str | None] = [primary_country]
90
+ if country is None:
91
+ countries.append(None)
92
+ elif primary_country != _clean_param(settings.NEWS_API_PRIMARY_COUNTRY or "in").lower():
93
+ countries.append(_clean_param(settings.NEWS_API_PRIMARY_COUNTRY or "in").lower())
94
+ countries.append(None)
95
+
96
+ seen: set[tuple[str | None, str]] = set()
97
+ for country_code in countries:
98
+ latest_key = (country_code, "latest")
99
+ if latest_key not in seen:
100
+ seen.add(latest_key)
101
+ latest_params = dict(base)
102
+ latest_params["_endpoint"] = "latest"
103
+ latest_params["_url"] = settings.NEWS_API_BASE_URL
104
+ if country_code:
105
+ latest_params["country"] = country_code
106
+ if recent_window:
107
+ latest_params["timeframe"] = recent_window
108
+ attempts.append(latest_params)
109
+
110
+ archive_key = (country_code, "archive")
111
+ if archive_key not in seen:
112
+ seen.add(archive_key)
113
+ archive_params = dict(base)
114
+ archive_params["_endpoint"] = "archive"
115
+ archive_params["_url"] = settings.NEWS_API_ARCHIVE_BASE_URL
116
+ archive_params["from_date"] = archive_from
117
+ archive_params["to_date"] = archive_to
118
+ if country_code:
119
+ archive_params["country"] = country_code
120
+ attempts.append(archive_params)
121
+
122
+ return attempts
123
+
124
+
125
  def _domain_of(url: str) -> str:
126
  try:
127
  return urlparse(url).netloc.lower().replace("www.", "")
 
249
  return None
250
 
251
 
252
+ async def _fetch(params: dict) -> list[dict]:
253
+ url = params.get("_url") or settings.NEWS_API_BASE_URL
254
+ request_params = {k: v for k, v in params.items() if not k.startswith("_")}
255
+ redacted = {k: v for k, v in request_params.items() if k != "apikey"}
256
+ logger.info(f"News lookup query params: {redacted}")
257
  try:
258
  async with httpx.AsyncClient(timeout=httpx.Timeout(8.0, connect=3.0)) as c:
259
+ r = await c.get(url, params=request_params)
260
  r.raise_for_status()
261
  results = (r.json() or {}).get("results") or []
262
+ logger.info(f"News lookup returned {len(results)} articles for query: {request_params.get('q')!r}")
263
  return results
264
  except Exception as e:
265
+ logger.warning(f"News lookup failed (query={params.get('q')!r}): {e}")
266
  return []
267
 
268
 
269
+ def _collect_news_evidence(
270
+ articles: list[dict],
271
+ *,
272
+ seen: set[str],
273
+ trusted: List[TrustedSource],
274
+ contradictions: List[ContradictingEvidence],
275
+ ) -> None:
276
+ for art in articles:
277
+ url = art.get("link") or ""
278
+ if not url or url in seen:
279
+ continue
280
+ seen.add(url)
281
+
282
+ title = art.get("title") or ""
283
+ dom = _domain_of(url)
284
+ src_name = art.get("source_id") or dom or "news"
285
+
286
+ if _is_factcheck(url, title):
287
+ contradictions.append(ContradictingEvidence(
288
+ source_name=src_name, title=title, url=url, type="fact_check",
289
+ ))
290
+ continue
291
+
292
+ trusted.append(TrustedSource(
293
+ source_name=src_name,
294
+ title=title,
295
+ url=url,
296
+ description=art.get("description") or art.get("content"),
297
+ published_at=art.get("pubDate"),
298
+ relevance_score=_relevance(url),
299
+ ))
300
+
301
+
302
  async def search_news(
303
  keywords: List[str],
304
  limit: int = 6,
 
329
  return NewsLookupResult([], [], 0)
330
 
331
  q = " ".join(keywords[:4])
332
+ total_articles = 0
 
333
  seen: set[str] = set()
334
  trusted: List[TrustedSource] = []
335
  contradictions: List[ContradictingEvidence] = []
336
 
337
+ for params in _query_attempts(q, country):
338
+ articles = await _fetch(params)
339
+ total_articles += len(articles)
340
+ _collect_news_evidence(articles, seen=seen, trusted=trusted, contradictions=contradictions)
341
+ if trusted or contradictions:
342
+ break
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
343
 
344
  trusted.sort(key=lambda s: -s.relevance_score)
345
  trusted = trusted[:limit]
 
361
  return NewsLookupResult(
362
  trusted_sources=trusted,
363
  contradicting_evidence=contradictions[:limit],
364
+ total_articles=total_articles,
365
  truth_override=truth_override,
366
  no_source_penalty=no_source_penalty,
367
  )
services/video_service.py CHANGED
@@ -10,7 +10,7 @@ from PIL import Image
10
 
11
  from config import settings
12
  from models.model_loader import get_model_loader
13
- from services.image_service import _classify_vit
14
  from services.video_temporal import TemporalAnalysis, compute_temporal_score
15
 
16
 
@@ -107,6 +107,26 @@ def _score_efficientnet_face(eff, face) -> float:
107
  return float(eff._calibrate(raw_prob))
108
 
109
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
110
  def _analyze_with_efficientnet(
111
  frames: List[Tuple[int, float, np.ndarray, Image.Image]],
112
  ) -> Tuple[List[FrameAnalysis], str, List[str], bool]:
@@ -137,13 +157,28 @@ def _analyze_with_efficientnet(
137
  has_face = True
138
  face_detector_used = "blazeface+crop_fallback"
139
 
140
- fake_prob = 0.0
 
141
  label = "unknown"
142
  if has_face and faces:
143
  # Run EfficientNet on the best face/crop and apply the same calibration as image inference.
144
- fake_prob = _score_efficientnet_face(eff, faces[0])
 
 
 
 
 
 
 
 
 
 
 
 
 
145
  label = "Fake" if fake_prob > 0.5 else "Real"
146
  elif not has_face:
 
147
  label = "no_face"
148
 
149
  results.append(
@@ -155,7 +190,7 @@ def _analyze_with_efficientnet(
155
  suspicious_prob=fake_prob,
156
  is_suspicious=(fake_prob >= 0.5) and has_face,
157
  has_face=has_face,
158
- scored=has_face and faces,
159
  )
160
  )
161
 
 
10
 
11
  from config import settings
12
  from models.model_loader import get_model_loader
13
+ from services.image_service import _classify_ffpp, _classify_vit
14
  from services.video_temporal import TemporalAnalysis, compute_temporal_score
15
 
16
 
 
107
  return float(eff._calibrate(raw_prob))
108
 
109
 
110
+ def _blend_video_frame_scores(
111
+ *,
112
+ efficientnet_prob: Optional[float],
113
+ ffpp_prob: Optional[float],
114
+ ) -> float:
115
+ if ffpp_prob is not None and efficientnet_prob is not None:
116
+ total = settings.VIDEO_FFPP_WEIGHT + settings.VIDEO_EFFNET_WEIGHT
117
+ if total <= 0:
118
+ return float(ffpp_prob)
119
+ return float(
120
+ (settings.VIDEO_FFPP_WEIGHT * ffpp_prob + settings.VIDEO_EFFNET_WEIGHT * efficientnet_prob)
121
+ / total
122
+ )
123
+ if ffpp_prob is not None:
124
+ return float(ffpp_prob)
125
+ if efficientnet_prob is not None:
126
+ return float(efficientnet_prob)
127
+ return 0.0
128
+
129
+
130
  def _analyze_with_efficientnet(
131
  frames: List[Tuple[int, float, np.ndarray, Image.Image]],
132
  ) -> Tuple[List[FrameAnalysis], str, List[str], bool]:
 
157
  has_face = True
158
  face_detector_used = "blazeface+crop_fallback"
159
 
160
+ eff_prob: Optional[float] = None
161
+ ffpp_prob: Optional[float] = None
162
  label = "unknown"
163
  if has_face and faces:
164
  # Run EfficientNet on the best face/crop and apply the same calibration as image inference.
165
+ eff_prob = _score_efficientnet_face(eff, faces[0])
166
+ if settings.FFPP_ENABLED:
167
+ try:
168
+ ffpp_res = _classify_ffpp(pil)
169
+ if ffpp_res is not None:
170
+ ffpp_prob = float(ffpp_res[0])
171
+ if "ffpp-vit-local" not in models_used:
172
+ models_used.append("ffpp-vit-local")
173
+ except Exception as exc: # noqa: BLE001
174
+ logger.debug(f"FFPP video frame scoring failed, using EfficientNet only: {exc}")
175
+ fake_prob = _blend_video_frame_scores(
176
+ efficientnet_prob=eff_prob,
177
+ ffpp_prob=ffpp_prob,
178
+ )
179
  label = "Fake" if fake_prob > 0.5 else "Real"
180
  elif not has_face:
181
+ fake_prob = 0.0
182
  label = "no_face"
183
 
184
  results.append(
 
190
  suspicious_prob=fake_prob,
191
  is_suspicious=(fake_prob >= 0.5) and has_face,
192
  has_face=has_face,
193
+ scored=bool(has_face and faces),
194
  )
195
  )
196
 
tests/test_accuracy_regressions.py CHANGED
@@ -1,14 +1,15 @@
1
  from __future__ import annotations
2
 
3
  import os
 
4
 
5
  os.environ["DEBUG"] = "false"
6
 
7
  from schemas.common import TrustedSource
8
- from services.news_lookup import _compute_truth_override
9
  from services.screenshot_service import OCRBox, extract_full_text
10
  from services.text_service import _scores_to_classification
11
- from utils.scoring import compute_video_authenticity_score
12
  from schemas.common import ArtifactIndicator, ExifSummary, VLMComponentScore, VLMBreakdown
13
  from services.general_image_service import GeneralImageDetection, fuse_no_face_evidence
14
 
@@ -79,6 +80,105 @@ def test_truth_override_does_not_apply_from_headline_only_match(monkeypatch):
79
  assert override is None or not override.applied
80
 
81
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
82
  def test_no_face_fusion_uses_general_detector_forensic_and_exif_evidence():
83
  fused = fuse_no_face_evidence(
84
  general_fake_prob=0.72,
@@ -156,3 +256,109 @@ def test_no_face_image_route_skips_face_trained_classifiers(monkeypatch):
156
  assert result.ensemble_method == "no_face_general_forensic_fusion"
157
  assert result.models_used == ["test-general-detector"]
158
  assert result.no_face_analysis is not None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  from __future__ import annotations
2
 
3
  import os
4
+ import asyncio
5
 
6
  os.environ["DEBUG"] = "false"
7
 
8
  from schemas.common import TrustedSource
9
+ from services.news_lookup import _compute_truth_override, search_news_full
10
  from services.screenshot_service import OCRBox, extract_full_text
11
  from services.text_service import _scores_to_classification
12
+ from utils.scoring import apply_unverified_news_gate, compute_video_authenticity_score
13
  from schemas.common import ArtifactIndicator, ExifSummary, VLMComponentScore, VLMBreakdown
14
  from services.general_image_service import GeneralImageDetection, fuse_no_face_evidence
15
 
 
80
  assert override is None or not override.applied
81
 
82
 
83
+ def test_unverified_news_gate_caps_real_scores_as_suspicious():
84
+ score, label, severity, reason = apply_unverified_news_gate(
85
+ 92,
86
+ has_trusted_sources=False,
87
+ has_contradicting_evidence=False,
88
+ truth_override_applied=False,
89
+ )
90
+
91
+ assert score == 55
92
+ assert label == "Suspicious"
93
+ assert severity == "warning"
94
+ assert reason == "no_trusted_source"
95
+
96
+
97
+ def test_unverified_news_gate_keeps_fake_scores_fake():
98
+ score, label, severity, reason = apply_unverified_news_gate(
99
+ 18,
100
+ has_trusted_sources=False,
101
+ has_contradicting_evidence=False,
102
+ truth_override_applied=False,
103
+ )
104
+
105
+ assert score == 18
106
+ assert label == "Very Likely Fake"
107
+ assert severity == "critical"
108
+ assert reason == "no_trusted_source"
109
+
110
+
111
+ def test_news_lookup_falls_back_from_recent_india_to_older_india(monkeypatch):
112
+ calls = []
113
+
114
+ async def fake_fetch(params):
115
+ calls.append(dict(params))
116
+ if params.get("country") == "in" and params.get("_endpoint") == "archive":
117
+ return [
118
+ {
119
+ "link": "https://indianexpress.com/article/cities/kolkata/example",
120
+ "title": "BJP leader aide shot dead in Bengal",
121
+ "source_id": "indianexpress",
122
+ "pubDate": "2026-05-07 00:43:00",
123
+ "description": "Police launched an investigation.",
124
+ }
125
+ ]
126
+ return []
127
+
128
+ monkeypatch.setattr("services.news_lookup.settings.NEWS_API_KEY", "test-key")
129
+ monkeypatch.setattr("services.news_lookup._fetch", fake_fetch)
130
+ monkeypatch.setattr("services.news_lookup._compute_truth_override", lambda *args, **kwargs: None)
131
+
132
+ result = asyncio.run(
133
+ search_news_full(
134
+ ["BJP", "Suvendu", "Adhikari", "Madhyamgram"],
135
+ original_text="BJP leader Suvendu Adhikari's PA shot dead in West Bengal's Madhyamgram",
136
+ )
137
+ )
138
+
139
+ assert result.trusted_sources[0].source_name == "indianexpress"
140
+ assert calls[0]["country"] == "in"
141
+ assert calls[0]["timeframe"] == "1"
142
+ archive_call = next(call for call in calls if call.get("country") == "in" and call.get("_endpoint") == "archive")
143
+ assert archive_call["_url"].endswith("/archive")
144
+ assert "timeframe" not in archive_call
145
+ assert "from_date" in archive_call
146
+ assert "to_date" in archive_call
147
+
148
+
149
+ def test_news_lookup_falls_back_to_global_when_india_has_no_results(monkeypatch):
150
+ calls = []
151
+
152
+ async def fake_fetch(params):
153
+ calls.append(dict(params))
154
+ if "country" not in params and params.get("timeframe") == "1":
155
+ return [
156
+ {
157
+ "link": "https://www.reuters.com/world/example",
158
+ "title": "US and EU announce new trade framework",
159
+ "source_id": "reuters",
160
+ "pubDate": "2026-05-07 01:05:00",
161
+ "description": "Officials announced a new framework.",
162
+ }
163
+ ]
164
+ return []
165
+
166
+ monkeypatch.setattr("services.news_lookup.settings.NEWS_API_KEY", "test-key")
167
+ monkeypatch.setattr("services.news_lookup._fetch", fake_fetch)
168
+ monkeypatch.setattr("services.news_lookup._compute_truth_override", lambda *args, **kwargs: None)
169
+
170
+ result = asyncio.run(
171
+ search_news_full(
172
+ ["US", "EU", "trade", "framework"],
173
+ original_text="US and EU announce new trade framework",
174
+ )
175
+ )
176
+
177
+ assert result.trusted_sources[0].source_name == "reuters"
178
+ assert any(call.get("country") == "in" for call in calls)
179
+ assert any("country" not in call for call in calls)
180
+
181
+
182
  def test_no_face_fusion_uses_general_detector_forensic_and_exif_evidence():
183
  fused = fuse_no_face_evidence(
184
  general_fake_prob=0.72,
 
256
  assert result.ensemble_method == "no_face_general_forensic_fusion"
257
  assert result.models_used == ["test-general-detector"]
258
  assert result.no_face_analysis is not None
259
+
260
+
261
+ def test_synthetic_still_override_keeps_strong_ai_detector_authoritative():
262
+ import services.image_service as image_service
263
+
264
+ adjusted, reason = image_service._apply_synthetic_still_overrides(
265
+ fake_prob=0.13,
266
+ general_fake_prob=0.93,
267
+ is_video_frame=False,
268
+ )
269
+
270
+ assert adjusted >= 0.90
271
+ assert reason == "general_detector_very_high(0.93)"
272
+
273
+
274
+ def test_synthetic_still_override_does_not_affect_video_frame_route():
275
+ import services.image_service as image_service
276
+
277
+ adjusted, reason = image_service._apply_synthetic_still_overrides(
278
+ fake_prob=0.13,
279
+ general_fake_prob=0.93,
280
+ is_video_frame=True,
281
+ )
282
+
283
+ assert adjusted == 0.13
284
+ assert reason is None
285
+
286
+
287
+ def test_heatmap_target_index_prefers_fake_label_tokens():
288
+ from types import SimpleNamespace
289
+
290
+ from models.heatmap_generator import _find_class_index
291
+
292
+ model = SimpleNamespace(config=SimpleNamespace(id2label={0: "real", 1: "fake"}))
293
+
294
+ assert _find_class_index(model, ("fake", "generated", "synthetic")) == 1
295
+
296
+
297
+ def test_video_efficientnet_frame_scored_is_boolean(monkeypatch):
298
+ import numpy as np
299
+ from PIL import Image
300
+
301
+ import services.video_service as video_service
302
+
303
+ class FakeEfficientNet:
304
+ calibrator_applied = False
305
+
306
+ class FaceExtractor:
307
+ def process_image(self, img):
308
+ return {"faces": [np.zeros((16, 16, 3), dtype=np.uint8)]}
309
+
310
+ face_extractor = FaceExtractor()
311
+
312
+ def _fallback_face_crop(self, img):
313
+ return None
314
+
315
+ class FakeLoader:
316
+ def load_efficientnet(self):
317
+ return FakeEfficientNet()
318
+
319
+ monkeypatch.setattr(video_service, "get_model_loader", lambda: FakeLoader())
320
+ monkeypatch.setattr(video_service, "_score_efficientnet_face", lambda _eff, _face: 0.7)
321
+
322
+ frame = np.zeros((32, 32, 3), dtype=np.uint8)
323
+ results, *_ = video_service._analyze_with_efficientnet(
324
+ [(0, 0.0, frame, Image.fromarray(frame))]
325
+ )
326
+
327
+ assert results[0].scored is True
328
+ assert isinstance(results[0].scored, bool)
329
+
330
+
331
+ def test_video_primary_path_weights_ffpp_vit_above_efficientnet(monkeypatch):
332
+ import numpy as np
333
+ from PIL import Image
334
+
335
+ import services.video_service as video_service
336
+
337
+ class FakeEfficientNet:
338
+ calibrator_applied = False
339
+
340
+ class FaceExtractor:
341
+ def process_image(self, img):
342
+ return {"faces": [np.zeros((16, 16, 3), dtype=np.uint8)]}
343
+
344
+ face_extractor = FaceExtractor()
345
+
346
+ def _fallback_face_crop(self, img):
347
+ return None
348
+
349
+ class FakeLoader:
350
+ def load_efficientnet(self):
351
+ return FakeEfficientNet()
352
+
353
+ monkeypatch.setattr(video_service, "get_model_loader", lambda: FakeLoader())
354
+ monkeypatch.setattr(video_service, "_score_efficientnet_face", lambda _eff, _face: 0.10)
355
+ monkeypatch.setattr(video_service, "_classify_ffpp", lambda _pil: (0.90, {"fake": 0.90, "real": 0.10}))
356
+
357
+ frame = np.zeros((32, 32, 3), dtype=np.uint8)
358
+ results, _detector, models_used, _calibrated = video_service._analyze_with_efficientnet(
359
+ [(0, 0.0, frame, Image.fromarray(frame))]
360
+ )
361
+
362
+ assert results[0].suspicious_prob > 0.60
363
+ assert results[0].label == "Fake"
364
+ assert "ffpp-vit-local" in models_used
utils/scoring.py CHANGED
@@ -15,6 +15,7 @@ TRUST_SCALE = [
15
  # Score range for forced disagreement clamp
16
  UNCERTAIN_SCORE_LO = 56
17
  UNCERTAIN_SCORE_HI = 69
 
18
 
19
 
20
  def _validate_weight_total(weights: list[float], context: str) -> None:
@@ -41,6 +42,31 @@ def get_verdict_label(score: int) -> Tuple[str, str]:
41
  return "Unknown", "warning"
42
 
43
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
44
  def compute_video_authenticity_score(
45
  *,
46
  mean_suspicious_prob: float,
 
15
  # Score range for forced disagreement clamp
16
  UNCERTAIN_SCORE_LO = 56
17
  UNCERTAIN_SCORE_HI = 69
18
+ UNVERIFIED_NEWS_SCORE_CAP = 55
19
 
20
 
21
  def _validate_weight_total(weights: list[float], context: str) -> None:
 
42
  return "Unknown", "warning"
43
 
44
 
45
+ def apply_unverified_news_gate(
46
+ score: int,
47
+ *,
48
+ has_trusted_sources: bool,
49
+ has_contradicting_evidence: bool,
50
+ truth_override_applied: bool,
51
+ ) -> Tuple[int, str, str, str | None]:
52
+ """Prevent unverifiable news claims from receiving a real verdict.
53
+
54
+ The text classifier can judge writing style, but a news claim with no
55
+ corroborating trusted source should stay in the suspicious/verification band.
56
+ Already-fake scores remain fake; the gate only caps overly-real scores.
57
+ """
58
+ if has_trusted_sources or has_contradicting_evidence or truth_override_applied:
59
+ label, severity = get_verdict_label(score)
60
+ return score, label, severity, None
61
+
62
+ gated_score = min(score, UNVERIFIED_NEWS_SCORE_CAP)
63
+ if gated_score > 40:
64
+ return gated_score, "Suspicious", "warning", "no_trusted_source"
65
+
66
+ label, severity = get_verdict_label(gated_score)
67
+ return gated_score, label, severity, "no_trusted_source"
68
+
69
+
70
  def compute_video_authenticity_score(
71
  *,
72
  mean_suspicious_prob: float,