""" forensics.py - Document forensics core module Reusable analysis functions extracted from anomaly_detection_banking.ipynb. Imported by app.py (Streamlit) and the notebook. Public API: analyse_document(path) - end-to-end pipeline score_image(path) - image-only forensic score error_level_analysis(path) - ELA image + score copy_move_detect(path) - copy-move heatmap + match count noise_inconsistency(path) - noise heatmap + outlier ratio exif_sanity(path) - metadata flags pdf_structural_audit(path) - EOF count + producer/creator pdf_font_audit(path) - font count + flags ocr_text(path) - OCR (no-op if Tesseract missing) text_rule_checks(text) - date/amount/IFSC sanity extract_features(path) - feature vector for ML model predict_with_model(path) - run trained Random Forest if present generate_insights(score, sub, flags) - rule-based bullets band(score) - score -> LOW/MEDIUM/HIGH/CRITICAL """ import os import io import re import math import json import hashlib import shutil import warnings from pathlib import Path from datetime import datetime import numpy as np import pandas as pd from PIL import Image, ImageChops, ImageEnhance import cv2 import fitz # PyMuPDF import pytesseract try: import ai_detector AI_DETECTOR_OK = True except Exception: AI_DETECTOR_OK = False warnings.filterwarnings("ignore") # ------------------------------------------------------------- # Tesseract auto-detect (Windows-friendly) # ------------------------------------------------------------- TESSERACT_OK = False for _c in [ shutil.which("tesseract"), r"C:\Program Files\Tesseract-OCR\tesseract.exe", r"C:\Program Files (x86)\Tesseract-OCR\tesseract.exe", os.path.expanduser(r"~\AppData\Local\Programs\Tesseract-OCR\tesseract.exe"), ]: if _c and os.path.isfile(_c): pytesseract.pytesseract.tesseract_cmd = _c TESSERACT_OK = True break # ------------------------------------------------------------- # Image forensics # ------------------------------------------------------------- def error_level_analysis(path, quality=90, scale=15): orig = Image.open(path).convert("RGB") buf = io.BytesIO() orig.save(buf, "JPEG", quality=quality) buf.seek(0) resaved = Image.open(buf) diff = ImageChops.difference(orig, resaved) extrema = diff.getextrema() max_diff = max([e[1] for e in extrema]) or 1 ela = ImageEnhance.Brightness(diff).enhance(scale * 255 / max_diff) score = float(np.array(diff).mean()) return ela, score def copy_move_detect(path, min_dist=40, max_matches=80): img = cv2.imread(str(path)) if img is None: return None, 0, [] gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) orb = cv2.ORB_create(nfeatures=2000) kp, des = orb.detectAndCompute(gray, None) if des is None or len(kp) < 10: return img, 0, [] bf = cv2.BFMatcher(cv2.NORM_HAMMING) matches = bf.knnMatch(des, des, k=10) good = [] for m_list in matches: for m in m_list[1:]: p1 = kp[m.queryIdx].pt p2 = kp[m.trainIdx].pt d = math.hypot(p1[0] - p2[0], p1[1] - p2[1]) if d > min_dist and m.distance < 40: good.append((p1, p2, d)) good = good[:max_matches] out = img.copy() for p1, p2, _ in good: cv2.line(out, tuple(map(int, p1)), tuple(map(int, p2)), (0, 0, 255), 1) cv2.circle(out, tuple(map(int, p1)), 3, (0, 255, 0), -1) cv2.circle(out, tuple(map(int, p2)), 3, (0, 255, 0), -1) return out, len(good), good def noise_inconsistency(path, block=32): img = cv2.imread(str(path), cv2.IMREAD_GRAYSCALE) if img is None: return np.zeros((1, 1)), 0.0 H, W = img.shape Hc, Wc = (H // block) * block, (W // block) * block if Hc == 0 or Wc == 0: return np.zeros((1, 1)), 0.0 img = img[:Hc, :Wc] lap = cv2.Laplacian(img, cv2.CV_64F) lap_blocks = (lap.reshape(Hc // block, block, Wc // block, block) .transpose(0, 2, 1, 3) .reshape(-1, block * block)) var = lap_blocks.var(axis=1) z = (var - var.mean()) / (var.std() + 1e-9) suspicious = (np.abs(z) > 2.5).sum() / max(1, len(z)) heat = np.abs(z).reshape(Hc // block, Wc // block) return heat, float(suspicious) def exif_sanity(path): try: img = Image.open(path) exif = img.getexif() except Exception: return ["cannot read image"] if not exif: return ["no EXIF metadata (re-saved or stripped)"] tags = {Image.ExifTags.TAGS.get(k, k): v for k, v in exif.items()} flags = [] sw = str(tags.get("Software", "")).lower() for bad in ["photoshop", "gimp", "paint", "snapseed", "picsart"]: if bad in sw: flags.append("edited with " + bad) if "DateTimeOriginal" in tags and "DateTime" in tags: if tags["DateTimeOriginal"] != tags["DateTime"]: flags.append("modified-time differs from original-time") return flags or ["exif clean"] # ------------------------------------------------------------- # PDF forensics # ------------------------------------------------------------- def pdf_structural_audit(path): raw = Path(path).read_bytes() eofs = raw.count(b"%%EOF") with fitz.open(path) as d: info = d.metadata or {} n_pages = d.page_count flags = [] if eofs > 1: flags.append(f"{eofs} EOF markers (incremental updates)") prod = (info.get("producer") or "").lower() crt = (info.get("creator") or "").lower() if prod and crt and prod != crt: flags.append(f"producer/creator differ: {prod} vs {crt}") for t in ["ilovepdf", "smallpdf", "pdfescape", "sejda", "foxit phantom"]: if t in prod or t in crt: flags.append("edited via consumer tool: " + t) return {"pages": n_pages, "eof_markers": eofs, "metadata": info, "flags": flags or ["clean"]} def pdf_font_audit(path): fonts_per_page = [] with fitz.open(path) as d: for page in d: fonts_per_page.append({f[3] for f in page.get_fonts()}) all_fonts = set().union(*fonts_per_page) if fonts_per_page else set() flags = [] if len(all_fonts) > 4: flags.append("unusually high font count: " + str(len(all_fonts))) return {"fonts": sorted(all_fonts), "flags": flags or ["ok"]} # ------------------------------------------------------------- # OCR + text rules # ------------------------------------------------------------- AMT_RE = re.compile(r"(? ds[i + 1] for i in range(len(ds) - 1)): flags.append("dates not monotonic") except Exception: flags.append("unparseable dates") if amts: big_round = [a for a in amts if a >= 100000 and a % 100000 == 0] if len(big_round) > 3: flags.append(f"{len(big_round)} suspiciously round large amounts") if accs and not ifsc: flags.append("account number present but no IFSC") return {"n_dates": len(dates), "n_amounts": len(amts), "n_ifsc": len(ifsc), "n_accounts": len(accs), "flags": flags or ["ok"]} # ------------------------------------------------------------- # Scoring & insights # ------------------------------------------------------------- WEIGHTS = {"ela": 0.20, "copy_move": 0.25, "noise": 0.15, "exif": 0.10, "pdf_struct": 0.15, "text_rules": 0.10, "math": 0.05} INSIGHT_RULES = [ ("copy_move", 0.4, "Possible copy-paste forgery: repeated visual region. Inspect seal/signature area."), ("ela", 0.4, "Compression artefacts inconsistent with a single-source scan. Likely re-saved after edits."), ("noise", 0.4, "Localised noise inconsistency - common in image splicing."), ("exif", 0.4, "Image metadata indicates edits in a photo-editor or stripped EXIF."), ("pdf_struct", 0.4, "PDF structural anomalies detected (incremental edits / consumer-tool fingerprint)."), ] ACTIONS = { "LOW": "Proceed with standard underwriting.", "MEDIUM": "Request additional verification documents.", "HIGH": "Escalate to fraud-risk team; manual review mandatory.", "CRITICAL": "Block file; trigger investigation workflow.", } def band(score): if score < 0.25: return "LOW" if score < 0.50: return "MEDIUM" if score < 0.75: return "HIGH" return "CRITICAL" def score_image(path): _, ela_s = error_level_analysis(path) _, n_cm, _ = copy_move_detect(path) _, noise_r = noise_inconsistency(path) exif_flags = exif_sanity(path) sub = {"ela": min(ela_s / 25.0, 1.0), "copy_move": min(n_cm / 50.0, 1.0), "noise": min(noise_r * 4, 1.0), "exif": 0.0 if exif_flags == ["exif clean"] else 0.6} total = sum(WEIGHTS[k] * v for k, v in sub.items()) return total, sub, exif_flags def generate_insights(score, sub_scores, extra_flags=None): bullets = [] for key, thresh, msg in INSIGHT_RULES: if sub_scores.get(key, 0) >= thresh: bullets.append(msg) if extra_flags: for f in extra_flags: if f not in ("exif clean", "ok", "clean"): bullets.append("Flag: " + str(f)) if not bullets: bullets.append("No anomaly indicators above threshold.") return {"risk_score": round(score, 3), "risk_band": band(score), "recommended_action": ACTIONS[band(score)], "evidence": bullets} # ------------------------------------------------------------- # ML feature extraction + prediction # ------------------------------------------------------------- MODEL_PATH = Path("models/forgery_rf.joblib") CNN_MODEL_PATH = Path("models/forgery_cnn.keras") CNN_META_PATH = Path("models/forgery_cnn.meta.json") _CNN_CACHE = {"model": None, "meta": None, "tried": False} def _load_cnn(): """Lazy-load the CNN model only when first needed (avoids TF import cost).""" if _CNN_CACHE["tried"]: return _CNN_CACHE["model"], _CNN_CACHE["meta"] _CNN_CACHE["tried"] = True if not CNN_MODEL_PATH.exists(): return None, None try: import tensorflow as tf # local import - heavy _CNN_CACHE["model"] = tf.keras.models.load_model(CNN_MODEL_PATH) if CNN_META_PATH.exists(): _CNN_CACHE["meta"] = json.loads(CNN_META_PATH.read_text()) else: _CNN_CACHE["meta"] = {"image_size": 224, "class_names": ["originals", "tampered"]} except Exception as e: print("CNN load failed:", e) return _CNN_CACHE["model"], _CNN_CACHE["meta"] def predict_with_cnn(path): """Run the trained CNN if forgery_cnn.keras exists. Returns dict or None.""" model, meta = _load_cnn() if model is None: return None img_size = meta.get("image_size", 224) img = Image.open(path).convert("RGB").resize((img_size, img_size)) arr = np.array(img)[None, ...].astype(np.float32) prob = float(model.predict(arr, verbose=0)[0, 0]) return { "tamper_probability": round(prob, 3), "verdict": "TAMPERED" if prob >= 0.5 else "GENUINE", "model": "MobileNetV2 (CASIA v2 fine-tuned)", "val_auc": (meta or {}).get("val_auc"), } def extract_features(path): from skimage.feature import graycomatrix, graycoprops feats = {} _, ela_score = error_level_analysis(path) feats["ela_mean"] = ela_score _, cm_count, _ = copy_move_detect(path) feats["copy_move_matches"] = cm_count _, noise_ratio = noise_inconsistency(path) feats["noise_outlier_ratio"] = noise_ratio feats["exif_clean"] = int(exif_sanity(path) == ["exif clean"]) img = cv2.imread(str(path), cv2.IMREAD_GRAYSCALE) img_s = cv2.resize(img, (256, 256)) glcm = graycomatrix(img_s, [1], [0], 256, symmetric=True, normed=True) feats["glcm_contrast"] = float(graycoprops(glcm, "contrast")[0, 0]) feats["glcm_homogeneity"] = float(graycoprops(glcm, "homogeneity")[0, 0]) feats["glcm_energy"] = float(graycoprops(glcm, "energy")[0, 0]) feats["glcm_correlation"] = float(graycoprops(glcm, "correlation")[0, 0]) col = cv2.imread(str(path)) if col is not None: for i, ch in enumerate(["b", "g", "r"]): hist = cv2.calcHist([col], [i], None, [32], [0, 256]).flatten() hist = hist / (hist.sum() + 1e-9) feats["hist_" + ch + "_entropy"] = float(-(hist * np.log2(hist + 1e-9)).sum()) return feats def predict_with_model(path, model_path=MODEL_PATH): import joblib if not Path(model_path).exists(): return None bundle = joblib.load(model_path) feats = extract_features(path) x = pd.DataFrame([feats])[bundle["features"]] p = bundle["model"].predict_proba(x)[0, 1] return {"file": str(path), "tamper_probability": round(float(p), 3), "verdict": "TAMPERED" if p >= 0.5 else "GENUINE", "features": feats} # ------------------------------------------------------------- # End-to-end pipeline # ------------------------------------------------------------- def analyse_document(path): path = Path(path) ext = path.suffix.lower() report = {"file": str(path), "analysed_at": datetime.utcnow().isoformat() + "Z", "sha256": hashlib.sha256(path.read_bytes()).hexdigest()} if ext in (".png", ".jpg", ".jpeg", ".tif", ".tiff", ".bmp"): report["type"] = "image" s, sub, flags = score_image(path) try: txt = ocr_text(path) text_rules = text_rule_checks(txt) sub["text_rules"] = 0.0 if text_rules["flags"] == ["ok"] else 0.5 s = sum(WEIGHTS.get(k, 0) * v for k, v in sub.items()) except Exception as e: text_rules = {"error": str(e)} # Blend in RF prediction if model exists try: ml = predict_with_model(path) if ml is not None: s = 0.5 * s + 0.5 * ml["tamper_probability"] report["ml_prediction"] = ml except Exception as e: report["ml_error"] = str(e) # Blend in CNN prediction if model exists (weight rises with val_auc) try: cnn = predict_with_cnn(path) if cnn is not None: # If CNN AUC is known and high, give it more weight than rule-score w = max(0.4, min(0.7, (cnn.get("val_auc") or 0.85))) s = (1 - w) * s + w * cnn["tamper_probability"] report["cnn_prediction"] = cnn except Exception as e: report["cnn_error"] = str(e) # AI-generated content detector (FFT spectral analysis) try: if AI_DETECTOR_OK: ai = ai_detector.detect_ai_generated(path) report["ai_detector"] = ai sub["ai_generated"] = ai["probability"] # Blend lightly: AI-gen prob bumps risk up to +20% s = 0.9 * s + 0.1 * ai["probability"] * 2.0 if ai["probability"] >= 0.6: flags = flags + [f"AI-generated content suspected (prob {ai['probability']:.2f})"] except Exception as e: report["ai_detector_error"] = str(e) insights = generate_insights(s, sub, flags + text_rules.get("flags", [])) report.update({"sub_scores": sub, "exif_flags": flags, "text_rules": text_rules, **insights}) elif ext == ".pdf": report["type"] = "pdf" audit = pdf_structural_audit(path) fonts = pdf_font_audit(path) sub = {"pdf_struct": 0.8 if audit["flags"] != ["clean"] else 0.1, "text_rules": 0.6 if fonts["flags"] != ["ok"] else 0.1} s = sum(WEIGHTS.get(k, 0) * v for k, v in sub.items()) insights = generate_insights(s, sub, audit["flags"] + fonts["flags"]) report.update({"sub_scores": sub, "pdf_audit": audit, "font_audit": fonts, **insights}) else: report["type"] = "unsupported" report["error"] = "extension " + ext + " not handled" # Log to provenance ledger (tamper-evident hash chain) try: import provenance provenance.log_analysis(report.get("file","unknown"), report.get("sha256","-"), report.get("risk_band","UNKNOWN"), report.get("risk_score", -1.0), extra={"type": report.get("type")}) except Exception as _e: report["provenance_error"] = str(_e) return report # ------------------------------------------------------------- # Cross-document consistency (Sprint 2) # ------------------------------------------------------------- NAME_RE = re.compile(r"(?:Name|Owner|Borrower|Holder|Account Holder)\s*[:\-]\s*([A-Z][A-Z\s.]{2,40})", re.IGNORECASE) DOB_RE = re.compile(r"(?:DOB|Date of Birth|Born)\s*[:\-]\s*(\d{1,2}[-/]\d{1,2}[-/]\d{2,4})", re.IGNORECASE) ADDR_RE = re.compile(r"(?:Address|Village|Residence)\s*[:\-]\s*([A-Z0-9][A-Z0-9\s,.\-/]{3,80})", re.IGNORECASE) def _norm(s): return re.sub(r"\s+", " ", (s or "").strip().upper()) def extract_identity_fields(path): """Pull name, DOB, address, account, IFSC from any document via OCR.""" if str(path).lower().endswith(".pdf"): with fitz.open(path) as d: text = "\n".join(page.get_text() for page in d) else: text = ocr_text(path) fields = { "name": None, "dob": None, "address": None, "account": None, "ifsc": None, "amounts": [], } if not text: return fields, text m = NAME_RE.search(text) if m: fields["name"] = _norm(m.group(1)) m = DOB_RE.search(text) if m: fields["dob"] = _norm(m.group(1)) m = ADDR_RE.search(text) if m: fields["address"] = _norm(m.group(1)) accs = ACC_RE.findall(text) if accs: fields["account"] = accs[0] ifsc = IFSC_RE.findall(text) if ifsc: fields["ifsc"] = ifsc[0] fields["amounts"] = parse_amounts(text) return fields, text def _similarity(a, b): """Simple ratio-based string similarity.""" if not a or not b: return 0.0 from difflib import SequenceMatcher return SequenceMatcher(None, a, b).ratio() def cross_doc_consistency(file_paths): """Compare identity fields across 2+ documents. Return per-field verdict.""" if len(file_paths) < 2: return {"error": "need at least 2 documents"} extracts = [] for p in file_paths: fields, _ = extract_identity_fields(p) extracts.append({"file": str(p), "fields": fields}) # Compare each field across docs field_results = {} for field in ["name", "dob", "address", "account", "ifsc"]: values = [e["fields"].get(field) for e in extracts] present = [v for v in values if v] if len(present) < 2: field_results[field] = { "status": "insufficient_data", "values": values, "similarity": None, } continue # All-pairs similarity sims = [] for i in range(len(present)): for j in range(i + 1, len(present)): sims.append(_similarity(present[i], present[j])) min_sim = min(sims) if min_sim >= 0.95: status = "match" elif min_sim >= 0.75: status = "likely_match" else: status = "mismatch" field_results[field] = { "status": status, "values": values, "similarity": round(min_sim, 3), } # Aggregate risk mismatches = sum(1 for r in field_results.values() if r["status"] == "mismatch") li