Spaces:
Sleeping
Sleeping
| """ | |
| forensics.py - Document forensics core module | |
| Reusable analysis functions extracted from anomaly_detection_banking.ipynb. | |
| Imported by app.py (Streamlit) and the notebook. | |
| Public API: | |
| analyse_document(path) - end-to-end pipeline | |
| score_image(path) - image-only forensic score | |
| error_level_analysis(path) - ELA image + score | |
| copy_move_detect(path) - copy-move heatmap + match count | |
| noise_inconsistency(path) - noise heatmap + outlier ratio | |
| exif_sanity(path) - metadata flags | |
| pdf_structural_audit(path) - EOF count + producer/creator | |
| pdf_font_audit(path) - font count + flags | |
| ocr_text(path) - OCR (no-op if Tesseract missing) | |
| text_rule_checks(text) - date/amount/IFSC sanity | |
| extract_features(path) - feature vector for ML model | |
| predict_with_model(path) - run trained Random Forest if present | |
| generate_insights(score, sub, flags) - rule-based bullets | |
| band(score) - score -> LOW/MEDIUM/HIGH/CRITICAL | |
| """ | |
| import os | |
| import io | |
| import re | |
| import math | |
| import json | |
| import hashlib | |
| import shutil | |
| import warnings | |
| from pathlib import Path | |
| from datetime import datetime | |
| import numpy as np | |
| import pandas as pd | |
| from PIL import Image, ImageChops, ImageEnhance | |
| import cv2 | |
| import fitz # PyMuPDF | |
| import pytesseract | |
| try: | |
| import ai_detector | |
| AI_DETECTOR_OK = True | |
| except Exception: | |
| AI_DETECTOR_OK = False | |
| warnings.filterwarnings("ignore") | |
| # ------------------------------------------------------------- | |
| # Tesseract auto-detect (Windows-friendly) | |
| # ------------------------------------------------------------- | |
| TESSERACT_OK = False | |
| for _c in [ | |
| shutil.which("tesseract"), | |
| r"C:\Program Files\Tesseract-OCR\tesseract.exe", | |
| r"C:\Program Files (x86)\Tesseract-OCR\tesseract.exe", | |
| os.path.expanduser(r"~\AppData\Local\Programs\Tesseract-OCR\tesseract.exe"), | |
| ]: | |
| if _c and os.path.isfile(_c): | |
| pytesseract.pytesseract.tesseract_cmd = _c | |
| TESSERACT_OK = True | |
| break | |
| # ------------------------------------------------------------- | |
| # Image forensics | |
| # ------------------------------------------------------------- | |
| def error_level_analysis(path, quality=90, scale=15): | |
| orig = Image.open(path).convert("RGB") | |
| buf = io.BytesIO() | |
| orig.save(buf, "JPEG", quality=quality) | |
| buf.seek(0) | |
| resaved = Image.open(buf) | |
| diff = ImageChops.difference(orig, resaved) | |
| extrema = diff.getextrema() | |
| max_diff = max([e[1] for e in extrema]) or 1 | |
| ela = ImageEnhance.Brightness(diff).enhance(scale * 255 / max_diff) | |
| score = float(np.array(diff).mean()) | |
| return ela, score | |
| def copy_move_detect(path, min_dist=40, max_matches=80): | |
| img = cv2.imread(str(path)) | |
| if img is None: | |
| return None, 0, [] | |
| gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
| orb = cv2.ORB_create(nfeatures=2000) | |
| kp, des = orb.detectAndCompute(gray, None) | |
| if des is None or len(kp) < 10: | |
| return img, 0, [] | |
| bf = cv2.BFMatcher(cv2.NORM_HAMMING) | |
| matches = bf.knnMatch(des, des, k=10) | |
| good = [] | |
| for m_list in matches: | |
| for m in m_list[1:]: | |
| p1 = kp[m.queryIdx].pt | |
| p2 = kp[m.trainIdx].pt | |
| d = math.hypot(p1[0] - p2[0], p1[1] - p2[1]) | |
| if d > min_dist and m.distance < 40: | |
| good.append((p1, p2, d)) | |
| good = good[:max_matches] | |
| out = img.copy() | |
| for p1, p2, _ in good: | |
| cv2.line(out, tuple(map(int, p1)), tuple(map(int, p2)), (0, 0, 255), 1) | |
| cv2.circle(out, tuple(map(int, p1)), 3, (0, 255, 0), -1) | |
| cv2.circle(out, tuple(map(int, p2)), 3, (0, 255, 0), -1) | |
| return out, len(good), good | |
| def noise_inconsistency(path, block=32): | |
| img = cv2.imread(str(path), cv2.IMREAD_GRAYSCALE) | |
| if img is None: | |
| return np.zeros((1, 1)), 0.0 | |
| H, W = img.shape | |
| Hc, Wc = (H // block) * block, (W // block) * block | |
| if Hc == 0 or Wc == 0: | |
| return np.zeros((1, 1)), 0.0 | |
| img = img[:Hc, :Wc] | |
| lap = cv2.Laplacian(img, cv2.CV_64F) | |
| lap_blocks = (lap.reshape(Hc // block, block, Wc // block, block) | |
| .transpose(0, 2, 1, 3) | |
| .reshape(-1, block * block)) | |
| var = lap_blocks.var(axis=1) | |
| z = (var - var.mean()) / (var.std() + 1e-9) | |
| suspicious = (np.abs(z) > 2.5).sum() / max(1, len(z)) | |
| heat = np.abs(z).reshape(Hc // block, Wc // block) | |
| return heat, float(suspicious) | |
| def exif_sanity(path): | |
| try: | |
| img = Image.open(path) | |
| exif = img.getexif() | |
| except Exception: | |
| return ["cannot read image"] | |
| if not exif: | |
| return ["no EXIF metadata (re-saved or stripped)"] | |
| tags = {Image.ExifTags.TAGS.get(k, k): v for k, v in exif.items()} | |
| flags = [] | |
| sw = str(tags.get("Software", "")).lower() | |
| for bad in ["photoshop", "gimp", "paint", "snapseed", "picsart"]: | |
| if bad in sw: | |
| flags.append("edited with " + bad) | |
| if "DateTimeOriginal" in tags and "DateTime" in tags: | |
| if tags["DateTimeOriginal"] != tags["DateTime"]: | |
| flags.append("modified-time differs from original-time") | |
| return flags or ["exif clean"] | |
| # ------------------------------------------------------------- | |
| # PDF forensics | |
| # ------------------------------------------------------------- | |
| def pdf_structural_audit(path): | |
| raw = Path(path).read_bytes() | |
| eofs = raw.count(b"%%EOF") | |
| with fitz.open(path) as d: | |
| info = d.metadata or {} | |
| n_pages = d.page_count | |
| flags = [] | |
| if eofs > 1: | |
| flags.append(f"{eofs} EOF markers (incremental updates)") | |
| prod = (info.get("producer") or "").lower() | |
| crt = (info.get("creator") or "").lower() | |
| if prod and crt and prod != crt: | |
| flags.append(f"producer/creator differ: {prod} vs {crt}") | |
| for t in ["ilovepdf", "smallpdf", "pdfescape", "sejda", "foxit phantom"]: | |
| if t in prod or t in crt: | |
| flags.append("edited via consumer tool: " + t) | |
| return {"pages": n_pages, "eof_markers": eofs, | |
| "metadata": info, "flags": flags or ["clean"]} | |
| def pdf_font_audit(path): | |
| fonts_per_page = [] | |
| with fitz.open(path) as d: | |
| for page in d: | |
| fonts_per_page.append({f[3] for f in page.get_fonts()}) | |
| all_fonts = set().union(*fonts_per_page) if fonts_per_page else set() | |
| flags = [] | |
| if len(all_fonts) > 4: | |
| flags.append("unusually high font count: " + str(len(all_fonts))) | |
| return {"fonts": sorted(all_fonts), "flags": flags or ["ok"]} | |
| # ------------------------------------------------------------- | |
| # OCR + text rules | |
| # ------------------------------------------------------------- | |
| AMT_RE = re.compile(r"(?<![A-Za-z])[-]?\d{1,3}(?:,\d{2,3})*(?:\.\d{1,2})?") | |
| DATE_RE = re.compile(r"(\d{1,2}[-/]\d{1,2}[-/]\d{2,4})") | |
| IFSC_RE = re.compile(r"\b[A-Z]{4}0[A-Z0-9]{6}\b") | |
| ACC_RE = re.compile(r"\b\d{9,18}\b") | |
| def ocr_text(path): | |
| if not TESSERACT_OK: | |
| return "" | |
| try: | |
| return pytesseract.image_to_string(Image.open(path)) | |
| except Exception: | |
| return "" | |
| def parse_amounts(text): | |
| vals = [] | |
| for m in AMT_RE.findall(text): | |
| try: | |
| vals.append(float(m.replace(",", ""))) | |
| except ValueError: | |
| pass | |
| return vals | |
| def text_rule_checks(text): | |
| if not text: | |
| return {"n_dates": 0, "n_amounts": 0, "n_ifsc": 0, | |
| "n_accounts": 0, "flags": ["ocr_skipped"]} | |
| flags = [] | |
| dates = DATE_RE.findall(text) | |
| ifsc = IFSC_RE.findall(text) | |
| accs = ACC_RE.findall(text) | |
| amts = parse_amounts(text) | |
| if dates: | |
| try: | |
| from dateutil import parser as dp | |
| ds = [dp.parse(d, dayfirst=True) for d in dates] | |
| if any(ds[i] > ds[i + 1] for i in range(len(ds) - 1)): | |
| flags.append("dates not monotonic") | |
| except Exception: | |
| flags.append("unparseable dates") | |
| if amts: | |
| big_round = [a for a in amts if a >= 100000 and a % 100000 == 0] | |
| if len(big_round) > 3: | |
| flags.append(f"{len(big_round)} suspiciously round large amounts") | |
| if accs and not ifsc: | |
| flags.append("account number present but no IFSC") | |
| return {"n_dates": len(dates), "n_amounts": len(amts), | |
| "n_ifsc": len(ifsc), "n_accounts": len(accs), | |
| "flags": flags or ["ok"]} | |
| # ------------------------------------------------------------- | |
| # Scoring & insights | |
| # ------------------------------------------------------------- | |
| WEIGHTS = {"ela": 0.20, "copy_move": 0.25, "noise": 0.15, "exif": 0.10, | |
| "pdf_struct": 0.15, "text_rules": 0.10, "math": 0.05} | |
| INSIGHT_RULES = [ | |
| ("copy_move", 0.4, "Possible copy-paste forgery: repeated visual region. Inspect seal/signature area."), | |
| ("ela", 0.4, "Compression artefacts inconsistent with a single-source scan. Likely re-saved after edits."), | |
| ("noise", 0.4, "Localised noise inconsistency - common in image splicing."), | |
| ("exif", 0.4, "Image metadata indicates edits in a photo-editor or stripped EXIF."), | |
| ("pdf_struct", 0.4, "PDF structural anomalies detected (incremental edits / consumer-tool fingerprint)."), | |
| ] | |
| ACTIONS = { | |
| "LOW": "Proceed with standard underwriting.", | |
| "MEDIUM": "Request additional verification documents.", | |
| "HIGH": "Escalate to fraud-risk team; manual review mandatory.", | |
| "CRITICAL": "Block file; trigger investigation workflow.", | |
| } | |
| def band(score): | |
| if score < 0.25: return "LOW" | |
| if score < 0.50: return "MEDIUM" | |
| if score < 0.75: return "HIGH" | |
| return "CRITICAL" | |
| def score_image(path): | |
| _, ela_s = error_level_analysis(path) | |
| _, n_cm, _ = copy_move_detect(path) | |
| _, noise_r = noise_inconsistency(path) | |
| exif_flags = exif_sanity(path) | |
| sub = {"ela": min(ela_s / 25.0, 1.0), | |
| "copy_move": min(n_cm / 50.0, 1.0), | |
| "noise": min(noise_r * 4, 1.0), | |
| "exif": 0.0 if exif_flags == ["exif clean"] else 0.6} | |
| total = sum(WEIGHTS[k] * v for k, v in sub.items()) | |
| return total, sub, exif_flags | |
| def generate_insights(score, sub_scores, extra_flags=None): | |
| bullets = [] | |
| for key, thresh, msg in INSIGHT_RULES: | |
| if sub_scores.get(key, 0) >= thresh: | |
| bullets.append(msg) | |
| if extra_flags: | |
| for f in extra_flags: | |
| if f not in ("exif clean", "ok", "clean"): | |
| bullets.append("Flag: " + str(f)) | |
| if not bullets: | |
| bullets.append("No anomaly indicators above threshold.") | |
| return {"risk_score": round(score, 3), | |
| "risk_band": band(score), | |
| "recommended_action": ACTIONS[band(score)], | |
| "evidence": bullets} | |
| # ------------------------------------------------------------- | |
| # ML feature extraction + prediction | |
| # ------------------------------------------------------------- | |
| MODEL_PATH = Path("models/forgery_rf.joblib") | |
| CNN_MODEL_PATH = Path("models/forgery_cnn.keras") | |
| CNN_META_PATH = Path("models/forgery_cnn.meta.json") | |
| _CNN_CACHE = {"model": None, "meta": None, "tried": False} | |
| def _load_cnn(): | |
| """Lazy-load the CNN model only when first needed (avoids TF import cost).""" | |
| if _CNN_CACHE["tried"]: | |
| return _CNN_CACHE["model"], _CNN_CACHE["meta"] | |
| _CNN_CACHE["tried"] = True | |
| if not CNN_MODEL_PATH.exists(): | |
| return None, None | |
| try: | |
| import tensorflow as tf # local import - heavy | |
| _CNN_CACHE["model"] = tf.keras.models.load_model(CNN_MODEL_PATH) | |
| if CNN_META_PATH.exists(): | |
| _CNN_CACHE["meta"] = json.loads(CNN_META_PATH.read_text()) | |
| else: | |
| _CNN_CACHE["meta"] = {"image_size": 224, "class_names": ["originals", "tampered"]} | |
| except Exception as e: | |
| print("CNN load failed:", e) | |
| return _CNN_CACHE["model"], _CNN_CACHE["meta"] | |
| def predict_with_cnn(path): | |
| """Run the trained CNN if forgery_cnn.keras exists. Returns dict or None.""" | |
| model, meta = _load_cnn() | |
| if model is None: | |
| return None | |
| img_size = meta.get("image_size", 224) | |
| img = Image.open(path).convert("RGB").resize((img_size, img_size)) | |
| arr = np.array(img)[None, ...].astype(np.float32) | |
| prob = float(model.predict(arr, verbose=0)[0, 0]) | |
| return { | |
| "tamper_probability": round(prob, 3), | |
| "verdict": "TAMPERED" if prob >= 0.5 else "GENUINE", | |
| "model": "MobileNetV2 (CASIA v2 fine-tuned)", | |
| "val_auc": (meta or {}).get("val_auc"), | |
| } | |
| def extract_features(path): | |
| from skimage.feature import graycomatrix, graycoprops | |
| feats = {} | |
| _, ela_score = error_level_analysis(path) | |
| feats["ela_mean"] = ela_score | |
| _, cm_count, _ = copy_move_detect(path) | |
| feats["copy_move_matches"] = cm_count | |
| _, noise_ratio = noise_inconsistency(path) | |
| feats["noise_outlier_ratio"] = noise_ratio | |
| feats["exif_clean"] = int(exif_sanity(path) == ["exif clean"]) | |
| img = cv2.imread(str(path), cv2.IMREAD_GRAYSCALE) | |
| img_s = cv2.resize(img, (256, 256)) | |
| glcm = graycomatrix(img_s, [1], [0], 256, symmetric=True, normed=True) | |
| feats["glcm_contrast"] = float(graycoprops(glcm, "contrast")[0, 0]) | |
| feats["glcm_homogeneity"] = float(graycoprops(glcm, "homogeneity")[0, 0]) | |
| feats["glcm_energy"] = float(graycoprops(glcm, "energy")[0, 0]) | |
| feats["glcm_correlation"] = float(graycoprops(glcm, "correlation")[0, 0]) | |
| col = cv2.imread(str(path)) | |
| if col is not None: | |
| for i, ch in enumerate(["b", "g", "r"]): | |
| hist = cv2.calcHist([col], [i], None, [32], [0, 256]).flatten() | |
| hist = hist / (hist.sum() + 1e-9) | |
| feats["hist_" + ch + "_entropy"] = float(-(hist * np.log2(hist + 1e-9)).sum()) | |
| return feats | |
| def predict_with_model(path, model_path=MODEL_PATH): | |
| import joblib | |
| if not Path(model_path).exists(): | |
| return None | |
| bundle = joblib.load(model_path) | |
| feats = extract_features(path) | |
| x = pd.DataFrame([feats])[bundle["features"]] | |
| p = bundle["model"].predict_proba(x)[0, 1] | |
| return {"file": str(path), "tamper_probability": round(float(p), 3), | |
| "verdict": "TAMPERED" if p >= 0.5 else "GENUINE", | |
| "features": feats} | |
| # ------------------------------------------------------------- | |
| # End-to-end pipeline | |
| # ------------------------------------------------------------- | |
| def analyse_document(path): | |
| path = Path(path) | |
| ext = path.suffix.lower() | |
| report = {"file": str(path), | |
| "analysed_at": datetime.utcnow().isoformat() + "Z", | |
| "sha256": hashlib.sha256(path.read_bytes()).hexdigest()} | |
| if ext in (".png", ".jpg", ".jpeg", ".tif", ".tiff", ".bmp"): | |
| report["type"] = "image" | |
| s, sub, flags = score_image(path) | |
| try: | |
| txt = ocr_text(path) | |
| text_rules = text_rule_checks(txt) | |
| sub["text_rules"] = 0.0 if text_rules["flags"] == ["ok"] else 0.5 | |
| s = sum(WEIGHTS.get(k, 0) * v for k, v in sub.items()) | |
| except Exception as e: | |
| text_rules = {"error": str(e)} | |
| # Blend in RF prediction if model exists | |
| try: | |
| ml = predict_with_model(path) | |
| if ml is not None: | |
| s = 0.5 * s + 0.5 * ml["tamper_probability"] | |
| report["ml_prediction"] = ml | |
| except Exception as e: | |
| report["ml_error"] = str(e) | |
| # Blend in CNN prediction if model exists (weight rises with val_auc) | |
| try: | |
| cnn = predict_with_cnn(path) | |
| if cnn is not None: | |
| # If CNN AUC is known and high, give it more weight than rule-score | |
| w = max(0.4, min(0.7, (cnn.get("val_auc") or 0.85))) | |
| s = (1 - w) * s + w * cnn["tamper_probability"] | |
| report["cnn_prediction"] = cnn | |
| except Exception as e: | |
| report["cnn_error"] = str(e) | |
| # AI-generated content detector (FFT spectral analysis) | |
| try: | |
| if AI_DETECTOR_OK: | |
| ai = ai_detector.detect_ai_generated(path) | |
| report["ai_detector"] = ai | |
| sub["ai_generated"] = ai["probability"] | |
| # Blend lightly: AI-gen prob bumps risk up to +20% | |
| s = 0.9 * s + 0.1 * ai["probability"] * 2.0 | |
| if ai["probability"] >= 0.6: | |
| flags = flags + [f"AI-generated content suspected (prob {ai['probability']:.2f})"] | |
| except Exception as e: | |
| report["ai_detector_error"] = str(e) | |
| insights = generate_insights(s, sub, flags + text_rules.get("flags", [])) | |
| report.update({"sub_scores": sub, "exif_flags": flags, | |
| "text_rules": text_rules, **insights}) | |
| elif ext == ".pdf": | |
| report["type"] = "pdf" | |
| audit = pdf_structural_audit(path) | |
| fonts = pdf_font_audit(path) | |
| sub = {"pdf_struct": 0.8 if audit["flags"] != ["clean"] else 0.1, | |
| "text_rules": 0.6 if fonts["flags"] != ["ok"] else 0.1} | |
| s = sum(WEIGHTS.get(k, 0) * v for k, v in sub.items()) | |
| insights = generate_insights(s, sub, audit["flags"] + fonts["flags"]) | |
| report.update({"sub_scores": sub, "pdf_audit": audit, | |
| "font_audit": fonts, **insights}) | |
| else: | |
| report["type"] = "unsupported" | |
| report["error"] = "extension " + ext + " not handled" | |
| # Log to provenance ledger (tamper-evident hash chain) | |
| try: | |
| import provenance | |
| provenance.log_analysis(report.get("file","unknown"), | |
| report.get("sha256","-"), | |
| report.get("risk_band","UNKNOWN"), | |
| report.get("risk_score", -1.0), | |
| extra={"type": report.get("type")}) | |
| except Exception as _e: | |
| report["provenance_error"] = str(_e) | |
| return report | |
| # ------------------------------------------------------------- | |
| # Cross-document consistency (Sprint 2) | |
| # ------------------------------------------------------------- | |
| NAME_RE = re.compile(r"(?:Name|Owner|Borrower|Holder|Account Holder)\s*[:\-]\s*([A-Z][A-Z\s.]{2,40})", re.IGNORECASE) | |
| DOB_RE = re.compile(r"(?:DOB|Date of Birth|Born)\s*[:\-]\s*(\d{1,2}[-/]\d{1,2}[-/]\d{2,4})", re.IGNORECASE) | |
| ADDR_RE = re.compile(r"(?:Address|Village|Residence)\s*[:\-]\s*([A-Z0-9][A-Z0-9\s,.\-/]{3,80})", re.IGNORECASE) | |
| def _norm(s): | |
| return re.sub(r"\s+", " ", (s or "").strip().upper()) | |
| def extract_identity_fields(path): | |
| """Pull name, DOB, address, account, IFSC from any document via OCR.""" | |
| if str(path).lower().endswith(".pdf"): | |
| with fitz.open(path) as d: | |
| text = "\n".join(page.get_text() for page in d) | |
| else: | |
| text = ocr_text(path) | |
| fields = { | |
| "name": None, "dob": None, "address": None, | |
| "account": None, "ifsc": None, "amounts": [], | |
| } | |
| if not text: | |
| return fields, text | |
| m = NAME_RE.search(text) | |
| if m: fields["name"] = _norm(m.group(1)) | |
| m = DOB_RE.search(text) | |
| if m: fields["dob"] = _norm(m.group(1)) | |
| m = ADDR_RE.search(text) | |
| if m: fields["address"] = _norm(m.group(1)) | |
| accs = ACC_RE.findall(text) | |
| if accs: fields["account"] = accs[0] | |
| ifsc = IFSC_RE.findall(text) | |
| if ifsc: fields["ifsc"] = ifsc[0] | |
| fields["amounts"] = parse_amounts(text) | |
| return fields, text | |
| def _similarity(a, b): | |
| """Simple ratio-based string similarity.""" | |
| if not a or not b: | |
| return 0.0 | |
| from difflib import SequenceMatcher | |
| return SequenceMatcher(None, a, b).ratio() | |
| def cross_doc_consistency(file_paths): | |
| """Compare identity fields across 2+ documents. Return per-field verdict.""" | |
| if len(file_paths) < 2: | |
| return {"error": "need at least 2 documents"} | |
| extracts = [] | |
| for p in file_paths: | |
| fields, _ = extract_identity_fields(p) | |
| extracts.append({"file": str(p), "fields": fields}) | |
| # Compare each field across docs | |
| field_results = {} | |
| for field in ["name", "dob", "address", "account", "ifsc"]: | |
| values = [e["fields"].get(field) for e in extracts] | |
| present = [v for v in values if v] | |
| if len(present) < 2: | |
| field_results[field] = { | |
| "status": "insufficient_data", | |
| "values": values, | |
| "similarity": None, | |
| } | |
| continue | |
| # All-pairs similarity | |
| sims = [] | |
| for i in range(len(present)): | |
| for j in range(i + 1, len(present)): | |
| sims.append(_similarity(present[i], present[j])) | |
| min_sim = min(sims) | |
| if min_sim >= 0.95: | |
| status = "match" | |
| elif min_sim >= 0.75: | |
| status = "likely_match" | |
| else: | |
| status = "mismatch" | |
| field_results[field] = { | |
| "status": status, | |
| "values": values, | |
| "similarity": round(min_sim, 3), | |
| } | |
| # Aggregate risk | |
| mismatches = sum(1 for r in field_results.values() if r["status"] == "mismatch") | |
| li |