Spaces:
Sleeping
Sleeping
| """ | |
| fraud_ring.py - Cross-applicant fraud-ring detection. | |
| Banks lose ~Rs 3,000 crore/year to organised application fraud | |
| (RBI's own annual report). This module detects rings by: | |
| 1. Extracting identity signals from each applicant's documents | |
| (name, DOB, address, phone, IFSC, account number, employer). | |
| 2. Building a similarity graph: nodes = applicants, | |
| edges = shared signals (weighted by how many signals match). | |
| 3. Finding cliques and connected subgraphs above a similarity threshold. | |
| Each subgraph of >= 3 applicants is a suspected fraud ring. | |
| 4. Rendering an interactive network graph with rings highlighted in red. | |
| Public API: | |
| extract_applicant_fields(path) - pulls identity fields from a doc | |
| compare_applicants(a, b) - similarity score + shared signals | |
| build_fraud_graph(applicants) - returns nx.Graph | |
| detect_rings(G, min_size, threshold) - returns list of ring subgraphs | |
| visualize_graph(G, rings, names) - returns matplotlib Figure | |
| fraud_summary(G, rings, applicants) - structured summary for the UI | |
| """ | |
| import re | |
| from pathlib import Path | |
| from difflib import SequenceMatcher | |
| from collections import defaultdict | |
| import numpy as np | |
| import networkx as nx | |
| import matplotlib.pyplot as plt | |
| # ============================================================ | |
| # Field extraction | |
| # ============================================================ | |
| NAME_RE = re.compile(r"(?:Name|Owner|Borrower|Holder|Account Holder|Applicant)\s*[:\-]\s*([A-Z][A-Z\s.]{2,40})", re.I) | |
| DOB_RE = re.compile(r"(?:DOB|Date of Birth|Born)\s*[:\-]\s*(\d{1,2}[-/]\d{1,2}[-/]\d{2,4})", re.I) | |
| ADDR_RE = re.compile(r"(?:Address|Village|Residence|City)\s*[:\-]\s*([A-Z0-9][A-Z0-9\s,.\-/]{3,80})", re.I) | |
| PHONE_RE = re.compile(r"\b[6-9]\d{9}\b") # Indian mobile | |
| IFSC_RE = re.compile(r"\b[A-Z]{4}0[A-Z0-9]{6}\b") | |
| ACC_RE = re.compile(r"\b\d{9,18}\b") | |
| EMP_RE = re.compile(r"(?:Employer|Company|Workplace)\s*[:\-]\s*([A-Z][A-Z0-9\s.&]{2,50})", re.I) | |
| def _norm(s): | |
| return re.sub(r"\s+", " ", (s or "").strip().upper()) | |
| def extract_applicant_fields(path): | |
| """ | |
| Extract identity signals from a document. | |
| Returns dict with the standard set of fields. Missing fields are None. | |
| Uses PDF text extraction for PDFs, OCR for images. If OCR isn't | |
| available, image documents return mostly empty fields - the graph | |
| builder handles that gracefully. | |
| """ | |
| text = "" | |
| p = Path(path) | |
| try: | |
| if p.suffix.lower() == ".pdf": | |
| import fitz | |
| with fitz.open(p) as d: | |
| text = "\n".join(page.get_text() for page in d) | |
| else: | |
| try: | |
| import pytesseract | |
| from PIL import Image | |
| text = pytesseract.image_to_string(Image.open(p)) | |
| except Exception: | |
| text = "" | |
| except Exception: | |
| text = "" | |
| def first(rx, source=text): | |
| m = rx.search(source) if hasattr(rx, "search") else None | |
| if m: | |
| try: return _norm(m.group(1)) | |
| except IndexError: return _norm(m.group(0)) | |
| return None | |
| fields = { | |
| "file": str(p), | |
| "name": first(NAME_RE), | |
| "dob": first(DOB_RE), | |
| "address": first(ADDR_RE), | |
| "phone": PHONE_RE.search(text).group(0) if PHONE_RE.search(text) else None, | |
| "ifsc": IFSC_RE.findall(text)[0] if IFSC_RE.findall(text) else None, | |
| "account": ACC_RE.findall(text)[0] if ACC_RE.findall(text) else None, | |
| "employer": first(EMP_RE), | |
| "text_sample": text[:500], | |
| } | |
| return fields | |
| # ============================================================ | |
| # Similarity scoring between two applicants | |
| # ============================================================ | |
| def _string_sim(a, b): | |
| if not a or not b: | |
| return 0.0 | |
| return SequenceMatcher(None, _norm(a), _norm(b)).ratio() | |
| def _exact(a, b): | |
| return bool(a) and bool(b) and _norm(a) == _norm(b) | |
| def compare_applicants(a, b, name_thresh=0.85, addr_thresh=0.80): | |
| """ | |
| Compare two applicants' identity fields. Returns: | |
| { | |
| 'score': float in [0, 1] - overall similarity | |
| 'shared': dict {field: bool} - which signals matched | |
| 'highlights': list of str - human-readable matched signals | |
| } | |
| """ | |
| shared = {} | |
| highlights = [] | |
| # Name (fuzzy) | |
| name_sim = _string_sim(a.get("name"), b.get("name")) | |
| shared["name"] = name_sim >= name_thresh | |
| if shared["name"]: | |
| highlights.append(f"Name: {a.get('name')}") | |
| # DOB (exact) | |
| shared["dob"] = _exact(a.get("dob"), b.get("dob")) | |
| if shared["dob"]: | |
| highlights.append(f"DOB: {a.get('dob')}") | |
| # Address (fuzzy) | |
| addr_sim = _string_sim(a.get("address"), b.get("address")) | |
| shared["address"] = addr_sim >= addr_thresh | |
| if shared["address"]: | |
| highlights.append(f"Address: {a.get('address')}") | |
| # Phone (exact) | |
| shared["phone"] = _exact(a.get("phone"), b.get("phone")) | |
| if shared["phone"]: | |
| highlights.append(f"Phone: {a.get('phone')}") | |
| # IFSC (exact) — same branch is mildly suspicious | |
| shared["ifsc"] = _exact(a.get("ifsc"), b.get("ifsc")) | |
| if shared["ifsc"]: | |
| highlights.append(f"IFSC: {a.get('ifsc')}") | |
| # Account (exact) — VERY suspicious (same bank account = same person) | |
| shared["account"] = _exact(a.get("account"), b.get("account")) | |
| if shared["account"]: | |
| highlights.append(f"Account: {a.get('account')}") | |
| # Employer (fuzzy) | |
| emp_sim = _string_sim(a.get("employer"), b.get("employer")) | |
| shared["employer"] = emp_sim >= 0.85 | |
| if shared["employer"]: | |
| highlights.append(f"Employer: {a.get('employer')}") | |
| # Weighted score — different fields have different fraud significance | |
| weights = {"name": 0.10, "dob": 0.15, "address": 0.20, "phone": 0.20, | |
| "ifsc": 0.05, "account": 0.25, "employer": 0.05} | |
| score = sum(weights[k] for k, v in shared.items() if v) | |
| return { | |
| "score": round(score, 3), | |
| "shared": shared, | |
| "highlights": highlights, | |
| "n_matches": sum(1 for v in shared.values() if v), | |
| } | |
| # ============================================================ | |
| # Graph builder | |
| # ============================================================ | |
| def build_fraud_graph(applicants, name_thresh=0.85, addr_thresh=0.80): | |
| """ | |
| applicants: list[dict] returned by extract_applicant_fields | |
| Returns: nx.Graph with applicants as nodes (indexed 0..N-1) | |
| and edges weighted by similarity score. | |
| """ | |
| G = nx.Graph() | |
| for i, a in enumerate(applicants): | |
| G.add_node(i, | |
| file=a.get("file"), | |
| name=a.get("name") or "(unknown)", | |
| dob=a.get("dob"), | |
| address=a.get("address"), | |
| phone=a.get("phone"), | |
| ifsc=a.get("ifsc"), | |
| account=a.get("account"), | |
| employer=a.get("employer")) | |
| for i in range(len(applicants)): | |
| for j in range(i + 1, len(applicants)): | |
| cmp = compare_applicants(applicants[i], applicants[j], | |
| name_thresh=name_thresh, | |
| addr_thresh=addr_thresh) | |
| if cmp["n_matches"] > 0: | |
| G.add_edge(i, j, | |
| weight=cmp["score"], | |
| highlights=cmp["highlights"], | |
| shared=cmp["shared"], | |
| n_matches=cmp["n_matches"]) | |
| return G | |
| def detect_rings(G, min_size=3, edge_threshold=0.30): | |
| """ | |
| Find suspected fraud rings: connected components with >= min_size | |
| applicants linked by edges above edge_threshold. | |
| Returns: list of frozensets (applicant indices), sorted by size. | |
| """ | |
| H = G.copy() | |
| H.remove_edges_from([(u, v) for u, v, d in G.edges(data=True) | |
| if d.get("weight", 0) < edge_threshold]) | |
| rings = [frozenset(c) for c in nx.connected_components(H) if len(c) >= min_size] | |
| rings.sort(key=len, reverse=True) | |
| return rings | |
| # ============================================================ | |
| # Visualization | |
| # ============================================================ | |
| def visualize_graph(G, rings=None, figsize=(11, 7), seed=42): | |
| """Render the fraud graph. Ring members in red, others in green.""" | |
| rings = rings or [] | |
| ring_members = set().union(*rings) if rings else set() | |
| fig, ax = plt.subplots(figsize=figsize) | |
| if len(G.nodes) == 0: | |
| ax.text(0.5, 0.5, "No applicants to plot.", | |
| ha="center", va="center", fontsize=14) | |
| ax.axis("off") | |
| return fig | |
| pos = nx.spring_layout(G, seed=seed, k=0.8) | |
| # Node colours | |
| node_colors = ["#dc2626" if i in ring_members else "#16a34a" for i in G.nodes] | |
| node_sizes = [900 if i in ring_members else 600 for i in G.nodes] | |
| # Edge widths proportional to weight | |
| edges = list(G.edges(data=True)) | |
| edge_widths = [1 + 5 * d.get("weight", 0) for u, v, d in edges] | |
| edge_colors = ["#dc2626" if (u in ring_members and v in ring_members) else "#94a3b8" | |
| for u, v, _ in edges] | |
| nx.draw_networkx_nodes(G, pos, node_color=node_colors, node_size=node_sizes, | |
| edgecolors="black", linewidths=1.5, ax=ax) | |
| nx.draw_networkx_edges(G, pos, width=edge_widths, edge_color=edge_colors, | |
| alpha=0.7, ax=ax) | |
| labels = {i: (G.nodes[i].get("name") or f"#{i}")[:15] for i in G.nodes} | |
| nx.draw_networkx_labels(G, pos, labels, font_size=9, font_color="white", | |
| font_weight="bold", ax=ax) | |
| # Title bar | |
| n_rings = len(rings) | |
| largest = max((len(r) for r in rings), default=0) | |
| title = (f"Cross-applicant fraud network | {len(G.nodes)} applicants " | |
| f"| {n_rings} suspected ring{'s' if n_rings != 1 else ''}") | |
| if largest: | |
| title += f" | largest ring: {largest} applicants" | |
| ax.set_title(title, fontsize=12, weight="bold") | |
| ax.axis("off") | |
| plt.tight_layout() | |
| return fig | |
| def fraud_summary(G, rings, applicants): | |
| """Structured summary for the Streamlit UI.""" | |
| ring_data = [] | |
| for i, ring in enumerate(rings): | |
| members = list(ring) | |
| edges = [(u, v, d) for u, v, d in G.edges(data=True) | |
| if u in ring and v in ring] | |
| # Collect the most common shared signals | |
| signal_counts = defaultdict(int) | |
| for _, _, d in edges: | |
| for k, v in d.get("shared", {}).items(): | |
| if v: signal_counts[k] += 1 | |
| top_signals = sorted(signal_counts.items(), key=lambda x: -x[1])[:3] | |
| ring_data.append({ | |
| "ring_id": i + 1, | |
| "size": len(ring), | |
| "applicant_indices": members, | |
| "applicant_names": [applicants[m].get("name") or f"#{m}" for m in members], | |
| "applicant_files": [Path(applicants[m].get("file", "")).name for m in members], | |
| "n_links": len(edges), | |
| "top_shared_signals": top_signals, | |
| "risk_band": "CRITICAL" if len(ring) >= 5 else "HIGH" if len(ring) >= 3 else "MEDIUM", | |
| }) | |
| n_in_rings = len(set().union(*rings)) if rings else 0 | |
| return { | |
| "n_applicants": len(applicants), | |
| "n_rings": len(rings), | |
| "n_applicants_in_rings": n_in_rings, | |
| "largest_ring_size": max((len(r) for r in rings), default=0), | |
| "rings": ring_data, | |
| "fraud_risk_percentage": round(100 * n_in_rings / max(1, len(applicants)), 1), | |
| } | |
| # ============================================================ | |
| # Smoke test | |
| # ============================================================ | |
| if __name__ == "__main__": | |
| fake = [ | |
| {"file": "a.pdf", "name": "RAMESH KUMAR", "dob": "14-03-1985", | |
| "address": "D-234 ALIPORE KOLKATA", "phone": "9876543210", | |
| "ifsc": "SBIN0001234", "account": "78439336112", "employer": "ACME LTD"}, | |
| {"file": "b.pdf", "name": "SURESH KUMAR", "dob": "14-03-1985", | |
| "address": "D-234 ALIPORE KOLKATA", "phone": "9876543210", | |
| "ifsc": "HDFC0009999", "account": "78439336112", "employer": None}, | |
| {"file": "c.pdf", "name": "AMIT SHARMA", "dob": "22-07-1990", | |
| "address": "D-234 ALIPORE KOLKATA", "phone": "9876543210", | |
| "ifsc": None, "account": None, "employer": None}, | |
| {"file": "d.pdf", "name": "POOJA VERMA", "dob": "01-01-1995", | |
| "address": "FLAT 12 BANDRA MUMBAI", "phone": "8765432100", | |
| "ifsc": "ICIC0008888", "account": "12345678901", "employer": "XYZ CORP"}, | |
| ] | |
| G = build_fraud_graph(fake) | |
| rings = detect_rings(G) | |
| summary = fraud_summary(G, rings, fake) | |
| print(f"Built graph: {len(G.nodes)} nodes, {len(G.edges)} edges") | |
| print(f"Detected {len(rings)} rings") | |
| for r in summary["rings"]: | |
| print(f" Ring #{r['ring_id']} ({r['risk_band']}): {r['applicant_names']}") | |
| print(f" Top signals: {r['top_shared_signals']}") | |
| print(f"Fraud-risk percentage: {summary['fraud_risk_percentage']}%") | |