{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# DocSentry - Master Notebook\n", "\n", "**Single source of truth.** Everything lives here: detectors, training,\n", "evaluation, cross-doc check, PDF report generator, AND a cell that exports\n", "the supporting `.py` files for the Streamlit demo.\n", "\n", "**Use case:** real-time document anomaly detection for bank underwriting.\n", "Land records, legal documents, financial statements.\n", "\n", "**Pipeline:**\n", "```\n", " Document -> Image forensics (ELA, copy-move, noise, EXIF)\n", " -> PDF structure (EOF count, fonts, producer)\n", " -> OCR + text rules (date monotonicity, math, IFSC)\n", " -> Random Forest (forensic feature blend)\n", " -> CNN (MobileNetV2 on CASIA v2)\n", " -> Risk band + Insights + Audit JSON + PDF report\n", "```\n", "\n", "**100% open-source, no paid APIs, no LLM calls.** Runs on a laptop CPU.\n", "GPU only required for the optional CNN training section.\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 0. Environment auto-detection\n", "\n", "Detects whether you are on Colab or local; auto-installs deps if Colab.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sys, os, platform\n", "IS_COLAB = 'google.colab' in sys.modules\n", "IS_WINDOWS = platform.system() == 'Windows'\n", "print('Colab:', IS_COLAB, ' Windows:', IS_WINDOWS)\n", "\n", "# One-shot install (skip if you already pip-installed locally)\n", "if IS_COLAB:\n", " !apt-get -qq install -y tesseract-ocr\n", " %pip install --quiet \\\n", " numpy pandas matplotlib seaborn scikit-image scikit-learn joblib \\\n", " opencv-python-headless pillow pytesseract pdfplumber pymupdf pikepdf \\\n", " imagehash exifread python-dateutil kaggle reportlab\n", "print('Setup complete.')\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import io, json, re, math, hashlib, shutil, tempfile, warnings\n", "from pathlib import Path\n", "from datetime import datetime\n", "from collections import Counter\n", "\n", "import numpy as np\n", "import pandas as pd\n", "import matplotlib.pyplot as plt\n", "from PIL import Image, ImageChops, ImageEnhance, ImageDraw, ImageFont, ImageFilter\n", "import cv2\n", "import fitz # PyMuPDF\n", "import pdfplumber\n", "import pikepdf\n", "import pytesseract\n", "\n", "warnings.filterwarnings('ignore')\n", "plt.rcParams['figure.figsize'] = (10, 6)\n", "\n", "# Auto-detect Tesseract on Windows / Mac / Linux\n", "TESSERACT_OK = False\n", "for c in [shutil.which('tesseract'),\n", " r'C:\\\\Program Files\\\\Tesseract-OCR\\\\tesseract.exe',\n", " r'C:\\\\Program Files (x86)\\\\Tesseract-OCR\\\\tesseract.exe',\n", " os.path.expanduser(r'~\\\\AppData\\\\Local\\\\Programs\\\\Tesseract-OCR\\\\tesseract.exe')]:\n", " if c and os.path.isfile(c):\n", " pytesseract.pytesseract.tesseract_cmd = c\n", " TESSERACT_OK = True\n", " print('Tesseract:', c)\n", " break\n", "if not TESSERACT_OK:\n", " print('Tesseract not found. OCR-based checks will be skipped.')\n", " print('Windows install: https://github.com/UB-Mannheim/tesseract/wiki')\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 1. Datasets\n", "\n", "Folder layout the notebook expects:\n", "```\n", "data/\n", " images/originals/ <-- genuine scans\n", " images/tampered/ <-- forged scans\n", " pdfs/originals/\n", " pdfs/tampered/\n", " statements/\n", "```\n", "\n", "Three ways to populate `data/`:\n", "1. **Synthetic generator** (next cell) - 130 docs each, no downloads, runs in ~3 min\n", "2. **Kaggle CASIA v2** - the 12k-image industry benchmark (see cell 1.3)\n", "3. **Manual datasets** - MICC-F220, CoMoFoD, ICDAR Find-It, Tobacco-3482 (see DATASETS.md)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "DATA = Path('data')\n", "for sub in ['images/originals', 'images/tampered',\n", " 'pdfs/originals', 'pdfs/tampered', 'statements']:\n", " (DATA / sub).mkdir(parents=True, exist_ok=True)\n", "print('Folders ready under', DATA.resolve())\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 1.1 Synthetic banking-document generator\n", "\n", "Produces realistic land records, loan agreements, and bank statements.\n", "Tampering variants: copy-move, text-edit, splice, compression-after-edit.\n", "Resumable - skips existing files.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import random\n", "random.seed(42); np.random.seed(42)\n", "\n", "FIRST = ['RAMESH','SURESH','AMIT','PRIYA','ANITA','VIKAS','POOJA','RAHUL',\n", " 'DEEPAK','SUNITA','ARJUN','MEENA','KIRAN','NEHA','SANJAY','GEETA']\n", "LAST = ['KUMAR','SHARMA','VERMA','SINGH','GUPTA','PATEL','REDDY','RAO',\n", " 'NAIR','JOSHI','MEHTA','AGGARWAL','BANERJEE','MISHRA']\n", "VILLAGES = ['NARAYANPUR','RAMGARH','BHIWANI','KISHANGARH','SITAPUR','JAGADHRI']\n", "BANKS = ['State Bank of India','HDFC Bank','ICICI Bank','Axis Bank',\n", " 'Punjab National Bank','Bank of Baroda','Canara Bank']\n", "IFSC_PFX = ['SBIN','HDFC','ICIC','UTIB','PUNB','BARB','CNRB']\n", "\n", "def rand_name(): return f'{random.choice(FIRST)} {random.choice(LAST)}'\n", "def rand_date(): return f'{random.randint(1,28):02d}-{random.randint(1,12):02d}-{random.randint(2018,2024)}'\n", "def rand_amount(low=100000, high=10000000): return (random.randint(low,high)//1000)*1000\n", "def rand_account():return ''.join(str(random.randint(0,9)) for _ in range(random.randint(11,14)))\n", "def rand_ifsc(): return f\"{random.choice(IFSC_PFX)}0{''.join(random.choice('0123456789ABCDEF') for _ in range(6))}\"\n", "def fmt_inr(a):\n", " s = str(a)[::-1]; parts=[s[:3]]; s=s[3:]\n", " while s: parts.append(s[:2]); s=s[2:]\n", " return 'Rs ' + ','.join(parts)[::-1]\n", "\n", "def get_fonts():\n", " for p in ['/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf',\n", " 'DejaVuSans.ttf', 'arial.ttf']:\n", " try:\n", " return (ImageFont.truetype(p,22), ImageFont.truetype(p,16),\n", " ImageFont.truetype(p,14))\n", " except OSError: continue\n", " f = ImageFont.load_default(); return f,f,f\n", "BIG, MID, SMALL = get_fonts()\n", "print('Helpers ready.')\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def make_land_record():\n", " img = Image.new('RGB', (900,600), 'white'); d = ImageDraw.Draw(img)\n", " d.rectangle([20,20,880,580], outline='black', width=2)\n", " fields = dict(survey=f'{random.randint(50,300)}/{random.randint(1,9)}',\n", " owner=rand_name(), area=f'{random.uniform(0.1,5):.2f} hectares',\n", " village=random.choice(VILLAGES), date=rand_date(),\n", " amount=fmt_inr(rand_amount()))\n", " d.text((40,40),'GOVERNMENT OF INDIA - LAND RECORD',font=BIG,fill='black')\n", " for i,(k,v) in enumerate([('Survey No', fields['survey']),\n", " ('Owner', fields['owner']),\n", " ('Area', fields['area']),\n", " ('Village', fields['village']),\n", " ('Date', fields['date']),\n", " ('Stamp value', fields['amount'])]):\n", " d.text((40, 90+i*30), f'{k:11s}: {v}', font=MID, fill='black')\n", " sx,sy = random.randint(520,600), random.randint(370,410)\n", " d.rectangle([sx,sy,sx+250,sy+140], outline='black', width=1)\n", " d.text((sx+30,sy+30),'OFFICIAL SEAL',font=MID,fill='black')\n", " d.text((sx+30,sy+60),'Tehsildar / Patwari',font=MID,fill='black')\n", " fields['seal_box']=(sx,sy,sx+250,sy+140)\n", " fields['amount_pos']=(170,256,380,285)\n", " return img, fields\n", "\n", "def make_loan_agreement():\n", " img = Image.new('RGB',(900,700),'white'); d = ImageDraw.Draw(img)\n", " d.rectangle([20,20,880,680], outline='black', width=2)\n", " fields = dict(borrower=rand_name(), principal=fmt_inr(rand_amount(500000,8000000)),\n", " tenure=f'{random.choice([36,60,84,120,180])} months',\n", " rate=f'{random.uniform(6.5,12.5):.2f}% p.a.',\n", " date=rand_date(), bank=random.choice(BANKS),\n", " account=rand_account(), ifsc=rand_ifsc())\n", " d.text((40,40),'LOAN AGREEMENT',font=BIG,fill='black')\n", " for i,(k,v) in enumerate([('Borrower',fields['borrower']),\n", " ('Principal',fields['principal']),\n", " ('Tenure',fields['tenure']),\n", " ('Rate',fields['rate']),\n", " ('Date',fields['date']),\n", " ('Bank',fields['bank']),\n", " ('A/c No',fields['account']),\n", " ('IFSC',fields['ifsc'])]):\n", " d.text((40,110+i*35), f'{k:11s}: {v}', font=MID, fill='black')\n", " sx,sy=560,520\n", " d.rectangle([sx,sy,sx+260,sy+120], outline='black', width=1)\n", " d.text((sx+20,sy+20),'AUTHORISED SIGNATORY',font=SMALL,fill='black')\n", " fields['sig_box']=(sx,sy,sx+260,sy+120)\n", " fields['principal_pos']=(170,141,380,170)\n", " return img, fields\n", "\n", "def make_bank_statement():\n", " img = Image.new('RGB',(900,800),'white'); d = ImageDraw.Draw(img)\n", " d.rectangle([20,20,880,780], outline='black', width=2)\n", " d.text((40,40), random.choice(BANKS).upper(), font=BIG, fill='black')\n", " d.text((40,80), f'Account Holder : {rand_name()}', font=MID, fill='black')\n", " d.text((40,110), f'Account No : {rand_account()}', font=MID, fill='black')\n", " d.text((40,140), f'IFSC : {rand_ifsc()}', font=MID, fill='black')\n", " d.line([(40,215),(860,215)], fill='black', width=1)\n", " d.text((50,220),'Date',font=MID,fill='black')\n", " d.text((180,220),'Narration',font=MID,fill='black')\n", " d.text((500,220),'Debit',font=MID,fill='black')\n", " d.text((620,220),'Credit',font=MID,fill='black')\n", " d.line([(40,250),(860,250)], fill='black', width=1)\n", " bal=random.randint(20000,200000); y=260\n", " for _ in range(random.randint(8,14)):\n", " date=f'{random.randint(1,28):02d}-04-2024'\n", " narr=random.choice(['UPI Transfer','ATM Withdrawal','Salary Credit','Cheque Deposit','EMI Debit'])\n", " is_cr=random.random()>0.55; amt=random.randint(1000,50000)\n", " bal = bal+amt if is_cr else bal-amt\n", " d.text((50,y),date,font=SMALL,fill='black')\n", " d.text((180,y),narr,font=SMALL,fill='black')\n", " d.text((500,y),'' if is_cr else f'{amt:,}',font=SMALL,fill='black')\n", " d.text((620,y),f'{amt:,}' if is_cr else '',font=SMALL,fill='black')\n", " y += 28\n", " return img, {}\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def tamper_copy_move(img, fields, kind):\n", " arr = np.array(img)\n", " box = fields.get('seal_box') or fields.get('sig_box') or (600,400,850,540)\n", " x0,y0,x1,y1 = box\n", " patch = arr[y0:y1, x0:x1].copy()\n", " tx = max(40, x0 - 300); ty = y0\n", " if tx + (x1-x0) < arr.shape[1] and ty + (y1-y0) < arr.shape[0]:\n", " arr[ty:ty+(y1-y0), tx:tx+(x1-x0)] = patch\n", " return Image.fromarray(arr)\n", "\n", "def tamper_text_edit(img, fields, kind):\n", " pos = fields.get('amount_pos') or fields.get('principal_pos') or (600,80,800,110)\n", " d = ImageDraw.Draw(img)\n", " d.rectangle(pos, fill='white')\n", " d.text((pos[0]+5, pos[1]+5), fmt_inr(rand_amount(1000000,50000000)),\n", " font=MID, fill='black')\n", " return img\n", "\n", "def tamper_splice(img, fields, kind, donor):\n", " arr=np.array(img); donor_arr=np.array(donor.resize(img.size))\n", " h,w=arr.shape[:2]; bh,bw=80,200\n", " x=random.randint(40,w-bw-40); y=random.randint(300,h-bh-40)\n", " arr[y:y+bh, x:x+bw] = donor_arr[y:y+bh, x:x+bw]\n", " return Image.fromarray(arr)\n", "\n", "def tamper_compression(img):\n", " buf=io.BytesIO(); img.save(buf,'JPEG',quality=random.randint(35,60)); buf.seek(0)\n", " return Image.open(buf).convert('RGB')\n", "\n", "def add_scan_noise(img):\n", " arr=np.array(img).astype(np.float32)\n", " arr=np.clip(arr+np.random.normal(0,3,arr.shape),0,255).astype(np.uint8)\n", " return Image.fromarray(arr)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "GENERATORS = {'land': make_land_record, 'agreement': make_loan_agreement,\n", " 'statement': make_bank_statement}\n", "COUNTS = {'land': 30, 'agreement': 20, 'statement': 15} # adjust as needed\n", "\n", "def generate_dataset(counts=COUNTS):\n", " print('Generating genuine documents...')\n", " genuine = []\n", " for kind, n in counts.items():\n", " for i in range(n):\n", " p = DATA/'images/originals'/f'{kind}_{i:03d}.png'\n", " if p.exists():\n", " try: img=Image.open(p).convert('RGB')\n", " except: p.unlink(missing_ok=True); img,fields=GENERATORS[kind]()\n", " else: _,fields=GENERATORS[kind]()\n", " else:\n", " img,fields = GENERATORS[kind]()\n", " img = add_scan_noise(img); img.save(p)\n", " genuine.append((kind, img.copy(), fields))\n", " print(f' {sum(counts.values())} originals on disk.')\n", " print('Generating tampered documents...')\n", " tampers = ['copy_move','text_edit','splice','compression_after_edit']\n", " nt = 0\n", " for kind, n in counts.items():\n", " for i in range(n):\n", " if list((DATA/'images/tampered').glob(f'{kind}_{i:03d}_*.png')):\n", " continue\n", " img,fields = GENERATORS[kind]()\n", " t = random.choice(tampers)\n", " if t=='copy_move': out = tamper_copy_move(img, fields, kind)\n", " elif t=='text_edit': out = tamper_text_edit(img, fields, kind)\n", " elif t=='splice': out = tamper_splice(img, fields, kind, random.choice(genuine)[1])\n", " else: out = tamper_compression(tamper_text_edit(img, fields, kind))\n", " out = add_scan_noise(out)\n", " out.save(DATA/'images/tampered'/f'{kind}_{i:03d}_{t}.png')\n", " nt += 1\n", " print(f' {nt} new tampered images written.')\n", "\n", "# Run generator (idempotent - skip cells you already ran)\n", "generate_dataset()\n", "\n", "# Show one genuine + one tampered\n", "samples = sorted((DATA/'images/originals').glob('land_*.png'))[:1]\n", "tsamples = sorted((DATA/'images/tampered').glob('land_*.png'))[:1]\n", "if samples and tsamples:\n", " fig, ax = plt.subplots(1,2, figsize=(14,5))\n", " ax[0].imshow(Image.open(samples[0])); ax[0].set_title('Genuine'); ax[0].axis('off')\n", " ax[1].imshow(Image.open(tsamples[0])); ax[1].set_title(f'Tampered ({tsamples[0].name.split(\"_\",2)[2]})'); ax[1].axis('off')\n", " plt.show()\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 1.2 Demo PDFs (for the PDF forensic detectors)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def make_demo_pdfs(n=15):\n", " for i in range(n):\n", " op = DATA/'pdfs/originals'/f'agreement_{i:03d}.pdf'\n", " tp = DATA/'pdfs/tampered'/f'agreement_{i:03d}_tampered.pdf'\n", " if op.exists() and tp.exists(): continue\n", " doc = fitz.open(); page = doc.new_page()\n", " text = (f'LOAN AGREEMENT\\n\\nBorrower : {rand_name()}\\n'\n", " f'Principal: {fmt_inr(rand_amount())}\\nTenure : 60 months\\n'\n", " f'Rate : 8.5% p.a.\\nDate : {rand_date()}')\n", " page.insert_text((72,72), text, fontsize=14)\n", " doc.set_metadata({'producer':'PyMuPDF','creator':'PyMuPDF'})\n", " doc.save(op); doc.close()\n", " doc = fitz.open(op); page = doc[0]\n", " page.draw_rect(fitz.Rect(150,102,360,122), color=(1,1,1), fill=(1,1,1))\n", " page.insert_text((150,118), fmt_inr(rand_amount(10000000,90000000)),\n", " fontsize=14, fontname='helv')\n", " doc.set_metadata({'producer':random.choice(['iLovePDF','Smallpdf','PDFescape','Sejda']),\n", " 'creator':'PyMuPDF'})\n", " doc.save(tp, deflate=True); doc.close()\n", "make_demo_pdfs()\n", "print('PDFs ready.')\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 1.3 (Optional) Kaggle CASIA v2 download\n", "\n", "Adds 12,000 real tampered/genuine images on top of your synthetic ones.\n", "Requires `kaggle.json` (https://www.kaggle.com/settings -> Create New API Token).\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "USE_CASIA = False # flip True after placing kaggle.json\n", "\n", "if USE_CASIA:\n", " if IS_COLAB:\n", " from google.colab import files\n", " if not os.path.exists('/root/.kaggle/kaggle.json'):\n", " up = files.upload() # browse-and-select\n", " os.makedirs('/root/.kaggle', exist_ok=True)\n", " for n in up:\n", " if n.endswith('.json'):\n", " shutil.copy(n, '/root/.kaggle/kaggle.json')\n", " os.chmod('/root/.kaggle/kaggle.json', 0o600)\n", " break\n", " !kaggle datasets download -d divg07/casia-20-image-tampering-detection-dataset \\\n", " -p data/images --unzip --force\n", " # rename Au/Tp -> originals/tampered\n", " for src, dst in [('Au','originals'),('Tp','tampered')]:\n", " for cand in [f'data/images/{src}', f'data/images/CASIA2/{src}']:\n", " if os.path.isdir(cand) and not os.path.isdir(f'data/images/{dst}'):\n", " shutil.move(cand, f'data/images/{dst}'); break\n", " print('CASIA v2 ready.')\n", "else:\n", " print('USE_CASIA = False - skipping. Synthetic data only.')\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 2. Image forensics detectors\n", "\n", "Four classical techniques, none requiring training:\n", "- **Error Level Analysis** - re-save at known JPEG quality; tampered regions diverge\n", "- **Copy-move** - ORB keypoint matching finds duplicated regions\n", "- **Noise inconsistency** - per-block Laplacian variance for splice detection\n", "- **EXIF sanity** - missing metadata, photo-editor fingerprints, time mismatches\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def error_level_analysis(path, quality=90, scale=15):\n", " orig = Image.open(path).convert('RGB')\n", " buf = io.BytesIO(); orig.save(buf,'JPEG',quality=quality); buf.seek(0)\n", " resaved = Image.open(buf)\n", " diff = ImageChops.difference(orig, resaved)\n", " max_diff = max([e[1] for e in diff.getextrema()]) or 1\n", " ela = ImageEnhance.Brightness(diff).enhance(scale * 255 / max_diff)\n", " return ela, float(np.array(diff).mean())\n", "\n", "def copy_move_detect(path, min_dist=40, max_matches=80):\n", " img = cv2.imread(str(path))\n", " if img is None: return None, 0, []\n", " gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)\n", " orb = cv2.ORB_create(nfeatures=2000)\n", " kp, des = orb.detectAndCompute(gray, None)\n", " if des is None or len(kp)<10: return img, 0, []\n", " matches = cv2.BFMatcher(cv2.NORM_HAMMING).knnMatch(des, des, k=10)\n", " good = []\n", " for ml in matches:\n", " for m in ml[1:]:\n", " p1,p2 = kp[m.queryIdx].pt, kp[m.trainIdx].pt\n", " d = math.hypot(p1[0]-p2[0], p1[1]-p2[1])\n", " if d > min_dist and m.distance < 40: good.append((p1,p2,d))\n", " good = good[:max_matches]\n", " out = img.copy()\n", " for p1,p2,_ in good:\n", " cv2.line(out, tuple(map(int,p1)), tuple(map(int,p2)), (0,0,255), 1)\n", " cv2.circle(out, tuple(map(int,p1)), 3, (0,255,0), -1)\n", " cv2.circle(out, tuple(map(int,p2)), 3, (0,255,0), -1)\n", " return out, len(good), good\n", "\n", "def noise_inconsistency(path, block=32):\n", " img = cv2.imread(str(path), cv2.IMREAD_GRAYSCALE)\n", " if img is None: return np.zeros((1,1)), 0.0\n", " H,W = img.shape; Hc,Wc = (H//block)*block, (W//block)*block\n", " if Hc==0 or Wc==0: return np.zeros((1,1)), 0.0\n", " img = img[:Hc,:Wc]\n", " lap = cv2.Laplacian(img, cv2.CV_64F)\n", " blocks = lap.reshape(Hc//block,block,Wc//block,block).transpose(0,2,1,3).reshape(-1,block*block)\n", " var = blocks.var(axis=1)\n", " z = (var - var.mean()) / (var.std() + 1e-9)\n", " return np.abs(z).reshape(Hc//block, Wc//block), float((np.abs(z)>2.5).sum()/max(1,len(z)))\n", "\n", "def exif_sanity(path):\n", " try: exif = Image.open(path).getexif()\n", " except Exception: return ['cannot read image']\n", " if not exif: return ['no EXIF metadata (re-saved or stripped)']\n", " tags = {Image.ExifTags.TAGS.get(k,k):v for k,v in exif.items()}\n", " flags = []; sw = str(tags.get('Software','')).lower()\n", " for bad in ['photoshop','gimp','paint','snapseed','picsart']:\n", " if bad in sw: flags.append('edited with '+bad)\n", " if 'DateTimeOriginal' in tags and 'DateTime' in tags:\n", " if tags['DateTimeOriginal'] != tags['DateTime']:\n", " flags.append('modified-time differs from original-time')\n", " return flags or ['exif clean']\n", "\n", "print('Image forensics ready.')\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 2.1 Visual smoke test\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "originals = sorted((DATA/'images/originals').glob('land_*.png'))[:1]\n", "tampered = sorted((DATA/'images/tampered').glob('land_*.png'))[:1]\n", "\n", "for label, path in [('Genuine', originals[0]), ('Tampered', tampered[0])]:\n", " fig, ax = plt.subplots(1, 3, figsize=(16, 4))\n", " ax[0].imshow(Image.open(path)); ax[0].set_title(f'{label} - source'); ax[0].axis('off')\n", " ela, s = error_level_analysis(path)\n", " ax[1].imshow(ela); ax[1].set_title(f'ELA (score={s:.2f})'); ax[1].axis('off')\n", " viz, n, _ = copy_move_detect(path)\n", " ax[2].imshow(cv2.cvtColor(viz, cv2.COLOR_BGR2RGB))\n", " ax[2].set_title(f'Copy-move ({n} matches)'); ax[2].axis('off')\n", " plt.show()\n", " print(f'{label}: EXIF -> {exif_sanity(path)}')\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 3. PDF forensics detectors\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def pdf_structural_audit(path):\n", " raw = Path(path).read_bytes(); eofs = raw.count(b'%%EOF')\n", " with fitz.open(path) as d:\n", " info = d.metadata or {}; n_pages = d.page_count\n", " flags = []\n", " if eofs > 1: flags.append(f'{eofs} EOF markers (incremental updates)')\n", " prod = (info.get('producer') or '').lower()\n", " crt = (info.get('creator') or '').lower()\n", " if prod and crt and prod != crt:\n", " flags.append(f'producer/creator differ: {prod} vs {crt}')\n", " for t in ['ilovepdf','smallpdf','pdfescape','sejda','foxit phantom']:\n", " if t in prod or t in crt: flags.append('edited via consumer tool: '+t)\n", " return {'pages':n_pages, 'eof_markers':eofs, 'metadata':info,\n", " 'flags': flags or ['clean']}\n", "\n", "def pdf_font_audit(path):\n", " fonts = []\n", " with fitz.open(path) as d:\n", " for page in d: fonts.append({f[3] for f in page.get_fonts()})\n", " allf = set().union(*fonts) if fonts else set()\n", " return {'fonts': sorted(allf),\n", " 'flags': ['unusually high font count: '+str(len(allf))] if len(allf)>4 else ['ok']}\n", "\n", "import pprint\n", "for label, p in [('Genuine', DATA/'pdfs/originals/agreement_000.pdf'),\n", " ('Tampered', DATA/'pdfs/tampered/agreement_000_tampered.pdf')]:\n", " if p.exists():\n", " print(f'\\n=== {label} ==='); pprint.pp(pdf_structural_audit(p))\n", " print('Fonts:', pdf_font_audit(p))\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 4. OCR + text-level rules\n", "\n", "Date monotonicity, amount sanity, IFSC format, account-without-IFSC,\n", "round-number anomalies. Skipped gracefully if Tesseract isn't installed.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "AMT_RE = re.compile(r'(? ds[i+1] for i in range(len(ds)-1)):\n", " flags.append('dates not monotonic')\n", " except Exception: flags.append('unparseable dates')\n", " if amts:\n", " big = [a for a in amts if a >= 100000 and a % 100000 == 0]\n", " if len(big) > 3: flags.append(f'{len(big)} suspiciously round large amounts')\n", " if accs and not ifsc: flags.append('account number present but no IFSC')\n", " return {'n_dates':len(dates), 'n_amounts':len(amts),\n", " 'n_ifsc':len(ifsc), 'n_accounts':len(accs),\n", " 'flags': flags or ['ok']}\n", "\n", "print('OCR + text rules ready.')\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 5. Anomaly scoring + risk band + insights\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "WEIGHTS = {'ela':0.20, 'copy_move':0.25, 'noise':0.15, 'exif':0.10,\n", " 'pdf_struct':0.15, 'text_rules':0.10, 'math':0.05}\n", "\n", "def band(s):\n", " return ('LOW' if s<0.25 else 'MEDIUM' if s<0.50 else\n", " 'HIGH' if s<0.75 else 'CRITICAL')\n", "\n", "INSIGHT_RULES = [\n", " ('copy_move',0.4,'Possible copy-paste forgery: repeated visual region. Inspect seal/signature area.'),\n", " ('ela', 0.4,'Compression artefacts inconsistent with a single-source scan. Likely re-saved after edits.'),\n", " ('noise', 0.4,'Localised noise inconsistency - common in image splicing.'),\n", " ('exif', 0.4,'Image metadata indicates edits in a photo-editor or stripped EXIF.'),\n", " ('pdf_struct',0.4,'PDF structural anomalies (incremental edits or consumer-tool fingerprint).'),\n", "]\n", "ACTIONS = {'LOW':'Proceed with standard underwriting.',\n", " 'MEDIUM':'Request additional verification documents.',\n", " 'HIGH':'Escalate to fraud-risk team; manual review mandatory.',\n", " 'CRITICAL':'Block file; trigger investigation workflow.'}\n", "\n", "def score_image(path):\n", " _, ela_s = error_level_analysis(path)\n", " _, ncm,_ = copy_move_detect(path)\n", " _, nr = noise_inconsistency(path)\n", " ef = exif_sanity(path)\n", " sub = {'ela':min(ela_s/25.0,1.0),\n", " 'copy_move':min(ncm/50.0,1.0),\n", " 'noise':min(nr*4,1.0),\n", " 'exif':0.0 if ef==['exif clean'] else 0.6}\n", " return sum(WEIGHTS[k]*v for k,v in sub.items()), sub, ef\n", "\n", "def generate_insights(score, sub, extra=None):\n", " bullets = [m for k,t,m in INSIGHT_RULES if sub.get(k,0) >= t]\n", " if extra:\n", " bullets += ['Flag: '+str(f) for f in extra if f not in ('exif clean','ok','clean')]\n", " if not bullets: bullets = ['No anomaly indicators above threshold.']\n", " return {'risk_score':round(score,3), 'risk_band':band(score),\n", " 'recommended_action':ACTIONS[band(score)], 'evidence':bullets}\n", "\n", "for label, path in [('Genuine', sorted((DATA/'images/originals').glob('land_*.png'))[0]),\n", " ('Tampered', sorted((DATA/'images/tampered').glob('land_*.png'))[0])]:\n", " s, sub, ef = score_image(path)\n", " print(f'{label:9s} score={s:.3f} band={band(s)} sub={sub} exif={ef}')\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 6. Random Forest training (auto-trains on data/images/)\n", "\n", "Extracts forensic features per image; trains a Random Forest;\n", "saves to `models/forgery_rf.joblib`. Loaded automatically by the pipeline.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from skimage.feature import graycomatrix, graycoprops\n", "from sklearn.ensemble import RandomForestClassifier\n", "from sklearn.model_selection import train_test_split\n", "from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score\n", "import joblib\n", "\n", "def extract_features(path):\n", " f = {}\n", " _, f['ela_mean'] = error_level_analysis(path)\n", " _, f['copy_move_matches'], _ = copy_move_detect(path)\n", " _, f['noise_outlier_ratio'] = noise_inconsistency(path)\n", " f['exif_clean'] = int(exif_sanity(path) == ['exif clean'])\n", " g = cv2.imread(str(path), cv2.IMREAD_GRAYSCALE)\n", " gs = cv2.resize(g, (256,256))\n", " glcm = graycomatrix(gs, [1], [0], 256, symmetric=True, normed=True)\n", " f['glcm_contrast'] = float(graycoprops(glcm,'contrast')[0,0])\n", " f['glcm_homogeneity'] = float(graycoprops(glcm,'homogeneity')[0,0])\n", " f['glcm_energy'] = float(graycoprops(glcm,'energy')[0,0])\n", " f['glcm_correlation'] = float(graycoprops(glcm,'correlation')[0,0])\n", " c = cv2.imread(str(path))\n", " if c is not None:\n", " for i,ch in enumerate(['b','g','r']):\n", " h = cv2.calcHist([c],[i],None,[32],[0,256]).flatten()\n", " h = h/(h.sum()+1e-9)\n", " f['hist_'+ch+'_entropy'] = float(-(h*np.log2(h+1e-9)).sum())\n", " return f\n", "\n", "def build_training_table(root=DATA/'images'):\n", " rows = []\n", " for label, sub in [(0,'originals'), (1,'tampered')]:\n", " for p in (root/sub).rglob('*'):\n", " if p.suffix.lower() in {'.png','.jpg','.jpeg'}:\n", " try:\n", " f = extract_features(p)\n", " f['label']=label; rows.append(f)\n", " except Exception as e:\n", " print('skip', p.name, '->', e)\n", " return pd.DataFrame(rows)\n", "\n", "train_df = build_training_table()\n", "print(f'Training rows: {len(train_df)} Classes: {train_df[\"label\"].value_counts().to_dict() if len(train_df) else \"none\"}')\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "MODEL_PATH = Path('models/forgery_rf.joblib')\n", "MODEL_PATH.parent.mkdir(exist_ok=True)\n", "\n", "if len(train_df) < 10:\n", " print('Not enough real data - skipping training. Run section 1.1 first.')\n", "else:\n", " FEATURES = [c for c in train_df.columns if c != 'label']\n", " X = train_df[FEATURES]; y = train_df['label']\n", " Xtr,Xte,ytr,yte = train_test_split(X,y, test_size=0.25, random_state=42, stratify=y)\n", " clf = RandomForestClassifier(n_estimators=300, max_depth=10,\n", " class_weight='balanced', random_state=42, n_jobs=-1)\n", " clf.fit(Xtr, ytr)\n", " pred = clf.predict(Xte); prob = clf.predict_proba(Xte)[:,1]\n", " print(classification_report(yte, pred, target_names=['genuine','tampered']))\n", " print('Confusion:'); print(confusion_matrix(yte, pred))\n", " try: print('ROC-AUC:', round(roc_auc_score(yte, prob), 3))\n", " except Exception: pass\n", " joblib.dump({'model':clf, 'features':FEATURES}, MODEL_PATH)\n", " # Feature importance plot\n", " imp = pd.Series(clf.feature_importances_, index=FEATURES).sort_values()\n", " plt.figure(figsize=(8,5))\n", " imp.plot.barh(color='steelblue'); plt.title('Forensic feature importance')\n", " plt.tight_layout(); plt.show()\n", " print(f'Model saved: {MODEL_PATH.resolve()}')\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def predict_with_model(path, model_path=MODEL_PATH):\n", " if not Path(model_path).exists(): return None\n", " b = joblib.load(model_path)\n", " f = extract_features(path)\n", " p = b['model'].predict_proba(pd.DataFrame([f])[b['features']])[0,1]\n", " return {'tamper_probability':round(float(p),3),\n", " 'verdict':'TAMPERED' if p>=0.5 else 'GENUINE'}\n", "\n", "if MODEL_PATH.exists():\n", " print('Genuine :', predict_with_model(sorted((DATA/'images/originals').glob('land_*.png'))[0]))\n", " print('Tampered:', predict_with_model(sorted((DATA/'images/tampered').glob('land_*.png'))[0]))\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 7. (Optional) CNN training on real CASIA v2\n", "\n", "Flip `TRAIN_CNN = True` once you have ~200+ real images per class.\n", "Trains MobileNetV2 in two phases (head only, then unfreezed top layers).\n", "Saves to `models/forgery_cnn.keras` + `forgery_cnn.meta.json`.\n", "Runs in ~25 min on Colab T4 GPU. Skip if running on a CPU laptop.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "TRAIN_CNN = False\n", "\n", "if TRAIN_CNN:\n", " if IS_COLAB: %pip install --quiet tensorflow\n", " import tensorflow as tf\n", " from tensorflow.keras import layers, Model\n", " print('TF:', tf.__version__, ' GPU:', tf.config.list_physical_devices('GPU'))\n", " IMG, BATCH = 224, 16\n", " train_ds = tf.keras.utils.image_dataset_from_directory(\n", " 'data/images', validation_split=0.2, subset='training', seed=42,\n", " image_size=(IMG,IMG), batch_size=BATCH, label_mode='binary')\n", " val_ds = tf.keras.utils.image_dataset_from_directory(\n", " 'data/images', validation_split=0.2, subset='validation', seed=42,\n", " image_size=(IMG,IMG), batch_size=BATCH, label_mode='binary')\n", " CLASS_NAMES = train_ds.class_names\n", " augment = tf.keras.Sequential([\n", " layers.RandomFlip('horizontal'), layers.RandomRotation(0.04),\n", " layers.RandomBrightness(0.15), layers.RandomContrast(0.15)])\n", " train_ds = train_ds.map(lambda x,y: (augment(x,training=True), y)).prefetch(tf.data.AUTOTUNE)\n", " val_ds = val_ds.cache().prefetch(tf.data.AUTOTUNE)\n", " base = tf.keras.applications.MobileNetV2(input_shape=(IMG,IMG,3),\n", " include_top=False, weights='imagenet')\n", " base.trainable = False\n", " inp = layers.Input(shape=(IMG,IMG,3))\n", " x = tf.keras.applications.mobilenet_v2.preprocess_input(inp)\n", " x = base(x, training=False)\n", " x = layers.GlobalAveragePooling2D()(x)\n", " x = layers.Dropout(0.35)(x)\n", " x = layers.Dense(256, activation='relu')(x)\n", " x = layers.Dropout(0.25)(x)\n", " out = layers.Dense(1, activation='sigmoid')(x)\n", " cnn = Model(inp, out)\n", " cnn.compile(optimizer=tf.keras.optimizers.Adam(1e-3),\n", " loss='binary_crossentropy',\n", " metrics=['accuracy', tf.keras.metrics.AUC(name='auc')])\n", " print('Phase 1: head-only training...')\n", " cnn.fit(train_ds, validation_data=val_ds, epochs=5)\n", " print('Phase 2: fine-tuning top of backbone...')\n", " base.trainable = True\n", " for l in base.layers[:-40]: l.trainable = False\n", " cnn.compile(optimizer=tf.keras.optimizers.Adam(1e-5),\n", " loss='binary_crossentropy',\n", " metrics=['accuracy', tf.keras.metrics.AUC(name='auc')])\n", " cnn.fit(train_ds, validation_data=val_ds, epochs=5)\n", " loss, acc, auc = cnn.evaluate(val_ds, verbose=0)\n", " print(f'Val loss {loss:.3f} acc {acc:.3f} AUC {auc:.3f}')\n", " Path('models').mkdir(exist_ok=True)\n", " cnn.save('models/forgery_cnn.keras')\n", " json.dump({'class_names':CLASS_NAMES, 'image_size':IMG, 'val_auc':float(auc),\n", " 'val_accuracy':float(acc)},\n", " open('models/forgery_cnn.meta.json','w'), indent=2)\n", " print('Saved: models/forgery_cnn.keras')\n", "else:\n", " print('TRAIN_CNN = False - skipping. Flip True + Colab GPU runtime to train.')\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "CNN_MODEL_PATH = Path('models/forgery_cnn.keras')\n", "CNN_META_PATH = Path('models/forgery_cnn.meta.json')\n", "_CNN = {'model':None, 'meta':None, 'tried':False}\n", "\n", "def _load_cnn():\n", " if _CNN['tried']: return _CNN['model'], _CNN['meta']\n", " _CNN['tried'] = True\n", " if not CNN_MODEL_PATH.exists(): return None, None\n", " try:\n", " import tensorflow as tf\n", " _CNN['model'] = tf.keras.models.load_model(CNN_MODEL_PATH)\n", " _CNN['meta'] = (json.loads(CNN_META_PATH.read_text())\n", " if CNN_META_PATH.exists() else {'image_size':224})\n", " except Exception as e: print('CNN load failed:', e)\n", " return _CNN['model'], _CNN['meta']\n", "\n", "def predict_with_cnn(path):\n", " m, meta = _load_cnn()\n", " if m is None: return None\n", " sz = meta.get('image_size', 224)\n", " arr = np.array(Image.open(path).convert('RGB').resize((sz,sz)))[None].astype(np.float32)\n", " p = float(m.predict(arr, verbose=0)[0,0])\n", " return {'tamper_probability':round(p,3),\n", " 'verdict':'TAMPERED' if p>=0.5 else 'GENUINE',\n", " 'model':'MobileNetV2 (CASIA v2 fine-tuned)',\n", " 'val_auc': meta.get('val_auc')}\n", "\n", "print('CNN inference ready (uses model when present).')\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 8. End-to-end pipeline\n", "\n", "Single `analyse_document(path)` call that:\n", "- Detects type (image vs PDF)\n", "- Runs all relevant detectors\n", "- Blends RF + CNN predictions if their models exist\n", "- Returns a complete audit dict\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def analyse_document(path):\n", " path = Path(path); ext = path.suffix.lower()\n", " r = {'file':str(path),\n", " 'analysed_at':datetime.utcnow().isoformat()+'Z',\n", " 'sha256':hashlib.sha256(path.read_bytes()).hexdigest()}\n", " if ext in ('.png','.jpg','.jpeg','.tif','.tiff','.bmp'):\n", " r['type'] = 'image'\n", " s, sub, ef = score_image(path)\n", " try:\n", " tr = text_rule_checks(ocr_text(path))\n", " sub['text_rules'] = 0.0 if tr['flags']==['ok'] else 0.5\n", " s = sum(WEIGHTS.get(k,0)*v for k,v in sub.items())\n", " except Exception as e: tr = {'error':str(e)}\n", " try:\n", " ml = predict_with_model(path)\n", " if ml is not None:\n", " s = 0.5*s + 0.5*ml['tamper_probability']\n", " r['ml_prediction'] = ml\n", " except Exception as e: r['ml_error'] = str(e)\n", " try:\n", " cnn = predict_with_cnn(path)\n", " if cnn is not None:\n", " w = max(0.4, min(0.7, (cnn.get('val_auc') or 0.85)))\n", " s = (1-w)*s + w*cnn['tamper_probability']\n", " r['cnn_prediction'] = cnn\n", " except Exception as e: r['cnn_error'] = str(e)\n", " r.update({'sub_scores':sub, 'exif_flags':ef, 'text_rules':tr,\n", " **generate_insights(s, sub, ef+tr.get('flags',[]))})\n", " elif ext == '.pdf':\n", " r['type'] = 'pdf'\n", " audit = pdf_structural_audit(path); fonts = pdf_font_audit(path)\n", " sub = {'pdf_struct':0.8 if audit['flags']!=['clean'] else 0.1,\n", " 'text_rules':0.6 if fonts['flags']!=['ok'] else 0.1}\n", " s = sum(WEIGHTS.get(k,0)*v for k,v in sub.items())\n", " r.update({'sub_scores':sub, 'pdf_audit':audit, 'font_audit':fonts,\n", " **generate_insights(s, sub, audit['flags']+fonts['flags'])})\n", " else: r['type']='unsupported'; r['error']='extension '+ext\n", " return r\n", "\n", "# Demo on 4 files\n", "for p in [sorted((DATA/'images/originals').glob('land_*.png'))[0],\n", " sorted((DATA/'images/tampered').glob('land_*.png'))[0],\n", " DATA/'pdfs/originals/agreement_000.pdf',\n", " DATA/'pdfs/tampered/agreement_000_tampered.pdf']:\n", " if p.exists():\n", " r = analyse_document(p)\n", " print(f'\\n--- {p.name} ---')\n", " print(f\" band: {r['risk_band']} score: {r['risk_score']} action: {r['recommended_action']}\")\n", " for e in r['evidence']: print(' *', e)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 9. Cross-document consistency check\n", "\n", "Upload 2+ docs for the same applicant; system extracts identity fields\n", "and flags mismatches in name, DOB, address, account, IFSC.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "NAME_RE = re.compile(r'(?:Name|Owner|Borrower|Holder|Account Holder)\\s*[:\\-]\\s*([A-Z][A-Z\\s.]{2,40})', re.I)\n", "DOB_RE = re.compile(r'(?:DOB|Date of Birth|Born)\\s*[:\\-]\\s*(\\d{1,2}[-/]\\d{1,2}[-/]\\d{2,4})', re.I)\n", "ADDR_RE = re.compile(r'(?:Address|Village|Residence)\\s*[:\\-]\\s*([A-Z0-9][A-Z0-9\\s,.\\-/]{3,80})', re.I)\n", "\n", "def _norm(s): return re.sub(r'\\s+', ' ', (s or '').strip().upper())\n", "\n", "def extract_identity_fields(path):\n", " if str(path).lower().endswith('.pdf'):\n", " with fitz.open(path) as d: text = '\\n'.join(p.get_text() for p in d)\n", " else: text = ocr_text(path)\n", " f = {k:None for k in ('name','dob','address','account','ifsc')}; f['amounts']=[]\n", " if not text: return f, text\n", " for k, rx in [('name',NAME_RE),('dob',DOB_RE),('address',ADDR_RE)]:\n", " m = rx.search(text)\n", " if m: f[k] = _norm(m.group(1))\n", " accs = ACC_RE.findall(text); ifsc = IFSC_RE.findall(text)\n", " if accs: f['account'] = accs[0]\n", " if ifsc: f['ifsc'] = ifsc[0]\n", " f['amounts'] = parse_amounts(text)\n", " return f, text\n", "\n", "def cross_doc_consistency(paths):\n", " if len(paths) < 2: return {'error':'need >=2 documents'}\n", " extracts = [{'file':str(p), 'fields':extract_identity_fields(p)[0]} for p in paths]\n", " field_results = {}\n", " from difflib import SequenceMatcher\n", " for field in ['name','dob','address','account','ifsc']:\n", " vals = [e['fields'].get(field) for e in extracts]\n", " present = [v for v in vals if v]\n", " if len(present) < 2:\n", " field_results[field] = {'status':'insufficient_data','values':vals,'similarity':None}\n", " continue\n", " sims = [SequenceMatcher(None, a, b).ratio()\n", " for i,a in enumerate(present) for b in present[i+1:]]\n", " ms = min(sims)\n", " status = 'match' if ms>=0.95 else 'likely_match' if ms>=0.75 else 'mismatch'\n", " field_results[field] = {'status':status,'values':vals,'similarity':round(ms,3)}\n", " mm = sum(1 for r in field_results.values() if r['status']=='mismatch')\n", " lm = sum(1 for r in field_results.values() if r['status']=='likely_match')\n", " rs = min(1.0, mm*0.5 + lm*0.2)\n", " return {'documents':extracts, 'field_results':field_results,\n", " 'mismatches':mm, 'likely_mismatches':lm,\n", " 'consistency_risk_score':round(rs,3), 'consistency_band':band(rs)}\n", "\n", "files = [sorted((DATA/'images/originals').glob('land_*.png'))[0],\n", " sorted((DATA/'images/originals').glob('agreement_*.png'))[0]]\n", "r = cross_doc_consistency(files)\n", "print('Band:', r['consistency_band'], ' Mismatches:', r['mismatches'])\n", "for f, v in r['field_results'].items():\n", " print(f' {f:10s} {v[\"status\"]:20s} sim={v[\"similarity\"]}')\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 10. Underwriter dashboard + batch audit\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def render_dashboard(report):\n", " fig, axes = plt.subplots(1, 2, figsize=(14, 5))\n", " sub = report.get('sub_scores', {})\n", " if sub:\n", " keys = list(sub.keys()); vals = list(sub.values())\n", " bars = axes[0].barh(keys, vals)\n", " for b,v in zip(bars, vals):\n", " b.set_color('green' if v<0.4 else 'orange' if v<0.7 else 'red')\n", " axes[0].set_xlim(0,1); axes[0].set_title('Sub-scores')\n", " axes[1].axis('off')\n", " cmap = {'LOW':'green','MEDIUM':'gold','HIGH':'orange','CRITICAL':'red'}\n", " risk = report.get('risk_band','N/A')\n", " axes[1].text(0.05, 0.85, f'RISK: {risk}', fontsize=22,\n", " color=cmap.get(risk,'black'), weight='bold')\n", " axes[1].text(0.05, 0.70, f'Score: {report.get(\"risk_score\",\"-\")}', fontsize=14)\n", " axes[1].text(0.05, 0.60, f'Action: {report.get(\"recommended_action\",\"-\")}', fontsize=11)\n", " y = 0.45\n", " for e in report.get('evidence', []):\n", " axes[1].text(0.05, y, '- '+e, fontsize=10, wrap=True); y -= 0.07\n", " plt.tight_layout(); plt.show()\n", "\n", "render_dashboard(analyse_document(sorted((DATA/'images/tampered').glob('land_*.png'))[0]))\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def batch_analyse(folder, out_csv='audit_log.csv'):\n", " folder = Path(folder); reports = []\n", " for p in folder.rglob('*'):\n", " if p.suffix.lower() in {'.png','.jpg','.jpeg','.pdf'}:\n", " try: reports.append(analyse_document(p))\n", " except Exception as e: reports.append({'file':str(p),'error':str(e)})\n", " df = pd.DataFrame([{'file':r.get('file'), 'type':r.get('type'),\n", " 'risk_score':r.get('risk_score'),\n", " 'risk_band':r.get('risk_band'),\n", " 'action':r.get('recommended_action')} for r in reports])\n", " df.to_csv(out_csv, index=False)\n", " return df\n", "\n", "audit = batch_analyse(DATA)\n", "audit.head(10)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 11. PDF audit report generator\n", "\n", "Bank-letterhead PDF with risk verdict, evidence, embedded heatmaps.\n", "Uses ReportLab.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%pip install --quiet reportlab\n", "from reportlab.lib.pagesizes import A4\n", "from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle\n", "from reportlab.lib.units import cm\n", "from reportlab.lib import colors\n", "from reportlab.platypus import (SimpleDocTemplate, Paragraph, Spacer, Table,\n", " TableStyle, Image as RLImage, PageBreak)\n", "from reportlab.lib.enums import TA_CENTER\n", "\n", "BAND_C = {'LOW':colors.HexColor('#16a34a'),'MEDIUM':colors.HexColor('#ca8a04'),\n", " 'HIGH':colors.HexColor('#ea580c'),'CRITICAL':colors.HexColor('#dc2626')}\n", "\n", "def build_pdf_report(report, source_path):\n", " source_path = Path(source_path); s = getSampleStyleSheet()\n", " s.add(ParagraphStyle('Title2', parent=s['Title'], fontSize=22,\n", " textColor=colors.HexColor('#1e3a8a')))\n", " s.add(ParagraphStyle('Mono', parent=s['Normal'], fontName='Courier',\n", " fontSize=8, textColor=colors.dimgray))\n", " buf = io.BytesIO()\n", " doc = SimpleDocTemplate(buf, pagesize=A4,\n", " leftMargin=2*cm, rightMargin=2*cm,\n", " topMargin=1.5*cm, bottomMargin=1.5*cm)\n", " story = [\n", " Paragraph('DOCSENTRY - DOCUMENT FORENSICS REPORT', s['Title2']),\n", " Paragraph('Confidential - For Underwriting Use Only', s['Normal']),\n", " Spacer(1, 0.4*cm),\n", " ]\n", " # metadata table\n", " meta = [['Field','Value'],\n", " ['Document', source_path.name],\n", " ['Type', report.get('type','-')],\n", " ['Analysed at', report.get('analysed_at','-')[:19].replace('T',' ')],\n", " ['SHA-256', report.get('sha256','-')[:32]+'...']]\n", " t = Table(meta, colWidths=[4*cm, 13*cm])\n", " t.setStyle(TableStyle([\n", " ('BACKGROUND',(0,0),(-1,0), colors.HexColor('#1e3a8a')),\n", " ('TEXTCOLOR',(0,0),(-1,0), colors.white),\n", " ('GRID',(0,0),(-1,-1), 0.4, colors.grey),\n", " ('FONTSIZE',(0,0),(-1,-1), 9)]))\n", " story += [t, Spacer(1, 0.4*cm)]\n", " # verdict box\n", " band_str = report.get('risk_band','UNKNOWN')\n", " bc = BAND_C.get(band_str, colors.grey)\n", " vt = Table([[Paragraph(f'{band_str}', s['Normal']),\n", " Paragraph(f'Risk score: {report.get(\"risk_score\",\"-\")}
'\n", " f'Action: {report.get(\"recommended_action\",\"-\")}', s['Normal'])]],\n", " colWidths=[5*cm, 12*cm])\n", " vt.setStyle(TableStyle([\n", " ('BACKGROUND',(0,0),(0,0), bc),\n", " ('BACKGROUND',(1,0),(1,0), colors.HexColor('#f1f5f9')),\n", " ('VALIGN',(0,0),(-1,-1),'MIDDLE'),\n", " ('TOPPADDING',(0,0),(-1,-1), 12),\n", " ('BOTTOMPADDING',(0,0),(-1,-1), 12),\n", " ('LEFTPADDING',(0,0),(-1,-1), 12),\n", " ('RIGHTPADDING',(0,0),(-1,-1), 12)]))\n", " story.append(vt)\n", " story.append(Spacer(1, 0.4*cm))\n", " # evidence\n", " story.append(Paragraph('Forensic evidence', s['Heading3']))\n", " for e in report.get('evidence', []):\n", " story.append(Paragraph('• '+e, s['Normal']))\n", " story.append(Spacer(1, 0.3*cm))\n", " story.append(Paragraph('Generated by DocSentry. Heuristic + ML ensemble. '\n", " 'Manual review required for HIGH/CRITICAL.', s['Mono']))\n", " doc.build(story)\n", " buf.seek(0); return buf.read()\n", "\n", "# Generate a sample report\n", "sample = sorted((DATA/'images/tampered').glob('land_*.png'))[0]\n", "r = analyse_document(sample)\n", "pdf_bytes = build_pdf_report(r, sample)\n", "Path('reports').mkdir(exist_ok=True)\n", "(Path('reports')/f'audit_{sample.stem}.pdf').write_bytes(pdf_bytes)\n", "print(f'Wrote: reports/audit_{sample.stem}.pdf ({len(pdf_bytes)} bytes)')\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 12. Export Streamlit demo files (forensics.py, app.py, audit_report.py)\n", "\n", "**This is the cell that links the notebook to the live web demo.**\n", "Run it once. It writes 3 files at the repo root with the same logic\n", "as above. After that:\n", "```\n", "streamlit run app.py\n", "```\n", "\n", "Re-run this cell any time you change the detector logic and want the app\n", "to pick up your changes. The cell is idempotent.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Section 12 - Generate forensics.py from in-notebook functions\n", "import inspect\n", "\n", "FORENSICS_HEADER = \"\"\"# Auto-generated from docsentry_master.ipynb. Edit notebook, not this file.\n", "import os, io, re, math, json, hashlib, shutil, warnings\n", "from pathlib import Path\n", "from datetime import datetime\n", "import numpy as np, pandas as pd\n", "from PIL import Image, ImageChops, ImageEnhance\n", "import cv2, fitz, pytesseract, joblib\n", "from skimage.feature import graycomatrix, graycoprops\n", "from difflib import SequenceMatcher\n", "warnings.filterwarnings('ignore')\n", "\n", "TESSERACT_OK = False\n", "for _c in [shutil.which('tesseract'),\n", " r'C:\\\\Program Files\\\\Tesseract-OCR\\\\tesseract.exe',\n", " r'C:\\\\Program Files (x86)\\\\Tesseract-OCR\\\\tesseract.exe',\n", " os.path.expanduser(r'~\\\\AppData\\\\Local\\\\Programs\\\\Tesseract-OCR\\\\tesseract.exe')]:\n", " if _c and os.path.isfile(_c):\n", " pytesseract.pytesseract.tesseract_cmd = _c\n", " TESSERACT_OK = True; break\n", "\n", "AMT_RE = re.compile(r'(?.big-risk{font-size:48px;font-weight:800;padding:14px 28px;\n", " border-radius:12px;color:white;text-align:center}.low{background:#16a34a}\n", " .medium{background:#ca8a04}.high{background:#ea580c}.critical{background:#dc2626}\n", " \"\"\", unsafe_allow_html=True)\n", "st.title(\":shield: DocSentry - Document Forensics\")\n", "st.caption(\"Real-time anomaly detection for underwriting.\")\n", "if not forensics.TESSERACT_OK:\n", " st.warning(\"Tesseract not installed - text-rule checks skipped.\")\n", "\n", "def risk_badge(b): st.markdown(f\"
{b}
\", unsafe_allow_html=True)\n", "def save(u):\n", " t = tempfile.NamedTemporaryFile(delete=False, suffix=Path(u.name).suffix)\n", " t.write(u.getbuffer()); t.close(); return Path(t.name)\n", "\n", "tab1, tab2, tab3 = st.tabs([\":mag: Single doc\", \":busts_in_silhouette: Cross-doc\", \":file_folder: Batch\"])\n", "\n", "with tab1:\n", " sd = Path(\"sample_data\")\n", " samples = [p for sub in (\"originals\",\"tampered\",\"pdfs\") for p in sorted((sd/sub).glob(\"*\")) if sd.exists()]\n", " opts = [\"(upload)\"] + [str(p.relative_to(sd)) for p in samples]\n", " pick = st.selectbox(\"Try a sample, or upload:\", opts)\n", " path = None\n", " if pick != \"(upload)\": path = sd / pick\n", " else:\n", " u = st.file_uploader(\"Upload\", type=[\"png\",\"jpg\",\"jpeg\",\"pdf\"])\n", " if u: path = save(u)\n", " if path:\n", " r = forensics.analyse_document(path)\n", " c1, c2 = st.columns([1,2])\n", " with c1: risk_badge(r[\"risk_band\"]); st.metric(\"Score\", f'{r[\"risk_score\"]:.3f}')\n", " with c2:\n", " st.info(r[\"recommended_action\"])\n", " for e in r[\"evidence\"]: st.markdown(\"- \" + e)\n", " st.image(str(path), use_container_width=True) if r[\"type\"]==\"image\" else None\n", " if r[\"type\"] == \"image\":\n", " ela, _ = forensics.error_level_analysis(path)\n", " viz, n, _ = forensics.copy_move_detect(path)\n", " t1, t2 = st.tabs([\"ELA\", f\"Copy-move ({n})\"])\n", " with t1: st.image(ela)\n", " with t2: st.image(cv2.cvtColor(viz, cv2.COLOR_BGR2RGB))\n", " if \"ml_prediction\" in r:\n", " ml = r[\"ml_prediction\"]; st.metric(\"RF verdict\", f\"{ml[\\'tamper_probability\\']:.1%}\")\n", " if \"cnn_prediction\" in r:\n", " cnn = r[\"cnn_prediction\"]; st.metric(\"CNN verdict\", f\"{cnn[\\'tamper_probability\\']:.1%}\")\n", " st.download_button(\"Audit JSON\", json.dumps(r, indent=2, default=str),\n", " file_name=f\"audit_{path.stem}.json\")\n", " try:\n", " pdf = build_pdf_report(r, path)\n", " st.download_button(\"Audit PDF\", pdf, file_name=f\"audit_{path.stem}.pdf\")\n", " except Exception as e: st.caption(f\"PDF report: {e}\")\n", "\n", "with tab2:\n", " ups = st.file_uploader(\"Upload 2+ docs\", type=[\"png\",\"jpg\",\"pdf\"], accept_multiple_files=True)\n", " if ups and len(ups) >= 2:\n", " paths = [save(u) for u in ups]\n", " r = forensics.cross_doc_consistency(paths)\n", " risk_badge(r[\"consistency_band\"])\n", " st.metric(\"Mismatches\", r[\"mismatches\"])\n", " rows = []\n", " for f, v in r[\"field_results\"].items():\n", " rows.append({\"Field\":f, \"Status\":v[\"status\"], \"Similarity\":v[\"similarity\"]})\n", " st.dataframe(pd.DataFrame(rows), use_container_width=True)\n", "\n", "with tab3:\n", " default = Path.cwd() / (\"sample_data\" if not (Path.cwd()/\"data\").exists() else \"data\")\n", " folder = st.text_input(\"Folder\", value=str(default))\n", " if st.button(\"Audit\"):\n", " root = Path(folder); reports = []\n", " for p in root.rglob(\"*\"):\n", " if p.suffix.lower() in {\".png\",\".jpg\",\".jpeg\",\".pdf\"}:\n", " try: reports.append(forensics.analyse_document(p))\n", " except Exception as e: reports.append({\"file\":str(p),\"error\":str(e)})\n", " df = pd.DataFrame([{\"file\":r.get(\"file\"), \"band\":r.get(\"risk_band\"),\n", " \"score\":r.get(\"risk_score\")} for r in reports])\n", " st.dataframe(df, use_container_width=True)\n", " st.download_button(\"CSV\", df.to_csv(index=False), file_name=\"audit_log.csv\")\n", "'''\n", "Path('app.py').write_text(APP_PY)\n", "print('Wrote app.py')\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "AUDIT_PY = '''\"\"\"audit_report.py - PDF report (auto-generated from notebook)\"\"\"\n", "import io\n", "from pathlib import Path\n", "from reportlab.lib.pagesizes import A4\n", "from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle\n", "from reportlab.lib.units import cm\n", "from reportlab.lib import colors\n", "from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle\n", "\n", "BAND_C = {\"LOW\":colors.HexColor(\"#16a34a\"),\"MEDIUM\":colors.HexColor(\"#ca8a04\"),\n", " \"HIGH\":colors.HexColor(\"#ea580c\"),\"CRITICAL\":colors.HexColor(\"#dc2626\")}\n", "\n", "def build_pdf_report(report, source_path):\n", " source_path = Path(source_path); s = getSampleStyleSheet()\n", " s.add(ParagraphStyle(\"Title2\", parent=s[\"Title\"], fontSize=22,\n", " textColor=colors.HexColor(\"#1e3a8a\")))\n", " buf = io.BytesIO()\n", " doc = SimpleDocTemplate(buf, pagesize=A4,\n", " leftMargin=2*cm, rightMargin=2*cm,\n", " topMargin=1.5*cm, bottomMargin=1.5*cm)\n", " story = [Paragraph(\"DOCSENTRY - DOCUMENT FORENSICS REPORT\", s[\"Title2\"]),\n", " Spacer(1, 0.3*cm)]\n", " meta = [[\"Document\", source_path.name],\n", " [\"Type\", report.get(\"type\",\"-\")],\n", " [\"Analysed at\", report.get(\"analysed_at\",\"-\")[:19].replace(\"T\",\" \")],\n", " [\"SHA-256\", report.get(\"sha256\",\"-\")[:32]+\"...\"]]\n", " t = Table(meta, colWidths=[4*cm, 13*cm])\n", " t.setStyle(TableStyle([(\"GRID\",(0,0),(-1,-1),0.4,colors.grey),(\"FONTSIZE\",(0,0),(-1,-1),9)]))\n", " story += [t, Spacer(1, 0.4*cm)]\n", " band_str = report.get(\"risk_band\",\"UNKNOWN\")\n", " bc = BAND_C.get(band_str, colors.grey)\n", " vt = Table([[Paragraph(f\\'{band_str}\\', s[\"Normal\"]),\n", " Paragraph(f\\'Risk score: {report.get(\"risk_score\",\"-\")}
Action: {report.get(\"recommended_action\",\"-\")}\\', s[\"Normal\"])]],\n", " colWidths=[5*cm, 12*cm])\n", " vt.setStyle(TableStyle([(\"BACKGROUND\",(0,0),(0,0), bc),\n", " (\"BACKGROUND\",(1,0),(1,0), colors.HexColor(\"#f1f5f9\")),\n", " (\"VALIGN\",(0,0),(-1,-1),\"MIDDLE\"),\n", " (\"TOPPADDING\",(0,0),(-1,-1),12),(\"BOTTOMPADDING\",(0,0),(-1,-1),12)]))\n", " story.append(vt); story.append(Spacer(1, 0.4*cm))\n", " story.append(Paragraph(\"Forensic evidence\", s[\"Heading3\"]))\n", " for e in report.get(\"evidence\",[]): story.append(Paragraph(\"• \"+e, s[\"Normal\"]))\n", " doc.build(story)\n", " buf.seek(0); return buf.read()\n", "'''\n", "Path('audit_report.py').write_text(AUDIT_PY)\n", "# requirements + packages.txt for Streamlit Cloud\n", "Path('requirements.txt').write_text('\\\\n'.join([\n", " 'numpy','pandas','matplotlib','scikit-image','scikit-learn','joblib',\n", " 'opencv-python-headless','Pillow','pytesseract','pdfplumber','pymupdf',\n", " 'pikepdf','python-dateutil','streamlit','reportlab','tensorflow-cpu']))\n", "Path('packages.txt').write_text('tesseract-ocr\\\\nlibtesseract-dev\\\\n')\n", "print('Wrote audit_report.py, requirements.txt, packages.txt')\n", "print()\n", "print('Streamlit demo files are ready. Launch with:')\n", "print(' streamlit run app.py')\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 13. Launch the live app\n", "\n", "After running section 12, the supporting files exist at the repo root.\n", "Open a terminal in this folder and run:\n", "\n", "```\n", "streamlit run app.py\n", "```\n", "\n", "Or to deploy to Streamlit Community Cloud (free public URL):\n", "1. Push this folder to a public GitHub repo\n", "2. Connect at https://share.streamlit.io\n", "3. Pick the repo, main file `app.py`, click Deploy\n", "\n", "## 14. Where to go next\n", "\n", "- **Train CNN on real CASIA v2** - section 7, flip `TRAIN_CNN=True` on Colab GPU\n", "- **Add signature verification** - Siamese network for borrower signatures\n", "- **Wrap as FastAPI** - turn `analyse_document` into an HTTP endpoint\n", "- **Grad-CAM overlays** - show which pixels the CNN flagged\n", "\n", "**Everything in this notebook is free, runs CPU-only by default, and demos end-to-end without any paid API call.**\n" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "name": "python", "version": "3.10" }, "colab": { "provenance": [], "toc_visible": true } }, "nbformat": 4, "nbformat_minor": 5 }