DocSentry / forensics.py
SpandanM110's picture
Round 2: fraud ring graph, AI-gen detector, provenance ledger, architecture doc
e97f963
Raw
History Blame Contribute Delete
21 kB
"""
forensics.py - Document forensics core module
Reusable analysis functions extracted from anomaly_detection_banking.ipynb.
Imported by app.py (Streamlit) and the notebook.
Public API:
analyse_document(path) - end-to-end pipeline
score_image(path) - image-only forensic score
error_level_analysis(path) - ELA image + score
copy_move_detect(path) - copy-move heatmap + match count
noise_inconsistency(path) - noise heatmap + outlier ratio
exif_sanity(path) - metadata flags
pdf_structural_audit(path) - EOF count + producer/creator
pdf_font_audit(path) - font count + flags
ocr_text(path) - OCR (no-op if Tesseract missing)
text_rule_checks(text) - date/amount/IFSC sanity
extract_features(path) - feature vector for ML model
predict_with_model(path) - run trained Random Forest if present
generate_insights(score, sub, flags) - rule-based bullets
band(score) - score -> LOW/MEDIUM/HIGH/CRITICAL
"""
import os
import io
import re
import math
import json
import hashlib
import shutil
import warnings
from pathlib import Path
from datetime import datetime
import numpy as np
import pandas as pd
from PIL import Image, ImageChops, ImageEnhance
import cv2
import fitz # PyMuPDF
import pytesseract
try:
import ai_detector
AI_DETECTOR_OK = True
except Exception:
AI_DETECTOR_OK = False
warnings.filterwarnings("ignore")
# -------------------------------------------------------------
# Tesseract auto-detect (Windows-friendly)
# -------------------------------------------------------------
TESSERACT_OK = False
for _c in [
shutil.which("tesseract"),
r"C:\Program Files\Tesseract-OCR\tesseract.exe",
r"C:\Program Files (x86)\Tesseract-OCR\tesseract.exe",
os.path.expanduser(r"~\AppData\Local\Programs\Tesseract-OCR\tesseract.exe"),
]:
if _c and os.path.isfile(_c):
pytesseract.pytesseract.tesseract_cmd = _c
TESSERACT_OK = True
break
# -------------------------------------------------------------
# Image forensics
# -------------------------------------------------------------
def error_level_analysis(path, quality=90, scale=15):
orig = Image.open(path).convert("RGB")
buf = io.BytesIO()
orig.save(buf, "JPEG", quality=quality)
buf.seek(0)
resaved = Image.open(buf)
diff = ImageChops.difference(orig, resaved)
extrema = diff.getextrema()
max_diff = max([e[1] for e in extrema]) or 1
ela = ImageEnhance.Brightness(diff).enhance(scale * 255 / max_diff)
score = float(np.array(diff).mean())
return ela, score
def copy_move_detect(path, min_dist=40, max_matches=80):
img = cv2.imread(str(path))
if img is None:
return None, 0, []
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
orb = cv2.ORB_create(nfeatures=2000)
kp, des = orb.detectAndCompute(gray, None)
if des is None or len(kp) < 10:
return img, 0, []
bf = cv2.BFMatcher(cv2.NORM_HAMMING)
matches = bf.knnMatch(des, des, k=10)
good = []
for m_list in matches:
for m in m_list[1:]:
p1 = kp[m.queryIdx].pt
p2 = kp[m.trainIdx].pt
d = math.hypot(p1[0] - p2[0], p1[1] - p2[1])
if d > min_dist and m.distance < 40:
good.append((p1, p2, d))
good = good[:max_matches]
out = img.copy()
for p1, p2, _ in good:
cv2.line(out, tuple(map(int, p1)), tuple(map(int, p2)), (0, 0, 255), 1)
cv2.circle(out, tuple(map(int, p1)), 3, (0, 255, 0), -1)
cv2.circle(out, tuple(map(int, p2)), 3, (0, 255, 0), -1)
return out, len(good), good
def noise_inconsistency(path, block=32):
img = cv2.imread(str(path), cv2.IMREAD_GRAYSCALE)
if img is None:
return np.zeros((1, 1)), 0.0
H, W = img.shape
Hc, Wc = (H // block) * block, (W // block) * block
if Hc == 0 or Wc == 0:
return np.zeros((1, 1)), 0.0
img = img[:Hc, :Wc]
lap = cv2.Laplacian(img, cv2.CV_64F)
lap_blocks = (lap.reshape(Hc // block, block, Wc // block, block)
.transpose(0, 2, 1, 3)
.reshape(-1, block * block))
var = lap_blocks.var(axis=1)
z = (var - var.mean()) / (var.std() + 1e-9)
suspicious = (np.abs(z) > 2.5).sum() / max(1, len(z))
heat = np.abs(z).reshape(Hc // block, Wc // block)
return heat, float(suspicious)
def exif_sanity(path):
try:
img = Image.open(path)
exif = img.getexif()
except Exception:
return ["cannot read image"]
if not exif:
return ["no EXIF metadata (re-saved or stripped)"]
tags = {Image.ExifTags.TAGS.get(k, k): v for k, v in exif.items()}
flags = []
sw = str(tags.get("Software", "")).lower()
for bad in ["photoshop", "gimp", "paint", "snapseed", "picsart"]:
if bad in sw:
flags.append("edited with " + bad)
if "DateTimeOriginal" in tags and "DateTime" in tags:
if tags["DateTimeOriginal"] != tags["DateTime"]:
flags.append("modified-time differs from original-time")
return flags or ["exif clean"]
# -------------------------------------------------------------
# PDF forensics
# -------------------------------------------------------------
def pdf_structural_audit(path):
raw = Path(path).read_bytes()
eofs = raw.count(b"%%EOF")
with fitz.open(path) as d:
info = d.metadata or {}
n_pages = d.page_count
flags = []
if eofs > 1:
flags.append(f"{eofs} EOF markers (incremental updates)")
prod = (info.get("producer") or "").lower()
crt = (info.get("creator") or "").lower()
if prod and crt and prod != crt:
flags.append(f"producer/creator differ: {prod} vs {crt}")
for t in ["ilovepdf", "smallpdf", "pdfescape", "sejda", "foxit phantom"]:
if t in prod or t in crt:
flags.append("edited via consumer tool: " + t)
return {"pages": n_pages, "eof_markers": eofs,
"metadata": info, "flags": flags or ["clean"]}
def pdf_font_audit(path):
fonts_per_page = []
with fitz.open(path) as d:
for page in d:
fonts_per_page.append({f[3] for f in page.get_fonts()})
all_fonts = set().union(*fonts_per_page) if fonts_per_page else set()
flags = []
if len(all_fonts) > 4:
flags.append("unusually high font count: " + str(len(all_fonts)))
return {"fonts": sorted(all_fonts), "flags": flags or ["ok"]}
# -------------------------------------------------------------
# OCR + text rules
# -------------------------------------------------------------
AMT_RE = re.compile(r"(?<![A-Za-z])[-]?\d{1,3}(?:,\d{2,3})*(?:\.\d{1,2})?")
DATE_RE = re.compile(r"(\d{1,2}[-/]\d{1,2}[-/]\d{2,4})")
IFSC_RE = re.compile(r"\b[A-Z]{4}0[A-Z0-9]{6}\b")
ACC_RE = re.compile(r"\b\d{9,18}\b")
def ocr_text(path):
if not TESSERACT_OK:
return ""
try:
return pytesseract.image_to_string(Image.open(path))
except Exception:
return ""
def parse_amounts(text):
vals = []
for m in AMT_RE.findall(text):
try:
vals.append(float(m.replace(",", "")))
except ValueError:
pass
return vals
def text_rule_checks(text):
if not text:
return {"n_dates": 0, "n_amounts": 0, "n_ifsc": 0,
"n_accounts": 0, "flags": ["ocr_skipped"]}
flags = []
dates = DATE_RE.findall(text)
ifsc = IFSC_RE.findall(text)
accs = ACC_RE.findall(text)
amts = parse_amounts(text)
if dates:
try:
from dateutil import parser as dp
ds = [dp.parse(d, dayfirst=True) for d in dates]
if any(ds[i] > ds[i + 1] for i in range(len(ds) - 1)):
flags.append("dates not monotonic")
except Exception:
flags.append("unparseable dates")
if amts:
big_round = [a for a in amts if a >= 100000 and a % 100000 == 0]
if len(big_round) > 3:
flags.append(f"{len(big_round)} suspiciously round large amounts")
if accs and not ifsc:
flags.append("account number present but no IFSC")
return {"n_dates": len(dates), "n_amounts": len(amts),
"n_ifsc": len(ifsc), "n_accounts": len(accs),
"flags": flags or ["ok"]}
# -------------------------------------------------------------
# Scoring & insights
# -------------------------------------------------------------
WEIGHTS = {"ela": 0.20, "copy_move": 0.25, "noise": 0.15, "exif": 0.10,
"pdf_struct": 0.15, "text_rules": 0.10, "math": 0.05}
INSIGHT_RULES = [
("copy_move", 0.4, "Possible copy-paste forgery: repeated visual region. Inspect seal/signature area."),
("ela", 0.4, "Compression artefacts inconsistent with a single-source scan. Likely re-saved after edits."),
("noise", 0.4, "Localised noise inconsistency - common in image splicing."),
("exif", 0.4, "Image metadata indicates edits in a photo-editor or stripped EXIF."),
("pdf_struct", 0.4, "PDF structural anomalies detected (incremental edits / consumer-tool fingerprint)."),
]
ACTIONS = {
"LOW": "Proceed with standard underwriting.",
"MEDIUM": "Request additional verification documents.",
"HIGH": "Escalate to fraud-risk team; manual review mandatory.",
"CRITICAL": "Block file; trigger investigation workflow.",
}
def band(score):
if score < 0.25: return "LOW"
if score < 0.50: return "MEDIUM"
if score < 0.75: return "HIGH"
return "CRITICAL"
def score_image(path):
_, ela_s = error_level_analysis(path)
_, n_cm, _ = copy_move_detect(path)
_, noise_r = noise_inconsistency(path)
exif_flags = exif_sanity(path)
sub = {"ela": min(ela_s / 25.0, 1.0),
"copy_move": min(n_cm / 50.0, 1.0),
"noise": min(noise_r * 4, 1.0),
"exif": 0.0 if exif_flags == ["exif clean"] else 0.6}
total = sum(WEIGHTS[k] * v for k, v in sub.items())
return total, sub, exif_flags
def generate_insights(score, sub_scores, extra_flags=None):
bullets = []
for key, thresh, msg in INSIGHT_RULES:
if sub_scores.get(key, 0) >= thresh:
bullets.append(msg)
if extra_flags:
for f in extra_flags:
if f not in ("exif clean", "ok", "clean"):
bullets.append("Flag: " + str(f))
if not bullets:
bullets.append("No anomaly indicators above threshold.")
return {"risk_score": round(score, 3),
"risk_band": band(score),
"recommended_action": ACTIONS[band(score)],
"evidence": bullets}
# -------------------------------------------------------------
# ML feature extraction + prediction
# -------------------------------------------------------------
MODEL_PATH = Path("models/forgery_rf.joblib")
CNN_MODEL_PATH = Path("models/forgery_cnn.keras")
CNN_META_PATH = Path("models/forgery_cnn.meta.json")
_CNN_CACHE = {"model": None, "meta": None, "tried": False}
def _load_cnn():
"""Lazy-load the CNN model only when first needed (avoids TF import cost)."""
if _CNN_CACHE["tried"]:
return _CNN_CACHE["model"], _CNN_CACHE["meta"]
_CNN_CACHE["tried"] = True
if not CNN_MODEL_PATH.exists():
return None, None
try:
import tensorflow as tf # local import - heavy
_CNN_CACHE["model"] = tf.keras.models.load_model(CNN_MODEL_PATH)
if CNN_META_PATH.exists():
_CNN_CACHE["meta"] = json.loads(CNN_META_PATH.read_text())
else:
_CNN_CACHE["meta"] = {"image_size": 224, "class_names": ["originals", "tampered"]}
except Exception as e:
print("CNN load failed:", e)
return _CNN_CACHE["model"], _CNN_CACHE["meta"]
def predict_with_cnn(path):
"""Run the trained CNN if forgery_cnn.keras exists. Returns dict or None."""
model, meta = _load_cnn()
if model is None:
return None
img_size = meta.get("image_size", 224)
img = Image.open(path).convert("RGB").resize((img_size, img_size))
arr = np.array(img)[None, ...].astype(np.float32)
prob = float(model.predict(arr, verbose=0)[0, 0])
return {
"tamper_probability": round(prob, 3),
"verdict": "TAMPERED" if prob >= 0.5 else "GENUINE",
"model": "MobileNetV2 (CASIA v2 fine-tuned)",
"val_auc": (meta or {}).get("val_auc"),
}
def extract_features(path):
from skimage.feature import graycomatrix, graycoprops
feats = {}
_, ela_score = error_level_analysis(path)
feats["ela_mean"] = ela_score
_, cm_count, _ = copy_move_detect(path)
feats["copy_move_matches"] = cm_count
_, noise_ratio = noise_inconsistency(path)
feats["noise_outlier_ratio"] = noise_ratio
feats["exif_clean"] = int(exif_sanity(path) == ["exif clean"])
img = cv2.imread(str(path), cv2.IMREAD_GRAYSCALE)
img_s = cv2.resize(img, (256, 256))
glcm = graycomatrix(img_s, [1], [0], 256, symmetric=True, normed=True)
feats["glcm_contrast"] = float(graycoprops(glcm, "contrast")[0, 0])
feats["glcm_homogeneity"] = float(graycoprops(glcm, "homogeneity")[0, 0])
feats["glcm_energy"] = float(graycoprops(glcm, "energy")[0, 0])
feats["glcm_correlation"] = float(graycoprops(glcm, "correlation")[0, 0])
col = cv2.imread(str(path))
if col is not None:
for i, ch in enumerate(["b", "g", "r"]):
hist = cv2.calcHist([col], [i], None, [32], [0, 256]).flatten()
hist = hist / (hist.sum() + 1e-9)
feats["hist_" + ch + "_entropy"] = float(-(hist * np.log2(hist + 1e-9)).sum())
return feats
def predict_with_model(path, model_path=MODEL_PATH):
import joblib
if not Path(model_path).exists():
return None
bundle = joblib.load(model_path)
feats = extract_features(path)
x = pd.DataFrame([feats])[bundle["features"]]
p = bundle["model"].predict_proba(x)[0, 1]
return {"file": str(path), "tamper_probability": round(float(p), 3),
"verdict": "TAMPERED" if p >= 0.5 else "GENUINE",
"features": feats}
# -------------------------------------------------------------
# End-to-end pipeline
# -------------------------------------------------------------
def analyse_document(path):
path = Path(path)
ext = path.suffix.lower()
report = {"file": str(path),
"analysed_at": datetime.utcnow().isoformat() + "Z",
"sha256": hashlib.sha256(path.read_bytes()).hexdigest()}
if ext in (".png", ".jpg", ".jpeg", ".tif", ".tiff", ".bmp"):
report["type"] = "image"
s, sub, flags = score_image(path)
try:
txt = ocr_text(path)
text_rules = text_rule_checks(txt)
sub["text_rules"] = 0.0 if text_rules["flags"] == ["ok"] else 0.5
s = sum(WEIGHTS.get(k, 0) * v for k, v in sub.items())
except Exception as e:
text_rules = {"error": str(e)}
# Blend in RF prediction if model exists
try:
ml = predict_with_model(path)
if ml is not None:
s = 0.5 * s + 0.5 * ml["tamper_probability"]
report["ml_prediction"] = ml
except Exception as e:
report["ml_error"] = str(e)
# Blend in CNN prediction if model exists (weight rises with val_auc)
try:
cnn = predict_with_cnn(path)
if cnn is not None:
# If CNN AUC is known and high, give it more weight than rule-score
w = max(0.4, min(0.7, (cnn.get("val_auc") or 0.85)))
s = (1 - w) * s + w * cnn["tamper_probability"]
report["cnn_prediction"] = cnn
except Exception as e:
report["cnn_error"] = str(e)
# AI-generated content detector (FFT spectral analysis)
try:
if AI_DETECTOR_OK:
ai = ai_detector.detect_ai_generated(path)
report["ai_detector"] = ai
sub["ai_generated"] = ai["probability"]
# Blend lightly: AI-gen prob bumps risk up to +20%
s = 0.9 * s + 0.1 * ai["probability"] * 2.0
if ai["probability"] >= 0.6:
flags = flags + [f"AI-generated content suspected (prob {ai['probability']:.2f})"]
except Exception as e:
report["ai_detector_error"] = str(e)
insights = generate_insights(s, sub, flags + text_rules.get("flags", []))
report.update({"sub_scores": sub, "exif_flags": flags,
"text_rules": text_rules, **insights})
elif ext == ".pdf":
report["type"] = "pdf"
audit = pdf_structural_audit(path)
fonts = pdf_font_audit(path)
sub = {"pdf_struct": 0.8 if audit["flags"] != ["clean"] else 0.1,
"text_rules": 0.6 if fonts["flags"] != ["ok"] else 0.1}
s = sum(WEIGHTS.get(k, 0) * v for k, v in sub.items())
insights = generate_insights(s, sub, audit["flags"] + fonts["flags"])
report.update({"sub_scores": sub, "pdf_audit": audit,
"font_audit": fonts, **insights})
else:
report["type"] = "unsupported"
report["error"] = "extension " + ext + " not handled"
# Log to provenance ledger (tamper-evident hash chain)
try:
import provenance
provenance.log_analysis(report.get("file","unknown"),
report.get("sha256","-"),
report.get("risk_band","UNKNOWN"),
report.get("risk_score", -1.0),
extra={"type": report.get("type")})
except Exception as _e:
report["provenance_error"] = str(_e)
return report
# -------------------------------------------------------------
# Cross-document consistency (Sprint 2)
# -------------------------------------------------------------
NAME_RE = re.compile(r"(?:Name|Owner|Borrower|Holder|Account Holder)\s*[:\-]\s*([A-Z][A-Z\s.]{2,40})", re.IGNORECASE)
DOB_RE = re.compile(r"(?:DOB|Date of Birth|Born)\s*[:\-]\s*(\d{1,2}[-/]\d{1,2}[-/]\d{2,4})", re.IGNORECASE)
ADDR_RE = re.compile(r"(?:Address|Village|Residence)\s*[:\-]\s*([A-Z0-9][A-Z0-9\s,.\-/]{3,80})", re.IGNORECASE)
def _norm(s):
return re.sub(r"\s+", " ", (s or "").strip().upper())
def extract_identity_fields(path):
"""Pull name, DOB, address, account, IFSC from any document via OCR."""
if str(path).lower().endswith(".pdf"):
with fitz.open(path) as d:
text = "\n".join(page.get_text() for page in d)
else:
text = ocr_text(path)
fields = {
"name": None, "dob": None, "address": None,
"account": None, "ifsc": None, "amounts": [],
}
if not text:
return fields, text
m = NAME_RE.search(text)
if m: fields["name"] = _norm(m.group(1))
m = DOB_RE.search(text)
if m: fields["dob"] = _norm(m.group(1))
m = ADDR_RE.search(text)
if m: fields["address"] = _norm(m.group(1))
accs = ACC_RE.findall(text)
if accs: fields["account"] = accs[0]
ifsc = IFSC_RE.findall(text)
if ifsc: fields["ifsc"] = ifsc[0]
fields["amounts"] = parse_amounts(text)
return fields, text
def _similarity(a, b):
"""Simple ratio-based string similarity."""
if not a or not b:
return 0.0
from difflib import SequenceMatcher
return SequenceMatcher(None, a, b).ratio()
def cross_doc_consistency(file_paths):
"""Compare identity fields across 2+ documents. Return per-field verdict."""
if len(file_paths) < 2:
return {"error": "need at least 2 documents"}
extracts = []
for p in file_paths:
fields, _ = extract_identity_fields(p)
extracts.append({"file": str(p), "fields": fields})
# Compare each field across docs
field_results = {}
for field in ["name", "dob", "address", "account", "ifsc"]:
values = [e["fields"].get(field) for e in extracts]
present = [v for v in values if v]
if len(present) < 2:
field_results[field] = {
"status": "insufficient_data",
"values": values,
"similarity": None,
}
continue
# All-pairs similarity
sims = []
for i in range(len(present)):
for j in range(i + 1, len(present)):
sims.append(_similarity(present[i], present[j]))
min_sim = min(sims)
if min_sim >= 0.95:
status = "match"
elif min_sim >= 0.75:
status = "likely_match"
else:
status = "mismatch"
field_results[field] = {
"status": status,
"values": values,
"similarity": round(min_sim, 3),
}
# Aggregate risk
mismatches = sum(1 for r in field_results.values() if r["status"] == "mismatch")
li