Spaces:
Running
Running
| """ | |
| VERIDEX β Visualization Engine | |
| ================================ | |
| Generates heatmaps, module score graphs, ELA overlays, | |
| and forensic annotation images for the frontend. | |
| All outputs are returned as base64-encoded PNG strings. | |
| """ | |
| import io | |
| import base64 | |
| import logging | |
| import numpy as np | |
| from PIL import Image, ImageDraw, ImageFont | |
| logger = logging.getLogger(__name__) | |
| def _to_b64(img: Image.Image, fmt: str = "PNG") -> str: | |
| """Convert PIL image to base64 data-URI string.""" | |
| buf = io.BytesIO() | |
| img.save(buf, format=fmt, optimize=True) | |
| return base64.b64encode(buf.getvalue()).decode() | |
| def _font(size: int = 12): | |
| """Load a font, falling back to default if not available.""" | |
| try: | |
| return ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", size) | |
| except Exception: | |
| return ImageFont.load_default() | |
| # ββ ELA Heatmap ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def ela_heatmap(content: bytes, quality: int = 75) -> str | None: | |
| """ | |
| Error Level Analysis heatmap. | |
| Re-saves the image at `quality` and amplifies the difference. | |
| Returns base64 PNG string, or None on failure. | |
| """ | |
| try: | |
| original = Image.open(io.BytesIO(content)).convert("RGB") | |
| arr_orig = np.array(original, dtype=np.float32) | |
| buf = io.BytesIO() | |
| original.save(buf, format="JPEG", quality=quality) | |
| reloaded = np.array(Image.open(buf).convert("RGB"), dtype=np.float32) | |
| ela = np.abs(arr_orig - reloaded) | |
| ela = (ela * 10.0).clip(0, 255).astype(np.uint8) # amplify 10Γ | |
| # Apply a red-yellow-white colourmap | |
| heat = np.zeros((*ela.shape[:2], 3), dtype=np.uint8) | |
| mag = ela.mean(axis=2) | |
| heat[:, :, 0] = np.clip(mag * 3, 0, 255).astype(np.uint8) # R | |
| heat[:, :, 1] = np.clip(mag * 1.5, 0, 255).astype(np.uint8) # G | |
| heat[:, :, 2] = np.zeros_like(mag, dtype=np.uint8) # B | |
| out = Image.fromarray(heat) | |
| out = out.resize(original.size, Image.LANCZOS) | |
| return _to_b64(out) | |
| except Exception as e: | |
| logger.warning(f"[Viz] ela_heatmap failed: {e}") | |
| return None | |
| # ββ Noise Map ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def noise_heatmap(content: bytes) -> str | None: | |
| """ | |
| Laplacian noise variance heatmap. | |
| Highlights regions with inconsistent noise (splicing indicator). | |
| """ | |
| try: | |
| from scipy.ndimage import laplace, uniform_filter | |
| img = Image.open(io.BytesIO(content)).convert("RGB") | |
| arr = np.array(img, dtype=np.float32) | |
| gray = arr.mean(axis=2) | |
| lap = np.abs(laplace(gray)) | |
| # Tile-wise variance | |
| tile = 16 | |
| var_map = np.zeros_like(gray) | |
| for y in range(0, gray.shape[0], tile): | |
| for x in range(0, gray.shape[1], tile): | |
| block = lap[y:y + tile, x:x + tile] | |
| var_map[y:y + tile, x:x + tile] = np.var(block) | |
| # Normalise to [0, 255] | |
| vm = var_map | |
| vm = ((vm - vm.min()) / (vm.max() - vm.min() + 1e-9) * 255).astype(np.uint8) | |
| # Blue-cyan colourmap | |
| heat = np.zeros((*vm.shape, 3), dtype=np.uint8) | |
| heat[:, :, 0] = np.zeros_like(vm) | |
| heat[:, :, 1] = (vm * 0.8).astype(np.uint8) | |
| heat[:, :, 2] = vm | |
| out = Image.fromarray(heat).resize(img.size, Image.NEAREST) | |
| return _to_b64(out) | |
| except Exception as e: | |
| logger.warning(f"[Viz] noise_heatmap failed: {e}") | |
| return None | |
| # ββ Module Score Radar / Bar Chart βββββββββββββββββββββββββββββββ | |
| def module_score_chart(scores: dict[int | str, float], width: int = 900, height: int = 420) -> str | None: | |
| """ | |
| Horizontal bar chart of all module scores. | |
| Green = authentic (β₯0.65), Red = suspicious (β€0.35), Grey = neutral. | |
| Returns base64 PNG. | |
| """ | |
| try: | |
| from utils.module_runner import MODULE_NAMES | |
| sorted_items = sorted(scores.items(), key=lambda x: int(x[0])) | |
| n = len(sorted_items) | |
| if n == 0: | |
| return None | |
| bar_h = max(8, min(14, height // n)) | |
| h_actual = bar_h * n + 60 | |
| img = Image.new("RGB", (width, h_actual), (18, 18, 30)) | |
| draw = ImageDraw.Draw(img) | |
| font_sm = _font(9) | |
| font_ti = _font(11) | |
| draw.text((10, 8), "VERIDEX β Module Score Overview", fill=(200, 200, 220), font=font_ti) | |
| draw.text((10, 22), f"{n} modules analysed", fill=(120, 120, 140), font=font_sm) | |
| label_w = 220 | |
| bar_max = width - label_w - 80 | |
| for i, (mid, score) in enumerate(sorted_items): | |
| y = 45 + i * bar_h | |
| sc = float(score) | |
| mid_i = int(mid) | |
| name = MODULE_NAMES.get(mid_i, f"Module {mid_i}")[:35] | |
| # Colour | |
| if sc >= 0.65: | |
| colour = (40, 200, 100) | |
| elif sc <= 0.35: | |
| colour = (220, 60, 60) | |
| else: | |
| colour = (120, 120, 160) | |
| bar_w = int(sc * bar_max) | |
| draw.rectangle([label_w, y, label_w + bar_w, y + bar_h - 2], fill=colour) | |
| draw.text((4, y + 1), | |
| f"{mid_i:>2}. {name}", fill=(180, 180, 200), font=font_sm) | |
| draw.text((label_w + bar_w + 4, y + 1), | |
| f"{sc:.2f}", fill=colour, font=font_sm) | |
| return _to_b64(img) | |
| except Exception as e: | |
| logger.warning(f"[Viz] module_score_chart failed: {e}") | |
| return None | |
| # ββ Verdict Summary Card βββββββββββββββββββββββββββββββββββββββββ | |
| def verdict_card( | |
| verdict: str, | |
| confidence: float, | |
| risk_score: float, | |
| case_id: str, | |
| filename: str, | |
| width: int = 600, | |
| height: int = 200, | |
| ) -> str | None: | |
| """ | |
| Compact verdict summary image β embeddable in PDF or frontend. | |
| """ | |
| try: | |
| COLOURS = { | |
| "AUTHENTIC": (30, 200, 90), | |
| "LIKELY AUTHENTIC": (80, 180, 80), | |
| "INCONCLUSIVE": (160, 160, 60), | |
| "LIKELY SYNTHETIC": (220, 120, 30), | |
| "SYNTHETIC": (220, 40, 40), | |
| } | |
| col = COLOURS.get(verdict.upper(), (120, 120, 120)) | |
| bg = (18, 18, 30) | |
| img = Image.new("RGB", (width, height), bg) | |
| draw = ImageDraw.Draw(img) | |
| # Left colour bar | |
| draw.rectangle([0, 0, 8, height], fill=col) | |
| font_lg = _font(28) | |
| font_md = _font(14) | |
| font_sm = _font(10) | |
| draw.text((20, 15), "VERIDEX VERDICT", fill=(140, 140, 180), font=font_sm) | |
| draw.text((20, 32), verdict, fill=col, font=font_lg) | |
| draw.text((20, 80), f"Confidence : {confidence:.1%}", fill=(200, 200, 210), font=font_md) | |
| draw.text((20, 102), f"Risk Score : {risk_score:.1f}/100", fill=(200, 200, 210), font=font_md) | |
| draw.text((20, 130), f"Case : {case_id}", fill=(120, 120, 150), font=font_sm) | |
| draw.text((20, 145), f"File : {filename[:50]}", fill=(120, 120, 150), font=font_sm) | |
| # Risk bar | |
| bar_x, bar_y, bar_w, bar_h2 = 20, 168, width - 40, 12 | |
| draw.rectangle([bar_x, bar_y, bar_x + bar_w, bar_y + bar_h2], fill=(40, 40, 55)) | |
| risk_fill = int(risk_score / 100 * bar_w) | |
| draw.rectangle([bar_x, bar_y, bar_x + risk_fill, bar_y + bar_h2], fill=col) | |
| draw.text((bar_x, bar_y - 12), "Risk Level", fill=(100, 100, 130), font=font_sm) | |
| return _to_b64(img) | |
| except Exception as e: | |
| logger.warning(f"[Viz] verdict_card failed: {e}") | |
| return None | |
| # ββ Overlay: annotate image ββββββββββββββββββββββββββββββββββββββ | |
| def annotate_image(content: bytes, findings: list[str], verdict: str) -> str | None: | |
| """ | |
| Returns the original image with a forensic annotation overlay | |
| (verdict badge + key findings text strip). | |
| """ | |
| try: | |
| COLOURS = { | |
| "AUTHENTIC": (30, 200, 90), | |
| "LIKELY AUTHENTIC": (80, 180, 80), | |
| "INCONCLUSIVE": (200, 180, 40), | |
| "LIKELY SYNTHETIC": (220, 120, 30), | |
| "SYNTHETIC": (220, 40, 40), | |
| } | |
| col = COLOURS.get(verdict.upper(), (160, 160, 160)) | |
| img = Image.open(io.BytesIO(content)).convert("RGB") | |
| w, h = img.size | |
| strip_h = 28 + len(findings[:4]) * 16 | |
| out = Image.new("RGB", (w, h + strip_h), (18, 18, 30)) | |
| out.paste(img, (0, 0)) | |
| draw = ImageDraw.Draw(out) | |
| font_vd = _font(13) | |
| font_sm = _font(10) | |
| # Verdict bar | |
| draw.rectangle([0, h, w, h + 24], fill=(28, 28, 45)) | |
| draw.rectangle([0, h, 8, h + 24], fill=col) | |
| draw.text((14, h + 5), f"VERIDEX: {verdict}", fill=col, font=font_vd) | |
| # Findings | |
| for i, finding in enumerate(findings[:4]): | |
| fy = h + 28 + i * 16 | |
| draw.text((10, fy), f"β’ {finding[:90]}", fill=(180, 180, 200), font=font_sm) | |
| return _to_b64(out) | |
| except Exception as e: | |
| logger.warning(f"[Viz] annotate_image failed: {e}") | |
| return None | |
| # ββ Public convenience function ββββββββββββββββββββββββββββββββββ | |
| def generate_all_visuals( | |
| content: bytes, | |
| content_type: str, | |
| scores: dict, | |
| verdict: str, | |
| confidence: float, | |
| risk_score: float, | |
| case_id: str, | |
| filename: str, | |
| findings: list[str], | |
| ) -> dict[str, str | None]: | |
| """ | |
| Generate all visuals for a VERIDEX analysis result. | |
| Returns a dict of { visual_name: base64_png | None }. | |
| Only runs for image content types. | |
| """ | |
| visuals: dict[str, str | None] = {} | |
| if content_type.startswith("image/"): | |
| visuals["ela_heatmap"] = ela_heatmap(content) | |
| visuals["noise_heatmap"] = noise_heatmap(content) | |
| visuals["annotated"] = annotate_image(content, findings, verdict) | |
| visuals["score_chart"] = module_score_chart(scores) | |
| visuals["verdict_card"] = verdict_card( | |
| verdict, confidence, risk_score, case_id, filename | |
| ) | |
| generated = sum(1 for v in visuals.values() if v is not None) | |
| logger.info(f"[Viz] Generated {generated}/{len(visuals)} visuals for {case_id}") | |
| return visuals | |