from __future__ import annotations
import base64
import html
import json
import uuid
from datetime import datetime, timedelta
from io import BytesIO
from pathlib import Path
from typing import Any, Optional
from urllib.parse import urlparse
from zoneinfo import ZoneInfo
from loguru import logger
from PIL import Image as PILImage
from reportlab.lib import colors
from reportlab.lib.enums import TA_CENTER, TA_LEFT, TA_RIGHT
from reportlab.lib.pagesizes import A4
from reportlab.lib.styles import ParagraphStyle, getSampleStyleSheet
from reportlab.lib.units import mm
from reportlab.platypus import (
Flowable,
Image,
Paragraph,
SimpleDocTemplate,
Spacer,
Table,
TableStyle,
)
from config import settings
from db.models import AnalysisRecord, Report
REPO_ROOT = Path(__file__).resolve().parents[2]
BACKEND_ROOT = Path(__file__).resolve().parents[1]
LOGO_PATH = BACKEND_ROOT / "static" / "logo.png"
IST = ZoneInfo("Asia/Kolkata")
# Typography & Spacing Grid (base unit: 6pt)
BASE_SPACING = 4
# Font constants (ReportLab uses these exact names; fallbacks handled by OS)
FONT_SANS = "Helvetica" # Primary: available on all systems
FONT_SANS_BOLD = "Helvetica-Bold"
FONT_SANS_OBLIQUE = "Helvetica-Oblique"
FONT_MONO = "Courier" # Monospace fallback
# Severity badge colors (high > medium > low)
SEVERITY_HIGH = colors.HexColor("#DC2626") # Red
SEVERITY_MEDIUM = colors.HexColor("#EA580C") # Orange
SEVERITY_LOW = colors.HexColor("#2563EB") # Blue
SEVERITY_NEUTRAL = colors.HexColor("#6B7280") # Gray
# Improved color palette with better contrast
SLATE = colors.HexColor("#0F1A2D") # Darker title color for contrast
TEXT = colors.HexColor("#1A202C") # Darker body text
MUTED = colors.HexColor("#4B5563") # Darker muted (was #667085)
LINE = colors.HexColor("#D9E0EA")
PANEL = colors.HexColor("#EDF0F7") # Slightly darker for better contrast (was #F7F9FC)
PANEL_2 = colors.HexColor("#E0E7F4") # Slightly darker (was #EEF3F8)
CRIMSON = colors.HexColor("#C81E3A")
AMBER = colors.HexColor("#C77700")
GREEN = colors.HexColor("#168A4A")
BLUE = colors.HexColor("#2F6FED")
def _ensure_dir() -> Path:
path = Path(settings.REPORT_DIR)
path.mkdir(parents=True, exist_ok=True)
return path
def _clamp(value: Any, lo: float = 0.0, hi: float = 100.0) -> float:
try:
number = float(value)
except (TypeError, ValueError):
number = lo
return max(lo, min(hi, number))
def _clean(value: Any, default: str = "") -> str:
if value is None:
return default
text = str(value).replace("\x00", "").strip()
return text or default
def _xml(value: Any, default: str = "") -> str:
return html.escape(_clean(value, default), quote=True)
def _shorten(value: Any, limit: int = 700) -> tuple[str, bool]:
"""Shorten text and return (text, was_truncated)."""
text = " ".join(_clean(value).split())
if len(text) <= limit:
return (text, False)
return (text[: limit - 1].rstrip() + "...", True)
def _severity_badge_color(severity: str) -> colors.Color:
"""Return color for severity level (high/medium/low)."""
sev_lower = _clean(severity).lower()
if "high" in sev_lower or "critical" in sev_lower:
return SEVERITY_HIGH
if "medium" in sev_lower or "warn" in sev_lower:
return SEVERITY_MEDIUM
if "low" in sev_lower:
return SEVERITY_LOW
return SEVERITY_NEUTRAL
def _format_anomaly_score(score: float) -> str:
"""Format anomaly score consistently (always as % anomaly)."""
anomaly_pct = 100 - _clamp(score, 0, 100)
return f"{anomaly_pct:.0f}% anomaly"
def _placeholder_image(width: float = 78 * mm, height: float = 58 * mm) -> Image:
"""Return a gray placeholder image when media is unavailable."""
placeholder_pil = PILImage.new("RGB", (int(width * 2.83), int(height * 2.83)), color=(220, 224, 232))
stream = BytesIO()
placeholder_pil.save(stream, format="PNG")
stream.seek(0)
img = Image(stream, width=width, height=height)
return img
def _as_dict(value: Any) -> dict[str, Any]:
return value if isinstance(value, dict) else {}
def _as_list(value: Any) -> list[Any]:
return value if isinstance(value, list) else []
def _deepfake_probability(analysis_json: dict[str, Any]) -> int:
verdict = _as_dict(analysis_json.get("verdict"))
authenticity = _clamp(verdict.get("authenticity_score", 50))
return int(round(100 - authenticity))
def _confidence_percent(verdict: dict[str, Any]) -> float:
confidence = _clamp(verdict.get("model_confidence", 0), 0, 100)
return confidence * 100 if confidence <= 1 else confidence
def _severity_color(fake_probability: float):
if fake_probability >= 70:
return CRIMSON
if fake_probability >= 40:
return AMBER
return GREEN
def _generated_at_ist() -> str:
return datetime.now(tz=IST).strftime("%d %b %Y, %I:%M %p IST")
def _extract_llm_summary(analysis_json: dict[str, Any]) -> dict[str, Any] | None:
top = analysis_json.get("llm_summary")
if isinstance(top, dict) and (top.get("paragraph") or top.get("bullets")):
return top
nested = _as_dict(analysis_json.get("explainability")).get("llm_summary")
if isinstance(nested, dict) and (nested.get("paragraph") or nested.get("bullets")):
return nested
return None
def _media_label(media_type: str) -> str:
return "SCREENSHOT" if media_type == "screenshot" else media_type.upper()
def _resolve_media_path(value: Any) -> Path | None:
raw = _clean(value)
if not raw or raw.startswith("data:") or urlparse(raw).scheme in {"http", "https"}:
return None
path = Path(raw)
candidates: list[Path] = []
if path.is_absolute():
candidates.append(path)
stripped = raw.lstrip("/\\")
candidates.extend(
[
REPO_ROOT / stripped,
BACKEND_ROOT / stripped,
REPO_ROOT / "backend" / stripped,
]
)
if stripped.startswith("media/"):
suffix = stripped[len("media/") :]
candidates.extend(
[
Path(settings.MEDIA_ROOT) / suffix,
BACKEND_ROOT / "media" / suffix,
REPO_ROOT / "media" / suffix,
]
)
for candidate in candidates:
try:
resolved = candidate.resolve()
if resolved.exists() and resolved.is_file():
return resolved
except OSError:
continue
return None
def _image_from_base64(data: Any, max_width: float, max_height: float) -> Image | None:
"""Decode base64 image, embed as bytes in PDF, or return None with error logging."""
raw = _clean(data)
if not raw:
logger.debug("No base64 image data provided")
return None
try:
encoded = raw.split(",", 1)[1] if "," in raw else raw
blob = base64.b64decode(encoded)
# Get dimensions from PIL
with PILImage.open(BytesIO(blob)) as pil:
width, height = pil.size
# Create stream and ensure it's at position 0 for ReportLab to read
stream = BytesIO(blob)
stream.seek(0)
return _scaled_image(stream, width, height, max_width, max_height)
except Exception as exc: # noqa: BLE001
logger.warning(f"Base64 image decode failed: {exc}")
return None
import urllib.request
def _image_from_url(url: str | None, max_width: float, max_height: float) -> Image | None:
"""Download image from HTTP/HTTPS URL and embed in PDF."""
if not url or not str(url).startswith("http"):
return None
try:
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'})
with urllib.request.urlopen(req, timeout=10) as response:
image_bytes = response.read()
with PILImage.open(BytesIO(image_bytes)) as pil:
width, height = pil.size
stream = BytesIO(image_bytes)
stream.seek(0)
return _scaled_image(stream, width, height, max_width, max_height)
except Exception as exc:
logger.warning(f"Failed to fetch image from URL {url}: {exc}")
return None
def _image_from_path(path: Path | None, max_width: float, max_height: float) -> Image | None:
"""Load image from path, embed as bytes in PDF, or return None with error logging."""
if path is None:
logger.debug("No image path provided")
return None
try:
# Read the file as bytes and wrap in BytesIO for embedding in PDF
with open(path, 'rb') as f:
image_bytes = f.read()
# Get dimensions from PIL
with PILImage.open(BytesIO(image_bytes)) as pil:
width, height = pil.size
# Create stream and ensure it's at position 0 for ReportLab to read
stream = BytesIO(image_bytes)
stream.seek(0)
return _scaled_image(stream, width, height, max_width, max_height)
except Exception as exc: # noqa: BLE001
logger.warning(f"Image not found at {path}: {exc}")
return None
def _scaled_image(source: Any, width: int, height: int, max_width: float, max_height: float) -> Image:
scale = min(max_width / max(width, 1), max_height / max(height, 1), 1.0)
img = Image(source)
img.drawWidth = width * scale
img.drawHeight = height * scale
return img
def _styles() -> dict[str, ParagraphStyle]:
"""Typography system with improved readability (10pt+ body, 1.5x leading)."""
base = getSampleStyleSheet()
return {
"title": ParagraphStyle(
"DeepShieldTitle",
parent=base["Title"],
fontName=FONT_SANS_BOLD,
fontSize=20,
leading=24, # 1.2x leading for titles
textColor=SLATE,
alignment=TA_LEFT,
spaceAfter=BASE_SPACING, # 4pt after (reduced from 12pt)
),
"section": ParagraphStyle(
"DeepShieldSection",
parent=base["Heading2"],
fontName=FONT_SANS_BOLD,
fontSize=13,
leading=16, # 1.23x leading
textColor=SLATE,
spaceBefore=BASE_SPACING, # 4pt before section (reduced from 12pt)
spaceAfter=BASE_SPACING, # 4pt after (reduced from 7pt)
keepWithNext=True,
),
"body": ParagraphStyle(
"DeepShieldBody",
parent=base["BodyText"],
fontName=FONT_SANS,
fontSize=10, # Increased from 9.1pt
leading=15, # 1.5x leading (was 13.2)
textColor=TEXT,
spaceAfter=BASE_SPACING, # 6pt
),
"small": ParagraphStyle(
"DeepShieldSmall",
parent=base["BodyText"],
fontName=FONT_SANS,
fontSize=9, # Increased from 7.8pt
leading=13.5, # 1.5x leading
textColor=MUTED,
spaceAfter=3,
),
"meta": ParagraphStyle(
"DeepShieldMeta",
parent=base["BodyText"],
fontName=FONT_SANS,
fontSize=9, # Increased from 8.2pt
leading=13.5, # 1.5x leading
textColor=MUTED,
alignment=TA_RIGHT,
),
"badge": ParagraphStyle(
"DeepShieldBadge",
parent=base["BodyText"],
fontName=FONT_SANS_BOLD,
fontSize=9, # Increased from 8.5pt
leading=13,
textColor=colors.white,
alignment=TA_CENTER,
),
"quote": ParagraphStyle(
"DeepShieldQuote",
parent=base["BodyText"],
fontName=FONT_SANS,
fontSize=10, # Increased from 9.2pt
leading=15, # 1.5x leading
textColor=SLATE,
leftIndent=12, # Increased from 8
rightIndent=12,
),
"caption": ParagraphStyle(
"DeepShieldCaption",
parent=base["BodyText"],
fontName=FONT_SANS_OBLIQUE,
fontSize=8.5, # Increased from 7.7pt
leading=12.75, # 1.5x leading
textColor=MUTED,
alignment=TA_CENTER,
),
"link": ParagraphStyle(
"DeepShieldLink",
parent=base["BodyText"],
fontName=FONT_SANS,
fontSize=9, # Increased from 8.4pt
leading=13.5, # 1.5x leading
textColor=BLUE,
),
}
class ScoreGauge(Flowable):
def __init__(self, score: int, label: str, width: float = 174, height: float = 104):
super().__init__()
self.score = int(_clamp(score))
self.label = label
self.width = width
self.height = height
self.color = _severity_color(self.score)
def draw(self) -> None:
c = self.canv
cx = self.width / 2
cy = 28
radius = 60
bbox = (cx - radius, cy - radius, cx + radius, cy + radius)
c.saveState()
c.setLineCap(1)
c.setStrokeColor(colors.HexColor("#E5EAF1"))
c.setLineWidth(13)
c.arc(*bbox, startAng=0, extent=180)
c.setStrokeColor(self.color)
c.arc(*bbox, startAng=180 - (180 * self.score / 100), extent=180 * self.score / 100)
c.setFillColor(SLATE)
c.setFont(FONT_SANS_BOLD, 25)
c.drawCentredString(cx, cy + 9, f"{self.score}")
c.setFont(FONT_SANS, 7.5)
c.setFillColor(MUTED)
c.drawCentredString(cx, cy - 4, "DEEPFAKE PROBABILITY")
c.setFont(FONT_SANS_BOLD, 8.5)
c.setFillColor(self.color)
c.drawCentredString(cx, cy - 18, self.label[:34])
c.restoreState()
class BarChart(Flowable):
def __init__(self, metrics: list[tuple[str, float, str]], width: float = 470, row_height: float = 21):
super().__init__()
self.metrics = metrics
self.width = width
self.row_height = row_height
self.height = max(1, len(metrics)) * row_height + 7
def draw(self) -> None:
c = self.canv
label_w = 132
value_w = 54
bar_w = self.width - label_w - value_w - 18
y = self.height - self.row_height
for label, value, value_text in self.metrics:
pct = _clamp(value)
color = _severity_color(pct)
c.setFillColor(SLATE)
c.setFont(FONT_SANS, 8.2)
c.drawString(0, y + 5, label[:35])
c.setFillColor(colors.HexColor("#E8EDF3"))
c.roundRect(label_w, y + 5, bar_w, 7, 3, fill=1, stroke=0)
c.setFillColor(color)
c.roundRect(label_w, y + 5, max(2, bar_w * pct / 100), 7, 3, fill=1, stroke=0)
c.setFillColor(MUTED)
c.setFont(FONT_SANS_BOLD, 8)
c.drawRightString(label_w + bar_w + value_w, y + 4, value_text)
y -= self.row_height
class PipelineFlow(Flowable):
def __init__(self, stages: list[str], width: float = 470):
super().__init__()
self.stages = [s for s in stages if s][:8]
self.width = width
self.height = 58 if self.stages else 20
def draw(self) -> None:
c = self.canv
if not self.stages:
c.setFillColor(MUTED)
c.setFont(FONT_SANS, 8)
c.drawString(0, 4, "No pipeline stages were recorded.")
return
gap = 11
box_w = min(83, (self.width - gap * (len(self.stages) - 1)) / len(self.stages))
y = 18
for idx, stage in enumerate(self.stages):
x = idx * (box_w + gap)
c.setFillColor(PANEL_2)
c.setStrokeColor(LINE)
c.roundRect(x, y, box_w, 26, 5, fill=1, stroke=1)
c.setFillColor(SLATE)
c.setFont(FONT_SANS_BOLD, 6.5)
c.drawCentredString(x + box_w / 2, y + 15, stage.replace("_", " ")[:18])
if idx < len(self.stages) - 1:
ax = x + box_w + 2
ay = y + 13
c.setStrokeColor(MUTED)
c.line(ax, ay, ax + gap - 5, ay)
c.line(ax + gap - 5, ay, ax + gap - 8, ay + 3)
c.line(ax + gap - 5, ay, ax + gap - 8, ay - 3)
def _section(title: str, styles: dict[str, ParagraphStyle]) -> Paragraph:
return Paragraph(_xml(title), styles["section"])
def _panel(rows: list[list[Any]], col_widths: list[float] | None = None) -> Table:
"""Detail panel with improved spacing (10pt padding)."""
table = Table(rows, colWidths=col_widths, hAlign="LEFT")
table.setStyle(
TableStyle(
[
("BACKGROUND", (0, 0), (-1, -1), PANEL),
("BOX", (0, 0), (-1, -1), 0.5, LINE),
("INNERGRID", (0, 0), (-1, -1), 0.25, colors.HexColor("#D9DFE8")), # Darker grid
("VALIGN", (0, 0), (-1, -1), "TOP"),
("LEFTPADDING", (0, 0), (-1, -1), 6), # Reduced from 10
("RIGHTPADDING", (0, 0), (-1, -1), 6),
("TOPPADDING", (0, 0), (-1, -1), 6), # Reduced from 10
("BOTTOMPADDING", (0, 0), (-1, -1), 6), # Reduced from 10
]
)
)
return table
def _header(analysis_json: dict[str, Any], generated_at: str, styles: dict[str, ParagraphStyle]) -> list[Any]:
media_type = _media_label(_clean(analysis_json.get("media_type"), "unknown"))
report_id = _clean(analysis_json.get("record_id")) or _clean(analysis_json.get("analysis_id"), "N/A")
logo = _image_from_path(LOGO_PATH if LOGO_PATH.exists() else None, 34 * mm, 16 * mm)
logo_cell: Any
if logo:
logo_cell = logo
else:
logo_cell = Paragraph("DeepShield", styles["title"])
meta = Paragraph(
f"Report ID: {_xml(report_id)}
"
f"Generated: {_xml(generated_at)}
"
f"Media Type: {_xml(media_type)}",
styles["meta"],
)
table = Table([[logo_cell, meta]], colWidths=[85 * mm, 91 * mm])
table.setStyle(
TableStyle(
[
("VALIGN", (0, 0), (-1, -1), "MIDDLE"),
("LINEBELOW", (0, 0), (-1, -1), 1.1, SLATE),
("BOTTOMPADDING", (0, 0), (-1, -1), 4), # Reduced from 8
]
)
)
return [table, Spacer(1, 4)]
def _badge(text: str, color, styles: dict[str, ParagraphStyle]) -> Table:
table = Table([[Paragraph(_xml(text), styles["badge"])]], colWidths=[54 * mm])
table.setStyle(
TableStyle(
[
("BACKGROUND", (0, 0), (-1, -1), color),
("BOX", (0, 0), (-1, -1), 0, color),
("TOPPADDING", (0, 0), (-1, -1), 4),
("BOTTOMPADDING", (0, 0), (-1, -1), 4),
]
)
)
return table
def _executive_summary(analysis_json: dict[str, Any], styles: dict[str, ParagraphStyle]) -> list[Any]:
verdict = _as_dict(analysis_json.get("verdict"))
fake_score = _deepfake_probability(analysis_json)
label = _clean(verdict.get("label"), "Inconclusive")
severity = verdict.get("severity")
color = _severity_badge_color(severity) if severity else _severity_color(fake_score)
confidence = _confidence_percent(verdict)
llm = _extract_llm_summary(analysis_json)
summary_text = _clean(
_as_dict(llm).get("paragraph"),
"No Gemini explanation summary was saved for this analysis.",
)
bullets = _as_list(_as_dict(llm).get("bullets"))
bullet_html = ""
if bullets:
bullet_html = "
" + "
".join(f"- {_xml(b)}" for b in bullets[:4])
if len(bullets) > 4:
more = len(bullets) - 4
bullet_html += f"
(+{more} more insight{'s' if more != 1 else ''} available)"
detail = [
_badge(label, color, styles),
Spacer(1, 6),
Paragraph(
f"Model confidence: {confidence:.1f}%
"
f"Model label: {_xml(verdict.get('model_label'), 'unknown')}
"
f"AI explanation summary:
{_xml(summary_text)}{bullet_html}",
styles["body"],
),
]
table = Table(
[[ScoreGauge(fake_score, label), detail]],
colWidths=[64 * mm, 110 * mm],
)
table.setStyle(
TableStyle(
[
("BACKGROUND", (0, 0), (-1, -1), PANEL),
("BOX", (0, 0), (-1, -1), 0.6, LINE),
("VALIGN", (0, 0), (-1, -1), "MIDDLE"),
("LEFTPADDING", (0, 0), (-1, -1), 6), # Reduced from 10
("RIGHTPADDING", (0, 0), (-1, -1), 6),
("TOPPADDING", (0, 0), (-1, -1), 6), # Reduced from 10
("BOTTOMPADDING", (0, 0), (-1, -1), 6), # Reduced from 10
]
)
)
return [_section("Executive Summary: The Verdict", styles), table]
def _media_context(analysis_json: dict[str, Any], record: AnalysisRecord, styles: dict[str, ParagraphStyle]) -> list[Any]:
media_type = _clean(analysis_json.get("media_type"), record.media_type).lower()
expl = _as_dict(analysis_json.get("explainability"))
story: list[Any] = [_section("Analyzed Media Context", styles)]
if media_type == "text":
snippet, was_truncated = _shorten(expl.get("original_text"), 950)
truncation_note = "
[...more text not shown]" if was_truncated else ""
story.append(
_panel(
[[Paragraph(f"“{_xml(snippet, 'No text snippet was stored.')}”{truncation_note}", styles["quote"])]],
[176 * mm],
)
)
return story
if media_type in {"image", "screenshot", "video"}:
thumb_url = analysis_json.get("thumbnail_url") or record.thumbnail_url
thumb = (
_image_from_url(thumb_url, 72 * mm, 48 * mm)
or _image_from_path(_resolve_media_path(thumb_url), 72 * mm, 48 * mm)
)
media_url = analysis_json.get("media_path") or record.media_path
original = (
_image_from_url(media_url, 72 * mm, 48 * mm)
or _image_from_path(_resolve_media_path(media_url), 72 * mm, 48 * mm)
)
image_cell: Any = thumb or original or Paragraph("Original thumbnail unavailable", styles["small"])
text_value, was_truncated = _shorten(expl.get("extracted_text") or expl.get("transcript"), 800)
truncation_note = " [+more not shown]" if was_truncated else ""
text_label = "Extracted OCR text" if media_type == "screenshot" else "Context notes"
text_cell = Paragraph(
f"{text_label}
{_xml(text_value, 'No OCR or transcript text was recorded.')}{truncation_note}",
styles["body"],
)
table = Table([[image_cell, text_cell]], colWidths=[78 * mm, 98 * mm])
table.setStyle(
TableStyle(
[
("BACKGROUND", (0, 0), (-1, -1), PANEL),
("BOX", (0, 0), (-1, -1), 0.5, LINE),
("VALIGN", (0, 0), (-1, -1), "TOP"),
("LEFTPADDING", (0, 0), (-1, -1), 6),
("RIGHTPADDING", (0, 0), (-1, -1), 6),
("TOPPADDING", (0, 0), (-1, -1), 6), # Reduced from 8
("BOTTOMPADDING", (0, 0), (-1, -1), 6), # Reduced from 8
]
)
)
story.append(table)
return story
if media_type == "audio":
transcript, was_truncated = _shorten(expl.get("transcript") or expl.get("extracted_transcript"), 850)
truncation_note = " [+more not shown]" if was_truncated else ""
duration = _clamp(expl.get("duration_s"), 0, 10_000_000)
fmt = _clean(analysis_json.get("audio_format") or analysis_json.get("format"), "not recorded")
story.append(
_panel(
[
[
Paragraph("Duration", styles["small"]),
Paragraph(f"{duration:.1f} seconds", styles["body"]),
],
[
Paragraph("Format", styles["small"]),
Paragraph(_xml(fmt), styles["body"]),
],
[
Paragraph("Transcript", styles["small"]),
Paragraph(_xml(transcript, "No transcript was recorded.") + truncation_note, styles["body"]),
],
],
[42 * mm, 134 * mm],
)
)
return story
story.append(Paragraph("No media context was recorded for this analysis.", styles["small"]))
return story
ANOMALY_LABELS = {
"facial_symmetry": ("Face alignment", "Facial landmarks do not line up as naturally as expected."),
"skin_texture": ("Skin texture", "Skin detail appears unusually smooth, noisy, or inconsistent."),
"lighting_consistency": ("Lighting consistency", "The face lighting does not match the surrounding scene."),
"background_coherence": ("Background coherence", "Edges or background objects look inconsistent with the subject."),
"anatomy_hands_eyes": ("Eyes and anatomy", "Eye glare, hands, or anatomy show unnatural structure."),
"context_objects": ("Scene context", "Objects or scene details conflict with the claimed context."),
}
def _xai_rows(analysis_json: dict[str, Any], styles: dict[str, ParagraphStyle]) -> list[list[Any]]:
media_type = _clean(analysis_json.get("media_type")).lower()
expl = _as_dict(analysis_json.get("explainability"))
rows: list[list[Any]] = [
[
Paragraph("Signal", styles["small"]),
Paragraph("Strength", styles["small"]),
Paragraph("Plain-language reason", styles["small"]),
]
]
for indicator in _as_list(expl.get("artifact_indicators"))[:8]:
item = _as_dict(indicator)
confidence = _clamp(item.get("confidence"), 0, 1) * 100
rows.append(
[
Paragraph(_xml(item.get("type"), "Artifact"), styles["body"]),
Paragraph(f"{confidence:.0f}%
{_xml(item.get('severity'), 'signal')}", styles["small"]),
Paragraph(_xml(item.get("description"), "The visual evidence contains an unusual pattern."), styles["body"]),
]
)
vlm = _as_dict(expl.get("vlm_breakdown"))
for key, (label, fallback) in ANOMALY_LABELS.items():
comp = _as_dict(vlm.get(key))
if not comp:
continue
consistency = _clamp(comp.get("score"), 0, 100)
anomaly = 100 - consistency
if anomaly < 18 and not comp.get("notes"):
continue
reason = _clean(comp.get("notes"), fallback)
rows.append(
[
Paragraph(_xml(label), styles["body"]),
Paragraph(_format_anomaly_score(consistency), styles["small"]),
Paragraph(_xml(reason), styles["body"]),
]
)
if media_type in {"text", "screenshot"}:
for indicator in _as_list(expl.get("manipulation_indicators"))[:6]:
item = _as_dict(indicator)
rows.append(
[
Paragraph(_xml(item.get("pattern_type"), "Text signal"), styles["body"]),
Paragraph(_xml(item.get("severity"), "medium"), styles["small"]),
Paragraph(_xml(item.get("description"), "The wording may be manipulative or misleading."), styles["body"]),
]
)
for phrase in _as_list(expl.get("suspicious_phrases"))[:6]:
item = _as_dict(phrase)
rows.append(
[
Paragraph(_xml(item.get("pattern_type"), "Suspicious phrase"), styles["body"]),
Paragraph(_xml(item.get("severity"), "medium"), styles["small"]),
Paragraph(_xml(item.get("description"), item.get("text", "OCR text was flagged.")), styles["body"]),
]
)
for anomaly in _as_list(expl.get("layout_anomalies"))[:5]:
item = _as_dict(anomaly)
rows.append(
[
Paragraph(_xml(item.get("type"), "Layout anomaly"), styles["body"]),
Paragraph(f"{_clamp(item.get('confidence'), 0, 1) * 100:.0f}%", styles["small"]),
Paragraph(_xml(item.get("description"), "The screenshot layout is visually inconsistent."), styles["body"]),
]
)
if media_type in {"audio", "video"}:
audio = _as_dict(expl.get("audio") or expl)
if audio:
rows.append(
[
Paragraph("Audio consistency", styles["body"]),
Paragraph(f"{100 - _clamp(audio.get('audio_authenticity_score')):.0f}% anomaly", styles["small"]),
Paragraph(_xml(audio.get("notes"), "Audio signal features were compared for voice consistency."), styles["body"]),
]
)
if len(rows) == 1:
rows.append(
[
Paragraph("No strong anomaly", styles["body"]),
Paragraph("Low", styles["small"]),
Paragraph("The saved model output did not include detailed anomaly markers.", styles["body"]),
]
)
return rows
def _xai_breakdown(analysis_json: dict[str, Any], styles: dict[str, ParagraphStyle]) -> list[Any]:
rows = _xai_rows(analysis_json, styles)
table = Table(rows, colWidths=[44 * mm, 30 * mm, 102 * mm], repeatRows=1)
table.setStyle(
TableStyle(
[
("BACKGROUND", (0, 0), (-1, 0), PANEL_2),
("TEXTCOLOR", (0, 0), (-1, 0), SLATE),
("FONTNAME", (0, 0), (-1, 0), FONT_SANS_BOLD),
("FONTSIZE", (0, 0), (-1, 0), 9),
("BACKGROUND", (0, 1), (-1, -1), colors.white),
("BOX", (0, 0), (-1, -1), 0.5, LINE),
("INNERGRID", (0, 0), (-1, -1), 0.25, colors.HexColor("#D9DFE8")),
("VALIGN", (0, 0), (-1, -1), "TOP"),
("NOSPLIT", (0, 0), (-1, 1)),
("LEFTPADDING", (0, 0), (-1, -1), 6), # Reduced from 8
("RIGHTPADDING", (0, 0), (-1, -1), 6),
("TOPPADDING", (0, 0), (-1, -1), 6), # Reduced from 8
("BOTTOMPADDING", (0, 0), (-1, -1), 6), # Reduced from 8
]
)
)
return [_section("XAI Detailed Breakdown", styles), table]
def _forensic_visuals(analysis_json: dict[str, Any], styles: dict[str, ParagraphStyle]) -> list[Any]:
media_type = _clean(analysis_json.get("media_type")).lower()
if media_type not in {"image", "screenshot", "video"}:
return []
expl = _as_dict(analysis_json.get("explainability"))
candidates = [
(
"AI Activation Heatmap",
"Grad-CAM++ activation showing regions the AI model focused on when making its decision.",
expl.get("heatmap_base64"),
expl.get("heatmap_url"),
),
(
"Error Level Analysis (ELA)",
"Heatmap indicating areas of high compression loss, often associated with digital splicing.",
expl.get("ela_base64"),
expl.get("ela_url"),
),
(
"Manipulation Region Overlay",
"Bounding boxes highlight regions the visual model treated as suspicious or manipulated.",
expl.get("boxes_base64"),
expl.get("boxes_url"),
),
]
visuals: list[tuple[str, str, Image]] = []
for title, caption, b64_data, url_data in candidates:
img = (
_image_from_base64(b64_data, 78 * mm, 58 * mm)
or _image_from_url(url_data, 78 * mm, 58 * mm)
or _image_from_path(_resolve_media_path(url_data), 78 * mm, 58 * mm)
or _placeholder_image(78 * mm, 58 * mm)
)
visuals.append((title, caption, img))
rows = []
current_row = []
for title, caption, image in visuals:
current_row.append([
Paragraph(f"{_xml(title)}", styles["body"]),
image,
Paragraph(_xml(caption), styles["caption"]),
])
if len(current_row) == 2:
rows.append(current_row)
current_row = []
if current_row:
current_row.append([])
rows.append(current_row)
table = Table(rows, colWidths=[88 * mm, 88 * mm])
table.setStyle(
TableStyle(
[
("BACKGROUND", (0, 0), (-1, -1), PANEL),
("BOX", (0, 0), (-1, -1), 0.5, LINE),
("VALIGN", (0, 0), (-1, -1), "TOP"),
("LEFTPADDING", (0, 0), (-1, -1), 6),
("RIGHTPADDING", (0, 0), (-1, -1), 6),
("TOPPADDING", (0, 0), (-1, -1), 6),
("BOTTOMPADDING", (0, 0), (-1, -1), 6),
]
)
)
return [_section("Forensic Visual Evidence", styles), table]
def _text_metric_chart(analysis_json: dict[str, Any], styles: dict[str, ParagraphStyle]) -> list[Any]:
media_type = _clean(analysis_json.get("media_type")).lower()
if media_type not in {"text", "screenshot"}:
return []
expl = _as_dict(analysis_json.get("explainability"))
sens = _as_dict(expl.get("sensationalism"))
metrics = [
("Deepfake probability", _clamp(expl.get("fake_probability"), 0, 1) * 100, f"{_clamp(expl.get('fake_probability'), 0, 1) * 100:.0f}%"),
("Sensationalism score", _clamp(sens.get("score")), f"{_clamp(sens.get('score')):.0f}/100"),
("Exclamations", min(_clamp(sens.get("exclamation_count"), 0, 20) * 5, 100), _clean(sens.get("exclamation_count"), "0")),
("ALL CAPS words", min(_clamp(sens.get("caps_word_count"), 0, 25) * 4, 100), _clean(sens.get("caps_word_count"), "0")),
("Emotional words", min(_clamp(sens.get("emotional_word_count"), 0, 20) * 5, 100), _clean(sens.get("emotional_word_count"), "0")),
("Clickbait matches", min(_clamp(sens.get("clickbait_matches"), 0, 10) * 10, 100), _clean(sens.get("clickbait_matches"), "0")),
]
return [
_section("Text & Metadata Analysis", styles),
_panel([[BarChart(metrics)]], [176 * mm]),
]
def _exif_metadata(analysis_json: dict[str, Any], styles: dict[str, ParagraphStyle]) -> list[Any]:
exif = _as_dict(_as_dict(analysis_json.get("explainability")).get("exif"))
if not exif:
return []
rows = [[Paragraph("Field", styles["small"]), Paragraph("Value", styles["small"])]]
for key in ["make", "model", "datetime_original", "software", "lens_model", "gps_info", "trust_reason"]:
value = _clean(exif.get(key))
if value:
rows.append([Paragraph(key.replace("_", " ").title(), styles["body"]), Paragraph(_xml(value), styles["body"])])
rows.append(
[
Paragraph("Trust Adjustment", styles["body"]),
Paragraph(_xml(exif.get("trust_adjustment"), "0"), styles["body"]),
]
)
table = Table(rows, colWidths=[48 * mm, 128 * mm], repeatRows=1)
table.setStyle(
TableStyle(
[
("BACKGROUND", (0, 0), (-1, 0), PANEL_2),
("FONTNAME", (0, 0), (-1, 0), FONT_SANS_BOLD),
("FONTSIZE", (0, 0), (-1, 0), 9),
("BOX", (0, 0), (-1, -1), 0.5, LINE),
("INNERGRID", (0, 0), (-1, -1), 0.25, colors.HexColor("#D9DFE8")),
("VALIGN", (0, 0), (-1, -1), "TOP"),
("NOSPLIT", (0, 0), (-1, 1)),
("LEFTPADDING", (0, 0), (-1, -1), 8), # Increased from 7
("RIGHTPADDING", (0, 0), (-1, -1), 8),
("TOPPADDING", (0, 0), (-1, -1), 7), # Increased from 5
("BOTTOMPADDING", (0, 0), (-1, -1), 7), # Increased from 5
]
)
)
return [_section("Image Metadata Signals", styles), table]
def _trusted_sources(analysis_json: dict[str, Any], styles: dict[str, ParagraphStyle]) -> list[Any]:
sources = [_as_dict(s) for s in _as_list(analysis_json.get("trusted_sources")) if _as_dict(s).get("url")]
if not sources:
return [
_section("Trusted Source Cross-Reference", styles),
Paragraph("No trusted news sources were returned for this analysis.", styles["small"]),
]
rows = [
[
Paragraph("Source", styles["small"]),
Paragraph("Title", styles["small"]),
Paragraph("Relevance Score", styles["small"]),
]
]
for source in sources[:10]:
url = _xml(source.get("url"))
source_name = _xml(source.get("source_name"), "Source")
title = _xml(source.get("title"), source.get("url"))
rows.append(
[
Paragraph(f'{source_name}', styles["link"]),
Paragraph(f'{title}', styles["link"]),
Paragraph(f"{_clamp(source.get('relevance_score'), 0, 1) * 100:.0f}%", styles["body"]),
]
)
table = Table(rows, colWidths=[40 * mm, 104 * mm, 32 * mm], repeatRows=1)
table.setStyle(
TableStyle(
[
("BACKGROUND", (0, 0), (-1, 0), PANEL_2),
("FONTNAME", (0, 0), (-1, 0), FONT_SANS_BOLD),
("FONTSIZE", (0, 0), (-1, 0), 9),
("BOX", (0, 0), (-1, -1), 0.5, LINE),
("INNERGRID", (0, 0), (-1, -1), 0.25, colors.HexColor("#D9DFE8")),
("VALIGN", (0, 0), (-1, -1), "TOP"),
("NOSPLIT", (0, 0), (-1, 1)),
("LEFTPADDING", (0, 0), (-1, -1), 8), # Increased from 6
("RIGHTPADDING", (0, 0), (-1, -1), 8),
("TOPPADDING", (0, 0), (-1, -1), 8), # Increased from 6
("BOTTOMPADDING", (0, 0), (-1, -1), 8), # Increased from 6
]
)
)
return [_section("Trusted Source Cross-Reference", styles), table]
def _processing_pipeline(analysis_json: dict[str, Any], styles: dict[str, ParagraphStyle]) -> list[Any]:
summary = _as_dict(analysis_json.get("processing_summary"))
stages = [_clean(s) for s in _as_list(summary.get("stages_completed")) if _clean(s)]
duration = _clamp(summary.get("total_duration_ms"), 0, 10_000_000)
return [
_section("Processing Pipeline", styles),
_panel(
[
[PipelineFlow(stages)],
[
Paragraph(
f"Total duration: {duration / 1000:.2f}s "
f"Model: {_xml(summary.get('model_used'), 'not recorded')}
"
f"Stages: {_xml(' -> '.join(stages), 'not recorded')}",
styles["body"],
)
],
],
[176 * mm],
),
]
def _footer_notice(analysis_json: dict[str, Any]) -> str:
return _clean(
analysis_json.get("responsible_ai_notice"),
"DeepShield Responsible-AI Notice: AI analysis can be wrong; verify before sharing.",
)
def _draw_footer(canvas, doc, notice: str) -> None:
canvas.saveState()
width, _height = A4
y = 13 * mm
canvas.setStrokeColor(LINE)
canvas.setLineWidth(0.4)
canvas.line(doc.leftMargin, y + 8, width - doc.rightMargin, y + 8)
canvas.setFont(FONT_SANS, 7)
canvas.setFillColor(MUTED)
canvas.drawString(doc.leftMargin, y, "Expiry Notice: report links expire according to the configured retention policy.")
canvas.drawRightString(width - doc.rightMargin, y, f"Page {doc.page}")
canvas.setFont(FONT_SANS_BOLD, 7.2)
canvas.drawCentredString(width / 2, y + 10, "DeepShield Responsible-AI Notice")
canvas.setFont(FONT_SANS, 6.6)
canvas.drawCentredString(width / 2, y + 1, notice[:128])
canvas.restoreState()
def _build_story(record: AnalysisRecord, analysis_json: dict[str, Any], generated_at: str) -> list[Any]:
styles = _styles()
story: list[Any] = []
story.extend(_header(analysis_json, generated_at, styles))
story.extend(_executive_summary(analysis_json, styles))
story.extend(_media_context(analysis_json, record, styles))
story.extend(_xai_breakdown(analysis_json, styles))
story.extend(_forensic_visuals(analysis_json, styles))
story.extend(_text_metric_chart(analysis_json, styles))
story.extend(_exif_metadata(analysis_json, styles))
story.extend(_trusted_sources(analysis_json, styles))
story.extend(_processing_pipeline(analysis_json, styles))
return story
def render_html(analysis_json: dict[str, Any]) -> str:
"""Compatibility shim for older callers.
PDF generation now uses ReportLab directly so hyperlinks, footers, charts,
and images are reliable. This compact HTML preview is intentionally not the
source of truth for report rendering.
"""
verdict = _as_dict(analysis_json.get("verdict"))
return (
"
Media: {_xml(analysis_json.get('media_type'), 'unknown')}
" f"Verdict: {_xml(verdict.get('label'), 'Inconclusive')}
" f"Deepfake probability: {_deepfake_probability(analysis_json)}/100
" "" ) def html_to_pdf(html: str, out_path: Path) -> None: """Deprecated compatibility entrypoint. The modern report pipeline renders from structured analysis JSON. This method remains so imports do not break, but it is no longer used internally. """ doc = SimpleDocTemplate(str(out_path), pagesize=A4, pageCompression=0) styles = _styles() doc.build([Paragraph(_xml(html), styles["body"])]) def _fallback_pdf(record: AnalysisRecord, analysis_json: dict[str, Any], out_path: Path) -> None: styles = _styles() notice = _footer_notice(analysis_json) doc = SimpleDocTemplate( str(out_path), pagesize=A4, rightMargin=18 * mm, leftMargin=18 * mm, topMargin=16 * mm, bottomMargin=24 * mm, pageCompression=0, ) story = [ Paragraph("DeepShield Analysis Report", styles["title"]), Paragraph(f"Record #{record.id} - {_xml(record.media_type)}", styles["body"]), Paragraph(f"Verdict: {_xml(record.verdict)}", styles["body"]), Paragraph(f"Deepfake probability: {_deepfake_probability(analysis_json)}/100", styles["body"]), ] doc.build(story, onFirstPage=lambda c, d: _draw_footer(c, d, notice), onLaterPages=lambda c, d: _draw_footer(c, d, notice)) def generate_report(record: AnalysisRecord) -> Path: out_dir = _ensure_dir() filename = f"deepshield_{record.id}_{uuid.uuid4().hex[:8]}.pdf" out_path = out_dir / filename data = json.loads(record.result_json) generated_at = _generated_at_ist() notice = _footer_notice(data) doc = SimpleDocTemplate( str(out_path), pagesize=A4, rightMargin=17 * mm, leftMargin=17 * mm, topMargin=14 * mm, bottomMargin=25 * mm, title=f"DeepShield Threat Intelligence Report {record.id}", author="DeepShield", pageCompression=0, ) try: story = _build_story(record, data, generated_at) doc.build( story, onFirstPage=lambda c, d: _draw_footer(c, d, notice), onLaterPages=lambda c, d: _draw_footer(c, d, notice), ) except Exception as exc: # noqa: BLE001 logger.warning(f"ReportLab renderer failed for report {record.id}, using minimal fallback: {exc}") _fallback_pdf(record, data, out_path) logger.info(f"Report generated id={record.id} path={out_path} size={out_path.stat().st_size}B") return out_path def create_report_row(analysis_id: int, path: Path) -> Report: return Report( analysis_id=analysis_id, file_path=str(path), expires_at=datetime.utcnow() + timedelta(seconds=settings.REPORT_TTL_SECONDS), ) def cleanup_expired(now: Optional[datetime] = None) -> int: """Delete expired PDFs from disk. Returns count deleted.""" now = now or datetime.utcnow() directory = Path(settings.REPORT_DIR) if not directory.exists(): return 0 deleted = 0 ttl = timedelta(seconds=settings.REPORT_TTL_SECONDS) for path in directory.glob("*.pdf"): try: mtime = datetime.utcfromtimestamp(path.stat().st_mtime) if now - mtime > ttl: path.unlink() deleted += 1 except OSError as exc: logger.warning(f"Cleanup failed for {path}: {exc}") if deleted: logger.info(f"Cleaned up {deleted} expired reports") return deleted