Spaces:
Sleeping
Sleeping
| """ | |
| app.py - Streamlit demo for real-time document anomaly detection | |
| Run with: | |
| streamlit run app.py | |
| Tabs: | |
| 1. Single-document analysis - drag-drop one file, see forensic verdict | |
| 2. Cross-document check - upload 2+ docs, check identity consistency | |
| 3. Batch audit - point at a folder, get an audit CSV | |
| """ | |
| import io | |
| import json | |
| import tempfile | |
| from pathlib import Path | |
| import streamlit as st | |
| import numpy as np | |
| import pandas as pd | |
| import cv2 | |
| from PIL import Image | |
| import matplotlib.pyplot as plt | |
| import forensics | |
| import compliance | |
| import tampering | |
| import fraud_ring | |
| from audit_report import build_pdf_report # Sprint 3 module | |
| # ------------------------------------------------------------- | |
| # Page config + global CSS | |
| # ------------------------------------------------------------- | |
| st.set_page_config(page_title="BankShield - Document Forensics", | |
| page_icon=":lock:", layout="wide") | |
| st.markdown(""" | |
| <style> | |
| .big-risk {font-size: 48px; font-weight: 800; padding: 14px 28px; | |
| border-radius: 12px; color: white; text-align: center; | |
| letter-spacing: 1px;} | |
| .low {background: #16a34a;} | |
| .medium {background: #ca8a04;} | |
| .high {background: #ea580c;} | |
| .critical {background: #dc2626;} | |
| .metric-card {background: #f8fafc; padding: 14px; border-radius: 8px; | |
| border-left: 4px solid #2563eb;} | |
| </style> | |
| """, unsafe_allow_html=True) | |
| # ------------------------------------------------------------- | |
| # Header | |
| # ------------------------------------------------------------- | |
| st.title(":shield: BankShield - Document Forensics") | |
| st.caption("Real-time anomaly detection for underwriting. " | |
| "Land records | Legal documents | Financial statements.") | |
| if not forensics.TESSERACT_OK: | |
| st.warning("Tesseract OCR is not installed - text-rule checks will be skipped. " | |
| "Install from https://github.com/UB-Mannheim/tesseract/wiki for full functionality.") | |
| # ------------------------------------------------------------- | |
| # Helpers | |
| # ------------------------------------------------------------- | |
| def risk_badge(band_str): | |
| klass = band_str.lower() | |
| st.markdown(f"<div class='big-risk {klass}'>{band_str}</div>", | |
| unsafe_allow_html=True) | |
| def save_uploaded(uploaded_file): | |
| """Persist an uploaded file to a temp path; return Path.""" | |
| suffix = Path(uploaded_file.name).suffix | |
| tmp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix) | |
| tmp.write(uploaded_file.getbuffer()) | |
| tmp.close() | |
| return Path(tmp.name) | |
| def sub_score_chart(sub_scores): | |
| fig, ax = plt.subplots(figsize=(7, 3.2)) | |
| keys = list(sub_scores.keys()) | |
| vals = list(sub_scores.values()) | |
| colours = ["#16a34a" if v < 0.4 else "#ea580c" if v < 0.7 else "#dc2626" | |
| for v in vals] | |
| ax.barh(keys, vals, color=colours) | |
| ax.set_xlim(0, 1) | |
| ax.set_xlabel("score (0 = clean, 1 = suspicious)") | |
| ax.set_title("Sub-score breakdown") | |
| ax.invert_yaxis() | |
| plt.tight_layout() | |
| return fig | |
| # ------------------------------------------------------------- | |
| # TABS | |
| # ------------------------------------------------------------- | |
| tab1, tab2, tab3, tab4, tab5, tab6 = st.tabs([ | |
| ":mag: Single-document analysis", | |
| ":busts_in_silhouette: Cross-document check", | |
| ":file_folder: Batch audit", | |
| ":scales: Compliance & Audit Pack", | |
| ":fire: Tamper Forge Studio", | |
| ":spider_web: Fraud Ring Network", | |
| ]) | |
| # ============================================================= | |
| # TAB 1 - Single document | |
| # ============================================================= | |
| with tab1: | |
| # Sample picker: lets cloud demos work without an upload | |
| sample_dir = Path("sample_data") | |
| sample_paths = [] | |
| if sample_dir.exists(): | |
| for sub in ("originals", "tampered", "pdfs"): | |
| sample_paths.extend(sorted((sample_dir / sub).glob("*"))) | |
| sample_options = ["(upload your own)"] + [str(p.relative_to(sample_dir)) for p in sample_paths] | |
| pick = st.selectbox("Try a sample document, or upload your own:", sample_options, key="sample_pick") | |
| path = None | |
| if pick != "(upload your own)": | |
| path = sample_dir / pick | |
| st.caption(f"Loaded sample: `{pick}`") | |
| else: | |
| uploaded = st.file_uploader( | |
| "Upload a document (PNG / JPG / PDF)", | |
| type=["png", "jpg", "jpeg", "pdf", "tif", "tiff"], | |
| key="single", | |
| ) | |
| if uploaded: | |
| path = save_uploaded(uploaded) | |
| if path is not None: | |
| with st.spinner("Analyzing forensic signals..."): | |
| report = forensics.analyse_document(path) | |
| # --- top row: risk badge + action --- | |
| c1, c2 = st.columns([1, 2]) | |
| with c1: | |
| risk_badge(report["risk_band"]) | |
| st.metric("Risk score", f"{report['risk_score']:.3f}") | |
| with c2: | |
| st.markdown("**Recommended action**") | |
| st.info(report["recommended_action"]) | |
| st.markdown("**Evidence**") | |
| for e in report["evidence"]: | |
| st.markdown(f"- {e}") | |
| st.divider() | |
| # --- detail row: image preview + sub-scores --- | |
| left, right = st.columns([1, 1]) | |
| with left: | |
| st.markdown("#### Document preview") | |
| if report["type"] == "image": | |
| st.image(str(path), use_column_width=True) | |
| elif report["type"] == "pdf": | |
| import fitz | |
| with fitz.open(path) as d: | |
| pix = d[0].get_pixmap(dpi=110) | |
| img = Image.frombytes("RGB", (pix.width, pix.height), pix.samples) | |
| st.image(img, use_column_width=True) | |
| with right: | |
| st.markdown("#### Sub-score breakdown") | |
| sub = report.get("sub_scores") | |
| if sub: | |
| st.pyplot(sub_score_chart(sub)) | |
| # --- forensic visualizations (images only) --- | |
| if report["type"] == "image": | |
| st.divider() | |
| st.markdown("#### Forensic visualizations") | |
| tabs_viz = st.tabs(["Error Level Analysis", "Copy-move matches", "Noise heatmap"]) | |
| with tabs_viz[0]: | |
| ela_img, ela_s = forensics.error_level_analysis(path) | |
| st.image(ela_img, caption=f"ELA score: {ela_s:.2f}", | |
| use_column_width=True) | |
| st.caption("Bright regions = compression-inconsistent areas (likely edited).") | |
| with tabs_viz[1]: | |
| viz, n, _ = forensics.copy_move_detect(path) | |
| st.image(cv2.cvtColor(viz, cv2.COLOR_BGR2RGB), | |
| caption=f"Copy-move matches: {n}", | |
| use_column_width=True) | |
| st.caption("Red lines connect duplicated regions within the image.") | |
| with tabs_viz[2]: | |
| heat, ratio = forensics.noise_inconsistency(path) | |
| fig, ax = plt.subplots(figsize=(6, 4)) | |
| ax.imshow(heat, cmap="hot") | |
| ax.set_title(f"Noise outlier ratio: {ratio:.2%}") | |
| ax.axis("off") | |
| st.pyplot(fig) | |
| st.caption("Hotspots = local noise inconsistencies (splicing signal).") | |
| # AI-generated content detector - separate expander | |
| ai = report.get("ai_detector") | |
| if ai: | |
| st.divider() | |
| badge = {":robot_face: likely_ai_generated": ai["verdict"] == "likely_ai_generated", | |
| ":warning: suspicious": ai["verdict"] == "suspicious", | |
| ":white_check_mark: likely_real": ai["verdict"] == "likely_real"} | |
| label = next((k for k, v in badge.items() if v), ai["verdict"]) | |
| with st.expander(f"AI-generated forgery detector — {label} (prob {ai['probability']:.2f})", expanded=False): | |
| a1, a2, a3 = st.columns(3) | |
| a1.metric("High-freq suppression", f"{ai['sub']['high_freq_suppression']:.2f}") | |
| a2.metric("Spectral peakiness", f"{ai['sub']['spectral_peakiness']:.2f}") | |
| a3.metric("JPEG artefact score", f"{ai['sub']['jpeg_artefact_score']:.2f}") | |
| # FFT profile plot | |
| profile = ai.get("profile", []) | |
| if profile: | |
| fig, ax = plt.subplots(figsize=(7, 2.6)) | |
| ax.plot(profile, color="#2563eb", linewidth=2) | |
| ax.fill_between(range(len(profile)), profile, alpha=0.2, color="#2563eb") | |
| ax.set_xlabel("Radial frequency bin (low -> high)") | |
| ax.set_ylabel("log-magnitude") | |
| ax.set_title("Radial FFT profile (real scans show gradual 1/f decay; AI outputs drop sharply)") | |
| ax.grid(alpha=0.3) | |
| st.pyplot(fig) | |
| for f in ai.get("flags", []): | |
| st.markdown(f"- {f}") | |
| # --- PDF-specific audit details --- | |
| if report["type"] == "pdf": | |
| st.divider() | |
| st.markdown("#### PDF structural audit") | |
| audit = report.get("pdf_audit", {}) | |
| fonts = report.get("font_audit", {}) | |
| cc1, cc2 = st.columns(2) | |
| with cc1: | |
| st.metric("EOF markers", audit.get("eof_markers", "-")) | |
| st.metric("Pages", audit.get("pages", "-")) | |
| st.markdown("**Metadata flags:**") | |
| for f in audit.get("flags", []): | |
| st.markdown(f"- {f}") | |
| with cc2: | |
| meta = audit.get("metadata", {}) or {} | |
| st.markdown("**Producer:** " + str(meta.get("producer", "-"))) | |
| st.markdown("**Creator:** " + str(meta.get("creator", "-"))) | |
| st.markdown("**Fonts used:** " + ", ".join(fonts.get("fonts", []) or ["-"])) | |
| st.markdown("**Font flags:**") | |
| for f in fonts.get("flags", []): | |
| st.markdown(f"- {f}") | |
| # --- ML predictions (RF + CNN side-by-side if available) --- | |
| has_rf = "ml_prediction" in report | |
| has_cnn = "cnn_prediction" in report | |
| if has_rf or has_cnn: | |
| st.divider() | |
| st.markdown("#### Trained model verdicts") | |
| cols = st.columns(2 if (has_rf and has_cnn) else 1) | |
| ci = 0 | |
| if has_rf: | |
| ml = report["ml_prediction"] | |
| with cols[ci]: | |
| st.markdown("**Random Forest** (forensic features)") | |
| cc1, cc2 = st.columns(2) | |
| cc1.metric("Tamper probability", f"{ml['tamper_probability']:.1%}") | |
| cc2.metric("Verdict", ml["verdict"]) | |
| ci += 1 | |
| if has_cnn: | |
| cnn = report["cnn_prediction"] | |
| with cols[ci]: | |
| st.markdown("**CNN** (MobileNetV2 on CASIA v2)") | |
| cc1, cc2 = st.columns(2) | |
| cc1.metric("Tamper probability", f"{cnn['tamper_probability']:.1%}") | |
| cc2.metric("Verdict", cnn["verdict"]) | |
| if cnn.get("val_auc"): | |
| st.caption(f"Model val ROC-AUC: {cnn['val_auc']:.3f}") | |
| # --- downloads --- | |
| st.divider() | |
| dl1, dl2 = st.columns(2) | |
| with dl1: | |
| st.download_button( | |
| "Download audit JSON", | |
| data=json.dumps(report, indent=2, default=str), | |
| file_name=f"audit_{path.stem}.json", | |
| mime="application/json", | |
| ) | |
| with dl2: | |
| try: | |
| pdf_bytes = build_pdf_report(report, path) | |
| st.download_button( | |
| "Download audit PDF report", | |
| data=pdf_bytes, | |
| file_name=f"audit_report_{path.stem}.pdf", | |
| mime="application/pdf", | |
| ) | |
| except Exception as e: | |
| st.warning(f"PDF report generation skipped: {e}") | |
| # ============================================================= | |
| # TAB 2 - Cross-document consistency | |
| # ============================================================= | |
| with tab2: | |
| st.markdown("Upload 2 or more documents for the **same applicant** " | |
| "(e.g. land record + bank statement + ID). The system will " | |
| "extract identity fields and flag any mismatches.") | |
| uploads = st.file_uploader( | |
| "Upload 2-4 documents", | |
| type=["png", "jpg", "jpeg", "pdf"], | |
| accept_multiple_files=True, | |
| key="multi", | |
| ) | |
| if uploads and len(uploads) >= 2: | |
| paths = [save_uploaded(u) for u in uploads] | |
| with st.spinner("Extracting identity fields from each document..."): | |
| result = forensics.cross_doc_consistency(paths) | |
| # --- header --- | |
| c1, c2 = st.columns([1, 2]) | |
| with c1: | |
| risk_badge(result["consistency_band"]) | |
| st.metric("Consistency risk", f"{result['consistency_risk_score']:.3f}") | |
| with c2: | |
| st.metric("Mismatches", result["mismatches"]) | |
| st.metric("Likely mismatches", result["likely_mismatches"]) | |
| st.divider() | |
| st.markdown("#### Field-by-field comparison") | |
| # Build a comparison table | |
| field_rows = [] | |
| files = [Path(p).name for p in paths] | |
| for field, res in result["field_results"].items(): | |
| row = {"Field": field, "Status": res["status"], | |
| "Similarity": res.get("similarity")} | |
| for fn, val in zip(files, res["values"]): | |
| row[fn] = val or "(not found)" | |
| field_rows.append(row) | |
| df = pd.DataFrame(field_rows) | |
| def colour_status(val): | |
| if val == "match": return "background-color: #dcfce7" | |
| if val == "likely_match": return "background-color: #fef3c7" | |
| if val == "mismatch": return "background-color: #fecaca" | |
| return "" | |
| st.dataframe(df.style.applymap(colour_status, subset=["Status"]), | |
| use_container_width=True) | |
| st.divider() | |
| st.markdown("#### Per-document extracts") | |
| for doc in result["documents"]: | |
| with st.expander(Path(doc["file"]).name): | |
| st.json(doc["fields"]) | |
| st.download_button( | |
| "Download consistency report JSON", | |
| data=json.dumps(result, indent=2, default=str), | |
| file_name="cross_doc_consistency.json", | |
| mime="application/json", | |
| ) | |
| elif uploads: | |
| st.info("Upload at least 2 documents to run the cross-check.") | |
| # ============================================================= | |
| # TAB 3 - Batch audit | |
| # ============================================================= | |
| with tab3: | |
| st.markdown("Point at a folder on your machine to run a batch audit. " | |
| "Produces a CSV with risk band per file.") | |
| default = Path.cwd() / ("sample_data" if not (Path.cwd() / "data").exists() else "data") | |
| folder = st.text_input("Folder path", value=str(default)) | |
| if st.button("Run batch audit"): | |
| root = Path(folder) | |
| if not root.exists(): | |
| st.error(f"Folder not found: {root}") | |
| else: | |
| results = [] | |
| files = [p for p in root.rglob("*") | |
| if p.suffix.lower() in {".png", ".jpg", ".jpeg", ".pdf", ".tif"}] | |
| if not files: | |
| st.warning("No supported files found in folder.") | |
| else: | |
| progress = st.progress(0.0) | |
| for i, p in enumerate(files): | |
| try: | |
| r = forensics.analyse_document(p) | |
| results.append({ | |
| "file": str(p.relative_to(root)), | |
| "type": r.get("type"), | |
| "risk_score": r.get("risk_score"), | |
| "risk_band": r.get("risk_band"), | |
| "action": r.get("recommended_action"), | |
| }) | |
| except Exception as e: | |
| results.append({"file": str(p), "error": str(e)}) | |
| progress.progress((i + 1) / len(files)) | |
| df = pd.DataFrame(results) | |
| st.success(f"Analysed {len(files)} files.") | |
| st.dataframe(df, use_container_width=True) | |
| csv = df.to_csv(index=False).encode("utf-8") | |
| st.download_button("Download audit CSV", data=csv, | |
| file_name="audit_log.csv", mime="text/csv") | |
| # ============================================================= | |
| # TAB 4 - Compliance & Audit Pack (KYC + PII redaction + RBI report) | |
| # ============================================================= | |
| with tab4: | |
| st.markdown("**Three regulatory tools in one tab** - KYC field validation, " | |
| "PII auto-redaction, and RBI-style compliance reports.") | |
| sub_a, sub_b, sub_c = st.tabs([ | |
| ":id: KYC Field Validation", | |
| ":lock: PII Auto-Redaction", | |
| ":scroll: RBI Compliance Report", | |
| ]) | |
| # -------- 4A: KYC validators (manual input) -------- | |
| with sub_a: | |
| st.markdown("#### Validate KYC fields against RBI rules") | |
| st.caption("IFSC: format + RBI bank-code list | PAN: format + entity-type " | |
| "char | Aadhaar: 12-digit + UIDAI Verhoeff checksum.") | |
| c1, c2, c3 = st.columns(3) | |
| ifsc_in = c1.text_input("IFSC code", value="SBIN0001234") | |
| pan_in = c2.text_input("PAN", value="ABCPQ1234F") | |
| aad_in = c3.text_input("Aadhaar number (12 digits)", value="234567890124") | |
| if st.button("Validate all", key="kyc_validate"): | |
| r_ifsc = compliance.validate_ifsc(ifsc_in) | |
| r_pan = compliance.validate_pan(pan_in) | |
| r_aad = compliance.validate_aadhaar(aad_in) | |
| for label, r in [("IFSC", r_ifsc), ("PAN", r_pan), ("Aadhaar", r_aad)]: | |
| if r["ok"]: | |
| st.success("**" + label + "**: VALID. " + " ".join(r["flags"])) | |
| if label == "IFSC": | |
| bn = r.get("bank_name", "-") | |
| bc = r.get("branch_code", "-") | |
| st.caption("Bank: " + bn + ", branch code: " + bc) | |
| if label == "PAN": | |
| et = r.get("entity_type", "-") | |
| st.caption("Entity type: " + et) | |
| if label == "Aadhaar": | |
| mk = r.get("masked", "-") | |
| st.caption("Masked: " + mk) | |
| else: | |
| st.error("**" + label + "**: INVALID. " + " | ".join(r["flags"])) | |
| st.divider() | |
| st.markdown("#### Or: extract & validate from a document") | |
| kyc_file = st.file_uploader("Upload doc to scan for KYC fields", | |
| type=["pdf", "png", "jpg"], key="kyc_doc") | |
| if kyc_file: | |
| kyc_path = save_uploaded(kyc_file) | |
| with st.spinner("Extracting KYC fields..."): | |
| fields, _ = compliance.extract_pii_fields(kyc_path) | |
| n_ifsc = len(fields["ifsc"]) | |
| n_pan = len(fields["pan"]) | |
| n_aad = len(fields["aadhaar"]) | |
| n_acc = len(fields["accounts"]) | |
| st.markdown("**Found in document:** " + str(n_ifsc) + " IFSC, " + | |
| str(n_pan) + " PAN, " + str(n_aad) + " Aadhaar candidates, " + | |
| str(n_acc) + " account numbers") | |
| # Validate unique IFSCs (first 5) | |
| uniq_ifsc = list(set(fields["ifsc"]))[:5] | |
| if uniq_ifsc: | |
| st.markdown("**IFSC validation (first 5 unique):**") | |
| rows = [compliance.validate_ifsc(c) for c in uniq_ifsc] | |
| st.dataframe(pd.DataFrame(rows), use_container_width=True) | |
| if fields["pan"]: | |
| st.markdown("**PAN validation:**") | |
| rows = [compliance.validate_pan(c) for c in fields["pan"][:5]] | |
| st.dataframe(pd.DataFrame(rows), use_container_width=True) | |
| # -------- 4B: PII redaction -------- | |
| with sub_b: | |
| st.markdown("#### Auto-redact PII for safe sharing") | |
| st.caption("Masks IFSC, PAN, Aadhaar, and account numbers. Use before " | |
| "forwarding to external vendors / for DPDP Act compliance.") | |
| rd_file = st.file_uploader("Upload document to redact", | |
| type=["pdf", "png", "jpg"], key="rd") | |
| if rd_file: | |
| src_path = save_uploaded(rd_file) | |
| if str(src_path).lower().endswith(".pdf"): | |
| out_path = Path(tempfile.gettempdir()) / f"redacted_{src_path.stem}.pdf" | |
| with st.spinner("Redacting PDF..."): | |
| found = compliance.redact_pdf(str(src_path), str(out_path)) | |
| total = sum(len(v) for v in found.values()) | |
| st.success("Redacted " + str(total) + " PII items.") | |
| summary = {k: len(v) for k, v in found.items()} | |
| st.json(summary) | |
| with open(out_path, "rb") as f: | |
| st.download_button("Download redacted PDF", f.read(), | |
| file_name=out_path.name, mime="application/pdf") | |
| else: | |
| # image - just OCR + redact text | |
| fields, text = compliance.extract_pii_fields(src_path) | |
| red_text, _ = compliance.redact_text(text) | |
| st.markdown("**Original (OCR):**") | |
| st.code(text[:600], language=None) | |
| st.markdown("**Redacted:**") | |
| st.code(red_text[:600], language=None) | |
| st.download_button("Download redacted text", red_text, | |
| file_name=f"redacted_{src_path.stem}.txt") | |
| # -------- 4C: RBI compliance report -------- | |
| with sub_c: | |
| st.markdown("#### Generate an RBI Master-Direction-style audit PDF") | |
| st.caption("Runs full forensic analysis + KYC verification + RBI risk-treatment " | |
| "recommendation, then produces a regulator-ready PDF.") | |
| cr_file = st.file_uploader("Upload document for compliance audit", | |
| type=["pdf", "png", "jpg"], key="cr") | |
| if cr_file: | |
| src_path = save_uploaded(cr_file) | |
| with st.spinner("Running forensic analysis..."): | |
| f_report = forensics.analyse_document(src_path) | |
| with st.spinner("Validating KYC fields..."): | |
| fields, _ = compliance.extract_pii_fields(src_path) | |
| kyc_results = {} | |
| if fields["ifsc"]: | |
| kyc_results["ifsc"] = compliance.validate_ifsc(fields["ifsc"][0]) | |
| if fields["pan"]: | |
| kyc_results["pan"] = compliance.validate_pan(fields["pan"][0]) | |
| if fields["aadhaar"]: | |
| kyc_results["aadhaar"] = compliance.validate_aadhaar(fields["aadhaar"][0]) | |
| # Summary cards | |
| cc1, cc2, cc3 = st.columns(3) | |
| cc1.metric("Forensic risk", f_report.get("risk_band", "-")) | |
| cc2.metric("KYC fields found", | |
| sum(len(fields[k]) for k in ("ifsc", "pan", "aadhaar"))) | |
| cc3.metric("KYC checks passed", | |
| sum(1 for r in kyc_results.values() if r.get("ok"))) | |
| # KYC table | |
| if kyc_results: | |
| rows = [{"Field": k.upper(), "Value": r.get("code", "-"), | |
| "Status": "PASS" if r.get("ok") else "FAIL", | |
| "Notes": "; ".join(r.get("flags", []))[:60]} | |
| for k, r in kyc_results.items()] | |
| st.dataframe(pd.DataFrame(rows), use_container_width=True) | |
| else: | |
| st.info("No KYC fields found in this document to validate.") | |
| # Generate the report | |
| with st.spinner("Building RBI compliance PDF..."): | |
| pdf_bytes = compliance.build_compliance_report( | |
| f_report, src_path, kyc_results) | |
| st.success(f"Generated compliance audit ({len(pdf_bytes)//1024} KB)") | |
| st.download_button("Download RBI Compliance Report (PDF)", pdf_bytes, | |
| file_name=f"compliance_{src_path.stem}.pdf", | |
| mime="application/pdf") | |
| # ---------- Provenance Ledger (tamper-evident audit trail) ---------- | |
| st.divider() | |
| st.markdown("### :link: Provenance Ledger — tamper-evident audit chain") | |
| st.caption("Every analysis is logged as a SHA-256 hash-chained record. " | |
| "Editing any past entry breaks the chain, satisfying RBI Master " | |
| "Direction on KYC, 2016 (Para 67) record-retention requirements.") | |
| try: | |
| import provenance as _prov | |
| stats = _prov.chain_stats() | |
| pc1, pc2, pc3, pc4 = st.columns(4) | |
| pc1.metric("Records", stats["n_records"]) | |
| pc2.metric("First entry", (stats["first_ts"] or "-")[:19].replace("T", " ")) | |
| pc3.metric("Last entry", (stats["last_ts"] or "-")[:19].replace("T", " ")) | |
| pc4.metric("Chain status", ":white_check_mark: INTACT" if stats["chain_intact"] | |
| else f":x: BROKEN @ #{stats['broken_at']}") | |
| if stats["n_records"]: | |
| df = _prov.ledger_dataframe(limit=20) | |
| df_show = df[["id", "ts", "doc_name", "risk_band", "risk_score", | |
| "prev_hash", "record_hash"]].copy() | |
| df_show["prev_hash"] = df_show["prev_hash"].str[:14] + "..." | |
| df_show["record_hash"] = df_show["record_hash"].str[:14] + "..." | |
| df_show["ts"] = df_show["ts"].str[:19].str.replace("T", " ") | |
| st.dataframe(df_show, use_container_width=True, hide_index=True) | |
| cdl, cv = st.columns([1, 1]) | |
| with cdl: | |
| import json as _j | |
| st.download_button(":inbox_tray: Download full ledger (JSON)", | |
| _j.dumps(_prov.fetch_ledger(limit=10000), | |
| indent=2, default=str), | |
| file_name="docsentry_provenance.json", | |
| mime="application/json") | |
| with cv: | |
| if st.button(":mag: Re-verify chain integrity", key="prov_verify"): | |
| ok, where = _prov.verify_chain() | |
| if ok: st.success(":white_check_mark: Chain integrity verified across all records.") | |
| else: st.error(f":x: Chain broken at record #{where}.") | |
| else: | |
| st.info("Ledger is empty. Run an analysis on Tab 1 to create the first record.") | |
| except Exception as e: | |
| st.warning(f"Provenance ledger unavailable: {e}") | |
| # ============================================================= | |
| # TAB 5 - Live Tamper Forge Studio (rich version) | |
| # ============================================================= | |
| with tab5: | |
| import time as _time | |
| st.markdown("**Live forgery demo.** Pick a clean document, choose a " | |
| "technique and intensity, and watch DocSentry localise the " | |
| "tampered region and tell you *which* detector caught it.") | |
| # ---- top controls row ---- | |
| ctl1, ctl2, ctl3 = st.columns([2, 1, 1]) | |
| with ctl1: | |
| fs_sample_dir = Path("sample_data") | |
| fs_options = [] | |
| if fs_sample_dir.exists(): | |
| for sub in ("originals",): | |
| fs_options.extend(sorted((fs_sample_dir / sub).glob("*.png"))) | |
| fs_options.extend(sorted((fs_sample_dir / sub).glob("*.jpg"))) | |
| fs_opts_str = ["(upload your own)"] + [str(p.relative_to(fs_sample_dir)) for p in fs_options] | |
| fs_pick = st.selectbox("Source document", fs_opts_str, key="fs_pick") | |
| with ctl2: | |
| fs_intensity = st.select_slider("Tamper intensity", | |
| options=["subtle", "moderate", "aggressive"], value="moderate", key="fs_intensity") | |
| with ctl3: | |
| fs_mode = st.radio("Mode", | |
| ["Single technique", "Chain (multi-step)", "Adversarial canvas"], | |
| key="fs_mode") | |
| src_path = None | |
| if fs_pick != "(upload your own)": | |
| src_path = fs_sample_dir / fs_pick | |
| else: | |
| fs_up = st.file_uploader("Upload PNG/JPG to forge", | |
| type=["png", "jpg", "jpeg"], key="fs_up") | |
| if fs_up: | |
| src_path = save_uploaded(fs_up) | |
| if src_path is None: | |
| st.info(":point_up: Pick a sample document or upload one to begin.") | |
| else: | |
| from PIL import Image as _PILImg | |
| orig_img = _PILImg.open(src_path).convert("RGB") | |
| st.image(orig_img, caption=f"Source: {src_path.name}", width=420) | |
| # ---- mode-specific controls ---- | |
| chosen, chosen_chain, custom_box = None, [], None | |
| if fs_mode == "Single technique": | |
| st.markdown("#### Pick a forgery technique") | |
| cols = st.columns(5) | |
| techniques = [ | |
| ("copy_move", ":scissors:", "Copy-move"), | |
| ("text_edit", ":pencil2:", "Text edit"), | |
| ("splice", ":jigsaw:", "Splice"), | |
| ("compression", ":package:", "Re-save"), | |
| ("metadata", ":wastebasket:", "Strip EXIF"), | |
| ] | |
| for (key, icon, label), col in zip(techniques, cols): | |
| if col.button(f"{icon} {label}", key="fs_tbtn_" + key): | |
| st.session_state["fs_chosen"] = key | |
| chosen = st.session_state.get("fs_chosen") | |
| elif fs_mode == "Chain (multi-step)": | |
| st.markdown("#### Build a chain (applied in order)") | |
| cc = st.columns(5) | |
| chain_keys = ["copy_move","text_edit","splice","compression","metadata"] | |
| chain_picks = [] | |
| for k, c in zip(chain_keys, cc): | |
| if c.checkbox(k, key="fs_chain_" + k): | |
| chain_picks.append(k) | |
| if st.button(":fire: Run chain", key="fs_chain_run"): | |
| if not chain_picks: | |
| st.warning("Pick at least one technique to chain.") | |
| else: | |
| st.session_state["fs_chain"] = chain_picks | |
| st.session_state["fs_chosen"] = "__chain__" | |
| chosen_chain = st.session_state.get("fs_chain", []) | |
| if st.session_state.get("fs_chosen") == "__chain__": | |
| chosen = "__chain__" | |
| elif fs_mode == "Adversarial canvas": | |
| st.markdown("#### Draw a region to tamper (adversarial mode)") | |
| try: | |
| from streamlit_drawable_canvas import st_canvas | |
| canvas_max_w = 700 | |
| cw_scale = min(canvas_max_w, orig_img.width) / orig_img.width | |
| cw, ch = int(orig_img.width * cw_scale), int(orig_img.height * cw_scale) | |
| canvas_res = st_canvas( | |
| fill_color="rgba(255, 0, 0, 0.25)", | |
| stroke_width=2, stroke_color="#dc2626", | |
| background_image=orig_img, | |
| update_streamlit=True, height=ch, width=cw, | |
| drawing_mode="rect", key="fs_canvas", | |
| ) | |
| if canvas_res.json_data and canvas_res.json_data.get("objects"): | |
| obj = canvas_res.json_data["objects"][-1] | |
| x = obj["left"] / cw_scale; y = obj["top"] / cw_scale | |
| w = obj["width"] / cw_scale; h = obj["height"] / cw_scale | |
| custom_box = (max(0,int(x)), max(0,int(y)), | |
| min(orig_img.width, int(x+w)), | |
| min(orig_img.height, int(y+h))) | |
| if st.button(":boom: Tamper this region", key="fs_custom_run"): | |
| st.session_state["fs_custom_box"] = custom_box | |
| st.session_state["fs_chosen"] = "__custom__" | |
| except ImportError: | |
| st.error("Install `streamlit-drawable-canvas` to enable adversarial mode.") | |
| if st.session_state.get("fs_chosen") == "__custom__": | |
| chosen = "__custom__" | |
| custom_box = st.session_state.get("fs_custom_box") | |
| # ---- run + render ---- | |
| if chosen is not None: | |
| st.divider() | |
| # staged reveal | |
| progress = st.progress(0, text="Forging document...") | |
| _time.sleep(0.4); progress.progress(35, text="Applying tampering...") | |
| t0 = _time.time() | |
| if chosen == "__chain__": | |
| meta = tampering.tamper_chain(orig_img, chosen_chain, intensity=fs_intensity) | |
| elif chosen == "__custom__": | |
| meta = tampering.tamper_dispatch("custom", orig_img, | |
| intensity=fs_intensity, | |
| custom_box=custom_box) | |
| else: | |
| meta = tampering.tamper_dispatch(chosen, orig_img, intensity=fs_intensity) | |
| forge_ms = int((_time.time() - t0) * 1000) | |
| progress.progress(60, text="DocSentry analysing...") | |
| import tempfile as _tmp | |
| tmp_path = Path(_tmp.NamedTemporaryFile(delete=False, suffix=".png").name) | |
| meta["image"].save(tmp_path) | |
| t1 = _time.time() | |
| report = forensics.analyse_document(tmp_path) | |
| detect_ms = int((_time.time() - t1) * 1000) | |
| progress.progress(85, text="Computing detector scorecard...") | |
| scorecard = tampering.detector_scorecard(tmp_path) | |
| progress.progress(100, text="Done") | |
| _time.sleep(0.2); progress.empty() | |
| # --- BIG VERDICT ROW --- | |
| detected = report["risk_band"] in ("MEDIUM","HIGH","CRITICAL") | |
| v1, v2 = st.columns([1, 2]) | |
| with v1: | |
| risk_badge(report["risk_band"]) | |
| st.metric("Detection latency", f"{detect_ms} ms") | |
| st.metric("Forge latency", f"{forge_ms} ms") | |
| with v2: | |
| if detected: | |
| st.success(f":white_check_mark: **FORGERY DETECTED** in {detect_ms} ms") | |
| else: | |
| st.error(":x: **Forgery slipped past detectors.** Try a more aggressive intensity or different technique.") | |
| st.markdown(f"**Technique:** {meta.get('description','')}") | |
| st.markdown(f"**Intensity:** `{fs_intensity}`") | |
| st.divider() | |
| # --- ANNOTATED BEFORE/AFTER --- | |
| st.markdown("#### Where is the forgery?") | |
| ann_orig, ann_tamp = tampering.annotate_before_after(orig_img, meta) | |
| ab1, ab2 = st.columns(2) | |
| with ab1: | |
| st.image(ann_orig, caption=":large_green_circle: Original (green = source region)", use_column_width=True) | |
| with ab2: | |
| st.image(ann_tamp, caption=":red_circle: Tampered (red = where the change is)", use_column_width=True) | |
| st.divider() | |
| # --- DETECTOR SCORECARD --- | |
| st.markdown("#### Per-detector scorecard") | |
| st.caption("Which forensic signal caught the tampering, and how confidently.") | |
| sc_rows = [] | |
| for name, info in scorecard.items(): | |
| sc_rows.append({ | |
| "Detector": name, | |
| "Confidence": round(info["score"], 3), | |
| "Status": ":red_circle: CAUGHT" if info["caught"] else ":large_green_circle: clean", | |
| "Reading": str(info["raw"])[:40], | |
| "Signal": info["what"], | |
| }) | |
| sc_df = pd.DataFrame(sc_rows) | |
| st.dataframe(sc_df, use_container_width=True, hide_index=True) | |
| # bar chart | |
| fig_sc, ax_sc = plt.subplots(figsize=(9, 0.5 * len(sc_rows) + 1.5)) | |
| colors_sc = ["#dc2626" if r["Status"].startswith(":red") else "#16a34a" for r in sc_rows] | |
| ax_sc.barh([r["Detector"] for r in sc_rows], | |
| [r["Confidence"] for r in sc_rows], color=colors_sc) | |
| ax_sc.set_xlim(0, 1) | |
| ax_sc.axvline(0.4, color="grey", linestyle="--", alpha=0.5, label="threshold") | |
| ax_sc.set_xlabel("confidence (0 = clean, 1 = certain tampering)") | |
| ax_sc.set_title("Detector confidence per signal") | |
| ax_sc.invert_yaxis() | |
| ax_sc.legend(loc="lower right") | |
| plt.tight_layout() | |
| st.pyplot(fig_sc) | |
| st.divider() | |
| # --- LOCALIZATION HEATMAP OVERLAYS --- | |
| st.markdown("#### Forensic localization (heatmap overlays)") | |
| st.caption("Where each detector thinks the tampering is, painted on the tampered image.") | |
| ela_img_raw, _ = forensics.error_level_analysis(tmp_path) | |
| heat_noise, _ = forensics.noise_inconsistency(tmp_path) | |
| tabs_loc = st.tabs(["ELA overlay", "Noise overlay"]) | |
| with tabs_loc[0]: | |
| ela_arr = np.array(ela_img_raw.convert("L")) | |
| composite_ela = tampering.overlay_heatmap_on_image(meta["image"], ela_arr, alpha=0.55, cmap="hot") | |
| st.image(composite_ela, caption="ELA hotspots overlaid on tampered image", | |
| use_column_width=True) | |
| with tabs_loc[1]: | |
| composite_noise = tampering.overlay_heatmap_on_image(meta["image"], heat_noise, alpha=0.55, cmap="jet") | |
| st.image(composite_noise, caption="Noise inconsistency hotspots", | |
| use_column_width=True) | |
| st.divider() | |
| # --- DOWNLOAD --- | |
| import io as _io | |
| buf = _io.BytesIO() | |
| meta["image"].save(buf, "PNG") | |
| st.download_button(":inbox_tray: Download the forged image", | |
| buf.getvalue(), | |
| file_name=f"forged_{chosen}_{fs_intensity}_{src_path.stem}.png", | |
| mime="image/png") | |
| # ============================================================= | |
| # TAB 6 - Fraud Ring Network Detector | |
| # ============================================================= | |
| with tab6: | |
| st.markdown("**Detect organised application fraud.** " | |
| "Upload documents from multiple applicants. The system extracts " | |
| "their identity signals (name, DOB, address, phone, IFSC, account, " | |
| "employer) and builds a similarity graph. Cliques of >=3 applicants " | |
| "linked by shared signals are flagged as suspected fraud rings.") | |
| st.caption("Banks lose ~Rs 3,000 crore/year to organised application fraud (RBI Annual Report).") | |
| fr_files = st.file_uploader( | |
| "Upload 3 or more applicant documents (PNG / JPG / PDF):", | |
| type=["png", "jpg", "jpeg", "pdf"], | |
| accept_multiple_files=True, | |
| key="fr_files", | |
| ) | |
| fr_col1, fr_col2 = st.columns([1, 1]) | |
| with fr_col1: | |
| fr_min_size = st.slider("Minimum ring size", 2, 6, 3, key="fr_min_size", | |
| help="A 'ring' must have at least this many linked applicants.") | |
| with fr_col2: | |
| fr_threshold = st.slider("Link threshold", 0.10, 1.00, 0.30, 0.05, key="fr_thresh", | |
| help="Pair similarity required to count as a link.") | |
| if fr_files and len(fr_files) >= 2: | |
| if st.button(":spider_web: Build fraud network", key="fr_run", type="primary"): | |
| with st.spinner("Extracting identity fields from each document..."): | |
| applicants = [] | |
| for f in fr_files: | |
| p_tmp = save_uploaded(f) | |
| fields = fraud_ring.extract_applicant_fields(p_tmp) | |
| fields["upload_name"] = f.name | |
| applicants.append(fields) | |
| st.session_state["fr_applicants"] = applicants | |
| with st.spinner("Building similarity graph..."): | |
| G = fraud_ring.build_fraud_graph(applicants) | |
| rings = fraud_ring.detect_rings(G, min_size=fr_min_size, edge_threshold=fr_threshold) | |
| st.session_state["fr_graph"] = G | |
| st.session_state["fr_rings"] = rings | |
| # Display from session state so re-runs don't lose results | |
| if "fr_graph" in st.session_state: | |
| G = st.session_state["fr_graph"] | |
| rings = st.session_state["fr_rings"] | |
| applicants = st.session_state["fr_applicants"] | |
| summary = fraud_ring.fraud_summary(G, rings, applicants) | |
| # KPI cards | |
| kc1, kc2, kc3, kc4 = st.columns(4) | |
| kc1.metric("Applicants", summary["n_applicants"]) | |
| kc2.metric("Suspected rings", summary["n_rings"]) | |
| kc3.metric("Largest ring", summary["largest_ring_size"]) | |
| kc4.metric("Fraud risk", f"{summary['fraud_risk_percentage']}%") | |
| st.divider() | |
| # Graph visualisation | |
| st.markdown("#### Fraud network graph") | |
| st.caption(":red_circle: Red nodes are members of suspected fraud rings. " | |
| ":large_green_circle: Green nodes look clean. " | |
| "Edge thickness = similarity strength.") | |
| fig = fraud_ring.visualize_graph(G, rings) | |
| st.pyplot(fig) | |
| st.divider() | |
| # Ring breakdown | |
| if summary["rings"]: | |
| st.markdown("#### Detected fraud rings") | |
| for r in summary["rings"]: | |
| band_color = {"CRITICAL":":red_circle:", "HIGH":":large_orange_circle:", | |
| "MEDIUM":":large_yellow_circle:"}.get(r["risk_band"], ":white_circle:") | |
| with st.expander(f"{band_color} **Ring #{r['ring_id']}** " | |
| f"({r['risk_band']}, {r['size']} applicants, " | |
| f"{r['n_links']} links)"): | |
| st.markdown("**Members:**") | |
| for nm, fn in zip(r["applicant_names"], r["applicant_files"]): | |
| st.markdown(f"- **{nm}** (`{fn}`)") | |
| st.markdown("**Top shared signals:**") | |
| for sig, count in r["top_shared_signals"]: | |
| st.markdown(f"- `{sig}`: appears in {count} pairwise links") | |
| else: | |
| st.success(":white_check_mark: No fraud rings detected at the current threshold.") | |
| st.divider() | |
| # Per-applicant extracts | |
| with st.expander("View extracted identity fields per applicant"): | |
| rows = [] | |
| for i, a in enumerate(applicants): | |
| rows.append({ | |
| "#": i, | |
| "File": a.get("upload_name"), | |
| "Name": a.get("name") or "-", | |
| "DOB": a.get("dob") or "-", | |
| "Address": (a.get("address") or "-")[:40], | |
| "Phone": a.get("phone") or "-", | |
| "IFSC": a.get("ifsc") or "-", | |
| "Account": (a.get("account") or "-")[:14], | |
| }) | |
| st.dataframe(pd.DataFrame(rows), use_container_width=True, hide_index=True) | |
| # Download network analysis | |
| import io as _io, json as _json | |
| export = {"summary": summary, | |
| "applicants": [{k: v for k, v in a.items() if k != "text_sample"} | |
| for a in applicants], | |
| "edges": [{"a": u, "b": v, **d} for u, v, d in G.edges(data=True)]} | |
| st.download_button(":inbox_tray: Download fraud network report (JSON)", | |
| _json.dumps(export, indent=2, default=str), | |
| file_name="fraud_network_report.json", | |
| mime="application/json") | |
| elif fr_files and len(fr_files) < 2: | |
| st.warning("Upload at least 2 applicants (3+ recommended) to detect a ring.") | |
| else: | |
| st.info(":point_up: Upload multiple applicants' documents to begin. " | |
| "The system will pair-wise compare their identity fields and " | |
| "show which applicants are linked.") | |
| # ------------------------------------------------------------- | |
| # Footer | |
| # ------------------------------------------------------------- | |
| st.divider() | |
| st.caption("DocSentry prototype - rule-based + trainable RF + CNN ensemble - " | |
| "100% open source, runs locally or on Streamlit Cloud / HF Spaces.") | |