from __future__ import annotations import base64 import html import json import uuid from datetime import datetime, timedelta from io import BytesIO from pathlib import Path from typing import Any, Optional from urllib.parse import urlparse from zoneinfo import ZoneInfo from loguru import logger from PIL import Image as PILImage from reportlab.lib import colors from reportlab.lib.enums import TA_CENTER, TA_LEFT, TA_RIGHT from reportlab.lib.pagesizes import A4 from reportlab.lib.styles import ParagraphStyle, getSampleStyleSheet from reportlab.lib.units import mm from reportlab.platypus import ( Flowable, Image, Paragraph, SimpleDocTemplate, Spacer, Table, TableStyle, ) from config import settings from db.models import AnalysisRecord, Report REPO_ROOT = Path(__file__).resolve().parents[2] BACKEND_ROOT = Path(__file__).resolve().parents[1] LOGO_PATH = BACKEND_ROOT / "static" / "logo.png" IST = ZoneInfo("Asia/Kolkata") # Typography & Spacing Grid (base unit: 6pt) BASE_SPACING = 4 # Font constants (ReportLab uses these exact names; fallbacks handled by OS) FONT_SANS = "Helvetica" # Primary: available on all systems FONT_SANS_BOLD = "Helvetica-Bold" FONT_SANS_OBLIQUE = "Helvetica-Oblique" FONT_MONO = "Courier" # Monospace fallback # Severity badge colors (high > medium > low) SEVERITY_HIGH = colors.HexColor("#DC2626") # Red SEVERITY_MEDIUM = colors.HexColor("#EA580C") # Orange SEVERITY_LOW = colors.HexColor("#2563EB") # Blue SEVERITY_NEUTRAL = colors.HexColor("#6B7280") # Gray # Improved color palette with better contrast SLATE = colors.HexColor("#0F1A2D") # Darker title color for contrast TEXT = colors.HexColor("#1A202C") # Darker body text MUTED = colors.HexColor("#4B5563") # Darker muted (was #667085) LINE = colors.HexColor("#D9E0EA") PANEL = colors.HexColor("#EDF0F7") # Slightly darker for better contrast (was #F7F9FC) PANEL_2 = colors.HexColor("#E0E7F4") # Slightly darker (was #EEF3F8) CRIMSON = colors.HexColor("#C81E3A") AMBER = colors.HexColor("#C77700") GREEN = colors.HexColor("#168A4A") BLUE = colors.HexColor("#2F6FED") def _ensure_dir() -> Path: path = Path(settings.REPORT_DIR) path.mkdir(parents=True, exist_ok=True) return path def _clamp(value: Any, lo: float = 0.0, hi: float = 100.0) -> float: try: number = float(value) except (TypeError, ValueError): number = lo return max(lo, min(hi, number)) def _clean(value: Any, default: str = "") -> str: if value is None: return default text = str(value).replace("\x00", "").strip() return text or default def _xml(value: Any, default: str = "") -> str: return html.escape(_clean(value, default), quote=True) def _shorten(value: Any, limit: int = 700) -> tuple[str, bool]: """Shorten text and return (text, was_truncated).""" text = " ".join(_clean(value).split()) if len(text) <= limit: return (text, False) return (text[: limit - 1].rstrip() + "...", True) def _severity_badge_color(severity: str) -> colors.Color: """Return color for severity level (high/medium/low).""" sev_lower = _clean(severity).lower() if "high" in sev_lower or "critical" in sev_lower: return SEVERITY_HIGH if "medium" in sev_lower or "warn" in sev_lower: return SEVERITY_MEDIUM if "low" in sev_lower: return SEVERITY_LOW return SEVERITY_NEUTRAL def _format_anomaly_score(score: float) -> str: """Format anomaly score consistently (always as % anomaly).""" anomaly_pct = 100 - _clamp(score, 0, 100) return f"{anomaly_pct:.0f}% anomaly" def _placeholder_image(width: float = 78 * mm, height: float = 58 * mm) -> Image: """Return a gray placeholder image when media is unavailable.""" placeholder_pil = PILImage.new("RGB", (int(width * 2.83), int(height * 2.83)), color=(220, 224, 232)) stream = BytesIO() placeholder_pil.save(stream, format="PNG") stream.seek(0) img = Image(stream, width=width, height=height) return img def _as_dict(value: Any) -> dict[str, Any]: return value if isinstance(value, dict) else {} def _as_list(value: Any) -> list[Any]: return value if isinstance(value, list) else [] def _deepfake_probability(analysis_json: dict[str, Any]) -> int: verdict = _as_dict(analysis_json.get("verdict")) authenticity = _clamp(verdict.get("authenticity_score", 50)) return int(round(100 - authenticity)) def _confidence_percent(verdict: dict[str, Any]) -> float: confidence = _clamp(verdict.get("model_confidence", 0), 0, 100) return confidence * 100 if confidence <= 1 else confidence def _severity_color(fake_probability: float): if fake_probability >= 70: return CRIMSON if fake_probability >= 40: return AMBER return GREEN def _generated_at_ist() -> str: return datetime.now(tz=IST).strftime("%d %b %Y, %I:%M %p IST") def _extract_llm_summary(analysis_json: dict[str, Any]) -> dict[str, Any] | None: top = analysis_json.get("llm_summary") if isinstance(top, dict) and (top.get("paragraph") or top.get("bullets")): return top nested = _as_dict(analysis_json.get("explainability")).get("llm_summary") if isinstance(nested, dict) and (nested.get("paragraph") or nested.get("bullets")): return nested return None def _media_label(media_type: str) -> str: return "SCREENSHOT" if media_type == "screenshot" else media_type.upper() def _resolve_media_path(value: Any) -> Path | None: raw = _clean(value) if not raw or raw.startswith("data:") or urlparse(raw).scheme in {"http", "https"}: return None path = Path(raw) candidates: list[Path] = [] if path.is_absolute(): candidates.append(path) stripped = raw.lstrip("/\\") candidates.extend( [ REPO_ROOT / stripped, BACKEND_ROOT / stripped, REPO_ROOT / "backend" / stripped, ] ) if stripped.startswith("media/"): suffix = stripped[len("media/") :] candidates.extend( [ Path(settings.MEDIA_ROOT) / suffix, BACKEND_ROOT / "media" / suffix, REPO_ROOT / "media" / suffix, ] ) for candidate in candidates: try: resolved = candidate.resolve() if resolved.exists() and resolved.is_file(): return resolved except OSError: continue return None def _image_from_base64(data: Any, max_width: float, max_height: float) -> Image | None: """Decode base64 image, embed as bytes in PDF, or return None with error logging.""" raw = _clean(data) if not raw: logger.debug("No base64 image data provided") return None try: encoded = raw.split(",", 1)[1] if "," in raw else raw blob = base64.b64decode(encoded) # Get dimensions from PIL with PILImage.open(BytesIO(blob)) as pil: width, height = pil.size # Create stream and ensure it's at position 0 for ReportLab to read stream = BytesIO(blob) stream.seek(0) return _scaled_image(stream, width, height, max_width, max_height) except Exception as exc: # noqa: BLE001 logger.warning(f"Base64 image decode failed: {exc}") return None import urllib.request def _image_from_url(url: str | None, max_width: float, max_height: float) -> Image | None: """Download image from HTTP/HTTPS URL and embed in PDF.""" if not url or not str(url).startswith("http"): return None try: req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'}) with urllib.request.urlopen(req, timeout=10) as response: image_bytes = response.read() with PILImage.open(BytesIO(image_bytes)) as pil: width, height = pil.size stream = BytesIO(image_bytes) stream.seek(0) return _scaled_image(stream, width, height, max_width, max_height) except Exception as exc: logger.warning(f"Failed to fetch image from URL {url}: {exc}") return None def _image_from_path(path: Path | None, max_width: float, max_height: float) -> Image | None: """Load image from path, embed as bytes in PDF, or return None with error logging.""" if path is None: logger.debug("No image path provided") return None try: # Read the file as bytes and wrap in BytesIO for embedding in PDF with open(path, 'rb') as f: image_bytes = f.read() # Get dimensions from PIL with PILImage.open(BytesIO(image_bytes)) as pil: width, height = pil.size # Create stream and ensure it's at position 0 for ReportLab to read stream = BytesIO(image_bytes) stream.seek(0) return _scaled_image(stream, width, height, max_width, max_height) except Exception as exc: # noqa: BLE001 logger.warning(f"Image not found at {path}: {exc}") return None def _scaled_image(source: Any, width: int, height: int, max_width: float, max_height: float) -> Image: scale = min(max_width / max(width, 1), max_height / max(height, 1), 1.0) img = Image(source) img.drawWidth = width * scale img.drawHeight = height * scale return img def _styles() -> dict[str, ParagraphStyle]: """Typography system with improved readability (10pt+ body, 1.5x leading).""" base = getSampleStyleSheet() return { "title": ParagraphStyle( "DeepShieldTitle", parent=base["Title"], fontName=FONT_SANS_BOLD, fontSize=20, leading=24, # 1.2x leading for titles textColor=SLATE, alignment=TA_LEFT, spaceAfter=BASE_SPACING, # 4pt after (reduced from 12pt) ), "section": ParagraphStyle( "DeepShieldSection", parent=base["Heading2"], fontName=FONT_SANS_BOLD, fontSize=13, leading=16, # 1.23x leading textColor=SLATE, spaceBefore=BASE_SPACING, # 4pt before section (reduced from 12pt) spaceAfter=BASE_SPACING, # 4pt after (reduced from 7pt) keepWithNext=True, ), "body": ParagraphStyle( "DeepShieldBody", parent=base["BodyText"], fontName=FONT_SANS, fontSize=10, # Increased from 9.1pt leading=15, # 1.5x leading (was 13.2) textColor=TEXT, spaceAfter=BASE_SPACING, # 6pt ), "small": ParagraphStyle( "DeepShieldSmall", parent=base["BodyText"], fontName=FONT_SANS, fontSize=9, # Increased from 7.8pt leading=13.5, # 1.5x leading textColor=MUTED, spaceAfter=3, ), "meta": ParagraphStyle( "DeepShieldMeta", parent=base["BodyText"], fontName=FONT_SANS, fontSize=9, # Increased from 8.2pt leading=13.5, # 1.5x leading textColor=MUTED, alignment=TA_RIGHT, ), "badge": ParagraphStyle( "DeepShieldBadge", parent=base["BodyText"], fontName=FONT_SANS_BOLD, fontSize=9, # Increased from 8.5pt leading=13, textColor=colors.white, alignment=TA_CENTER, ), "quote": ParagraphStyle( "DeepShieldQuote", parent=base["BodyText"], fontName=FONT_SANS, fontSize=10, # Increased from 9.2pt leading=15, # 1.5x leading textColor=SLATE, leftIndent=12, # Increased from 8 rightIndent=12, ), "caption": ParagraphStyle( "DeepShieldCaption", parent=base["BodyText"], fontName=FONT_SANS_OBLIQUE, fontSize=8.5, # Increased from 7.7pt leading=12.75, # 1.5x leading textColor=MUTED, alignment=TA_CENTER, ), "link": ParagraphStyle( "DeepShieldLink", parent=base["BodyText"], fontName=FONT_SANS, fontSize=9, # Increased from 8.4pt leading=13.5, # 1.5x leading textColor=BLUE, ), } class ScoreGauge(Flowable): def __init__(self, score: int, label: str, width: float = 174, height: float = 104): super().__init__() self.score = int(_clamp(score)) self.label = label self.width = width self.height = height self.color = _severity_color(self.score) def draw(self) -> None: c = self.canv cx = self.width / 2 cy = 28 radius = 60 bbox = (cx - radius, cy - radius, cx + radius, cy + radius) c.saveState() c.setLineCap(1) c.setStrokeColor(colors.HexColor("#E5EAF1")) c.setLineWidth(13) c.arc(*bbox, startAng=0, extent=180) c.setStrokeColor(self.color) c.arc(*bbox, startAng=180 - (180 * self.score / 100), extent=180 * self.score / 100) c.setFillColor(SLATE) c.setFont(FONT_SANS_BOLD, 25) c.drawCentredString(cx, cy + 9, f"{self.score}") c.setFont(FONT_SANS, 7.5) c.setFillColor(MUTED) c.drawCentredString(cx, cy - 4, "DEEPFAKE PROBABILITY") c.setFont(FONT_SANS_BOLD, 8.5) c.setFillColor(self.color) c.drawCentredString(cx, cy - 18, self.label[:34]) c.restoreState() class BarChart(Flowable): def __init__(self, metrics: list[tuple[str, float, str]], width: float = 470, row_height: float = 21): super().__init__() self.metrics = metrics self.width = width self.row_height = row_height self.height = max(1, len(metrics)) * row_height + 7 def draw(self) -> None: c = self.canv label_w = 132 value_w = 54 bar_w = self.width - label_w - value_w - 18 y = self.height - self.row_height for label, value, value_text in self.metrics: pct = _clamp(value) color = _severity_color(pct) c.setFillColor(SLATE) c.setFont(FONT_SANS, 8.2) c.drawString(0, y + 5, label[:35]) c.setFillColor(colors.HexColor("#E8EDF3")) c.roundRect(label_w, y + 5, bar_w, 7, 3, fill=1, stroke=0) c.setFillColor(color) c.roundRect(label_w, y + 5, max(2, bar_w * pct / 100), 7, 3, fill=1, stroke=0) c.setFillColor(MUTED) c.setFont(FONT_SANS_BOLD, 8) c.drawRightString(label_w + bar_w + value_w, y + 4, value_text) y -= self.row_height class PipelineFlow(Flowable): def __init__(self, stages: list[str], width: float = 470): super().__init__() self.stages = [s for s in stages if s][:8] self.width = width self.height = 58 if self.stages else 20 def draw(self) -> None: c = self.canv if not self.stages: c.setFillColor(MUTED) c.setFont(FONT_SANS, 8) c.drawString(0, 4, "No pipeline stages were recorded.") return gap = 11 box_w = min(83, (self.width - gap * (len(self.stages) - 1)) / len(self.stages)) y = 18 for idx, stage in enumerate(self.stages): x = idx * (box_w + gap) c.setFillColor(PANEL_2) c.setStrokeColor(LINE) c.roundRect(x, y, box_w, 26, 5, fill=1, stroke=1) c.setFillColor(SLATE) c.setFont(FONT_SANS_BOLD, 6.5) c.drawCentredString(x + box_w / 2, y + 15, stage.replace("_", " ")[:18]) if idx < len(self.stages) - 1: ax = x + box_w + 2 ay = y + 13 c.setStrokeColor(MUTED) c.line(ax, ay, ax + gap - 5, ay) c.line(ax + gap - 5, ay, ax + gap - 8, ay + 3) c.line(ax + gap - 5, ay, ax + gap - 8, ay - 3) def _section(title: str, styles: dict[str, ParagraphStyle]) -> Paragraph: return Paragraph(_xml(title), styles["section"]) def _panel(rows: list[list[Any]], col_widths: list[float] | None = None) -> Table: """Detail panel with improved spacing (10pt padding).""" table = Table(rows, colWidths=col_widths, hAlign="LEFT") table.setStyle( TableStyle( [ ("BACKGROUND", (0, 0), (-1, -1), PANEL), ("BOX", (0, 0), (-1, -1), 0.5, LINE), ("INNERGRID", (0, 0), (-1, -1), 0.25, colors.HexColor("#D9DFE8")), # Darker grid ("VALIGN", (0, 0), (-1, -1), "TOP"), ("LEFTPADDING", (0, 0), (-1, -1), 6), # Reduced from 10 ("RIGHTPADDING", (0, 0), (-1, -1), 6), ("TOPPADDING", (0, 0), (-1, -1), 6), # Reduced from 10 ("BOTTOMPADDING", (0, 0), (-1, -1), 6), # Reduced from 10 ] ) ) return table def _header(analysis_json: dict[str, Any], generated_at: str, styles: dict[str, ParagraphStyle]) -> list[Any]: media_type = _media_label(_clean(analysis_json.get("media_type"), "unknown")) report_id = _clean(analysis_json.get("record_id")) or _clean(analysis_json.get("analysis_id"), "N/A") logo = _image_from_path(LOGO_PATH if LOGO_PATH.exists() else None, 34 * mm, 16 * mm) logo_cell: Any if logo: logo_cell = logo else: logo_cell = Paragraph("DeepShield", styles["title"]) meta = Paragraph( f"Report ID: {_xml(report_id)}
" f"Generated: {_xml(generated_at)}
" f"Media Type: {_xml(media_type)}", styles["meta"], ) table = Table([[logo_cell, meta]], colWidths=[85 * mm, 91 * mm]) table.setStyle( TableStyle( [ ("VALIGN", (0, 0), (-1, -1), "MIDDLE"), ("LINEBELOW", (0, 0), (-1, -1), 1.1, SLATE), ("BOTTOMPADDING", (0, 0), (-1, -1), 4), # Reduced from 8 ] ) ) return [table, Spacer(1, 4)] def _badge(text: str, color, styles: dict[str, ParagraphStyle]) -> Table: table = Table([[Paragraph(_xml(text), styles["badge"])]], colWidths=[54 * mm]) table.setStyle( TableStyle( [ ("BACKGROUND", (0, 0), (-1, -1), color), ("BOX", (0, 0), (-1, -1), 0, color), ("TOPPADDING", (0, 0), (-1, -1), 4), ("BOTTOMPADDING", (0, 0), (-1, -1), 4), ] ) ) return table def _executive_summary(analysis_json: dict[str, Any], styles: dict[str, ParagraphStyle]) -> list[Any]: verdict = _as_dict(analysis_json.get("verdict")) fake_score = _deepfake_probability(analysis_json) label = _clean(verdict.get("label"), "Inconclusive") severity = verdict.get("severity") color = _severity_badge_color(severity) if severity else _severity_color(fake_score) confidence = _confidence_percent(verdict) llm = _extract_llm_summary(analysis_json) summary_text = _clean( _as_dict(llm).get("paragraph"), "No Gemini explanation summary was saved for this analysis.", ) bullets = _as_list(_as_dict(llm).get("bullets")) bullet_html = "" if bullets: bullet_html = "
" + "
".join(f"- {_xml(b)}" for b in bullets[:4]) if len(bullets) > 4: more = len(bullets) - 4 bullet_html += f"
(+{more} more insight{'s' if more != 1 else ''} available)" detail = [ _badge(label, color, styles), Spacer(1, 6), Paragraph( f"Model confidence: {confidence:.1f}%
" f"Model label: {_xml(verdict.get('model_label'), 'unknown')}
" f"AI explanation summary:
{_xml(summary_text)}{bullet_html}", styles["body"], ), ] table = Table( [[ScoreGauge(fake_score, label), detail]], colWidths=[64 * mm, 110 * mm], ) table.setStyle( TableStyle( [ ("BACKGROUND", (0, 0), (-1, -1), PANEL), ("BOX", (0, 0), (-1, -1), 0.6, LINE), ("VALIGN", (0, 0), (-1, -1), "MIDDLE"), ("LEFTPADDING", (0, 0), (-1, -1), 6), # Reduced from 10 ("RIGHTPADDING", (0, 0), (-1, -1), 6), ("TOPPADDING", (0, 0), (-1, -1), 6), # Reduced from 10 ("BOTTOMPADDING", (0, 0), (-1, -1), 6), # Reduced from 10 ] ) ) return [_section("Executive Summary: The Verdict", styles), table] def _media_context(analysis_json: dict[str, Any], record: AnalysisRecord, styles: dict[str, ParagraphStyle]) -> list[Any]: media_type = _clean(analysis_json.get("media_type"), record.media_type).lower() expl = _as_dict(analysis_json.get("explainability")) story: list[Any] = [_section("Analyzed Media Context", styles)] if media_type == "text": snippet, was_truncated = _shorten(expl.get("original_text"), 950) truncation_note = "
[...more text not shown]" if was_truncated else "" story.append( _panel( [[Paragraph(f"“{_xml(snippet, 'No text snippet was stored.')}”{truncation_note}", styles["quote"])]], [176 * mm], ) ) return story if media_type in {"image", "screenshot", "video"}: thumb_url = analysis_json.get("thumbnail_url") or record.thumbnail_url thumb = ( _image_from_url(thumb_url, 72 * mm, 48 * mm) or _image_from_path(_resolve_media_path(thumb_url), 72 * mm, 48 * mm) ) media_url = analysis_json.get("media_path") or record.media_path original = ( _image_from_url(media_url, 72 * mm, 48 * mm) or _image_from_path(_resolve_media_path(media_url), 72 * mm, 48 * mm) ) image_cell: Any = thumb or original or Paragraph("Original thumbnail unavailable", styles["small"]) text_value, was_truncated = _shorten(expl.get("extracted_text") or expl.get("transcript"), 800) truncation_note = " [+more not shown]" if was_truncated else "" text_label = "Extracted OCR text" if media_type == "screenshot" else "Context notes" text_cell = Paragraph( f"{text_label}
{_xml(text_value, 'No OCR or transcript text was recorded.')}{truncation_note}", styles["body"], ) table = Table([[image_cell, text_cell]], colWidths=[78 * mm, 98 * mm]) table.setStyle( TableStyle( [ ("BACKGROUND", (0, 0), (-1, -1), PANEL), ("BOX", (0, 0), (-1, -1), 0.5, LINE), ("VALIGN", (0, 0), (-1, -1), "TOP"), ("LEFTPADDING", (0, 0), (-1, -1), 6), ("RIGHTPADDING", (0, 0), (-1, -1), 6), ("TOPPADDING", (0, 0), (-1, -1), 6), # Reduced from 8 ("BOTTOMPADDING", (0, 0), (-1, -1), 6), # Reduced from 8 ] ) ) story.append(table) return story if media_type == "audio": transcript, was_truncated = _shorten(expl.get("transcript") or expl.get("extracted_transcript"), 850) truncation_note = " [+more not shown]" if was_truncated else "" duration = _clamp(expl.get("duration_s"), 0, 10_000_000) fmt = _clean(analysis_json.get("audio_format") or analysis_json.get("format"), "not recorded") story.append( _panel( [ [ Paragraph("Duration", styles["small"]), Paragraph(f"{duration:.1f} seconds", styles["body"]), ], [ Paragraph("Format", styles["small"]), Paragraph(_xml(fmt), styles["body"]), ], [ Paragraph("Transcript", styles["small"]), Paragraph(_xml(transcript, "No transcript was recorded.") + truncation_note, styles["body"]), ], ], [42 * mm, 134 * mm], ) ) return story story.append(Paragraph("No media context was recorded for this analysis.", styles["small"])) return story ANOMALY_LABELS = { "facial_symmetry": ("Face alignment", "Facial landmarks do not line up as naturally as expected."), "skin_texture": ("Skin texture", "Skin detail appears unusually smooth, noisy, or inconsistent."), "lighting_consistency": ("Lighting consistency", "The face lighting does not match the surrounding scene."), "background_coherence": ("Background coherence", "Edges or background objects look inconsistent with the subject."), "anatomy_hands_eyes": ("Eyes and anatomy", "Eye glare, hands, or anatomy show unnatural structure."), "context_objects": ("Scene context", "Objects or scene details conflict with the claimed context."), } def _xai_rows(analysis_json: dict[str, Any], styles: dict[str, ParagraphStyle]) -> list[list[Any]]: media_type = _clean(analysis_json.get("media_type")).lower() expl = _as_dict(analysis_json.get("explainability")) rows: list[list[Any]] = [ [ Paragraph("Signal", styles["small"]), Paragraph("Strength", styles["small"]), Paragraph("Plain-language reason", styles["small"]), ] ] for indicator in _as_list(expl.get("artifact_indicators"))[:8]: item = _as_dict(indicator) confidence = _clamp(item.get("confidence"), 0, 1) * 100 rows.append( [ Paragraph(_xml(item.get("type"), "Artifact"), styles["body"]), Paragraph(f"{confidence:.0f}%
{_xml(item.get('severity'), 'signal')}", styles["small"]), Paragraph(_xml(item.get("description"), "The visual evidence contains an unusual pattern."), styles["body"]), ] ) vlm = _as_dict(expl.get("vlm_breakdown")) for key, (label, fallback) in ANOMALY_LABELS.items(): comp = _as_dict(vlm.get(key)) if not comp: continue consistency = _clamp(comp.get("score"), 0, 100) anomaly = 100 - consistency if anomaly < 18 and not comp.get("notes"): continue reason = _clean(comp.get("notes"), fallback) rows.append( [ Paragraph(_xml(label), styles["body"]), Paragraph(_format_anomaly_score(consistency), styles["small"]), Paragraph(_xml(reason), styles["body"]), ] ) if media_type in {"text", "screenshot"}: for indicator in _as_list(expl.get("manipulation_indicators"))[:6]: item = _as_dict(indicator) rows.append( [ Paragraph(_xml(item.get("pattern_type"), "Text signal"), styles["body"]), Paragraph(_xml(item.get("severity"), "medium"), styles["small"]), Paragraph(_xml(item.get("description"), "The wording may be manipulative or misleading."), styles["body"]), ] ) for phrase in _as_list(expl.get("suspicious_phrases"))[:6]: item = _as_dict(phrase) rows.append( [ Paragraph(_xml(item.get("pattern_type"), "Suspicious phrase"), styles["body"]), Paragraph(_xml(item.get("severity"), "medium"), styles["small"]), Paragraph(_xml(item.get("description"), item.get("text", "OCR text was flagged.")), styles["body"]), ] ) for anomaly in _as_list(expl.get("layout_anomalies"))[:5]: item = _as_dict(anomaly) rows.append( [ Paragraph(_xml(item.get("type"), "Layout anomaly"), styles["body"]), Paragraph(f"{_clamp(item.get('confidence'), 0, 1) * 100:.0f}%", styles["small"]), Paragraph(_xml(item.get("description"), "The screenshot layout is visually inconsistent."), styles["body"]), ] ) if media_type in {"audio", "video"}: audio = _as_dict(expl.get("audio") or expl) if audio: rows.append( [ Paragraph("Audio consistency", styles["body"]), Paragraph(f"{100 - _clamp(audio.get('audio_authenticity_score')):.0f}% anomaly", styles["small"]), Paragraph(_xml(audio.get("notes"), "Audio signal features were compared for voice consistency."), styles["body"]), ] ) if len(rows) == 1: rows.append( [ Paragraph("No strong anomaly", styles["body"]), Paragraph("Low", styles["small"]), Paragraph("The saved model output did not include detailed anomaly markers.", styles["body"]), ] ) return rows def _xai_breakdown(analysis_json: dict[str, Any], styles: dict[str, ParagraphStyle]) -> list[Any]: rows = _xai_rows(analysis_json, styles) table = Table(rows, colWidths=[44 * mm, 30 * mm, 102 * mm], repeatRows=1) table.setStyle( TableStyle( [ ("BACKGROUND", (0, 0), (-1, 0), PANEL_2), ("TEXTCOLOR", (0, 0), (-1, 0), SLATE), ("FONTNAME", (0, 0), (-1, 0), FONT_SANS_BOLD), ("FONTSIZE", (0, 0), (-1, 0), 9), ("BACKGROUND", (0, 1), (-1, -1), colors.white), ("BOX", (0, 0), (-1, -1), 0.5, LINE), ("INNERGRID", (0, 0), (-1, -1), 0.25, colors.HexColor("#D9DFE8")), ("VALIGN", (0, 0), (-1, -1), "TOP"), ("NOSPLIT", (0, 0), (-1, 1)), ("LEFTPADDING", (0, 0), (-1, -1), 6), # Reduced from 8 ("RIGHTPADDING", (0, 0), (-1, -1), 6), ("TOPPADDING", (0, 0), (-1, -1), 6), # Reduced from 8 ("BOTTOMPADDING", (0, 0), (-1, -1), 6), # Reduced from 8 ] ) ) return [_section("XAI Detailed Breakdown", styles), table] def _forensic_visuals(analysis_json: dict[str, Any], styles: dict[str, ParagraphStyle]) -> list[Any]: media_type = _clean(analysis_json.get("media_type")).lower() if media_type not in {"image", "screenshot", "video"}: return [] expl = _as_dict(analysis_json.get("explainability")) candidates = [ ( "AI Activation Heatmap", "Grad-CAM++ activation showing regions the AI model focused on when making its decision.", expl.get("heatmap_base64"), expl.get("heatmap_url"), ), ( "Error Level Analysis (ELA)", "Heatmap indicating areas of high compression loss, often associated with digital splicing.", expl.get("ela_base64"), expl.get("ela_url"), ), ( "Manipulation Region Overlay", "Bounding boxes highlight regions the visual model treated as suspicious or manipulated.", expl.get("boxes_base64"), expl.get("boxes_url"), ), ] visuals: list[tuple[str, str, Image]] = [] for title, caption, b64_data, url_data in candidates: img = ( _image_from_base64(b64_data, 78 * mm, 58 * mm) or _image_from_url(url_data, 78 * mm, 58 * mm) or _image_from_path(_resolve_media_path(url_data), 78 * mm, 58 * mm) or _placeholder_image(78 * mm, 58 * mm) ) visuals.append((title, caption, img)) rows = [] current_row = [] for title, caption, image in visuals: current_row.append([ Paragraph(f"{_xml(title)}", styles["body"]), image, Paragraph(_xml(caption), styles["caption"]), ]) if len(current_row) == 2: rows.append(current_row) current_row = [] if current_row: current_row.append([]) rows.append(current_row) table = Table(rows, colWidths=[88 * mm, 88 * mm]) table.setStyle( TableStyle( [ ("BACKGROUND", (0, 0), (-1, -1), PANEL), ("BOX", (0, 0), (-1, -1), 0.5, LINE), ("VALIGN", (0, 0), (-1, -1), "TOP"), ("LEFTPADDING", (0, 0), (-1, -1), 6), ("RIGHTPADDING", (0, 0), (-1, -1), 6), ("TOPPADDING", (0, 0), (-1, -1), 6), ("BOTTOMPADDING", (0, 0), (-1, -1), 6), ] ) ) return [_section("Forensic Visual Evidence", styles), table] def _text_metric_chart(analysis_json: dict[str, Any], styles: dict[str, ParagraphStyle]) -> list[Any]: media_type = _clean(analysis_json.get("media_type")).lower() if media_type not in {"text", "screenshot"}: return [] expl = _as_dict(analysis_json.get("explainability")) sens = _as_dict(expl.get("sensationalism")) metrics = [ ("Deepfake probability", _clamp(expl.get("fake_probability"), 0, 1) * 100, f"{_clamp(expl.get('fake_probability'), 0, 1) * 100:.0f}%"), ("Sensationalism score", _clamp(sens.get("score")), f"{_clamp(sens.get('score')):.0f}/100"), ("Exclamations", min(_clamp(sens.get("exclamation_count"), 0, 20) * 5, 100), _clean(sens.get("exclamation_count"), "0")), ("ALL CAPS words", min(_clamp(sens.get("caps_word_count"), 0, 25) * 4, 100), _clean(sens.get("caps_word_count"), "0")), ("Emotional words", min(_clamp(sens.get("emotional_word_count"), 0, 20) * 5, 100), _clean(sens.get("emotional_word_count"), "0")), ("Clickbait matches", min(_clamp(sens.get("clickbait_matches"), 0, 10) * 10, 100), _clean(sens.get("clickbait_matches"), "0")), ] return [ _section("Text & Metadata Analysis", styles), _panel([[BarChart(metrics)]], [176 * mm]), ] def _exif_metadata(analysis_json: dict[str, Any], styles: dict[str, ParagraphStyle]) -> list[Any]: exif = _as_dict(_as_dict(analysis_json.get("explainability")).get("exif")) if not exif: return [] rows = [[Paragraph("Field", styles["small"]), Paragraph("Value", styles["small"])]] for key in ["make", "model", "datetime_original", "software", "lens_model", "gps_info", "trust_reason"]: value = _clean(exif.get(key)) if value: rows.append([Paragraph(key.replace("_", " ").title(), styles["body"]), Paragraph(_xml(value), styles["body"])]) rows.append( [ Paragraph("Trust Adjustment", styles["body"]), Paragraph(_xml(exif.get("trust_adjustment"), "0"), styles["body"]), ] ) table = Table(rows, colWidths=[48 * mm, 128 * mm], repeatRows=1) table.setStyle( TableStyle( [ ("BACKGROUND", (0, 0), (-1, 0), PANEL_2), ("FONTNAME", (0, 0), (-1, 0), FONT_SANS_BOLD), ("FONTSIZE", (0, 0), (-1, 0), 9), ("BOX", (0, 0), (-1, -1), 0.5, LINE), ("INNERGRID", (0, 0), (-1, -1), 0.25, colors.HexColor("#D9DFE8")), ("VALIGN", (0, 0), (-1, -1), "TOP"), ("NOSPLIT", (0, 0), (-1, 1)), ("LEFTPADDING", (0, 0), (-1, -1), 8), # Increased from 7 ("RIGHTPADDING", (0, 0), (-1, -1), 8), ("TOPPADDING", (0, 0), (-1, -1), 7), # Increased from 5 ("BOTTOMPADDING", (0, 0), (-1, -1), 7), # Increased from 5 ] ) ) return [_section("Image Metadata Signals", styles), table] def _trusted_sources(analysis_json: dict[str, Any], styles: dict[str, ParagraphStyle]) -> list[Any]: sources = [_as_dict(s) for s in _as_list(analysis_json.get("trusted_sources")) if _as_dict(s).get("url")] if not sources: return [ _section("Trusted Source Cross-Reference", styles), Paragraph("No trusted news sources were returned for this analysis.", styles["small"]), ] rows = [ [ Paragraph("Source", styles["small"]), Paragraph("Title", styles["small"]), Paragraph("Relevance Score", styles["small"]), ] ] for source in sources[:10]: url = _xml(source.get("url")) source_name = _xml(source.get("source_name"), "Source") title = _xml(source.get("title"), source.get("url")) rows.append( [ Paragraph(f'{source_name}', styles["link"]), Paragraph(f'{title}', styles["link"]), Paragraph(f"{_clamp(source.get('relevance_score'), 0, 1) * 100:.0f}%", styles["body"]), ] ) table = Table(rows, colWidths=[40 * mm, 104 * mm, 32 * mm], repeatRows=1) table.setStyle( TableStyle( [ ("BACKGROUND", (0, 0), (-1, 0), PANEL_2), ("FONTNAME", (0, 0), (-1, 0), FONT_SANS_BOLD), ("FONTSIZE", (0, 0), (-1, 0), 9), ("BOX", (0, 0), (-1, -1), 0.5, LINE), ("INNERGRID", (0, 0), (-1, -1), 0.25, colors.HexColor("#D9DFE8")), ("VALIGN", (0, 0), (-1, -1), "TOP"), ("NOSPLIT", (0, 0), (-1, 1)), ("LEFTPADDING", (0, 0), (-1, -1), 8), # Increased from 6 ("RIGHTPADDING", (0, 0), (-1, -1), 8), ("TOPPADDING", (0, 0), (-1, -1), 8), # Increased from 6 ("BOTTOMPADDING", (0, 0), (-1, -1), 8), # Increased from 6 ] ) ) return [_section("Trusted Source Cross-Reference", styles), table] def _processing_pipeline(analysis_json: dict[str, Any], styles: dict[str, ParagraphStyle]) -> list[Any]: summary = _as_dict(analysis_json.get("processing_summary")) stages = [_clean(s) for s in _as_list(summary.get("stages_completed")) if _clean(s)] duration = _clamp(summary.get("total_duration_ms"), 0, 10_000_000) return [ _section("Processing Pipeline", styles), _panel( [ [PipelineFlow(stages)], [ Paragraph( f"Total duration: {duration / 1000:.2f}s    " f"Model: {_xml(summary.get('model_used'), 'not recorded')}
" f"Stages: {_xml(' -> '.join(stages), 'not recorded')}", styles["body"], ) ], ], [176 * mm], ), ] def _footer_notice(analysis_json: dict[str, Any]) -> str: return _clean( analysis_json.get("responsible_ai_notice"), "DeepShield Responsible-AI Notice: AI analysis can be wrong; verify before sharing.", ) def _draw_footer(canvas, doc, notice: str) -> None: canvas.saveState() width, _height = A4 y = 13 * mm canvas.setStrokeColor(LINE) canvas.setLineWidth(0.4) canvas.line(doc.leftMargin, y + 8, width - doc.rightMargin, y + 8) canvas.setFont(FONT_SANS, 7) canvas.setFillColor(MUTED) canvas.drawString(doc.leftMargin, y, "Expiry Notice: report links expire according to the configured retention policy.") canvas.drawRightString(width - doc.rightMargin, y, f"Page {doc.page}") canvas.setFont(FONT_SANS_BOLD, 7.2) canvas.drawCentredString(width / 2, y + 10, "DeepShield Responsible-AI Notice") canvas.setFont(FONT_SANS, 6.6) canvas.drawCentredString(width / 2, y + 1, notice[:128]) canvas.restoreState() def _build_story(record: AnalysisRecord, analysis_json: dict[str, Any], generated_at: str) -> list[Any]: styles = _styles() story: list[Any] = [] story.extend(_header(analysis_json, generated_at, styles)) story.extend(_executive_summary(analysis_json, styles)) story.extend(_media_context(analysis_json, record, styles)) story.extend(_xai_breakdown(analysis_json, styles)) story.extend(_forensic_visuals(analysis_json, styles)) story.extend(_text_metric_chart(analysis_json, styles)) story.extend(_exif_metadata(analysis_json, styles)) story.extend(_trusted_sources(analysis_json, styles)) story.extend(_processing_pipeline(analysis_json, styles)) return story def render_html(analysis_json: dict[str, Any]) -> str: """Compatibility shim for older callers. PDF generation now uses ReportLab directly so hyperlinks, footers, charts, and images are reliable. This compact HTML preview is intentionally not the source of truth for report rendering. """ verdict = _as_dict(analysis_json.get("verdict")) return ( "" f"

DeepShield Report

" f"

Media: {_xml(analysis_json.get('media_type'), 'unknown')}

" f"

Verdict: {_xml(verdict.get('label'), 'Inconclusive')}

" f"

Deepfake probability: {_deepfake_probability(analysis_json)}/100

" "" ) def html_to_pdf(html: str, out_path: Path) -> None: """Deprecated compatibility entrypoint. The modern report pipeline renders from structured analysis JSON. This method remains so imports do not break, but it is no longer used internally. """ doc = SimpleDocTemplate(str(out_path), pagesize=A4, pageCompression=0) styles = _styles() doc.build([Paragraph(_xml(html), styles["body"])]) def _fallback_pdf(record: AnalysisRecord, analysis_json: dict[str, Any], out_path: Path) -> None: styles = _styles() notice = _footer_notice(analysis_json) doc = SimpleDocTemplate( str(out_path), pagesize=A4, rightMargin=18 * mm, leftMargin=18 * mm, topMargin=16 * mm, bottomMargin=24 * mm, pageCompression=0, ) story = [ Paragraph("DeepShield Analysis Report", styles["title"]), Paragraph(f"Record #{record.id} - {_xml(record.media_type)}", styles["body"]), Paragraph(f"Verdict: {_xml(record.verdict)}", styles["body"]), Paragraph(f"Deepfake probability: {_deepfake_probability(analysis_json)}/100", styles["body"]), ] doc.build(story, onFirstPage=lambda c, d: _draw_footer(c, d, notice), onLaterPages=lambda c, d: _draw_footer(c, d, notice)) def generate_report(record: AnalysisRecord) -> Path: out_dir = _ensure_dir() filename = f"deepshield_{record.id}_{uuid.uuid4().hex[:8]}.pdf" out_path = out_dir / filename data = json.loads(record.result_json) generated_at = _generated_at_ist() notice = _footer_notice(data) doc = SimpleDocTemplate( str(out_path), pagesize=A4, rightMargin=17 * mm, leftMargin=17 * mm, topMargin=14 * mm, bottomMargin=25 * mm, title=f"DeepShield Threat Intelligence Report {record.id}", author="DeepShield", pageCompression=0, ) try: story = _build_story(record, data, generated_at) doc.build( story, onFirstPage=lambda c, d: _draw_footer(c, d, notice), onLaterPages=lambda c, d: _draw_footer(c, d, notice), ) except Exception as exc: # noqa: BLE001 logger.warning(f"ReportLab renderer failed for report {record.id}, using minimal fallback: {exc}") _fallback_pdf(record, data, out_path) logger.info(f"Report generated id={record.id} path={out_path} size={out_path.stat().st_size}B") return out_path def create_report_row(analysis_id: int, path: Path) -> Report: return Report( analysis_id=analysis_id, file_path=str(path), expires_at=datetime.utcnow() + timedelta(seconds=settings.REPORT_TTL_SECONDS), ) def cleanup_expired(now: Optional[datetime] = None) -> int: """Delete expired PDFs from disk. Returns count deleted.""" now = now or datetime.utcnow() directory = Path(settings.REPORT_DIR) if not directory.exists(): return 0 deleted = 0 ttl = timedelta(seconds=settings.REPORT_TTL_SECONDS) for path in directory.glob("*.pdf"): try: mtime = datetime.utcfromtimestamp(path.stat().st_mtime) if now - mtime > ttl: path.unlink() deleted += 1 except OSError as exc: logger.warning(f"Cleanup failed for {path}: {exc}") if deleted: logger.info(f"Cleaned up {deleted} expired reports") return deleted