"
+ ]
+}}
+
+Rules:
+- Be factual. State what the analysis found, not what you speculate.
+- Reference specific indicators (e.g. "GAN artifact score", "EXIF metadata", "sensationalism level").
+- If the verdict is "Likely Authentic", reassure the user and explain why.
+- If the verdict is "Likely Manipulated" or "Suspicious", highlight the strongest evidence.
+- Keep the paragraph under 60 words. Each bullet under 20 words.
+
+**Analysis payload:**
+{payload_json}
+"""
+
+
+class _LLMProvider(ABC):
+ @abstractmethod
+ def generate(self, prompt: str) -> str:
+ """Send prompt to LLM and return raw text response."""
+
+
+class _GeminiProvider(_LLMProvider):
+ def __init__(self) -> None:
+ import google.generativeai as genai
+ genai.configure(api_key=settings.LLM_API_KEY)
+ self._model = genai.GenerativeModel(settings.LLM_MODEL)
+
+ def generate(self, prompt: str) -> str:
+ response = self._model.generate_content(prompt)
+ return response.text
+
+
+class _OpenAIProvider(_LLMProvider):
+ def __init__(self) -> None:
+ from openai import OpenAI
+ self._client = OpenAI(api_key=settings.LLM_API_KEY)
+
+ def generate(self, prompt: str) -> str:
+ response = self._client.chat.completions.create(
+ model=settings.LLM_MODEL,
+ messages=[{"role": "user", "content": prompt}],
+ temperature=0.3,
+ max_tokens=300,
+ )
+ return response.choices[0].message.content
+
+
+@lru_cache(maxsize=1)
+def _get_provider() -> _LLMProvider:
+ """Lazy-init the configured LLM provider (singleton)."""
+ provider_name = settings.LLM_PROVIDER.lower()
+ if provider_name == "openai":
+ return _OpenAIProvider()
+ return _GeminiProvider()
+
+
+def _parse_llm_response(raw: str) -> tuple[str, list[str]]:
+ """Parse the LLM's JSON response into (paragraph, bullets).
+ Handles cases where the LLM wraps output in markdown fences.
+ """
+ text = raw.strip()
+ # Strip markdown code fences if present
+ if text.startswith("```"):
+ lines = text.split("\n")
+ # Remove first and last fence lines
+ lines = [l for l in lines if not l.strip().startswith("```")]
+ text = "\n".join(lines).strip()
+
+ parsed = json.loads(text)
+ paragraph = parsed.get("paragraph", "")
+ bullets = parsed.get("bullets", [])
+ if not isinstance(bullets, list):
+ bullets = [str(bullets)]
+ return paragraph, bullets[:3]
+
+
+def generate_llm_summary(
+ payload: dict[str, Any],
+ record_id: str | None = None,
+) -> LLMExplainabilitySummary:
+ """Generate an LLM-powered plain-English explanation for an analysis result.
+
+ Args:
+ payload: The full analysis response dict (verdict, scores, indicators, etc.).
+ record_id: Optional cache key. If provided and cached, returns cached result.
+
+ Returns:
+ LLMExplainabilitySummary with paragraph, bullets, and model info.
+ """
+ # Check cache
+ if record_id and record_id in _cache:
+ logger.debug(f"LLM summary cache hit for record_id={record_id}")
+ cached = _cache[record_id]
+ cached.cached = True
+ return cached
+
+ # Guard: no API key configured
+ if not settings.LLM_API_KEY:
+ logger.warning("LLM_API_KEY not set — skipping LLM explainability card")
+ return LLMExplainabilitySummary(
+ paragraph="LLM explanation unavailable (no API key configured).",
+ bullets=[],
+ model_used="none",
+ )
+
+ # Strip heavy base64 fields to reduce token usage
+ slim_payload = {k: v for k, v in payload.items()
+ if k not in ("explainability",)}
+ # Include explainability but strip base64 images
+ if "explainability" in payload and isinstance(payload["explainability"], dict):
+ expl = {k: v for k, v in payload["explainability"].items()
+ if not k.endswith("_base64")}
+ slim_payload["explainability"] = expl
+
+ prompt = _PROMPT_TEMPLATE.format(payload_json=json.dumps(slim_payload, indent=2, default=str))
+
+ try:
+ provider = _get_provider()
+ raw_response = provider.generate(prompt)
+ paragraph, bullets = _parse_llm_response(raw_response)
+
+ summary = LLMExplainabilitySummary(
+ paragraph=paragraph,
+ bullets=bullets,
+ model_used=f"{settings.LLM_PROVIDER}/{settings.LLM_MODEL}",
+ )
+
+ # Cache result
+ if record_id:
+ _cache[record_id] = summary
+
+ logger.info(f"LLM summary generated via {settings.LLM_PROVIDER}/{settings.LLM_MODEL}")
+ return summary
+
+ except json.JSONDecodeError as e:
+ logger.error(f"LLM returned unparseable JSON: {e}")
+ return LLMExplainabilitySummary(
+ paragraph="Analysis complete. See the detailed indicators below for specifics.",
+ bullets=["LLM explanation could not be parsed"],
+ model_used=f"{settings.LLM_PROVIDER}/{settings.LLM_MODEL}",
+ )
+ except Exception as e:
+ logger.error(f"LLM explainer failed: {e}")
+ return LLMExplainabilitySummary(
+ paragraph="Analysis complete. See the detailed indicators below for specifics.",
+ bullets=["LLM explanation temporarily unavailable"],
+ model_used="error",
+ )
diff --git a/main.py b/main.py
new file mode 100644
index 0000000000000000000000000000000000000000..2c144c8523c543ab6882943f9f1412ce24d57e75
--- /dev/null
+++ b/main.py
@@ -0,0 +1,59 @@
+import asyncio
+from contextlib import asynccontextmanager
+
+from fastapi import FastAPI
+from fastapi.middleware.cors import CORSMiddleware
+from loguru import logger
+
+from api.router import api_router
+from config import settings
+from db.database import init_db
+from models.model_loader import get_model_loader
+from services.report_service import cleanup_expired
+
+
+async def _report_cleanup_loop():
+ while True:
+ try:
+ cleanup_expired()
+ except Exception as e: # noqa: BLE001
+ logger.warning(f"Report cleanup error: {e}")
+ await asyncio.sleep(600) # every 10 min
+
+
+@asynccontextmanager
+async def lifespan(app: FastAPI):
+ logger.info("Starting DeepShield backend")
+ init_db()
+ logger.info("Database initialized")
+ if settings.PRELOAD_MODELS:
+ get_model_loader().preload_phase1()
+ else:
+ logger.info("PRELOAD_MODELS=false — models will load on first use")
+ task = asyncio.create_task(_report_cleanup_loop())
+ yield
+ task.cancel()
+ logger.info("Shutting down DeepShield backend")
+
+
+app = FastAPI(
+ title="DeepShield API",
+ description="Explainable AI-based multimodal misinformation detection",
+ version="0.1.0",
+ lifespan=lifespan,
+)
+
+app.add_middleware(
+ CORSMiddleware,
+ allow_origins=settings.CORS_ORIGINS,
+ allow_credentials=True,
+ allow_methods=["*"],
+ allow_headers=["*"],
+)
+
+app.include_router(api_router)
+
+
+@app.get("/")
+def root():
+ return {"service": "DeepShield", "docs": "/docs", "health": "/api/v1/health"}
diff --git a/model_loader.py b/model_loader.py
new file mode 100644
index 0000000000000000000000000000000000000000..d71e9f3f59bb7a51d81cbd8d82ef940d521118e0
--- /dev/null
+++ b/model_loader.py
@@ -0,0 +1,156 @@
+from __future__ import annotations
+
+from threading import Lock
+from typing import Optional, Tuple
+
+from loguru import logger
+
+from config import settings
+
+
+class ModelLoader:
+ """Singleton holder for preloaded AI models. Thread-safe lazy init."""
+
+ _instance: Optional["ModelLoader"] = None
+ _lock: Lock = Lock()
+
+ def __new__(cls) -> "ModelLoader":
+ if cls._instance is None:
+ with cls._lock:
+ if cls._instance is None:
+ cls._instance = super().__new__(cls)
+ cls._instance._image_model = None
+ cls._instance._image_processor = None
+ cls._instance._text_pipeline = None
+ cls._instance._multilang_text_pipeline = None
+ cls._instance._ocr_reader = None
+ cls._instance._face_detector = None
+ cls._instance._spacy_nlp = None
+ cls._instance._sentence_transformer = None
+ return cls._instance
+
+ @classmethod
+ def get_instance(cls) -> "ModelLoader":
+ return cls()
+
+ # ---------- Image (ViT deepfake classifier) ----------
+ def load_image_model(self) -> Tuple[object, object]:
+ if self._image_model is None:
+ logger.info(f"Loading image model: {settings.IMAGE_MODEL_ID}")
+ from transformers import AutoImageProcessor, AutoModelForImageClassification
+
+ self._image_processor = AutoImageProcessor.from_pretrained(settings.IMAGE_MODEL_ID)
+ model = AutoModelForImageClassification.from_pretrained(settings.IMAGE_MODEL_ID)
+ model.to(settings.DEVICE)
+ model.eval()
+ self._image_model = model
+ logger.info("Image model loaded")
+ return self._image_model, self._image_processor
+
+ # ---------- Text (BERT fake-news classifier — English) ----------
+ def load_text_model(self):
+ if self._text_pipeline is None:
+ logger.info(f"Loading text model: {settings.TEXT_MODEL_ID}")
+ from transformers import pipeline
+
+ self._text_pipeline = pipeline(
+ "text-classification",
+ model=settings.TEXT_MODEL_ID,
+ device=0 if settings.DEVICE == "cuda" else -1,
+ )
+ logger.info("Text model loaded")
+ return self._text_pipeline
+
+ # ---------- Multilingual text model (Phase 13) ----------
+ def load_multilang_text_model(self):
+ """Load multilingual fake-news classifier. Falls back to English model if not configured."""
+ model_id = settings.TEXT_MULTILANG_MODEL_ID
+ if not model_id:
+ logger.debug("TEXT_MULTILANG_MODEL_ID not set — falling back to English text model")
+ return self.load_text_model()
+
+ if self._multilang_text_pipeline is None:
+ logger.info(f"Loading multilingual text model: {model_id}")
+ from transformers import pipeline
+
+ self._multilang_text_pipeline = pipeline(
+ "text-classification",
+ model=model_id,
+ device=0 if settings.DEVICE == "cuda" else -1,
+ )
+ logger.info("Multilingual text model loaded")
+ return self._multilang_text_pipeline
+
+ # ---------- spaCy NLP (Phase 13 NER) ----------
+ def load_spacy_nlp(self):
+ """Lazy-load spaCy English NLP model. Returns None if spaCy is not installed."""
+ if self._spacy_nlp is None:
+ try:
+ import spacy # type: ignore
+ try:
+ self._spacy_nlp = spacy.load("en_core_web_sm")
+ logger.info("spaCy en_core_web_sm loaded")
+ except OSError:
+ logger.warning(
+ "spaCy model 'en_core_web_sm' not found. "
+ "Run: python -m spacy download en_core_web_sm"
+ )
+ return None
+ except ImportError:
+ logger.warning("spaCy not installed — NER keyword extraction disabled")
+ return None
+ return self._spacy_nlp
+
+ # ---------- Sentence-Transformer (Phase 13 truth-override) ----------
+ def load_sentence_transformer(self):
+ """Lazy-load sentence-transformers/all-MiniLM-L6-v2. Returns None if not installed."""
+ if self._sentence_transformer is None:
+ try:
+ from sentence_transformers import SentenceTransformer # type: ignore
+ self._sentence_transformer = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
+ logger.info("Sentence-transformer (all-MiniLM-L6-v2) loaded")
+ except ImportError:
+ logger.warning("sentence-transformers not installed — truth-override disabled")
+ return None
+ except Exception as e:
+ logger.warning(f"Sentence-transformer load failed: {e}")
+ return None
+ return self._sentence_transformer
+
+ # ---------- OCR (EasyOCR) — Phase 13: use OCR_LANGS from config ----------
+ def load_ocr_engine(self):
+ if self._ocr_reader is None:
+ langs = [l.strip() for l in settings.OCR_LANGS.split(",") if l.strip()]
+ if not langs:
+ langs = ["en"]
+ logger.info(f"Loading EasyOCR reader (langs: {langs})")
+ import easyocr # type: ignore
+
+ self._ocr_reader = easyocr.Reader(
+ langs, gpu=(settings.DEVICE == "cuda"), verbose=False, download_enabled=True,
+ )
+ logger.info("EasyOCR loaded")
+ return self._ocr_reader
+
+ # ---------- Face detector (MediaPipe) ----------
+ def load_face_detector(self):
+ if self._face_detector is None:
+ logger.info("Loading MediaPipe FaceMesh")
+ import mediapipe as mp # type: ignore
+
+ self._face_detector = mp.solutions.face_mesh.FaceMesh(
+ static_image_mode=True,
+ max_num_faces=5,
+ min_detection_confidence=0.5,
+ )
+ logger.info("MediaPipe FaceMesh loaded")
+ return self._face_detector
+
+ # ---------- Preload ----------
+ def preload_phase1(self) -> None:
+ """Preload only what Phase 1 needs (image model)."""
+ self.load_image_model()
+
+
+def get_model_loader() -> ModelLoader:
+ return ModelLoader.get_instance()
diff --git a/models.py b/models.py
new file mode 100644
index 0000000000000000000000000000000000000000..af3b2f8f14b6485f08ed933ec490c11d10802e4a
--- /dev/null
+++ b/models.py
@@ -0,0 +1,45 @@
+from datetime import datetime
+
+from sqlalchemy import DateTime, ForeignKey, Integer, String, Text
+from sqlalchemy.orm import Mapped, mapped_column, relationship
+
+from db.database import Base
+
+
+class User(Base):
+ __tablename__ = "users"
+
+ id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True)
+ email: Mapped[str] = mapped_column(String(255), unique=True, index=True, nullable=False)
+ password_hash: Mapped[str] = mapped_column(String(255), nullable=False)
+ name: Mapped[str | None] = mapped_column(String(255), nullable=True)
+ created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)
+
+ analyses: Mapped[list["AnalysisRecord"]] = relationship(back_populates="user")
+
+
+class AnalysisRecord(Base):
+ __tablename__ = "analyses"
+
+ id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True)
+ user_id: Mapped[int | None] = mapped_column(ForeignKey("users.id"), nullable=True)
+ media_type: Mapped[str] = mapped_column(String(32), nullable=False) # image|video|text|screenshot
+ verdict: Mapped[str] = mapped_column(String(32), nullable=False)
+ authenticity_score: Mapped[float] = mapped_column(nullable=False)
+ result_json: Mapped[str] = mapped_column(Text, nullable=False)
+ created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)
+
+ user: Mapped["User | None"] = relationship(back_populates="analyses")
+ report: Mapped["Report | None"] = relationship(back_populates="analysis", uselist=False)
+
+
+class Report(Base):
+ __tablename__ = "reports"
+
+ id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True)
+ analysis_id: Mapped[int] = mapped_column(ForeignKey("analyses.id"), nullable=False)
+ file_path: Mapped[str] = mapped_column(String(512), nullable=False)
+ created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)
+ expires_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
+
+ analysis: Mapped["AnalysisRecord"] = relationship(back_populates="report")
diff --git a/news_lookup.py b/news_lookup.py
new file mode 100644
index 0000000000000000000000000000000000000000..8831afb27b3e5d852cf6c2838c8bd96ceca8420d
--- /dev/null
+++ b/news_lookup.py
@@ -0,0 +1,242 @@
+from __future__ import annotations
+
+from dataclasses import dataclass
+from typing import List, Optional, Tuple
+from urllib.parse import urlparse
+
+import httpx
+from loguru import logger
+
+from config import settings
+from schemas.common import ContradictingEvidence, TrustedSource, TruthOverride
+
+# Trusted news domains — higher relevance boost
+TRUSTED_DOMAINS = {
+ "reuters.com": 1.0, "apnews.com": 1.0, "bbc.com": 1.0, "bbc.co.uk": 1.0,
+ "theguardian.com": 0.95, "nytimes.com": 0.95, "washingtonpost.com": 0.95,
+ "cnn.com": 0.9, "npr.org": 0.95, "aljazeera.com": 0.9,
+ "thehindu.com": 0.9, "indianexpress.com": 0.9, "ndtv.com": 0.85,
+ "hindustantimes.com": 0.85, "pti.news": 0.95,
+}
+
+# Fact-check / contradiction sources
+FACTCHECK_DOMAINS = {
+ "factcheck.org", "snopes.com", "politifact.com", "fullfact.org",
+ "reuters.com/fact-check", "apnews.com/hub/ap-fact-check",
+ "factly.in", "altnews.in", "boomlive.in", "vishvasnews.com",
+}
+
+# Domains eligible for truth-override (weight >= 0.9 per BUILD_PLAN spec)
+_HIGH_TRUST_DOMAINS = {d for d, w in TRUSTED_DOMAINS.items() if w >= 0.9}
+
+# Thresholds per BUILD_PLAN §13.2
+_OVERRIDE_SIMILARITY_THRESHOLD = 0.6
+_OVERRIDE_FAKE_PROB_CAP = 0.15
+_OVERRIDE_FAKE_PROB_MULTIPLIER = 0.3
+
+
+@dataclass
+class NewsLookupResult:
+ trusted_sources: List[TrustedSource]
+ contradicting_evidence: List[ContradictingEvidence]
+ total_articles: int
+ truth_override: Optional[TruthOverride] = None
+
+
+def _domain_of(url: str) -> str:
+ try:
+ return urlparse(url).netloc.lower().replace("www.", "")
+ except Exception:
+ return ""
+
+
+def _is_factcheck(url: str, title: str) -> bool:
+ dom = _domain_of(url)
+ if any(fc in dom for fc in FACTCHECK_DOMAINS):
+ return True
+ tl = (title or "").lower()
+ return any(kw in tl for kw in ("fact check", "fact-check", "debunked", "false claim", "misleading", "hoax"))
+
+
+def _relevance(url: str) -> float:
+ dom = _domain_of(url)
+ for td, score in TRUSTED_DOMAINS.items():
+ if td in dom:
+ return score
+ return 0.5
+
+
+def _is_high_trust(url: str) -> bool:
+ dom = _domain_of(url)
+ return any(ht in dom for ht in _HIGH_TRUST_DOMAINS)
+
+
+def _compute_truth_override(
+ input_text: str,
+ trusted_sources: List[TrustedSource],
+ current_fake_prob: float,
+) -> Optional[TruthOverride]:
+ """Check if any high-trust source corroborates the input text at >= 0.6 cosine similarity.
+
+ Per BUILD_PLAN §13.2:
+ - Compute cosine similarity between input_text and each trusted-source headline+description
+ - If ≥ 1 high-trust source (weight ≥ 0.9) has similarity ≥ 0.6 → apply fake_prob *= 0.3, cap at 0.15
+ """
+ if not input_text or not trusted_sources:
+ return None
+
+ # Filter to high-trust sources only
+ high_trust = [s for s in trusted_sources if _is_high_trust(s.url)]
+ if not high_trust:
+ return None
+
+ # Lazy-load sentence-transformer
+ from models.model_loader import get_model_loader
+ st_model = get_model_loader().load_sentence_transformer()
+ if st_model is None:
+ return None
+
+ try:
+ import numpy as np
+
+ # Encode input text and all high-trust headlines
+ source_texts = [
+ f"{s.title}" for s in high_trust
+ ]
+ all_texts = [input_text[:512]] + source_texts
+
+ embeddings = st_model.encode(all_texts, convert_to_numpy=True, normalize_embeddings=True)
+ query_vec = embeddings[0] # (D,)
+ source_vecs = embeddings[1:] # (N, D)
+
+ # Cosine similarity — already normalized, so dot product = cosine similarity
+ similarities = np.dot(source_vecs, query_vec)
+
+ best_idx = int(np.argmax(similarities))
+ best_sim = float(similarities[best_idx])
+ best_source = high_trust[best_idx]
+
+ logger.info(
+ f"Truth-override: best similarity={best_sim:.3f} "
+ f"source={best_source.source_name} url={best_source.url}"
+ )
+
+ if best_sim >= _OVERRIDE_SIMILARITY_THRESHOLD:
+ new_fake_prob = min(
+ current_fake_prob * _OVERRIDE_FAKE_PROB_MULTIPLIER,
+ _OVERRIDE_FAKE_PROB_CAP,
+ )
+ logger.info(
+ f"Truth-override APPLIED: fake_prob {current_fake_prob:.3f} → {new_fake_prob:.3f}"
+ )
+ return TruthOverride(
+ applied=True,
+ source_url=best_source.url,
+ source_name=best_source.source_name,
+ similarity=round(best_sim, 4),
+ fake_prob_before=round(current_fake_prob, 4),
+ fake_prob_after=round(new_fake_prob, 4),
+ )
+
+ return TruthOverride(
+ applied=False,
+ source_url=best_source.url,
+ source_name=best_source.source_name,
+ similarity=round(best_sim, 4),
+ fake_prob_before=round(current_fake_prob, 4),
+ fake_prob_after=round(current_fake_prob, 4),
+ )
+
+ except Exception as e:
+ logger.warning(f"Truth-override computation failed: {e}")
+ return None
+
+
+async def _fetch(q: str, country: Optional[str]) -> list[dict]:
+ target_country = country or "in"
+ params = {"apikey": settings.NEWS_API_KEY, "q": q, "language": "en", "size": 10, "country": "in"}
+
+ try:
+ async with httpx.AsyncClient(timeout=8.0) as c:
+ r = await c.get(settings.NEWS_API_BASE_URL, params=params)
+ r.raise_for_status()
+ return (r.json() or {}).get("results") or []
+ except Exception as e:
+ logger.warning(f"News lookup failed: {e}")
+ return []
+
+
+async def search_news(
+ keywords: List[str],
+ limit: int = 6,
+ country: Optional[str] = None,
+) -> List[TrustedSource]:
+ """Back-compat simple form — returns trusted sources only."""
+ result = await search_news_full(keywords, limit=limit, country=country)
+ return result.trusted_sources
+
+
+async def search_news_full(
+ keywords: List[str],
+ limit: int = 6,
+ country: Optional[str] = None,
+ original_text: Optional[str] = None,
+ current_fake_prob: float = 0.5,
+) -> NewsLookupResult:
+ """Full news lookup with truth-override support.
+
+ Args:
+ keywords: NER-extracted or frequency-extracted keywords to search.
+ limit: Max sources to return.
+ country: Country code for newsdata.io.
+ original_text: Input text to compare against headlines for truth-override.
+ current_fake_prob: Current fake probability — may be adjusted by truth-override.
+ """
+ if not settings.NEWS_API_KEY or not keywords:
+ return NewsLookupResult([], [], 0)
+
+ q = " ".join(keywords[:4])
+ articles = await _fetch(q, country)
+
+ seen: set[str] = set()
+ trusted: List[TrustedSource] = []
+ contradictions: List[ContradictingEvidence] = []
+
+ for art in articles:
+ url = art.get("link") or ""
+ if not url or url in seen:
+ continue
+ seen.add(url)
+
+ title = art.get("title") or ""
+ dom = _domain_of(url)
+ src_name = art.get("source_id") or dom or "news"
+
+ if _is_factcheck(url, title):
+ contradictions.append(ContradictingEvidence(
+ source_name=src_name, title=title, url=url, type="fact_check",
+ ))
+ continue
+
+ trusted.append(TrustedSource(
+ source_name=src_name,
+ title=title,
+ url=url,
+ published_at=art.get("pubDate"),
+ relevance_score=_relevance(url),
+ ))
+
+ trusted.sort(key=lambda s: -s.relevance_score)
+ trusted = trusted[:limit]
+
+ # ── Phase 13.2: Truth-override ──
+ truth_override = None
+ if original_text and trusted:
+ truth_override = _compute_truth_override(original_text, trusted, current_fake_prob)
+
+ return NewsLookupResult(
+ trusted_sources=trusted,
+ contradicting_evidence=contradictions[:limit],
+ total_articles=len(articles),
+ truth_override=truth_override,
+ )
diff --git a/report.html b/report.html
new file mode 100644
index 0000000000000000000000000000000000000000..17189b7194f885ccb8c690ae7accfe93578780a9
--- /dev/null
+++ b/report.html
@@ -0,0 +1,367 @@
+
+
+
+
+ DeepShield Analysis Report — {{ analysis_id }}
+
+
+
+
+ {# ── Header ── #}
+
+
+ {# ── Verdict ── #}
+ Verdict
+
+
+ |
+ {{ verdict.authenticity_score }}
+ / 100
+ |
+
+ {{ verdict.label }}
+ Severity: {{ verdict.severity }}
+ Model: {{ verdict.model_label }} ({{ '%.1f' | format(verdict.model_confidence * 100) }}% confidence)
+ |
+ {% if donut_b64 %}
+
+
+ |
+ {% endif %}
+
+
+
+ {# ── LLM Explanation ── #}
+ {% if llm_summary and llm_summary.paragraph %}
+ AI Explanation
+
+
{{ llm_summary.paragraph }}
+ {% if llm_summary.bullets %}
+
+ {% for b in llm_summary.bullets %}- {{ b }}
{% endfor %}
+
+ {% endif %}
+ {% if llm_summary.model_used %}
+
via {{ llm_summary.model_used }}
+ {% endif %}
+
+ {% endif %}
+
+ {# ══════════ IMAGE ══════════ #}
+ {% if media_type == 'image' %}
+
+ {# EXIF #}
+ {% if explainability.exif %}
+ EXIF Metadata
+
+ | Field | Value | Trust Signal |
+ {% if explainability.exif.make %}
+ | Camera Make | {{ explainability.exif.make }} | +real |
+ {% endif %}
+ {% if explainability.exif.model %}
+ | Camera Model | {{ explainability.exif.model }} | |
+ {% endif %}
+ {% if explainability.exif.datetime_original %}
+ | Date Taken | {{ explainability.exif.datetime_original }} | +real |
+ {% endif %}
+ {% if explainability.exif.software %}
+ | Software | {{ explainability.exif.software }} |
+ {% if 'photoshop' in explainability.exif.software | lower %}+fake{% endif %} |
+ {% endif %}
+ {% if explainability.exif.lens_model %}
+ | Lens Model | {{ explainability.exif.lens_model }} | |
+ {% endif %}
+ {% if explainability.exif.gps_info %}
+ | GPS | {{ explainability.exif.gps_info }} | |
+ {% endif %}
+
+ | Trust adjustment |
+
+ {% if explainability.exif.trust_adjustment > 0 %}
+ +{{ explainability.exif.trust_adjustment }} (fake signal)
+ {% elif explainability.exif.trust_adjustment < 0 %}
+ {{ explainability.exif.trust_adjustment }} (real signal)
+ {% else %}
+ neutral
+ {% endif %}
+ |
+
+
+ {% endif %}
+
+ {# Artifact indicators #}
+ {% if explainability.artifact_indicators %}
+ Artifact Indicators
+
+ | Type | Severity | Confidence | Description |
+ {% for ind in explainability.artifact_indicators %}
+
+ | {{ ind.type }} |
+ {{ ind.severity }} |
+ {{ '%.0f' | format(ind.confidence * 100) }}% |
+ {{ ind.description }} |
+
+ {% endfor %}
+
+ {% else %}
+ Artifact Indicators
+ No artifacts detected.
+ {% endif %}
+
+ {# VLM Detailed Breakdown #}
+ {% if explainability.vlm_breakdown %}
+ Detailed Breakdown
+ {% if explainability.vlm_breakdown.model_used %}
+ Scored by {{ explainability.vlm_breakdown.model_used }}
+ {% endif %}
+
+ | Component | Score | Bar | Notes |
+ {% set bd = explainability.vlm_breakdown %}
+ {% for comp_key, comp_label in [
+ ('facial_symmetry', 'Facial Symmetry'),
+ ('skin_texture', 'Skin Texture'),
+ ('lighting_consistency', 'Lighting Consistency'),
+ ('background_coherence', 'Background Coherence'),
+ ('anatomy_hands_eyes', 'Anatomy / Hands & Eyes'),
+ ('context_objects', 'Context & Objects')
+ ] %}
+ {% set comp = bd[comp_key] %}
+ {% set sc2 = comp.score if comp else 75 %}
+ {% set bar_cls = 'vlm-real' if sc2 >= 70 else ('vlm-warn' if sc2 >= 40 else 'vlm-fake') %}
+
+ | {{ comp_label }} |
+ {{ sc2 }}/100 |
+
+
+
+
+ |
+ {{ comp.notes if comp else '' }} |
+
+ {% endfor %}
+
+ {% endif %}
+
+ {% endif %}{# end image #}
+
+ {# ══════════ VIDEO ══════════ #}
+ {% if media_type == 'video' %}
+ Frame-Level Analysis
+
+ | Metric | Value |
+ | Frames sampled | {{ explainability.num_frames_sampled }} |
+ | Frames with face | {{ explainability.num_face_frames }} |
+ | Suspicious frames | {{ explainability.num_suspicious_frames }} |
+ | Mean suspicious prob | {{ '%.1f' | format(explainability.mean_suspicious_prob * 100) }}% |
+ | Max suspicious prob | {{ '%.1f' | format(explainability.max_suspicious_prob * 100) }}% |
+ | Insufficient faces | {{ explainability.insufficient_faces }} |
+
+ {% endif %}
+
+ {# ══════════ TEXT ══════════ #}
+ {% if media_type == 'text' %}
+
+ {# Language + truth-override #}
+ {% if explainability.detected_language and explainability.detected_language != 'en' %}
+ Language
+ Detected: {{ explainability.detected_language | upper }} — analysed via multilingual model
+ {% endif %}
+ {% if explainability.truth_override and explainability.truth_override.applied %}
+
+ Truth-override applied.
+ Corroborated by {{ explainability.truth_override.source_name }}
+ ({{ '%.0f' | format(explainability.truth_override.similarity * 100) }}% similarity).
+ Fake probability reduced from {{ '%.1f' | format(explainability.truth_override.fake_prob_before * 100) }}%
+ to {{ '%.1f' | format(explainability.truth_override.fake_prob_after * 100) }}%.
+
+ {% endif %}
+
+ Text Classification
+
+ | Metric | Value |
+ | Fake probability | {{ '%.1f' | format(explainability.fake_probability * 100) }}% |
+ | Top label | {{ explainability.top_label }} |
+ | Sensationalism score | {{ explainability.sensationalism.score }}/100 ({{ explainability.sensationalism.level }}) |
+ | Exclamations | {{ explainability.sensationalism.exclamation_count }} |
+ | ALL CAPS words | {{ explainability.sensationalism.caps_word_count }} |
+ | Clickbait matches | {{ explainability.sensationalism.clickbait_matches }} |
+ | Emotional words | {{ explainability.sensationalism.emotional_word_count }} |
+
+
+ {% if explainability.manipulation_indicators %}
+ Manipulation Indicators ({{ explainability.manipulation_indicators | length }})
+
+ | Pattern | Severity | Matched text |
+ {% for m in explainability.manipulation_indicators %}
+
+ | {{ m.pattern_type }} |
+ {{ m.severity }} |
+ {{ m.matched_text }} |
+
+ {% endfor %}
+
+ {% endif %}
+
+ {% if explainability.keywords %}
+ Extracted Keywords
+ {% for kw in explainability.keywords %}{{ kw }}{% endfor %}
+ {% endif %}
+
+ {% endif %}{# end text #}
+
+ {# ══════════ SCREENSHOT ══════════ #}
+ {% if media_type == 'screenshot' %}
+
+ {% if explainability.detected_language and explainability.detected_language != 'en' %}
+ Detected language: {{ explainability.detected_language | upper }}
+ {% endif %}
+ {% if explainability.truth_override and explainability.truth_override.applied %}
+
+ Truth-override applied. {{ explainability.truth_override.source_name }}
+ ({{ '%.0f' | format(explainability.truth_override.similarity * 100) }}% similarity)
+
+ {% endif %}
+
+ Extracted Text
+ {{ explainability.ocr_boxes | length }} OCR regions detected
+
+ | {{ explainability.extracted_text }} |
+
+
+ Analysis Summary
+
+ | Metric | Value |
+ | Fake probability | {{ '%.1f' | format(explainability.fake_probability * 100) }}% |
+ | Sensationalism | {{ explainability.sensationalism.score }}/100 ({{ explainability.sensationalism.level }}) |
+ | Suspicious phrases | {{ explainability.suspicious_phrases | length }} |
+ | Layout anomalies | {{ explainability.layout_anomalies | length }} |
+
+
+ {% if explainability.suspicious_phrases %}
+ Suspicious Phrases
+
+ | Text | Pattern | Severity |
+ {% for p in explainability.suspicious_phrases %}
+
+ | {{ p.text }} |
+ {{ p.pattern_type }} |
+ {{ p.severity }} |
+
+ {% endfor %}
+
+ {% endif %}
+
+ {% endif %}{# end screenshot #}
+
+ {# ══════════ SOURCES (all types) ══════════ #}
+ {% if trusted_sources %}
+ Trusted Source Cross-Reference ({{ trusted_sources | length }})
+
+ | Source | Title | Relevance |
+ {% for s in trusted_sources %}
+
+ | {{ s.source_name }} |
+ {{ s.title }} |
+ {{ '%.0f' | format(s.relevance_score * 100) }}% |
+
+ {% endfor %}
+
+ {% endif %}
+
+ {% if contradicting_evidence %}
+ Contradicting Evidence ({{ contradicting_evidence | length }})
+
+ | Source | Title | Type |
+ {% for c in contradicting_evidence %}
+ | {{ c.source_name }} | {{ c.title }} | {{ c.type }} |
+ {% endfor %}
+
+ {% endif %}
+
+ {# ══════════ PROCESSING ══════════ #}
+ Processing Summary
+ Model: {{ processing_summary.model_used }} · Duration: {{ processing_summary.total_duration_ms }} ms
+ {{ processing_summary.stages_completed | join(' → ') }}
+
+ {# ══════════ FOOTER ══════════ #}
+
+
+
+
diff --git a/report_service.py b/report_service.py
new file mode 100644
index 0000000000000000000000000000000000000000..154503b6179adbcd19ba924682e3f3649d8b0cc6
--- /dev/null
+++ b/report_service.py
@@ -0,0 +1,152 @@
+from __future__ import annotations
+
+import base64
+import json
+import os
+import time
+import uuid
+from datetime import datetime, timedelta, timezone
+from io import BytesIO
+from pathlib import Path
+from typing import Any, Optional
+
+from jinja2 import Environment, FileSystemLoader, select_autoescape
+from loguru import logger
+from xhtml2pdf import pisa # type: ignore
+
+from config import settings
+from db.models import AnalysisRecord, Report
+
+TEMPLATES_DIR = Path(__file__).resolve().parent.parent / "templates"
+
+_env = Environment(
+ loader=FileSystemLoader(str(TEMPLATES_DIR)),
+ autoescape=select_autoescape(["html", "xml"]),
+)
+
+
+def _score_class(score: int) -> str:
+ if score >= 70:
+ return "real"
+ if score >= 40:
+ return "warn"
+ return "fake"
+
+
+def _ensure_dir() -> Path:
+ p = Path(settings.REPORT_DIR)
+ p.mkdir(parents=True, exist_ok=True)
+ return p
+
+
+def _make_donut_chart(score: int, score_cls: str) -> str:
+ """Render authenticity score as a donut chart PNG; return base64 or '' on failure."""
+ try:
+ import matplotlib # type: ignore
+ matplotlib.use("Agg")
+ import matplotlib.pyplot as plt # type: ignore
+
+ color_map = {"real": "#43A047", "warn": "#FB8C00", "fake": "#E53935"}
+ color = color_map.get(score_cls, "#6B7280")
+
+ fig, ax = plt.subplots(figsize=(2.2, 2.2), dpi=96)
+ sizes = [score, 100 - score]
+ wedge_colors = [color, "#F3F4F6"]
+ ax.pie(sizes, colors=wedge_colors, startangle=90,
+ wedgeprops=dict(width=0.42, edgecolor="white", linewidth=1))
+ ax.text(0, 0, str(score), ha="center", va="center",
+ fontsize=20, fontweight="bold", color=color)
+ ax.set_aspect("equal")
+ plt.tight_layout(pad=0.05)
+
+ buf = BytesIO()
+ fig.savefig(buf, format="png", bbox_inches="tight", transparent=True)
+ plt.close(fig)
+ buf.seek(0)
+ return base64.b64encode(buf.read()).decode()
+ except Exception as e:
+ logger.debug(f"Donut chart skipped: {e}")
+ return ""
+
+
+def _extract_llm_summary(analysis_json: dict) -> dict | None:
+ """Extract llm_summary from either top-level or inside explainability (images)."""
+ top = analysis_json.get("llm_summary")
+ if top:
+ return top
+ return (analysis_json.get("explainability") or {}).get("llm_summary")
+
+
+def render_html(analysis_json: dict) -> str:
+ score = analysis_json.get("verdict", {}).get("authenticity_score", 50)
+ sc = _score_class(score)
+ donut_b64 = _make_donut_chart(score, sc)
+ llm_summary = _extract_llm_summary(analysis_json)
+ expl: dict[str, Any] = analysis_json.get("explainability") or {}
+
+ tmpl = _env.get_template("report.html")
+ return tmpl.render(
+ analysis_id=analysis_json.get("analysis_id", ""),
+ media_type=analysis_json.get("media_type", "unknown"),
+ verdict=analysis_json.get("verdict", {}),
+ explainability=expl,
+ trusted_sources=analysis_json.get("trusted_sources", []),
+ contradicting_evidence=analysis_json.get("contradicting_evidence", []),
+ processing_summary=analysis_json.get("processing_summary", {}),
+ responsible_ai_notice=analysis_json.get(
+ "responsible_ai_notice",
+ "AI-based analysis may not be 100% accurate.",
+ ),
+ score_class=sc,
+ generated_at=datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC"),
+ donut_b64=donut_b64,
+ llm_summary=llm_summary,
+ )
+
+
+def html_to_pdf(html: str, out_path: Path) -> None:
+ with open(out_path, "wb") as f:
+ result = pisa.CreatePDF(html, dest=f)
+ if result.err:
+ raise RuntimeError(f"xhtml2pdf failed with {result.err} errors")
+
+
+def generate_report(record: AnalysisRecord) -> Path:
+ out_dir = _ensure_dir()
+ filename = f"deepshield_{record.id}_{uuid.uuid4().hex[:8]}.pdf"
+ out_path = out_dir / filename
+
+ data = json.loads(record.result_json)
+ html = render_html(data)
+ html_to_pdf(html, out_path)
+ logger.info(f"Report generated id={record.id} path={out_path} size={out_path.stat().st_size}B")
+ return out_path
+
+
+def create_report_row(analysis_id: int, path: Path) -> Report:
+ return Report(
+ analysis_id=analysis_id,
+ file_path=str(path),
+ expires_at=datetime.utcnow() + timedelta(seconds=settings.REPORT_TTL_SECONDS),
+ )
+
+
+def cleanup_expired(now: Optional[datetime] = None) -> int:
+ """Delete expired PDFs from disk. Returns count deleted."""
+ now = now or datetime.utcnow()
+ d = Path(settings.REPORT_DIR)
+ if not d.exists():
+ return 0
+ deleted = 0
+ ttl = timedelta(seconds=settings.REPORT_TTL_SECONDS)
+ for f in d.glob("*.pdf"):
+ try:
+ mtime = datetime.utcfromtimestamp(f.stat().st_mtime)
+ if now - mtime > ttl:
+ f.unlink()
+ deleted += 1
+ except OSError as e:
+ logger.warning(f"Cleanup failed for {f}: {e}")
+ if deleted:
+ logger.info(f"Cleaned up {deleted} expired reports")
+ return deleted
diff --git a/requirements.txt b/requirements.txt
new file mode 100644
index 0000000000000000000000000000000000000000..6111cdc40a3920a19fa432a7c9539965abc77d32
--- /dev/null
+++ b/requirements.txt
@@ -0,0 +1,50 @@
+fastapi==0.115.0
+uvicorn[standard]==0.32.0
+pydantic==2.9.2
+pydantic-settings==2.6.0
+python-multipart==0.0.12
+python-dotenv==1.0.1
+loguru==0.7.2
+SQLAlchemy==2.0.35
+psycopg2-binary==2.9.9
+alembic==1.13.3
+python-jose[cryptography]==3.3.0
+bcrypt==4.2.0
+
+# === Phase 1: Image Detection ===
+# Install torch separately with CPU index first (see README): pip install torch==2.4.1 --index-url https://download.pytorch.org/whl/cpu
+torch==2.4.1
+torchvision==0.19.1
+transformers==4.44.2
+Pillow>=10.4.0
+numpy>=1.26,<3
+opencv-python==4.10.0.84
+grad-cam==1.5.4
+mediapipe==0.10.14
+
+# === Phase 12: Explainability v2 ===
+exifread==3.0.0
+google-generativeai>=0.3.0 # Gemini provider for LLM explainability
+openai>=1.0.0 # OpenAI provider (alternative to Gemini)
+
+# === Phase 14: PDF v2 donut chart ===
+matplotlib>=3.9.0
+
+# === Phase 13: Text Pipeline Hardening ===
+# After installing, run: python -m spacy download en_core_web_sm
+spacy>=3.7.0,<4.0.0
+sentence-transformers>=2.7.0 # for truth-override cosine similarity (all-MiniLM-L6-v2)
+langdetect==1.0.9 # lightweight language detection
+
+# === Phase 3: Text / News ===
+httpx==0.27.2
+
+# === Phase 4: Screenshot / OCR ===
+easyocr==1.7.2
+
+# === Phase 7: PDF Reports ===
+Jinja2==3.1.4
+xhtml2pdf==0.2.16
+
+# === Phase 8: Auth ===
+email-validator==2.2.0
diff --git a/router.py b/router.py
new file mode 100644
index 0000000000000000000000000000000000000000..478bb749be5a898755712262b76c49db8bd1257f
--- /dev/null
+++ b/router.py
@@ -0,0 +1,10 @@
+from fastapi import APIRouter
+
+from api.v1 import analyze, auth, health, history, report
+
+api_router = APIRouter(prefix="/api/v1")
+api_router.include_router(health.router)
+api_router.include_router(analyze.router)
+api_router.include_router(report.router)
+api_router.include_router(auth.router)
+api_router.include_router(history.router)
diff --git a/scoring.py b/scoring.py
new file mode 100644
index 0000000000000000000000000000000000000000..eec7009e2d63204fd7952bc3e0d30afb561dacc1
--- /dev/null
+++ b/scoring.py
@@ -0,0 +1,46 @@
+from __future__ import annotations
+
+from typing import Tuple
+
+TRUST_SCALE = [
+ (0, 20, "Very Likely Fake", "critical"),
+ (21, 40, "Likely Fake", "danger"),
+ (41, 60, "Possibly Manipulated", "warning"),
+ (61, 80, "Likely Real", "positive"),
+ (81, 100, "Very Likely Real", "safe"),
+]
+
+
+def compute_authenticity_score(model_confidence: float, label: str) -> int:
+ """Map (confidence, label) to 0-100 authenticity score.
+ Real-ish labels give high score; fake-ish labels give low score.
+ """
+ label_l = label.lower()
+ fake_tokens = ("fake", "deepfake", "manipulated", "ai", "generated", "synthetic")
+ if any(tok in label_l for tok in fake_tokens):
+ score = (1.0 - float(model_confidence)) * 100.0
+ else:
+ score = float(model_confidence) * 100.0
+ return int(round(max(0.0, min(100.0, score))))
+
+
+def get_verdict_label(score: int) -> Tuple[str, str]:
+ for lo, hi, label, severity in TRUST_SCALE:
+ if lo <= score <= hi:
+ return label, severity
+ return "Unknown", "warning"
+
+
+def get_score_color(score: int) -> str:
+ """Linear interpolate Red (#E53935) → Amber (#FFA726) → Green (#43A047)."""
+ def lerp(a: int, b: int, t: float) -> int:
+ return int(round(a + (b - a) * t))
+
+ score = max(0, min(100, score))
+ if score <= 50:
+ t = score / 50.0
+ r, g, b = lerp(0xE5, 0xFF, t), lerp(0x39, 0xA7, t), lerp(0x35, 0x26, t)
+ else:
+ t = (score - 50) / 50.0
+ r, g, b = lerp(0xFF, 0x43, t), lerp(0xA7, 0xA0, t), lerp(0x26, 0x47, t)
+ return f"#{r:02X}{g:02X}{b:02X}"
diff --git a/screenshot_service.py b/screenshot_service.py
new file mode 100644
index 0000000000000000000000000000000000000000..ae5aa3eed6986c0f9a940965bcdd5dfaedfa0dfb
--- /dev/null
+++ b/screenshot_service.py
@@ -0,0 +1,126 @@
+from __future__ import annotations
+
+from dataclasses import dataclass, field
+from typing import List, Tuple
+
+import numpy as np
+from loguru import logger
+from PIL import Image
+
+from models.model_loader import get_model_loader
+
+
+@dataclass
+class OCRBox:
+ text: str
+ bbox: List[List[int]] # 4 points [[x,y],...]
+ confidence: float
+
+
+@dataclass
+class SuspiciousPhrase:
+ text: str
+ bbox: List[List[int]]
+ pattern_type: str
+ severity: str
+ description: str
+
+
+@dataclass
+class LayoutAnomaly:
+ type: str # misalignment / font_mismatch / uneven_spacing
+ severity: str
+ description: str
+ confidence: float
+
+
+def run_ocr(pil_img: Image.Image) -> List[OCRBox]:
+ reader = get_model_loader().load_ocr_engine()
+ arr = np.array(pil_img.convert("RGB"))
+ results = reader.readtext(arr, detail=1, paragraph=False)
+ out: List[OCRBox] = []
+ for bbox, text, conf in results:
+ out.append(OCRBox(
+ text=str(text),
+ bbox=[[int(p[0]), int(p[1])] for p in bbox],
+ confidence=float(conf),
+ ))
+ logger.info(f"OCR extracted {len(out)} text regions")
+ return out
+
+
+def extract_full_text(boxes: List[OCRBox]) -> str:
+ return " ".join(b.text for b in boxes if b.text.strip())
+
+
+def map_phrases_to_boxes(boxes: List[OCRBox], manipulation_indicators) -> List[SuspiciousPhrase]:
+ """Map each manipulation indicator to the OCR box whose text contains it."""
+ out: List[SuspiciousPhrase] = []
+ for mi in manipulation_indicators:
+ needle = mi.matched_text.lower()
+ for b in boxes:
+ if needle in b.text.lower():
+ out.append(SuspiciousPhrase(
+ text=mi.matched_text,
+ bbox=b.bbox,
+ pattern_type=mi.pattern_type,
+ severity=mi.severity,
+ description=mi.description,
+ ))
+ break
+ return out
+
+
+def detect_layout_anomalies(boxes: List[OCRBox]) -> List[LayoutAnomaly]:
+ """Heuristic layout checks on OCR bboxes."""
+ out: List[LayoutAnomaly] = []
+ if len(boxes) < 3:
+ return out
+
+ heights = []
+ x_lefts = []
+ for b in boxes:
+ pts = b.bbox
+ ys = [p[1] for p in pts]
+ xs = [p[0] for p in pts]
+ heights.append(max(ys) - min(ys))
+ x_lefts.append(min(xs))
+
+ h_arr = np.array(heights, dtype=float)
+ if h_arr.mean() > 0:
+ cv_h = float(h_arr.std() / h_arr.mean())
+ if cv_h > 0.7:
+ out.append(LayoutAnomaly(
+ type="font_mismatch",
+ severity="medium" if cv_h < 1.2 else "high",
+ description=f"High variance in text heights (cv={cv_h:.2f}) — mixed fonts/sizes possible",
+ confidence=min(cv_h / 1.5, 1.0),
+ ))
+
+ x_arr = np.array(x_lefts, dtype=float)
+ if x_arr.std() > 0 and len(x_arr) > 4:
+ clustered = sum(1 for x in x_arr if abs(x - np.median(x_arr)) < 20)
+ align_ratio = clustered / len(x_arr)
+ if align_ratio < 0.4:
+ out.append(LayoutAnomaly(
+ type="misalignment",
+ severity="low",
+ description=f"Only {align_ratio*100:.0f}% of text blocks share left-alignment — unusual layout",
+ confidence=1.0 - align_ratio,
+ ))
+
+ if len(boxes) >= 4:
+ tops = sorted([min(p[1] for p in b.bbox) for b in boxes])
+ gaps = np.diff(tops)
+ gaps = gaps[gaps > 0]
+ if len(gaps) >= 3 and gaps.mean() > 0:
+ cv_g = float(gaps.std() / gaps.mean())
+ if cv_g > 1.5:
+ out.append(LayoutAnomaly(
+ type="uneven_spacing",
+ severity="low",
+ description=f"Irregular vertical spacing between text blocks (cv={cv_g:.2f})",
+ confidence=min(cv_g / 2.5, 1.0),
+ ))
+
+ return out
diff --git a/test_image_classify.py b/test_image_classify.py
new file mode 100644
index 0000000000000000000000000000000000000000..d38a0b667ed32057de0a08738f461535328648f5
--- /dev/null
+++ b/test_image_classify.py
@@ -0,0 +1,58 @@
+"""Phase 1.2 smoke test: download a sample image and run the ViT classifier.
+
+Run from backend/:
+ .venv/Scripts/python.exe scripts/test_image_classify.py
+"""
+from __future__ import annotations
+
+import sys
+import urllib.request
+from pathlib import Path
+
+sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
+
+import base64
+
+from models.heatmap_generator import generate_heatmap_base64
+from services.artifact_detector import scan_artifacts
+from services.image_service import preprocess_and_classify
+from utils.scoring import compute_authenticity_score, get_verdict_label
+
+SAMPLE_URL = "https://picsum.photos/seed/deepshield/512/512"
+
+
+def main() -> int:
+ print(f"Fetching sample image: {SAMPLE_URL}")
+ req = urllib.request.Request(SAMPLE_URL, headers={"User-Agent": "DeepShield/0.1"})
+ with urllib.request.urlopen(req, timeout=30) as r:
+ data = r.read()
+ print(f" got {len(data)} bytes")
+
+ print("Running classifier (first run will download model ~350MB)…")
+ pil, result = preprocess_and_classify(data)
+ print(f" image size: {pil.size}")
+ print(f" label: {result.label}")
+ print(f" confidence: {result.confidence:.4f}")
+ print(f" all scores: {result.all_scores}")
+
+ score = compute_authenticity_score(result.confidence, result.label)
+ verdict_label, severity = get_verdict_label(score)
+ print(f"\n authenticity_score: {score}")
+ print(f" verdict: {verdict_label} ({severity})")
+
+ print("\nScanning artifact indicators\u2026")
+ for ind in scan_artifacts(pil, data):
+ print(f" [{ind.severity.upper():6s}] {ind.type}: {ind.description} (conf {ind.confidence:.2f})")
+
+ print("\nGenerating Grad-CAM heatmap\u2026")
+ heatmap_url = generate_heatmap_base64(pil)
+ header, b64 = heatmap_url.split(",", 1)
+ out_path = Path(__file__).resolve().parent.parent / "heatmap_smoketest.png"
+ out_path.write_bytes(base64.b64decode(b64))
+ print(f" saved: {out_path}")
+ print(f" data URL length: {len(heatmap_url)} chars")
+ return 0
+
+
+if __name__ == "__main__":
+ raise SystemExit(main())
diff --git a/test_news_api.py b/test_news_api.py
new file mode 100644
index 0000000000000000000000000000000000000000..7efa689a0e80a1d4e4bae7b5e3cfdb85dd7fb030
--- /dev/null
+++ b/test_news_api.py
@@ -0,0 +1,43 @@
+"""Test script for the NewsData API integration."""
+import asyncio
+import sys
+import os
+
+# Add backend directory to sys.path so we can import modules
+sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
+
+from config import settings
+from services.news_lookup import search_news_full
+
+async def test_news():
+ print(f"Testing News API Integration with key: {settings.NEWS_API_KEY[:6]}... (masked)")
+
+ if not settings.NEWS_API_KEY:
+ print("ERROR: NEWS_API_KEY is empty in .env")
+ return
+
+ keywords = ["modi", "election", "bjp", "congress"]
+ print(f"Searching for keywords: {keywords}")
+
+ try:
+ result = await search_news_full(keywords, limit=5)
+
+ print("\n=== RAW RESULT ===")
+ print(f"Total articles found: {result.total_articles}")
+
+ print("\n=== TRUSTED SOURCES ===")
+ for i, source in enumerate(result.trusted_sources, 1):
+ date_str = str(source.published_at)[:10] if source.published_at else "Unknown date"
+ print(f"{i}. [{source.relevance_score}] {source.source_name}: {source.title[:60]}... ({date_str})")
+
+ print("\n=== CONTRADICTING EVIDENCE / FACT CHECKS ===")
+ if not result.contradicting_evidence:
+ print("No fact-check articles found for these keywords.")
+ for i, ev in enumerate(result.contradicting_evidence, 1):
+ print(f"{i}. {ev.source_name}: {ev.title[:60]}...")
+
+ except Exception as e:
+ print(f"\nERROR running test: {e}")
+
+if __name__ == "__main__":
+ asyncio.run(test_news())
diff --git a/test_phase5.py b/test_phase5.py
new file mode 100644
index 0000000000000000000000000000000000000000..5feebb08a02a305c00c093b43e15e083073cf67e
--- /dev/null
+++ b/test_phase5.py
@@ -0,0 +1,70 @@
+"""Phase 5 smoke: unit-test news_lookup classification + endpoint wiring."""
+from __future__ import annotations
+
+import asyncio
+import sys
+from pathlib import Path
+
+sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
+
+from services.news_lookup import (
+ _domain_of, _is_factcheck, _relevance, search_news_full,
+)
+
+
+def test_domain():
+ assert _domain_of("https://www.reuters.com/article/x") == "reuters.com"
+ assert _domain_of("https://snopes.com/fact-check/abc") == "snopes.com"
+ print("[OK] _domain_of")
+
+
+def test_factcheck_detection():
+ assert _is_factcheck("https://snopes.com/x", "Claim about moon")
+ assert _is_factcheck("https://factly.in/x", "")
+ assert _is_factcheck("https://example.com/x", "FACT CHECK: viral video debunked")
+ assert not _is_factcheck("https://bbc.com/news/world-123", "Election results")
+ print("[OK] _is_factcheck")
+
+
+def test_relevance():
+ assert _relevance("https://reuters.com/x") == 1.0
+ assert _relevance("https://ndtv.com/x") == 0.85
+ assert _relevance("https://random-blog.xyz/x") == 0.5
+ print("[OK] _relevance weights")
+
+
+async def test_empty_key_returns_empty():
+ res = await search_news_full(["modi", "election"])
+ assert res.trusted_sources == []
+ assert res.contradicting_evidence == []
+ assert res.total_articles == 0
+ print(f"[OK] empty-key path -> {res}")
+
+
+async def test_endpoint_wiring():
+ import httpx
+ body = {"text": "BREAKING!!! You won't BELIEVE this SHOCKING miracle cure doctors don't want you to know!!! Click now!"}
+ async with httpx.AsyncClient(timeout=180.0) as c:
+ r = await c.post("http://127.0.0.1:8000/api/v1/analyze/text", json=body)
+ r.raise_for_status()
+ j = r.json()
+ assert j["media_type"] == "text"
+ assert "trusted_sources" in j
+ assert "contradicting_evidence" in j
+ assert "news_lookup" in j["processing_summary"]["stages_completed"]
+ print(f"[OK] /analyze/text -> verdict={j['verdict']['label']} "
+ f"score={j['verdict']['authenticity_score']} "
+ f"trusted={len(j['trusted_sources'])} contradictions={len(j['contradicting_evidence'])}")
+
+
+async def main():
+ test_domain()
+ test_factcheck_detection()
+ test_relevance()
+ await test_empty_key_returns_empty()
+ await test_endpoint_wiring()
+ print("\n=== Phase 5 smoke PASS ===")
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/test_text_analysis.py b/test_text_analysis.py
new file mode 100644
index 0000000000000000000000000000000000000000..3bedd8f87f8f7be25039e23d5abff27031c5a22c
--- /dev/null
+++ b/test_text_analysis.py
@@ -0,0 +1,34 @@
+"""Quick smoke test for sensationalism + manipulation detection."""
+import sys
+sys.path.insert(0, ".")
+
+from services.text_service import score_sensationalism, detect_manipulation_indicators
+
+# --- Sensationalism ---
+text1 = "BREAKING: You wont believe this SHOCKING truth! Experts confirm the most DEVASTATING scandal exposed!!!"
+s = score_sensationalism(text1)
+print(f"Sensationalism: score={s.score} level={s.level}")
+print(f" excl={s.exclamation_count} caps={s.caps_word_count} clickbait={s.clickbait_matches} emotional={s.emotional_word_count} superlative={s.superlative_count}")
+assert s.score > 50, f"Expected high sensationalism, got {s.score}"
+assert s.level in ("Medium", "High"), f"Expected Medium/High, got {s.level}"
+print(" PASS")
+
+# --- Manipulation ---
+text2 = "Sources say that experts confirm the shocking truth. Allegedly, everyone knows this is a proven fact."
+m = detect_manipulation_indicators(text2)
+print(f"\nManipulation indicators: {len(m)} found")
+for ind in m:
+ print(f" [{ind.severity}] {ind.pattern_type}: \"{ind.matched_text}\"")
+assert len(m) >= 3, f"Expected >=3 indicators, got {len(m)}"
+print(" PASS")
+
+# --- Clean text ---
+text3 = "The weather today is sunny with clear skies in New Delhi."
+s2 = score_sensationalism(text3)
+m2 = detect_manipulation_indicators(text3)
+print(f"\nClean text: sensationalism={s2.score} ({s2.level}), manipulation={len(m2)}")
+assert s2.score < 20, f"Expected low sensationalism for clean text, got {s2.score}"
+assert len(m2) == 0, f"Expected 0 manipulation indicators for clean text, got {len(m2)}"
+print(" PASS")
+
+print("\nAll tests passed!")
diff --git a/text_service.py b/text_service.py
new file mode 100644
index 0000000000000000000000000000000000000000..556ac48a8f0195fe5b41d3a789179979e778fae3
--- /dev/null
+++ b/text_service.py
@@ -0,0 +1,285 @@
+from __future__ import annotations
+
+import re
+from dataclasses import dataclass, field
+from typing import List, Optional
+
+from loguru import logger
+
+from models.model_loader import get_model_loader
+
+FAKE_TOKENS = ("fake", "false", "unreliable", "misinformation")
+
+# --- Sensationalism patterns ---
+CLICKBAIT_PATTERNS = [
+ (r"\byou won'?t believe\b", "clickbait"),
+ (r"\bbreaking\s*:", "clickbait"),
+ (r"\bshocking\s*:", "clickbait"),
+ (r"\bexclusive\s*:", "clickbait"),
+ (r"\bjust\s+in\s*:", "clickbait"),
+ (r"\burgent\s*:", "clickbait"),
+ (r"\bwhat\s+happens\s+next\b", "clickbait"),
+ (r"\bthis\s+will\s+change\b", "clickbait"),
+ (r"\b(?:everyone|nobody)\s+(?:is|was)\s+talking\b", "clickbait"),
+]
+EMOTIONAL_WORDS = {
+ "outrage", "shocking", "horrifying", "disgusting", "amazing", "incredible",
+ "unbelievable", "devastating", "terrifying", "explosive", "bombshell",
+ "jaw-dropping", "heartbreaking", "furious", "scandal", "crisis",
+ "chaos", "destroyed", "slammed", "blasted", "exposed", "revealed",
+}
+SUPERLATIVES = {
+ "best", "worst", "greatest", "biggest", "most", "least",
+ "fastest", "deadliest", "largest", "smallest", "ultimate",
+}
+
+# --- Manipulation indicator patterns ---
+MANIPULATION_PATTERNS = [
+ # Unverified claims
+ (r"\bsources?\s+(?:say|said|claim|report)\b", "unverified_claim", "medium",
+ "Unverified source attribution without specific citation"),
+ (r"\ballegedly\b", "unverified_claim", "low",
+ "Hedging language suggests unverified information"),
+ (r"\breports?\s+suggest\b", "unverified_claim", "medium",
+ "Vague report attribution"),
+ (r"\baccording\s+to\s+(?:some|many|several)\b", "unverified_claim", "medium",
+ "Non-specific source attribution"),
+ (r"\brunconfirmed\b", "unverified_claim", "medium",
+ "Explicitly unconfirmed information"),
+ # Emotional manipulation
+ (r"\boutrage\b", "emotional_manipulation", "medium",
+ "Emotional trigger word designed to provoke reaction"),
+ (r"\bshocking\s+truth\b", "emotional_manipulation", "high",
+ "Sensationalist phrase designed to manipulate reader emotion"),
+ (r"\bwake\s+up\b", "emotional_manipulation", "medium",
+ "Call-to-action implying hidden knowledge"),
+ (r"\bthey\s+don'?t\s+want\s+you\s+to\s+know\b", "emotional_manipulation", "high",
+ "Conspiracy framing language"),
+ (r"\bopen\s+your\s+eyes\b", "emotional_manipulation", "medium",
+ "Implies audience ignorance"),
+ # False authority
+ (r"\bexperts?\s+(?:confirm|say|agree|warn)\b", "false_authority", "medium",
+ "Unnamed expert citation without specific attribution"),
+ (r"\bscientists?\s+(?:confirm|prove|say)\b", "false_authority", "medium",
+ "Unnamed scientist citation"),
+ (r"\bstudies?\s+(?:show|prove|confirm)\b", "false_authority", "low",
+ "Vague study reference without citation"),
+ (r"\beveryone\s+knows\b", "false_authority", "medium",
+ "Appeal to common knowledge fallacy"),
+ (r"\bit'?s\s+(?:a\s+)?(?:well-?known|proven)\s+fact\b", "false_authority", "medium",
+ "Assertion of fact without evidence"),
+]
+
+# NER entity labels to prefer for keyword extraction
+_NER_PREFERRED = {"PERSON", "ORG", "GPE", "EVENT", "PRODUCT", "NORP"}
+
+
+@dataclass
+class TextClassification:
+ label: str
+ confidence: float
+ fake_prob: float
+ all_scores: dict[str, float]
+
+
+@dataclass
+class SensationalismResult:
+ score: int # 0-100
+ level: str # Low / Medium / High
+ exclamation_count: int
+ caps_word_count: int
+ clickbait_matches: int
+ emotional_word_count: int
+ superlative_count: int
+
+
+@dataclass
+class ManipulationIndicator:
+ pattern_type: str # unverified_claim / emotional_manipulation / false_authority
+ matched_text: str
+ start_pos: int
+ end_pos: int
+ severity: str # low / medium / high
+ description: str
+
+
+def detect_language(text: str) -> str:
+ """Detect the primary language of text using langdetect.
+ Returns ISO 639-1 code (e.g. 'en', 'hi'). Falls back to 'en' on failure.
+ """
+ if not text or len(text.strip()) < 10:
+ return "en"
+ try:
+ from langdetect import detect # type: ignore
+ lang = detect(text.strip())
+ logger.info(f"Language detected: {lang}")
+ return lang
+ except ImportError:
+ logger.debug("langdetect not installed — defaulting to 'en'")
+ return "en"
+ except Exception as e:
+ logger.debug(f"Language detection failed: {e} — defaulting to 'en'")
+ return "en"
+
+
+def _scores_to_classification(items) -> TextClassification:
+ """Convert pipeline output to TextClassification."""
+ scores = {i["label"]: float(i["score"]) for i in items}
+ top_label, top_conf = max(scores.items(), key=lambda kv: kv[1])
+ # Extract fake probability
+ fake_prob = 0.0
+ if "LABEL_0" in scores:
+ fake_prob = scores["LABEL_0"]
+ else:
+ fake_prob = max(
+ (p for lbl, p in scores.items() if any(t in lbl.lower() for t in FAKE_TOKENS)),
+ default=0.0,
+ )
+ return TextClassification(top_label, top_conf, fake_prob, scores)
+
+
+def classify_text(text: str, language: Optional[str] = None) -> TextClassification:
+ """Classify text as fake/real.
+ Routes to multilingual model when language is non-English and the model is configured.
+ """
+ text = (text or "").strip()
+ if not text:
+ return TextClassification("unknown", 0.0, 0.0, {})
+
+ loader = get_model_loader()
+
+ if language and language != "en":
+ pipe = loader.load_multilang_text_model()
+ else:
+ pipe = loader.load_text_model()
+
+ out = pipe(text[:2000], truncation=True, top_k=None)
+ items = out[0] if isinstance(out[0], list) else out
+ clf = _scores_to_classification(items)
+ logger.info(
+ f"Text classify [{language or 'en'}] → {clf.label} @ {clf.confidence:.3f} "
+ f"fake_p={clf.fake_prob:.3f}"
+ )
+ return clf
+
+
+def score_sensationalism(text: str) -> SensationalismResult:
+ """Compute a 0-100 sensationalism score from structural/linguistic signals."""
+ if not text:
+ return SensationalismResult(0, "Low", 0, 0, 0, 0, 0)
+
+ words = text.split()
+ total_words = max(len(words), 1)
+
+ excl = text.count("!")
+ caps = sum(1 for w in words if w.isupper() and len(w) > 2)
+ clickbait = sum(
+ 1 for pat, _ in CLICKBAIT_PATTERNS
+ if re.search(pat, text, re.IGNORECASE)
+ )
+ emotional = sum(1 for w in words if w.lower().strip(".,!?;:") in EMOTIONAL_WORDS)
+ superlative = sum(1 for w in words if w.lower().strip(".,!?;:") in SUPERLATIVES)
+
+ raw = (
+ min(excl * 8, 25)
+ + min(caps / total_words * 200, 25)
+ + min(clickbait * 12, 25)
+ + min(emotional * 6, 15)
+ + min(superlative * 5, 10)
+ )
+ score = int(min(100, max(0, raw)))
+ level = "Low" if score < 30 else ("Medium" if score < 60 else "High")
+
+ logger.info(f"Sensationalism → {score} ({level}) excl={excl} caps={caps} cb={clickbait} emo={emotional}")
+ return SensationalismResult(score, level, excl, caps, clickbait, emotional, superlative)
+
+
+def detect_manipulation_indicators(text: str) -> List[ManipulationIndicator]:
+ """Scan text for manipulation linguistic patterns with positions."""
+ if not text:
+ return []
+ indicators: List[ManipulationIndicator] = []
+ for pattern, ptype, severity, description in MANIPULATION_PATTERNS:
+ for m in re.finditer(pattern, text, re.IGNORECASE):
+ indicators.append(ManipulationIndicator(
+ pattern_type=ptype,
+ matched_text=m.group(),
+ start_pos=m.start(),
+ end_pos=m.end(),
+ severity=severity,
+ description=description,
+ ))
+ indicators.sort(key=lambda i: i.start_pos)
+ logger.info(f"Manipulation indicators → {len(indicators)} found")
+ return indicators
+
+
+def extract_entities(text: str, max_k: int = 6) -> List[str]:
+ """Extract keywords via spaCy NER (PERSON, ORG, GPE, EVENT preferred).
+ Falls back to frequency-based extraction when spaCy is unavailable or text is too short.
+ """
+ if not text or len(text.strip()) < 20:
+ return _extract_keywords_freq(text, max_k)
+
+ loader = get_model_loader()
+ nlp = loader.load_spacy_nlp()
+
+ if nlp is None:
+ # spaCy not available — use frequency fallback
+ return _extract_keywords_freq(text, max_k)
+
+ try:
+ doc = nlp(text[:5000]) # cap for performance
+
+ # Collect named entities, preferring high-value types
+ preferred: List[str] = []
+ other: List[str] = []
+ seen: set[str] = set()
+
+ for ent in doc.ents:
+ norm = ent.text.strip()
+ norm_lower = norm.lower()
+ if not norm or norm_lower in seen or len(norm) < 2:
+ continue
+ seen.add(norm_lower)
+ if ent.label_ in _NER_PREFERRED:
+ preferred.append(norm)
+ else:
+ other.append(norm)
+
+ entities = preferred + other
+
+ if len(entities) >= 2:
+ logger.info(f"NER extracted {len(entities)} entities: {entities[:max_k]}")
+ return entities[:max_k]
+
+ # Not enough entities — supplement with frequency keywords
+ freq_kws = _extract_keywords_freq(text, max_k)
+ combined = entities + [k for k in freq_kws if k.lower() not in seen]
+ return combined[:max_k]
+
+ except Exception as e:
+ logger.warning(f"spaCy NER failed: {e} — falling back to frequency extraction")
+ return _extract_keywords_freq(text, max_k)
+
+
+def _extract_keywords_freq(text: str, max_k: int = 6) -> List[str]:
+ """Frequency-based keyword extraction (original implementation, kept as fallback)."""
+ stop = {
+ "the","a","an","is","are","was","were","be","been","being","to","of","and","or","but",
+ "in","on","at","for","with","by","from","as","that","this","it","its","has","have","had",
+ "will","would","can","could","should","may","might","do","does","did","not","no","so",
+ "than","then","there","their","they","them","we","our","you","your","he","she","his","her",
+ }
+ words = re.findall(r"[A-Za-z][A-Za-z\-']{2,}", text or "")
+ freq: dict[str, int] = {}
+ for w in words:
+ wl = w.lower()
+ if wl in stop:
+ continue
+ freq[wl] = freq.get(wl, 0) + 1
+ return [w for w, _ in sorted(freq.items(), key=lambda kv: (-kv[1], kv[0]))[:max_k]]
+
+
+# Back-compat alias: routes that still call extract_keywords get NER-first behaviour
+extract_keywords = extract_entities
diff --git a/v1/__init__.py b/v1/__init__.py
new file mode 100644
index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391
diff --git a/v1/__pycache__/__init__.cpython-311.pyc b/v1/__pycache__/__init__.cpython-311.pyc
new file mode 100644
index 0000000000000000000000000000000000000000..73f05625ed549a176fc40b85d92dae6a5cb03ff6
Binary files /dev/null and b/v1/__pycache__/__init__.cpython-311.pyc differ
diff --git a/v1/__pycache__/analyze.cpython-311.pyc b/v1/__pycache__/analyze.cpython-311.pyc
new file mode 100644
index 0000000000000000000000000000000000000000..0872fa4fc6033ab1c83d83443d1b7f83821ab961
Binary files /dev/null and b/v1/__pycache__/analyze.cpython-311.pyc differ
diff --git a/v1/__pycache__/auth.cpython-311.pyc b/v1/__pycache__/auth.cpython-311.pyc
new file mode 100644
index 0000000000000000000000000000000000000000..8bb52bcb50707cbd7326e8dcb6aa06a44cdd323e
Binary files /dev/null and b/v1/__pycache__/auth.cpython-311.pyc differ
diff --git a/v1/__pycache__/health.cpython-311.pyc b/v1/__pycache__/health.cpython-311.pyc
new file mode 100644
index 0000000000000000000000000000000000000000..b1546b02f22634cce20396352c2729418b411e89
Binary files /dev/null and b/v1/__pycache__/health.cpython-311.pyc differ
diff --git a/v1/__pycache__/history.cpython-311.pyc b/v1/__pycache__/history.cpython-311.pyc
new file mode 100644
index 0000000000000000000000000000000000000000..ff4b5cf59e3ab6fa3e0fa3c35ef4beba4f377c0d
Binary files /dev/null and b/v1/__pycache__/history.cpython-311.pyc differ
diff --git a/v1/__pycache__/report.cpython-311.pyc b/v1/__pycache__/report.cpython-311.pyc
new file mode 100644
index 0000000000000000000000000000000000000000..3b306d1542a167f78b0d17bd576448755943635e
Binary files /dev/null and b/v1/__pycache__/report.cpython-311.pyc differ
diff --git a/v1/analyze.py b/v1/analyze.py
new file mode 100644
index 0000000000000000000000000000000000000000..9663da62c33140dc37ca3fc24830458d4ad7d3fa
--- /dev/null
+++ b/v1/analyze.py
@@ -0,0 +1,581 @@
+from __future__ import annotations
+
+import json
+import os
+import time
+import uuid
+from datetime import datetime, timezone
+
+from fastapi import APIRouter, Body, Depends, File, UploadFile
+from pydantic import BaseModel
+from loguru import logger
+from sqlalchemy.orm import Session
+
+from api.deps import optional_current_user
+from config import settings
+from db.database import get_db
+from db.models import AnalysisRecord, User
+from models.heatmap_generator import generate_heatmap_base64, generate_boxes_base64
+from schemas.analyze import (
+ FrameAnalysisOut,
+ ImageAnalysisResponse,
+ ImageExplainability,
+ LayoutAnomalyOut,
+ ManipulationIndicatorOut,
+ OCRBoxOut,
+ ScreenshotAnalysisResponse,
+ ScreenshotExplainability,
+ SensationalismBreakdown,
+ SuspiciousPhraseOut,
+ TextAnalysisResponse,
+ TextExplainability,
+ VideoAnalysisResponse,
+ VideoExplainability,
+)
+from services.screenshot_service import (
+ detect_layout_anomalies,
+ extract_full_text,
+ map_phrases_to_boxes,
+ run_ocr,
+)
+from services.ela_service import generate_ela_base64
+from services.exif_service import extract_exif
+from services.image_service import load_image_from_bytes
+from services.llm_explainer import generate_llm_summary
+from schemas.common import ProcessingSummary, Verdict
+from services.artifact_detector import scan_artifacts
+from services.image_service import preprocess_and_classify
+from services.news_lookup import search_news_full
+from services.vlm_breakdown import generate_vlm_breakdown
+from services.text_service import (
+ classify_text,
+ detect_language,
+ detect_manipulation_indicators,
+ extract_entities,
+ score_sensationalism,
+)
+from services.video_service import analyze_video
+from utils.file_handler import read_upload_bytes, save_upload_to_tempfile
+from utils.scoring import compute_authenticity_score, get_verdict_label
+
+router = APIRouter(prefix="/analyze", tags=["analyze"])
+
+IMAGE_MAX_MB = 20
+VIDEO_MAX_MB = 100
+VIDEO_NUM_FRAMES = 16
+
+
+@router.post("/image", response_model=ImageAnalysisResponse)
+async def analyze_image(
+ file: UploadFile = File(...),
+ db: Session = Depends(get_db),
+ user: User | None = Depends(optional_current_user),
+) -> ImageAnalysisResponse:
+ start = time.perf_counter()
+ stages: list[str] = []
+
+ raw, mime = await read_upload_bytes(
+ file, settings.ALLOWED_IMAGE_TYPES, max_size_mb=IMAGE_MAX_MB
+ )
+ stages.append("validation")
+
+ pil, clf = preprocess_and_classify(raw)
+ stages.append("classification")
+
+ indicators = scan_artifacts(pil, raw)
+ stages.append("artifact_scanning")
+
+ # ── Phase 12: Grad-CAM++ heatmap ──
+ heatmap_status = "success"
+ heatmap = ""
+ try:
+ heatmap = generate_heatmap_base64(pil)
+ stages.append("heatmap_generation")
+ except Exception as e: # noqa: BLE001
+ logger.warning(f"Heatmap generation failed, continuing: {e}")
+ heatmap_status = "failed"
+
+ # ── Phase 12: ELA (Error Level Analysis) ──
+ ela_b64 = ""
+ try:
+ ela_b64 = generate_ela_base64(pil)
+ stages.append("ela_generation")
+ except Exception as e: # noqa: BLE001
+ logger.warning(f"ELA generation failed, continuing: {e}")
+
+ # ── Phase 12: Bounding box mode ──
+ boxes_b64 = ""
+ try:
+ boxes_b64 = generate_boxes_base64(pil)
+ stages.append("boxes_generation")
+ except Exception as e: # noqa: BLE001
+ logger.warning(f"Bounding box generation failed, continuing: {e}")
+
+ # ── Phase 12: EXIF extraction + trust adjustment ──
+ exif_summary = None
+ try:
+ exif_summary = extract_exif(pil, raw)
+ stages.append("exif_extraction")
+ except Exception as e: # noqa: BLE001
+ logger.warning(f"EXIF extraction failed, continuing: {e}")
+
+ score = compute_authenticity_score(clf.confidence, clf.label)
+
+ # Apply EXIF trust adjustment to the score
+ if exif_summary and exif_summary.trust_adjustment != 0:
+ score = int(round(max(0, min(100, score + exif_summary.trust_adjustment))))
+
+ label, severity = get_verdict_label(score)
+ duration_ms = int((time.perf_counter() - start) * 1000)
+
+ analysis_id = str(uuid.uuid4())
+
+ response = ImageAnalysisResponse(
+ analysis_id=analysis_id,
+ media_type="image",
+ timestamp=datetime.now(timezone.utc).isoformat(),
+ verdict=Verdict(
+ label=label,
+ severity=severity,
+ authenticity_score=score,
+ model_confidence=clf.confidence,
+ model_label=clf.label,
+ ),
+ explainability=ImageExplainability(
+ heatmap_base64=heatmap,
+ ela_base64=ela_b64,
+ boxes_base64=boxes_b64,
+ heatmap_status=heatmap_status,
+ artifact_indicators=indicators,
+ exif=exif_summary,
+ ),
+ trusted_sources=[],
+ contradicting_evidence=[],
+ processing_summary=ProcessingSummary(
+ stages_completed=stages,
+ total_duration_ms=duration_ms,
+ model_used=settings.IMAGE_MODEL_ID,
+ ),
+ )
+
+ record = AnalysisRecord(
+ user_id=user.id if user else None,
+ media_type="image",
+ verdict=label,
+ authenticity_score=float(score),
+ result_json=json.dumps(response.model_dump(
+ exclude={"explainability": {"heatmap_base64", "ela_base64", "boxes_base64"}}
+ )),
+ )
+ db.add(record)
+ db.commit()
+ db.refresh(record)
+ response.record_id = record.id
+ logger.info(f"Saved AnalysisRecord id={record.id} score={score} verdict={label}")
+
+ # ── Phase 12: LLM explainability card (runs after DB save so we have record_id) ──
+ try:
+ llm_summary = generate_llm_summary(
+ payload=response.model_dump(
+ exclude={"explainability": {"heatmap_base64", "ela_base64", "boxes_base64"}}
+ ),
+ record_id=str(record.id),
+ )
+ response.explainability.llm_summary = llm_summary
+ stages.append("llm_explanation")
+ except Exception as e: # noqa: BLE001
+ logger.warning(f"LLM explainer failed, continuing: {e}")
+
+ # ── Phase 14: VLM detailed breakdown (vision LLM scores 6 perceptual components) ──
+ try:
+ vlm_bd = generate_vlm_breakdown(pil, record_id=str(record.id))
+ if vlm_bd:
+ response.explainability.vlm_breakdown = vlm_bd
+ stages.append("vlm_breakdown")
+ except Exception as e: # noqa: BLE001
+ logger.warning(f"VLM breakdown failed, continuing: {e}")
+
+ return response
+
+
+@router.post("/video", response_model=VideoAnalysisResponse)
+async def analyze_video_endpoint(
+ file: UploadFile = File(...),
+ db: Session = Depends(get_db),
+ user: User | None = Depends(optional_current_user),
+) -> VideoAnalysisResponse:
+ start = time.perf_counter()
+ stages: list[str] = []
+
+ suffix = os.path.splitext(file.filename or "")[1].lower() or ".mp4"
+ path, mime = await save_upload_to_tempfile(
+ file, settings.ALLOWED_VIDEO_TYPES, max_size_mb=VIDEO_MAX_MB, suffix=suffix
+ )
+ stages.append("validation")
+
+ try:
+ agg = analyze_video(path, num_frames=VIDEO_NUM_FRAMES)
+ stages.append("frame_extraction")
+ stages.append("frame_classification")
+ stages.append("aggregation")
+ finally:
+ try:
+ os.unlink(path)
+ except OSError:
+ pass
+
+ if agg.insufficient_faces:
+ score = 50
+ label = "Insufficient face content"
+ severity = "warning"
+ else:
+ score = int(round(max(0.0, min(100.0, (1.0 - agg.mean_suspicious_prob) * 100.0))))
+ label, severity = get_verdict_label(score)
+ duration_ms = int((time.perf_counter() - start) * 1000)
+
+ response = VideoAnalysisResponse(
+ analysis_id=str(uuid.uuid4()),
+ media_type="video",
+ timestamp=datetime.now(timezone.utc).isoformat(),
+ verdict=Verdict(
+ label=label,
+ severity=severity,
+ authenticity_score=score,
+ model_confidence=float(agg.mean_suspicious_prob),
+ model_label="suspicious_mean" if not agg.insufficient_faces else "no_faces",
+ ),
+ explainability=VideoExplainability(
+ num_frames_sampled=agg.num_frames_sampled,
+ num_face_frames=agg.num_face_frames,
+ num_suspicious_frames=agg.num_suspicious_frames,
+ mean_suspicious_prob=agg.mean_suspicious_prob,
+ max_suspicious_prob=agg.max_suspicious_prob,
+ suspicious_ratio=agg.suspicious_ratio,
+ insufficient_faces=agg.insufficient_faces,
+ suspicious_timestamps=agg.suspicious_timestamps,
+ frames=[
+ FrameAnalysisOut(
+ index=f.index,
+ timestamp_s=f.timestamp_s,
+ label=f.label,
+ confidence=f.confidence,
+ suspicious_prob=f.suspicious_prob,
+ is_suspicious=f.is_suspicious,
+ has_face=f.has_face,
+ scored=f.scored,
+ )
+ for f in agg.frames
+ ],
+ ),
+ processing_summary=ProcessingSummary(
+ stages_completed=stages,
+ total_duration_ms=duration_ms,
+ model_used=settings.IMAGE_MODEL_ID,
+ ),
+ )
+
+ record = AnalysisRecord(
+ user_id=user.id if user else None,
+ media_type="video",
+ verdict=label,
+ authenticity_score=float(score),
+ result_json=json.dumps(response.model_dump()),
+ )
+ db.add(record)
+ db.commit()
+ db.refresh(record)
+ response.record_id = record.id
+ logger.info(
+ f"Saved AnalysisRecord id={record.id} video score={score} verdict={label} "
+ f"frames={agg.num_frames_sampled} susp={agg.num_suspicious_frames}"
+ )
+
+ # Phase 12: LLM explainability card
+ try:
+ response.llm_summary = generate_llm_summary(
+ payload=response.model_dump(), record_id=str(record.id),
+ )
+ except Exception as e: # noqa: BLE001
+ logger.warning(f"LLM explainer failed for video: {e}")
+
+ return response
+
+
+class TextAnalyzeBody(BaseModel):
+ text: str
+
+
+@router.post("/text", response_model=TextAnalysisResponse)
+async def analyze_text_endpoint(
+ body: TextAnalyzeBody = Body(...),
+ db: Session = Depends(get_db),
+ user: User | None = Depends(optional_current_user),
+) -> TextAnalysisResponse:
+ start = time.perf_counter()
+ stages: list[str] = []
+
+ # Phase 13: language detection — routes to multilang model when non-English
+ lang = detect_language(body.text)
+ stages.append("language_detection")
+
+ clf = classify_text(body.text, language=lang)
+ stages.append("classification")
+
+ sens = score_sensationalism(body.text)
+ stages.append("sensationalism_analysis")
+
+ manip = detect_manipulation_indicators(body.text)
+ stages.append("manipulation_detection")
+
+ # Phase 13.1: NER-based keyword extraction (spaCy entities first, frequency fallback)
+ keywords = extract_entities(body.text)
+ stages.append("ner_keyword_extraction")
+
+ # Phase 13.2: pass original text + current fake_prob for truth-override computation
+ news = await search_news_full(
+ keywords,
+ original_text=body.text,
+ current_fake_prob=clf.fake_prob,
+ )
+ stages.append("news_lookup")
+
+ # Apply truth-override to fake_prob before scoring
+ effective_fake_prob = clf.fake_prob
+ if news.truth_override and news.truth_override.applied:
+ effective_fake_prob = news.truth_override.fake_prob_after
+ stages.append("truth_override_applied")
+
+ # Weighted score: 70% classifier + 20% inverse sensationalism + 10% manipulation penalty
+ manip_penalty = min(len(manip) * 5, 30)
+ raw_score = (1.0 - effective_fake_prob) * 100.0
+ weighted = raw_score * 0.70 + max(0, 100 - sens.score) * 0.20 + max(0, 100 - manip_penalty) * 0.10
+ score = int(round(max(0.0, min(100.0, weighted))))
+ label, severity = get_verdict_label(score)
+ duration_ms = int((time.perf_counter() - start) * 1000)
+
+ model_used = (
+ settings.TEXT_MULTILANG_MODEL_ID if (lang != "en" and settings.TEXT_MULTILANG_MODEL_ID)
+ else settings.TEXT_MODEL_ID
+ )
+
+ response = TextAnalysisResponse(
+ analysis_id=str(uuid.uuid4()),
+ media_type="text",
+ timestamp=datetime.now(timezone.utc).isoformat(),
+ verdict=Verdict(
+ label=label,
+ severity=severity,
+ authenticity_score=score,
+ model_confidence=float(clf.confidence),
+ model_label=clf.label,
+ ),
+ explainability=TextExplainability(
+ fake_probability=effective_fake_prob,
+ top_label=clf.label,
+ all_scores=clf.all_scores,
+ keywords=keywords,
+ sensationalism=SensationalismBreakdown(
+ score=sens.score,
+ level=sens.level,
+ exclamation_count=sens.exclamation_count,
+ caps_word_count=sens.caps_word_count,
+ clickbait_matches=sens.clickbait_matches,
+ emotional_word_count=sens.emotional_word_count,
+ superlative_count=sens.superlative_count,
+ ),
+ manipulation_indicators=[
+ ManipulationIndicatorOut(
+ pattern_type=m.pattern_type,
+ matched_text=m.matched_text,
+ start_pos=m.start_pos,
+ end_pos=m.end_pos,
+ severity=m.severity,
+ description=m.description,
+ )
+ for m in manip
+ ],
+ detected_language=lang,
+ truth_override=news.truth_override,
+ ),
+ trusted_sources=news.trusted_sources,
+ contradicting_evidence=news.contradicting_evidence,
+ processing_summary=ProcessingSummary(
+ stages_completed=stages,
+ total_duration_ms=duration_ms,
+ model_used=model_used,
+ ),
+ )
+
+ record = AnalysisRecord(
+ user_id=user.id if user else None,
+ media_type="text",
+ verdict=label,
+ authenticity_score=float(score),
+ result_json=json.dumps(response.model_dump()),
+ )
+ db.add(record)
+ db.commit()
+ db.refresh(record)
+ response.record_id = record.id
+ logger.info(f"Saved AnalysisRecord id={record.id} text score={score} verdict={label}")
+
+ # Phase 12: LLM explainability card
+ try:
+ response.llm_summary = generate_llm_summary(
+ payload=response.model_dump(), record_id=str(record.id),
+ )
+ except Exception as e: # noqa: BLE001
+ logger.warning(f"LLM explainer failed for text: {e}")
+
+ return response
+
+
+@router.post("/screenshot", response_model=ScreenshotAnalysisResponse)
+async def analyze_screenshot_endpoint(
+ file: UploadFile = File(...),
+ db: Session = Depends(get_db),
+ user: User | None = Depends(optional_current_user),
+) -> ScreenshotAnalysisResponse:
+ start = time.perf_counter()
+ stages: list[str] = []
+
+ raw, mime = await read_upload_bytes(
+ file, settings.ALLOWED_IMAGE_TYPES, max_size_mb=IMAGE_MAX_MB
+ )
+ stages.append("validation")
+
+ pil = load_image_from_bytes(raw)
+ ocr_boxes = run_ocr(pil)
+ stages.append("ocr")
+
+ full_text = extract_full_text(ocr_boxes)
+
+ # Phase 13: language detection on extracted OCR text
+ lang = detect_language(full_text) if full_text else "en"
+ stages.append("language_detection")
+
+ clf = classify_text(full_text, language=lang) if full_text else None
+ stages.append("classification")
+
+ sens = score_sensationalism(full_text)
+ stages.append("sensationalism_analysis")
+
+ manip = detect_manipulation_indicators(full_text)
+ stages.append("manipulation_detection")
+
+ phrases = map_phrases_to_boxes(ocr_boxes, manip)
+ stages.append("phrase_overlay_mapping")
+
+ layout = detect_layout_anomalies(ocr_boxes)
+ stages.append("layout_anomaly_detection")
+
+ # Phase 13.1: NER-based keyword extraction
+ keywords = extract_entities(full_text)
+ stages.append("ner_keyword_extraction")
+
+ fake_prob = clf.fake_prob if clf else 0.0
+ model_conf = clf.confidence if clf else 0.0
+ model_lbl = clf.label if clf else "no_text"
+
+ # Phase 13.2: truth-override via cosine similarity
+ news = await search_news_full(
+ keywords,
+ original_text=full_text,
+ current_fake_prob=fake_prob,
+ )
+ stages.append("news_lookup")
+
+ effective_fake_prob = fake_prob
+ if news.truth_override and news.truth_override.applied:
+ effective_fake_prob = news.truth_override.fake_prob_after
+ stages.append("truth_override_applied")
+
+ manip_penalty = min(len(manip) * 5, 30)
+ layout_penalty = min(len(layout) * 5, 15)
+ raw_score = (1.0 - effective_fake_prob) * 100.0
+ weighted = (
+ raw_score * 0.65
+ + max(0, 100 - sens.score) * 0.20
+ + max(0, 100 - manip_penalty) * 0.10
+ + max(0, 100 - layout_penalty) * 0.05
+ )
+ if not full_text.strip():
+ weighted = 50
+ score = int(round(max(0.0, min(100.0, weighted))))
+ label, severity = get_verdict_label(score)
+ duration_ms = int((time.perf_counter() - start) * 1000)
+
+ model_used_str = (
+ f"{settings.TEXT_MULTILANG_MODEL_ID} + EasyOCR"
+ if (lang != "en" and settings.TEXT_MULTILANG_MODEL_ID)
+ else f"{settings.TEXT_MODEL_ID} + EasyOCR"
+ )
+
+ response = ScreenshotAnalysisResponse(
+ analysis_id=str(uuid.uuid4()),
+ media_type="screenshot",
+ timestamp=datetime.now(timezone.utc).isoformat(),
+ verdict=Verdict(
+ label=label,
+ severity=severity,
+ authenticity_score=score,
+ model_confidence=float(model_conf),
+ model_label=model_lbl,
+ ),
+ explainability=ScreenshotExplainability(
+ extracted_text=full_text,
+ ocr_boxes=[OCRBoxOut(text=b.text, bbox=b.bbox, confidence=b.confidence) for b in ocr_boxes],
+ fake_probability=effective_fake_prob,
+ sensationalism=SensationalismBreakdown(
+ score=sens.score, level=sens.level,
+ exclamation_count=sens.exclamation_count, caps_word_count=sens.caps_word_count,
+ clickbait_matches=sens.clickbait_matches, emotional_word_count=sens.emotional_word_count,
+ superlative_count=sens.superlative_count,
+ ),
+ suspicious_phrases=[
+ SuspiciousPhraseOut(
+ text=p.text, bbox=p.bbox, pattern_type=p.pattern_type,
+ severity=p.severity, description=p.description,
+ ) for p in phrases
+ ],
+ layout_anomalies=[
+ LayoutAnomalyOut(
+ type=la.type, severity=la.severity,
+ description=la.description, confidence=la.confidence,
+ ) for la in layout
+ ],
+ keywords=keywords,
+ detected_language=lang,
+ truth_override=news.truth_override,
+ ),
+ trusted_sources=news.trusted_sources,
+ contradicting_evidence=news.contradicting_evidence,
+ processing_summary=ProcessingSummary(
+ stages_completed=stages,
+ total_duration_ms=duration_ms,
+ model_used=model_used_str,
+ ),
+ )
+
+ record = AnalysisRecord(
+ user_id=user.id if user else None,
+ media_type="screenshot",
+ verdict=label,
+ authenticity_score=float(score),
+ result_json=json.dumps(response.model_dump()),
+ )
+ db.add(record)
+ db.commit()
+ db.refresh(record)
+ response.record_id = record.id
+ logger.info(f"Saved AnalysisRecord id={record.id} screenshot score={score} verdict={label}")
+
+ # Phase 12: LLM explainability card
+ try:
+ response.llm_summary = generate_llm_summary(
+ payload=response.model_dump(), record_id=str(record.id),
+ )
+ except Exception as e: # noqa: BLE001
+ logger.warning(f"LLM explainer failed for screenshot: {e}")
+
+ return response
diff --git a/v1/auth.py b/v1/auth.py
new file mode 100644
index 0000000000000000000000000000000000000000..c8e61ca0cbc138c8e1e2b7420996e4892e46468e
--- /dev/null
+++ b/v1/auth.py
@@ -0,0 +1,48 @@
+from __future__ import annotations
+
+from fastapi import APIRouter, Depends, HTTPException, status
+from loguru import logger
+from sqlalchemy.exc import IntegrityError
+from sqlalchemy.orm import Session
+
+from api.deps import get_current_user
+from config import settings
+from db.database import get_db
+from db.models import User
+from schemas.auth import LoginBody, RegisterBody, TokenResponse, UserOut
+from services.auth_service import authenticate, create_access_token, register_user
+
+router = APIRouter(prefix="/auth", tags=["auth"])
+
+
+def _token_response(user: User) -> TokenResponse:
+ return TokenResponse(
+ access_token=create_access_token(user.id, user.email),
+ expires_in_minutes=settings.JWT_EXPIRATION_MINUTES,
+ user=UserOut(id=user.id, email=user.email, name=user.name, created_at=user.created_at),
+ )
+
+
+@router.post("/register", response_model=TokenResponse, status_code=status.HTTP_201_CREATED)
+def register(body: RegisterBody, db: Session = Depends(get_db)) -> TokenResponse:
+ try:
+ user = register_user(db, body.email, body.password, body.name)
+ except IntegrityError:
+ db.rollback()
+ raise HTTPException(status.HTTP_409_CONFLICT, "Email already registered")
+ logger.info(f"Registered user id={user.id} email={user.email}")
+ return _token_response(user)
+
+
+@router.post("/login", response_model=TokenResponse)
+def login(body: LoginBody, db: Session = Depends(get_db)) -> TokenResponse:
+ user = authenticate(db, body.email, body.password)
+ if not user:
+ raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid email or password")
+ logger.info(f"Login user id={user.id} email={user.email}")
+ return _token_response(user)
+
+
+@router.get("/me", response_model=UserOut)
+def me(user: User = Depends(get_current_user)) -> UserOut:
+ return UserOut(id=user.id, email=user.email, name=user.name, created_at=user.created_at)
diff --git a/v1/health.py b/v1/health.py
new file mode 100644
index 0000000000000000000000000000000000000000..b02fd9845b16997bda02ffe00e91915c4c043533
--- /dev/null
+++ b/v1/health.py
@@ -0,0 +1,8 @@
+from fastapi import APIRouter
+
+router = APIRouter(tags=["health"])
+
+
+@router.get("/health")
+def health():
+ return {"status": "ok", "service": "deepshield-backend"}
diff --git a/v1/history.py b/v1/history.py
new file mode 100644
index 0000000000000000000000000000000000000000..db70c77e068a5e4f8070caddc011504868912493
--- /dev/null
+++ b/v1/history.py
@@ -0,0 +1,79 @@
+from __future__ import annotations
+
+import json
+from datetime import datetime
+
+from fastapi import APIRouter, Depends, HTTPException, Query, status
+from pydantic import BaseModel
+from sqlalchemy.orm import Session
+
+from api.deps import get_current_user
+from db.database import get_db
+from db.models import AnalysisRecord, User
+
+router = APIRouter(prefix="/history", tags=["history"])
+
+
+class HistoryItem(BaseModel):
+ id: int
+ media_type: str
+ verdict: str
+ authenticity_score: float
+ created_at: datetime
+
+
+class HistoryListResponse(BaseModel):
+ items: list[HistoryItem]
+ total: int
+
+
+@router.get("", response_model=HistoryListResponse)
+def list_history(
+ limit: int = Query(default=50, ge=1, le=200),
+ offset: int = Query(default=0, ge=0),
+ user: User = Depends(get_current_user),
+ db: Session = Depends(get_db),
+) -> HistoryListResponse:
+ q = db.query(AnalysisRecord).filter(AnalysisRecord.user_id == user.id)
+ total = q.count()
+ rows = q.order_by(AnalysisRecord.created_at.desc()).offset(offset).limit(limit).all()
+ items = [
+ HistoryItem(
+ id=r.id,
+ media_type=r.media_type,
+ verdict=r.verdict,
+ authenticity_score=r.authenticity_score,
+ created_at=r.created_at,
+ )
+ for r in rows
+ ]
+ return HistoryListResponse(items=items, total=total)
+
+
+@router.get("/{record_id}")
+def get_history_detail(
+ record_id: int,
+ user: User = Depends(get_current_user),
+ db: Session = Depends(get_db),
+):
+ r = db.query(AnalysisRecord).filter(AnalysisRecord.id == record_id).first()
+ if not r or r.user_id != user.id:
+ raise HTTPException(status.HTTP_404_NOT_FOUND, "Analysis not found")
+ try:
+ return json.loads(r.result_json)
+ except Exception:
+ raise HTTPException(status.HTTP_500_INTERNAL_SERVER_ERROR, "Corrupt result payload")
+
+
+@router.delete("/{record_id}", status_code=status.HTTP_204_NO_CONTENT)
+def delete_history(
+ record_id: int,
+ user: User = Depends(get_current_user),
+ db: Session = Depends(get_db),
+):
+ r = db.query(AnalysisRecord).filter(AnalysisRecord.id == record_id).first()
+ if not r or r.user_id != user.id:
+ raise HTTPException(status.HTTP_404_NOT_FOUND, "Analysis not found")
+ db.delete(r)
+ db.commit()
+ return None
diff --git a/v1/report.py b/v1/report.py
new file mode 100644
index 0000000000000000000000000000000000000000..72a34c8165dbd78f8e474afdc6d9df77d6e54494
--- /dev/null
+++ b/v1/report.py
@@ -0,0 +1,64 @@
+from __future__ import annotations
+
+from pathlib import Path
+
+from fastapi import APIRouter, Depends, HTTPException
+from fastapi.responses import FileResponse
+from loguru import logger
+from sqlalchemy.orm import Session
+
+from db.database import get_db
+from db.models import AnalysisRecord, Report
+from services.report_service import cleanup_expired, create_report_row, generate_report
+
+router = APIRouter(prefix="/report", tags=["report"])
+
+
+@router.post("/{analysis_id}")
+def generate(analysis_id: int, db: Session = Depends(get_db)):
+ record = db.query(AnalysisRecord).filter(AnalysisRecord.id == analysis_id).first()
+ if not record:
+ raise HTTPException(status_code=404, detail="analysis not found")
+
+ existing = db.query(Report).filter(Report.analysis_id == analysis_id).first()
+ if existing and Path(existing.file_path).exists():
+ return {"report_id": existing.id, "analysis_id": analysis_id, "ready": True}
+
+ try:
+ path = generate_report(record)
+ except Exception as e: # noqa: BLE001
+ logger.exception(f"Report generation failed: {e}")
+ raise HTTPException(status_code=500, detail=f"report generation failed: {e}")
+
+ if existing:
+ existing.file_path = str(path)
+ db.commit()
+ db.refresh(existing)
+ return {"report_id": existing.id, "analysis_id": analysis_id, "ready": True}
+
+ row = create_report_row(analysis_id, path)
+ db.add(row)
+ db.commit()
+ db.refresh(row)
+ return {"report_id": row.id, "analysis_id": analysis_id, "ready": True}
+
+
+@router.get("/{analysis_id}/download")
+def download(analysis_id: int, db: Session = Depends(get_db)):
+ row = db.query(Report).filter(Report.analysis_id == analysis_id).first()
+ if not row:
+ raise HTTPException(status_code=404, detail="report not found — generate first")
+ p = Path(row.file_path)
+ if not p.exists():
+ raise HTTPException(status_code=410, detail="report expired or missing")
+ return FileResponse(
+ path=str(p),
+ media_type="application/pdf",
+ filename=f"deepshield_report_{analysis_id}.pdf",
+ )
+
+
+@router.post("/cleanup")
+def cleanup():
+ n = cleanup_expired()
+ return {"deleted": n}
diff --git a/video_service.py b/video_service.py
new file mode 100644
index 0000000000000000000000000000000000000000..b1334fe682462f59c7eb3486c907ff033142b99e
--- /dev/null
+++ b/video_service.py
@@ -0,0 +1,151 @@
+from __future__ import annotations
+
+from dataclasses import dataclass, field
+from typing import List, Tuple
+
+import cv2
+import numpy as np
+from loguru import logger
+from PIL import Image
+
+from models.model_loader import get_model_loader
+from services.image_service import classify_image
+
+
+@dataclass
+class FrameAnalysis:
+ index: int
+ timestamp_s: float
+ label: str
+ confidence: float
+ suspicious_prob: float # prob of the fake/manipulated class
+ is_suspicious: bool
+ has_face: bool = False
+ scored: bool = False # contributed to aggregate (face frames only)
+
+
+@dataclass
+class VideoAggregation:
+ num_frames_sampled: int
+ num_face_frames: int
+ num_suspicious_frames: int
+ mean_suspicious_prob: float
+ max_suspicious_prob: float
+ suspicious_ratio: float
+ insufficient_faces: bool
+ suspicious_timestamps: List[float] = field(default_factory=list)
+ frames: List[FrameAnalysis] = field(default_factory=list)
+
+
+FAKE_TOKENS = ("fake", "deepfake", "manipulated", "ai", "generated", "synthetic")
+
+
+def _is_fake_label(label: str) -> bool:
+ l = label.lower()
+ return any(tok in l for tok in FAKE_TOKENS)
+
+
+def extract_frames(video_path: str, num_frames: int = 16) -> List[Tuple[int, float, Image.Image]]:
+ """Uniformly sample num_frames frames from the video. Returns list of
+ (frame_index, timestamp_seconds, PIL.Image).
+ """
+ cap = cv2.VideoCapture(video_path)
+ if not cap.isOpened():
+ raise RuntimeError(f"Failed to open video: {video_path}")
+
+ total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0)
+ fps = float(cap.get(cv2.CAP_PROP_FPS) or 0.0)
+ if total <= 0:
+ cap.release()
+ raise RuntimeError("Video appears to have 0 frames")
+
+ n = min(num_frames, total)
+ indices = np.linspace(0, max(0, total - 1), num=n, dtype=int).tolist()
+
+ out: List[Tuple[int, float, Image.Image]] = []
+ for idx in indices:
+ cap.set(cv2.CAP_PROP_POS_FRAMES, int(idx))
+ ok, frame_bgr = cap.read()
+ if not ok or frame_bgr is None:
+ continue
+ frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)
+ pil = Image.fromarray(frame_rgb)
+ ts = (idx / fps) if fps > 0 else 0.0
+ out.append((int(idx), float(ts), pil))
+
+ cap.release()
+ logger.info(f"Extracted {len(out)}/{n} frames from video (total={total}, fps={fps:.2f})")
+ return out
+
+
+MIN_FACE_FRAMES = 3 # below this we refuse to issue a deepfake verdict
+
+
+def _has_face(pil: Image.Image) -> bool:
+ detector = get_model_loader().load_face_detector()
+ arr = np.array(pil)
+ res = detector.process(arr)
+ return bool(getattr(res, "multi_face_landmarks", None))
+
+
+def classify_frames(frames: List[Tuple[int, float, Image.Image]]) -> List[FrameAnalysis]:
+ results: List[FrameAnalysis] = []
+ for idx, ts, pil in frames:
+ face = _has_face(pil)
+ clf = classify_image(pil)
+ fake_prob = 0.0
+ for lbl, p in clf.all_scores.items():
+ if _is_fake_label(lbl):
+ fake_prob = max(fake_prob, float(p))
+ results.append(
+ FrameAnalysis(
+ index=idx,
+ timestamp_s=ts,
+ label=clf.label,
+ confidence=clf.confidence,
+ suspicious_prob=fake_prob,
+ is_suspicious=(fake_prob >= 0.5) and face,
+ has_face=face,
+ scored=face,
+ )
+ )
+ return results
+
+
+def aggregate(frames: List[FrameAnalysis]) -> VideoAggregation:
+ if not frames:
+ return VideoAggregation(0, 0, 0, 0.0, 0.0, 0.0, True)
+
+ scored = [f for f in frames if f.scored]
+ num_face = len(scored)
+ insufficient = num_face < MIN_FACE_FRAMES
+
+ if insufficient:
+ mean_p = 0.0
+ max_p = 0.0
+ susp_ratio = 0.0
+ susp: List[FrameAnalysis] = []
+ else:
+ probs = [f.suspicious_prob for f in scored]
+ susp = [f for f in scored if f.is_suspicious]
+ mean_p = float(np.mean(probs))
+ max_p = float(np.max(probs))
+ susp_ratio = len(susp) / len(scored)
+
+ return VideoAggregation(
+ num_frames_sampled=len(frames),
+ num_face_frames=num_face,
+ num_suspicious_frames=len(susp),
+ mean_suspicious_prob=mean_p,
+ max_suspicious_prob=max_p,
+ suspicious_ratio=susp_ratio,
+ insufficient_faces=insufficient,
+ suspicious_timestamps=[round(f.timestamp_s, 2) for f in susp],
+ frames=frames,
+ )
+
+
+def analyze_video(video_path: str, num_frames: int = 16) -> VideoAggregation:
+ frames = extract_frames(video_path, num_frames=num_frames)
+ classified = classify_frames(frames)
+ return aggregate(classified)
diff --git a/vlm_breakdown.py b/vlm_breakdown.py
new file mode 100644
index 0000000000000000000000000000000000000000..50ab212b81579d5eeeb475d0191d603fa52809ca
--- /dev/null
+++ b/vlm_breakdown.py
@@ -0,0 +1,138 @@
+"""VLM Detailed Breakdown — Phase 14.1
+
+Calls a vision-capable LLM (Gemini or OpenAI) to score 6 perceptual
+components of an image for deepfake forensics. Cached per record_id.
+"""
+from __future__ import annotations
+
+import json
+from io import BytesIO
+from typing import Any
+
+from loguru import logger
+from PIL import Image
+
+from config import settings
+from schemas.common import VLMBreakdown, VLMComponentScore
+
+_cache: dict[str, VLMBreakdown] = {}
+
+_PROMPT = """\
+You are DeepShield's deepfake forensics engine. Analyze this image and score \
+each component for visual authenticity.
+
+Output ONLY valid JSON (no markdown fences, no extra text):
+{
+ "facial_symmetry": {"score": <0-100>, "notes": ""},
+ "skin_texture": {"score": <0-100>, "notes": ""},
+ "lighting_consistency": {"score": <0-100>, "notes": ""},
+ "background_coherence": {"score": <0-100>, "notes": ""},
+ "anatomy_hands_eyes": {"score": <0-100>, "notes": ""},
+ "context_objects": {"score": <0-100>, "notes": ""}
+}
+
+Scoring rules:
+- 100 = perfectly natural/authentic for that component
+- 0 = clear manipulation artifact for that component
+- Score each independently based only on visual evidence in this image
+- If a component is not visible (e.g. no hands present), score 75 and note "not visible in image"
+"""
+
+
+def _parse_response(raw: str) -> dict[str, Any]:
+ text = raw.strip()
+ if text.startswith("```"):
+ lines = [ln for ln in text.split("\n") if not ln.strip().startswith("```")]
+ text = "\n".join(lines).strip()
+ return json.loads(text)
+
+
+def _to_component(d: Any) -> VLMComponentScore:
+ if isinstance(d, dict):
+ return VLMComponentScore(
+ score=max(0, min(100, int(d.get("score", 75)))),
+ notes=str(d.get("notes", ""))[:200],
+ )
+ return VLMComponentScore()
+
+
+def _build_breakdown(data: dict[str, Any]) -> VLMBreakdown:
+ return VLMBreakdown(
+ facial_symmetry=_to_component(data.get("facial_symmetry")),
+ skin_texture=_to_component(data.get("skin_texture")),
+ lighting_consistency=_to_component(data.get("lighting_consistency")),
+ background_coherence=_to_component(data.get("background_coherence")),
+ anatomy_hands_eyes=_to_component(data.get("anatomy_hands_eyes")),
+ context_objects=_to_component(data.get("context_objects")),
+ )
+
+
+def generate_vlm_breakdown(
+ image: Image.Image,
+ record_id: str | None = None,
+) -> VLMBreakdown | None:
+ """Score 6 perceptual components via vision LLM. Returns None when unconfigured."""
+ if record_id and record_id in _cache:
+ cached = _cache[record_id]
+ cached.cached = True
+ return cached
+
+ if not settings.LLM_API_KEY:
+ logger.debug("LLM_API_KEY not set — skipping VLM breakdown")
+ return None
+
+ provider = settings.LLM_PROVIDER.lower()
+ model_id = settings.LLM_MODEL
+
+ try:
+ if provider == "openai":
+ breakdown = _call_openai(image, model_id)
+ else:
+ breakdown = _call_gemini(image, model_id)
+
+ breakdown.model_used = f"{provider}/{model_id}"
+ if record_id:
+ _cache[record_id] = breakdown
+
+ logger.info(f"VLM breakdown generated via {provider}/{model_id}")
+ return breakdown
+
+ except json.JSONDecodeError as e:
+ logger.error(f"VLM breakdown: unparseable JSON from LLM: {e}")
+ return None
+ except Exception as e:
+ logger.error(f"VLM breakdown failed: {e}")
+ return None
+
+
+def _call_gemini(image: Image.Image, model_id: str) -> VLMBreakdown:
+ import google.generativeai as genai # type: ignore
+ genai.configure(api_key=settings.LLM_API_KEY)
+ model = genai.GenerativeModel(model_id)
+ response = model.generate_content([_PROMPT, image])
+ return _build_breakdown(_parse_response(response.text))
+
+
+def _call_openai(image: Image.Image, model_id: str) -> VLMBreakdown:
+ import base64
+ from openai import OpenAI # type: ignore
+
+ buf = BytesIO()
+ img = image.convert("RGB")
+ img.save(buf, format="JPEG", quality=85)
+ b64 = base64.b64encode(buf.getvalue()).decode()
+
+ client = OpenAI(api_key=settings.LLM_API_KEY)
+ response = client.chat.completions.create(
+ model=model_id,
+ messages=[{
+ "role": "user",
+ "content": [
+ {"type": "text", "text": _PROMPT},
+ {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{b64}"}},
+ ],
+ }],
+ temperature=0.2,
+ max_tokens=400,
+ )
+ return _build_breakdown(_parse_response(response.choices[0].message.content))