""" app.py - Streamlit demo for real-time document anomaly detection Run with: streamlit run app.py Tabs: 1. Single-document analysis - drag-drop one file, see forensic verdict 2. Cross-document check - upload 2+ docs, check identity consistency 3. Batch audit - point at a folder, get an audit CSV """ import io import json import tempfile from pathlib import Path import streamlit as st import numpy as np import pandas as pd import cv2 from PIL import Image import matplotlib.pyplot as plt import forensics import compliance import tampering import fraud_ring from audit_report import build_pdf_report # Sprint 3 module # ------------------------------------------------------------- # Page config + global CSS # ------------------------------------------------------------- st.set_page_config(page_title="BankShield - Document Forensics", page_icon=":lock:", layout="wide") st.markdown(""" """, unsafe_allow_html=True) # ------------------------------------------------------------- # Header # ------------------------------------------------------------- st.title(":shield: BankShield - Document Forensics") st.caption("Real-time anomaly detection for underwriting. " "Land records | Legal documents | Financial statements.") if not forensics.TESSERACT_OK: st.warning("Tesseract OCR is not installed - text-rule checks will be skipped. " "Install from https://github.com/UB-Mannheim/tesseract/wiki for full functionality.") # ------------------------------------------------------------- # Helpers # ------------------------------------------------------------- def risk_badge(band_str): klass = band_str.lower() st.markdown(f"
{band_str}
", unsafe_allow_html=True) def save_uploaded(uploaded_file): """Persist an uploaded file to a temp path; return Path.""" suffix = Path(uploaded_file.name).suffix tmp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix) tmp.write(uploaded_file.getbuffer()) tmp.close() return Path(tmp.name) def sub_score_chart(sub_scores): fig, ax = plt.subplots(figsize=(7, 3.2)) keys = list(sub_scores.keys()) vals = list(sub_scores.values()) colours = ["#16a34a" if v < 0.4 else "#ea580c" if v < 0.7 else "#dc2626" for v in vals] ax.barh(keys, vals, color=colours) ax.set_xlim(0, 1) ax.set_xlabel("score (0 = clean, 1 = suspicious)") ax.set_title("Sub-score breakdown") ax.invert_yaxis() plt.tight_layout() return fig # ------------------------------------------------------------- # TABS # ------------------------------------------------------------- tab1, tab2, tab3, tab4, tab5, tab6 = st.tabs([ ":mag: Single-document analysis", ":busts_in_silhouette: Cross-document check", ":file_folder: Batch audit", ":scales: Compliance & Audit Pack", ":fire: Tamper Forge Studio", ":spider_web: Fraud Ring Network", ]) # ============================================================= # TAB 1 - Single document # ============================================================= with tab1: # Sample picker: lets cloud demos work without an upload sample_dir = Path("sample_data") sample_paths = [] if sample_dir.exists(): for sub in ("originals", "tampered", "pdfs"): sample_paths.extend(sorted((sample_dir / sub).glob("*"))) sample_options = ["(upload your own)"] + [str(p.relative_to(sample_dir)) for p in sample_paths] pick = st.selectbox("Try a sample document, or upload your own:", sample_options, key="sample_pick") path = None if pick != "(upload your own)": path = sample_dir / pick st.caption(f"Loaded sample: `{pick}`") else: uploaded = st.file_uploader( "Upload a document (PNG / JPG / PDF)", type=["png", "jpg", "jpeg", "pdf", "tif", "tiff"], key="single", ) if uploaded: path = save_uploaded(uploaded) if path is not None: with st.spinner("Analyzing forensic signals..."): report = forensics.analyse_document(path) # --- top row: risk badge + action --- c1, c2 = st.columns([1, 2]) with c1: risk_badge(report["risk_band"]) st.metric("Risk score", f"{report['risk_score']:.3f}") with c2: st.markdown("**Recommended action**") st.info(report["recommended_action"]) st.markdown("**Evidence**") for e in report["evidence"]: st.markdown(f"- {e}") st.divider() # --- detail row: image preview + sub-scores --- left, right = st.columns([1, 1]) with left: st.markdown("#### Document preview") if report["type"] == "image": st.image(str(path), use_column_width=True) elif report["type"] == "pdf": import fitz with fitz.open(path) as d: pix = d[0].get_pixmap(dpi=110) img = Image.frombytes("RGB", (pix.width, pix.height), pix.samples) st.image(img, use_column_width=True) with right: st.markdown("#### Sub-score breakdown") sub = report.get("sub_scores") if sub: st.pyplot(sub_score_chart(sub)) # --- forensic visualizations (images only) --- if report["type"] == "image": st.divider() st.markdown("#### Forensic visualizations") tabs_viz = st.tabs(["Error Level Analysis", "Copy-move matches", "Noise heatmap"]) with tabs_viz[0]: ela_img, ela_s = forensics.error_level_analysis(path) st.image(ela_img, caption=f"ELA score: {ela_s:.2f}", use_column_width=True) st.caption("Bright regions = compression-inconsistent areas (likely edited).") with tabs_viz[1]: viz, n, _ = forensics.copy_move_detect(path) st.image(cv2.cvtColor(viz, cv2.COLOR_BGR2RGB), caption=f"Copy-move matches: {n}", use_column_width=True) st.caption("Red lines connect duplicated regions within the image.") with tabs_viz[2]: heat, ratio = forensics.noise_inconsistency(path) fig, ax = plt.subplots(figsize=(6, 4)) ax.imshow(heat, cmap="hot") ax.set_title(f"Noise outlier ratio: {ratio:.2%}") ax.axis("off") st.pyplot(fig) st.caption("Hotspots = local noise inconsistencies (splicing signal).") # AI-generated content detector - separate expander ai = report.get("ai_detector") if ai: st.divider() badge = {":robot_face: likely_ai_generated": ai["verdict"] == "likely_ai_generated", ":warning: suspicious": ai["verdict"] == "suspicious", ":white_check_mark: likely_real": ai["verdict"] == "likely_real"} label = next((k for k, v in badge.items() if v), ai["verdict"]) with st.expander(f"AI-generated forgery detector — {label} (prob {ai['probability']:.2f})", expanded=False): a1, a2, a3 = st.columns(3) a1.metric("High-freq suppression", f"{ai['sub']['high_freq_suppression']:.2f}") a2.metric("Spectral peakiness", f"{ai['sub']['spectral_peakiness']:.2f}") a3.metric("JPEG artefact score", f"{ai['sub']['jpeg_artefact_score']:.2f}") # FFT profile plot profile = ai.get("profile", []) if profile: fig, ax = plt.subplots(figsize=(7, 2.6)) ax.plot(profile, color="#2563eb", linewidth=2) ax.fill_between(range(len(profile)), profile, alpha=0.2, color="#2563eb") ax.set_xlabel("Radial frequency bin (low -> high)") ax.set_ylabel("log-magnitude") ax.set_title("Radial FFT profile (real scans show gradual 1/f decay; AI outputs drop sharply)") ax.grid(alpha=0.3) st.pyplot(fig) for f in ai.get("flags", []): st.markdown(f"- {f}") # --- PDF-specific audit details --- if report["type"] == "pdf": st.divider() st.markdown("#### PDF structural audit") audit = report.get("pdf_audit", {}) fonts = report.get("font_audit", {}) cc1, cc2 = st.columns(2) with cc1: st.metric("EOF markers", audit.get("eof_markers", "-")) st.metric("Pages", audit.get("pages", "-")) st.markdown("**Metadata flags:**") for f in audit.get("flags", []): st.markdown(f"- {f}") with cc2: meta = audit.get("metadata", {}) or {} st.markdown("**Producer:** " + str(meta.get("producer", "-"))) st.markdown("**Creator:** " + str(meta.get("creator", "-"))) st.markdown("**Fonts used:** " + ", ".join(fonts.get("fonts", []) or ["-"])) st.markdown("**Font flags:**") for f in fonts.get("flags", []): st.markdown(f"- {f}") # --- ML predictions (RF + CNN side-by-side if available) --- has_rf = "ml_prediction" in report has_cnn = "cnn_prediction" in report if has_rf or has_cnn: st.divider() st.markdown("#### Trained model verdicts") cols = st.columns(2 if (has_rf and has_cnn) else 1) ci = 0 if has_rf: ml = report["ml_prediction"] with cols[ci]: st.markdown("**Random Forest** (forensic features)") cc1, cc2 = st.columns(2) cc1.metric("Tamper probability", f"{ml['tamper_probability']:.1%}") cc2.metric("Verdict", ml["verdict"]) ci += 1 if has_cnn: cnn = report["cnn_prediction"] with cols[ci]: st.markdown("**CNN** (MobileNetV2 on CASIA v2)") cc1, cc2 = st.columns(2) cc1.metric("Tamper probability", f"{cnn['tamper_probability']:.1%}") cc2.metric("Verdict", cnn["verdict"]) if cnn.get("val_auc"): st.caption(f"Model val ROC-AUC: {cnn['val_auc']:.3f}") # --- downloads --- st.divider() dl1, dl2 = st.columns(2) with dl1: st.download_button( "Download audit JSON", data=json.dumps(report, indent=2, default=str), file_name=f"audit_{path.stem}.json", mime="application/json", ) with dl2: try: pdf_bytes = build_pdf_report(report, path) st.download_button( "Download audit PDF report", data=pdf_bytes, file_name=f"audit_report_{path.stem}.pdf", mime="application/pdf", ) except Exception as e: st.warning(f"PDF report generation skipped: {e}") # ============================================================= # TAB 2 - Cross-document consistency # ============================================================= with tab2: st.markdown("Upload 2 or more documents for the **same applicant** " "(e.g. land record + bank statement + ID). The system will " "extract identity fields and flag any mismatches.") uploads = st.file_uploader( "Upload 2-4 documents", type=["png", "jpg", "jpeg", "pdf"], accept_multiple_files=True, key="multi", ) if uploads and len(uploads) >= 2: paths = [save_uploaded(u) for u in uploads] with st.spinner("Extracting identity fields from each document..."): result = forensics.cross_doc_consistency(paths) # --- header --- c1, c2 = st.columns([1, 2]) with c1: risk_badge(result["consistency_band"]) st.metric("Consistency risk", f"{result['consistency_risk_score']:.3f}") with c2: st.metric("Mismatches", result["mismatches"]) st.metric("Likely mismatches", result["likely_mismatches"]) st.divider() st.markdown("#### Field-by-field comparison") # Build a comparison table field_rows = [] files = [Path(p).name for p in paths] for field, res in result["field_results"].items(): row = {"Field": field, "Status": res["status"], "Similarity": res.get("similarity")} for fn, val in zip(files, res["values"]): row[fn] = val or "(not found)" field_rows.append(row) df = pd.DataFrame(field_rows) def colour_status(val): if val == "match": return "background-color: #dcfce7" if val == "likely_match": return "background-color: #fef3c7" if val == "mismatch": return "background-color: #fecaca" return "" st.dataframe(df.style.applymap(colour_status, subset=["Status"]), use_container_width=True) st.divider() st.markdown("#### Per-document extracts") for doc in result["documents"]: with st.expander(Path(doc["file"]).name): st.json(doc["fields"]) st.download_button( "Download consistency report JSON", data=json.dumps(result, indent=2, default=str), file_name="cross_doc_consistency.json", mime="application/json", ) elif uploads: st.info("Upload at least 2 documents to run the cross-check.") # ============================================================= # TAB 3 - Batch audit # ============================================================= with tab3: st.markdown("Point at a folder on your machine to run a batch audit. " "Produces a CSV with risk band per file.") default = Path.cwd() / ("sample_data" if not (Path.cwd() / "data").exists() else "data") folder = st.text_input("Folder path", value=str(default)) if st.button("Run batch audit"): root = Path(folder) if not root.exists(): st.error(f"Folder not found: {root}") else: results = [] files = [p for p in root.rglob("*") if p.suffix.lower() in {".png", ".jpg", ".jpeg", ".pdf", ".tif"}] if not files: st.warning("No supported files found in folder.") else: progress = st.progress(0.0) for i, p in enumerate(files): try: r = forensics.analyse_document(p) results.append({ "file": str(p.relative_to(root)), "type": r.get("type"), "risk_score": r.get("risk_score"), "risk_band": r.get("risk_band"), "action": r.get("recommended_action"), }) except Exception as e: results.append({"file": str(p), "error": str(e)}) progress.progress((i + 1) / len(files)) df = pd.DataFrame(results) st.success(f"Analysed {len(files)} files.") st.dataframe(df, use_container_width=True) csv = df.to_csv(index=False).encode("utf-8") st.download_button("Download audit CSV", data=csv, file_name="audit_log.csv", mime="text/csv") # ============================================================= # TAB 4 - Compliance & Audit Pack (KYC + PII redaction + RBI report) # ============================================================= with tab4: st.markdown("**Three regulatory tools in one tab** - KYC field validation, " "PII auto-redaction, and RBI-style compliance reports.") sub_a, sub_b, sub_c = st.tabs([ ":id: KYC Field Validation", ":lock: PII Auto-Redaction", ":scroll: RBI Compliance Report", ]) # -------- 4A: KYC validators (manual input) -------- with sub_a: st.markdown("#### Validate KYC fields against RBI rules") st.caption("IFSC: format + RBI bank-code list | PAN: format + entity-type " "char | Aadhaar: 12-digit + UIDAI Verhoeff checksum.") c1, c2, c3 = st.columns(3) ifsc_in = c1.text_input("IFSC code", value="SBIN0001234") pan_in = c2.text_input("PAN", value="ABCPQ1234F") aad_in = c3.text_input("Aadhaar number (12 digits)", value="234567890124") if st.button("Validate all", key="kyc_validate"): r_ifsc = compliance.validate_ifsc(ifsc_in) r_pan = compliance.validate_pan(pan_in) r_aad = compliance.validate_aadhaar(aad_in) for label, r in [("IFSC", r_ifsc), ("PAN", r_pan), ("Aadhaar", r_aad)]: if r["ok"]: st.success("**" + label + "**: VALID. " + " ".join(r["flags"])) if label == "IFSC": bn = r.get("bank_name", "-") bc = r.get("branch_code", "-") st.caption("Bank: " + bn + ", branch code: " + bc) if label == "PAN": et = r.get("entity_type", "-") st.caption("Entity type: " + et) if label == "Aadhaar": mk = r.get("masked", "-") st.caption("Masked: " + mk) else: st.error("**" + label + "**: INVALID. " + " | ".join(r["flags"])) st.divider() st.markdown("#### Or: extract & validate from a document") kyc_file = st.file_uploader("Upload doc to scan for KYC fields", type=["pdf", "png", "jpg"], key="kyc_doc") if kyc_file: kyc_path = save_uploaded(kyc_file) with st.spinner("Extracting KYC fields..."): fields, _ = compliance.extract_pii_fields(kyc_path) n_ifsc = len(fields["ifsc"]) n_pan = len(fields["pan"]) n_aad = len(fields["aadhaar"]) n_acc = len(fields["accounts"]) st.markdown("**Found in document:** " + str(n_ifsc) + " IFSC, " + str(n_pan) + " PAN, " + str(n_aad) + " Aadhaar candidates, " + str(n_acc) + " account numbers") # Validate unique IFSCs (first 5) uniq_ifsc = list(set(fields["ifsc"]))[:5] if uniq_ifsc: st.markdown("**IFSC validation (first 5 unique):**") rows = [compliance.validate_ifsc(c) for c in uniq_ifsc] st.dataframe(pd.DataFrame(rows), use_container_width=True) if fields["pan"]: st.markdown("**PAN validation:**") rows = [compliance.validate_pan(c) for c in fields["pan"][:5]] st.dataframe(pd.DataFrame(rows), use_container_width=True) # -------- 4B: PII redaction -------- with sub_b: st.markdown("#### Auto-redact PII for safe sharing") st.caption("Masks IFSC, PAN, Aadhaar, and account numbers. Use before " "forwarding to external vendors / for DPDP Act compliance.") rd_file = st.file_uploader("Upload document to redact", type=["pdf", "png", "jpg"], key="rd") if rd_file: src_path = save_uploaded(rd_file) if str(src_path).lower().endswith(".pdf"): out_path = Path(tempfile.gettempdir()) / f"redacted_{src_path.stem}.pdf" with st.spinner("Redacting PDF..."): found = compliance.redact_pdf(str(src_path), str(out_path)) total = sum(len(v) for v in found.values()) st.success("Redacted " + str(total) + " PII items.") summary = {k: len(v) for k, v in found.items()} st.json(summary) with open(out_path, "rb") as f: st.download_button("Download redacted PDF", f.read(), file_name=out_path.name, mime="application/pdf") else: # image - just OCR + redact text fields, text = compliance.extract_pii_fields(src_path) red_text, _ = compliance.redact_text(text) st.markdown("**Original (OCR):**") st.code(text[:600], language=None) st.markdown("**Redacted:**") st.code(red_text[:600], language=None) st.download_button("Download redacted text", red_text, file_name=f"redacted_{src_path.stem}.txt") # -------- 4C: RBI compliance report -------- with sub_c: st.markdown("#### Generate an RBI Master-Direction-style audit PDF") st.caption("Runs full forensic analysis + KYC verification + RBI risk-treatment " "recommendation, then produces a regulator-ready PDF.") cr_file = st.file_uploader("Upload document for compliance audit", type=["pdf", "png", "jpg"], key="cr") if cr_file: src_path = save_uploaded(cr_file) with st.spinner("Running forensic analysis..."): f_report = forensics.analyse_document(src_path) with st.spinner("Validating KYC fields..."): fields, _ = compliance.extract_pii_fields(src_path) kyc_results = {} if fields["ifsc"]: kyc_results["ifsc"] = compliance.validate_ifsc(fields["ifsc"][0]) if fields["pan"]: kyc_results["pan"] = compliance.validate_pan(fields["pan"][0]) if fields["aadhaar"]: kyc_results["aadhaar"] = compliance.validate_aadhaar(fields["aadhaar"][0]) # Summary cards cc1, cc2, cc3 = st.columns(3) cc1.metric("Forensic risk", f_report.get("risk_band", "-")) cc2.metric("KYC fields found", sum(len(fields[k]) for k in ("ifsc", "pan", "aadhaar"))) cc3.metric("KYC checks passed", sum(1 for r in kyc_results.values() if r.get("ok"))) # KYC table if kyc_results: rows = [{"Field": k.upper(), "Value": r.get("code", "-"), "Status": "PASS" if r.get("ok") else "FAIL", "Notes": "; ".join(r.get("flags", []))[:60]} for k, r in kyc_results.items()] st.dataframe(pd.DataFrame(rows), use_container_width=True) else: st.info("No KYC fields found in this document to validate.") # Generate the report with st.spinner("Building RBI compliance PDF..."): pdf_bytes = compliance.build_compliance_report( f_report, src_path, kyc_results) st.success(f"Generated compliance audit ({len(pdf_bytes)//1024} KB)") st.download_button("Download RBI Compliance Report (PDF)", pdf_bytes, file_name=f"compliance_{src_path.stem}.pdf", mime="application/pdf") # ---------- Provenance Ledger (tamper-evident audit trail) ---------- st.divider() st.markdown("### :link: Provenance Ledger — tamper-evident audit chain") st.caption("Every analysis is logged as a SHA-256 hash-chained record. " "Editing any past entry breaks the chain, satisfying RBI Master " "Direction on KYC, 2016 (Para 67) record-retention requirements.") try: import provenance as _prov stats = _prov.chain_stats() pc1, pc2, pc3, pc4 = st.columns(4) pc1.metric("Records", stats["n_records"]) pc2.metric("First entry", (stats["first_ts"] or "-")[:19].replace("T", " ")) pc3.metric("Last entry", (stats["last_ts"] or "-")[:19].replace("T", " ")) pc4.metric("Chain status", ":white_check_mark: INTACT" if stats["chain_intact"] else f":x: BROKEN @ #{stats['broken_at']}") if stats["n_records"]: df = _prov.ledger_dataframe(limit=20) df_show = df[["id", "ts", "doc_name", "risk_band", "risk_score", "prev_hash", "record_hash"]].copy() df_show["prev_hash"] = df_show["prev_hash"].str[:14] + "..." df_show["record_hash"] = df_show["record_hash"].str[:14] + "..." df_show["ts"] = df_show["ts"].str[:19].str.replace("T", " ") st.dataframe(df_show, use_container_width=True, hide_index=True) cdl, cv = st.columns([1, 1]) with cdl: import json as _j st.download_button(":inbox_tray: Download full ledger (JSON)", _j.dumps(_prov.fetch_ledger(limit=10000), indent=2, default=str), file_name="docsentry_provenance.json", mime="application/json") with cv: if st.button(":mag: Re-verify chain integrity", key="prov_verify"): ok, where = _prov.verify_chain() if ok: st.success(":white_check_mark: Chain integrity verified across all records.") else: st.error(f":x: Chain broken at record #{where}.") else: st.info("Ledger is empty. Run an analysis on Tab 1 to create the first record.") except Exception as e: st.warning(f"Provenance ledger unavailable: {e}") # ============================================================= # TAB 5 - Live Tamper Forge Studio (rich version) # ============================================================= with tab5: import time as _time st.markdown("**Live forgery demo.** Pick a clean document, choose a " "technique and intensity, and watch DocSentry localise the " "tampered region and tell you *which* detector caught it.") # ---- top controls row ---- ctl1, ctl2, ctl3 = st.columns([2, 1, 1]) with ctl1: fs_sample_dir = Path("sample_data") fs_options = [] if fs_sample_dir.exists(): for sub in ("originals",): fs_options.extend(sorted((fs_sample_dir / sub).glob("*.png"))) fs_options.extend(sorted((fs_sample_dir / sub).glob("*.jpg"))) fs_opts_str = ["(upload your own)"] + [str(p.relative_to(fs_sample_dir)) for p in fs_options] fs_pick = st.selectbox("Source document", fs_opts_str, key="fs_pick") with ctl2: fs_intensity = st.select_slider("Tamper intensity", options=["subtle", "moderate", "aggressive"], value="moderate", key="fs_intensity") with ctl3: fs_mode = st.radio("Mode", ["Single technique", "Chain (multi-step)", "Adversarial canvas"], key="fs_mode") src_path = None if fs_pick != "(upload your own)": src_path = fs_sample_dir / fs_pick else: fs_up = st.file_uploader("Upload PNG/JPG to forge", type=["png", "jpg", "jpeg"], key="fs_up") if fs_up: src_path = save_uploaded(fs_up) if src_path is None: st.info(":point_up: Pick a sample document or upload one to begin.") else: from PIL import Image as _PILImg orig_img = _PILImg.open(src_path).convert("RGB") st.image(orig_img, caption=f"Source: {src_path.name}", width=420) # ---- mode-specific controls ---- chosen, chosen_chain, custom_box = None, [], None if fs_mode == "Single technique": st.markdown("#### Pick a forgery technique") cols = st.columns(5) techniques = [ ("copy_move", ":scissors:", "Copy-move"), ("text_edit", ":pencil2:", "Text edit"), ("splice", ":jigsaw:", "Splice"), ("compression", ":package:", "Re-save"), ("metadata", ":wastebasket:", "Strip EXIF"), ] for (key, icon, label), col in zip(techniques, cols): if col.button(f"{icon} {label}", key="fs_tbtn_" + key): st.session_state["fs_chosen"] = key chosen = st.session_state.get("fs_chosen") elif fs_mode == "Chain (multi-step)": st.markdown("#### Build a chain (applied in order)") cc = st.columns(5) chain_keys = ["copy_move","text_edit","splice","compression","metadata"] chain_picks = [] for k, c in zip(chain_keys, cc): if c.checkbox(k, key="fs_chain_" + k): chain_picks.append(k) if st.button(":fire: Run chain", key="fs_chain_run"): if not chain_picks: st.warning("Pick at least one technique to chain.") else: st.session_state["fs_chain"] = chain_picks st.session_state["fs_chosen"] = "__chain__" chosen_chain = st.session_state.get("fs_chain", []) if st.session_state.get("fs_chosen") == "__chain__": chosen = "__chain__" elif fs_mode == "Adversarial canvas": st.markdown("#### Draw a region to tamper (adversarial mode)") try: from streamlit_drawable_canvas import st_canvas canvas_max_w = 700 cw_scale = min(canvas_max_w, orig_img.width) / orig_img.width cw, ch = int(orig_img.width * cw_scale), int(orig_img.height * cw_scale) canvas_res = st_canvas( fill_color="rgba(255, 0, 0, 0.25)", stroke_width=2, stroke_color="#dc2626", background_image=orig_img, update_streamlit=True, height=ch, width=cw, drawing_mode="rect", key="fs_canvas", ) if canvas_res.json_data and canvas_res.json_data.get("objects"): obj = canvas_res.json_data["objects"][-1] x = obj["left"] / cw_scale; y = obj["top"] / cw_scale w = obj["width"] / cw_scale; h = obj["height"] / cw_scale custom_box = (max(0,int(x)), max(0,int(y)), min(orig_img.width, int(x+w)), min(orig_img.height, int(y+h))) if st.button(":boom: Tamper this region", key="fs_custom_run"): st.session_state["fs_custom_box"] = custom_box st.session_state["fs_chosen"] = "__custom__" except ImportError: st.error("Install `streamlit-drawable-canvas` to enable adversarial mode.") if st.session_state.get("fs_chosen") == "__custom__": chosen = "__custom__" custom_box = st.session_state.get("fs_custom_box") # ---- run + render ---- if chosen is not None: st.divider() # staged reveal progress = st.progress(0, text="Forging document...") _time.sleep(0.4); progress.progress(35, text="Applying tampering...") t0 = _time.time() if chosen == "__chain__": meta = tampering.tamper_chain(orig_img, chosen_chain, intensity=fs_intensity) elif chosen == "__custom__": meta = tampering.tamper_dispatch("custom", orig_img, intensity=fs_intensity, custom_box=custom_box) else: meta = tampering.tamper_dispatch(chosen, orig_img, intensity=fs_intensity) forge_ms = int((_time.time() - t0) * 1000) progress.progress(60, text="DocSentry analysing...") import tempfile as _tmp tmp_path = Path(_tmp.NamedTemporaryFile(delete=False, suffix=".png").name) meta["image"].save(tmp_path) t1 = _time.time() report = forensics.analyse_document(tmp_path) detect_ms = int((_time.time() - t1) * 1000) progress.progress(85, text="Computing detector scorecard...") scorecard = tampering.detector_scorecard(tmp_path) progress.progress(100, text="Done") _time.sleep(0.2); progress.empty() # --- BIG VERDICT ROW --- detected = report["risk_band"] in ("MEDIUM","HIGH","CRITICAL") v1, v2 = st.columns([1, 2]) with v1: risk_badge(report["risk_band"]) st.metric("Detection latency", f"{detect_ms} ms") st.metric("Forge latency", f"{forge_ms} ms") with v2: if detected: st.success(f":white_check_mark: **FORGERY DETECTED** in {detect_ms} ms") else: st.error(":x: **Forgery slipped past detectors.** Try a more aggressive intensity or different technique.") st.markdown(f"**Technique:** {meta.get('description','')}") st.markdown(f"**Intensity:** `{fs_intensity}`") st.divider() # --- ANNOTATED BEFORE/AFTER --- st.markdown("#### Where is the forgery?") ann_orig, ann_tamp = tampering.annotate_before_after(orig_img, meta) ab1, ab2 = st.columns(2) with ab1: st.image(ann_orig, caption=":large_green_circle: Original (green = source region)", use_column_width=True) with ab2: st.image(ann_tamp, caption=":red_circle: Tampered (red = where the change is)", use_column_width=True) st.divider() # --- DETECTOR SCORECARD --- st.markdown("#### Per-detector scorecard") st.caption("Which forensic signal caught the tampering, and how confidently.") sc_rows = [] for name, info in scorecard.items(): sc_rows.append({ "Detector": name, "Confidence": round(info["score"], 3), "Status": ":red_circle: CAUGHT" if info["caught"] else ":large_green_circle: clean", "Reading": str(info["raw"])[:40], "Signal": info["what"], }) sc_df = pd.DataFrame(sc_rows) st.dataframe(sc_df, use_container_width=True, hide_index=True) # bar chart fig_sc, ax_sc = plt.subplots(figsize=(9, 0.5 * len(sc_rows) + 1.5)) colors_sc = ["#dc2626" if r["Status"].startswith(":red") else "#16a34a" for r in sc_rows] ax_sc.barh([r["Detector"] for r in sc_rows], [r["Confidence"] for r in sc_rows], color=colors_sc) ax_sc.set_xlim(0, 1) ax_sc.axvline(0.4, color="grey", linestyle="--", alpha=0.5, label="threshold") ax_sc.set_xlabel("confidence (0 = clean, 1 = certain tampering)") ax_sc.set_title("Detector confidence per signal") ax_sc.invert_yaxis() ax_sc.legend(loc="lower right") plt.tight_layout() st.pyplot(fig_sc) st.divider() # --- LOCALIZATION HEATMAP OVERLAYS --- st.markdown("#### Forensic localization (heatmap overlays)") st.caption("Where each detector thinks the tampering is, painted on the tampered image.") ela_img_raw, _ = forensics.error_level_analysis(tmp_path) heat_noise, _ = forensics.noise_inconsistency(tmp_path) tabs_loc = st.tabs(["ELA overlay", "Noise overlay"]) with tabs_loc[0]: ela_arr = np.array(ela_img_raw.convert("L")) composite_ela = tampering.overlay_heatmap_on_image(meta["image"], ela_arr, alpha=0.55, cmap="hot") st.image(composite_ela, caption="ELA hotspots overlaid on tampered image", use_column_width=True) with tabs_loc[1]: composite_noise = tampering.overlay_heatmap_on_image(meta["image"], heat_noise, alpha=0.55, cmap="jet") st.image(composite_noise, caption="Noise inconsistency hotspots", use_column_width=True) st.divider() # --- DOWNLOAD --- import io as _io buf = _io.BytesIO() meta["image"].save(buf, "PNG") st.download_button(":inbox_tray: Download the forged image", buf.getvalue(), file_name=f"forged_{chosen}_{fs_intensity}_{src_path.stem}.png", mime="image/png") # ============================================================= # TAB 6 - Fraud Ring Network Detector # ============================================================= with tab6: st.markdown("**Detect organised application fraud.** " "Upload documents from multiple applicants. The system extracts " "their identity signals (name, DOB, address, phone, IFSC, account, " "employer) and builds a similarity graph. Cliques of >=3 applicants " "linked by shared signals are flagged as suspected fraud rings.") st.caption("Banks lose ~Rs 3,000 crore/year to organised application fraud (RBI Annual Report).") fr_files = st.file_uploader( "Upload 3 or more applicant documents (PNG / JPG / PDF):", type=["png", "jpg", "jpeg", "pdf"], accept_multiple_files=True, key="fr_files", ) fr_col1, fr_col2 = st.columns([1, 1]) with fr_col1: fr_min_size = st.slider("Minimum ring size", 2, 6, 3, key="fr_min_size", help="A 'ring' must have at least this many linked applicants.") with fr_col2: fr_threshold = st.slider("Link threshold", 0.10, 1.00, 0.30, 0.05, key="fr_thresh", help="Pair similarity required to count as a link.") if fr_files and len(fr_files) >= 2: if st.button(":spider_web: Build fraud network", key="fr_run", type="primary"): with st.spinner("Extracting identity fields from each document..."): applicants = [] for f in fr_files: p_tmp = save_uploaded(f) fields = fraud_ring.extract_applicant_fields(p_tmp) fields["upload_name"] = f.name applicants.append(fields) st.session_state["fr_applicants"] = applicants with st.spinner("Building similarity graph..."): G = fraud_ring.build_fraud_graph(applicants) rings = fraud_ring.detect_rings(G, min_size=fr_min_size, edge_threshold=fr_threshold) st.session_state["fr_graph"] = G st.session_state["fr_rings"] = rings # Display from session state so re-runs don't lose results if "fr_graph" in st.session_state: G = st.session_state["fr_graph"] rings = st.session_state["fr_rings"] applicants = st.session_state["fr_applicants"] summary = fraud_ring.fraud_summary(G, rings, applicants) # KPI cards kc1, kc2, kc3, kc4 = st.columns(4) kc1.metric("Applicants", summary["n_applicants"]) kc2.metric("Suspected rings", summary["n_rings"]) kc3.metric("Largest ring", summary["largest_ring_size"]) kc4.metric("Fraud risk", f"{summary['fraud_risk_percentage']}%") st.divider() # Graph visualisation st.markdown("#### Fraud network graph") st.caption(":red_circle: Red nodes are members of suspected fraud rings. " ":large_green_circle: Green nodes look clean. " "Edge thickness = similarity strength.") fig = fraud_ring.visualize_graph(G, rings) st.pyplot(fig) st.divider() # Ring breakdown if summary["rings"]: st.markdown("#### Detected fraud rings") for r in summary["rings"]: band_color = {"CRITICAL":":red_circle:", "HIGH":":large_orange_circle:", "MEDIUM":":large_yellow_circle:"}.get(r["risk_band"], ":white_circle:") with st.expander(f"{band_color} **Ring #{r['ring_id']}** " f"({r['risk_band']}, {r['size']} applicants, " f"{r['n_links']} links)"): st.markdown("**Members:**") for nm, fn in zip(r["applicant_names"], r["applicant_files"]): st.markdown(f"- **{nm}** (`{fn}`)") st.markdown("**Top shared signals:**") for sig, count in r["top_shared_signals"]: st.markdown(f"- `{sig}`: appears in {count} pairwise links") else: st.success(":white_check_mark: No fraud rings detected at the current threshold.") st.divider() # Per-applicant extracts with st.expander("View extracted identity fields per applicant"): rows = [] for i, a in enumerate(applicants): rows.append({ "#": i, "File": a.get("upload_name"), "Name": a.get("name") or "-", "DOB": a.get("dob") or "-", "Address": (a.get("address") or "-")[:40], "Phone": a.get("phone") or "-", "IFSC": a.get("ifsc") or "-", "Account": (a.get("account") or "-")[:14], }) st.dataframe(pd.DataFrame(rows), use_container_width=True, hide_index=True) # Download network analysis import io as _io, json as _json export = {"summary": summary, "applicants": [{k: v for k, v in a.items() if k != "text_sample"} for a in applicants], "edges": [{"a": u, "b": v, **d} for u, v, d in G.edges(data=True)]} st.download_button(":inbox_tray: Download fraud network report (JSON)", _json.dumps(export, indent=2, default=str), file_name="fraud_network_report.json", mime="application/json") elif fr_files and len(fr_files) < 2: st.warning("Upload at least 2 applicants (3+ recommended) to detect a ring.") else: st.info(":point_up: Upload multiple applicants' documents to begin. " "The system will pair-wise compare their identity fields and " "show which applicants are linked.") # ------------------------------------------------------------- # Footer # ------------------------------------------------------------- st.divider() st.caption("DocSentry prototype - rule-based + trainable RF + CNN ensemble - " "100% open source, runs locally or on Streamlit Cloud / HF Spaces.")