DocSentry / fraud_ring.py
SpandanM110's picture
Round 2: fraud ring graph, AI-gen detector, provenance ledger, architecture doc
e97f963
Raw
History Blame Contribute Delete
13.1 kB
"""
fraud_ring.py - Cross-applicant fraud-ring detection.
Banks lose ~Rs 3,000 crore/year to organised application fraud
(RBI's own annual report). This module detects rings by:
1. Extracting identity signals from each applicant's documents
(name, DOB, address, phone, IFSC, account number, employer).
2. Building a similarity graph: nodes = applicants,
edges = shared signals (weighted by how many signals match).
3. Finding cliques and connected subgraphs above a similarity threshold.
Each subgraph of >= 3 applicants is a suspected fraud ring.
4. Rendering an interactive network graph with rings highlighted in red.
Public API:
extract_applicant_fields(path) - pulls identity fields from a doc
compare_applicants(a, b) - similarity score + shared signals
build_fraud_graph(applicants) - returns nx.Graph
detect_rings(G, min_size, threshold) - returns list of ring subgraphs
visualize_graph(G, rings, names) - returns matplotlib Figure
fraud_summary(G, rings, applicants) - structured summary for the UI
"""
import re
from pathlib import Path
from difflib import SequenceMatcher
from collections import defaultdict
import numpy as np
import networkx as nx
import matplotlib.pyplot as plt
# ============================================================
# Field extraction
# ============================================================
NAME_RE = re.compile(r"(?:Name|Owner|Borrower|Holder|Account Holder|Applicant)\s*[:\-]\s*([A-Z][A-Z\s.]{2,40})", re.I)
DOB_RE = re.compile(r"(?:DOB|Date of Birth|Born)\s*[:\-]\s*(\d{1,2}[-/]\d{1,2}[-/]\d{2,4})", re.I)
ADDR_RE = re.compile(r"(?:Address|Village|Residence|City)\s*[:\-]\s*([A-Z0-9][A-Z0-9\s,.\-/]{3,80})", re.I)
PHONE_RE = re.compile(r"\b[6-9]\d{9}\b") # Indian mobile
IFSC_RE = re.compile(r"\b[A-Z]{4}0[A-Z0-9]{6}\b")
ACC_RE = re.compile(r"\b\d{9,18}\b")
EMP_RE = re.compile(r"(?:Employer|Company|Workplace)\s*[:\-]\s*([A-Z][A-Z0-9\s.&]{2,50})", re.I)
def _norm(s):
return re.sub(r"\s+", " ", (s or "").strip().upper())
def extract_applicant_fields(path):
"""
Extract identity signals from a document.
Returns dict with the standard set of fields. Missing fields are None.
Uses PDF text extraction for PDFs, OCR for images. If OCR isn't
available, image documents return mostly empty fields - the graph
builder handles that gracefully.
"""
text = ""
p = Path(path)
try:
if p.suffix.lower() == ".pdf":
import fitz
with fitz.open(p) as d:
text = "\n".join(page.get_text() for page in d)
else:
try:
import pytesseract
from PIL import Image
text = pytesseract.image_to_string(Image.open(p))
except Exception:
text = ""
except Exception:
text = ""
def first(rx, source=text):
m = rx.search(source) if hasattr(rx, "search") else None
if m:
try: return _norm(m.group(1))
except IndexError: return _norm(m.group(0))
return None
fields = {
"file": str(p),
"name": first(NAME_RE),
"dob": first(DOB_RE),
"address": first(ADDR_RE),
"phone": PHONE_RE.search(text).group(0) if PHONE_RE.search(text) else None,
"ifsc": IFSC_RE.findall(text)[0] if IFSC_RE.findall(text) else None,
"account": ACC_RE.findall(text)[0] if ACC_RE.findall(text) else None,
"employer": first(EMP_RE),
"text_sample": text[:500],
}
return fields
# ============================================================
# Similarity scoring between two applicants
# ============================================================
def _string_sim(a, b):
if not a or not b:
return 0.0
return SequenceMatcher(None, _norm(a), _norm(b)).ratio()
def _exact(a, b):
return bool(a) and bool(b) and _norm(a) == _norm(b)
def compare_applicants(a, b, name_thresh=0.85, addr_thresh=0.80):
"""
Compare two applicants' identity fields. Returns:
{
'score': float in [0, 1] - overall similarity
'shared': dict {field: bool} - which signals matched
'highlights': list of str - human-readable matched signals
}
"""
shared = {}
highlights = []
# Name (fuzzy)
name_sim = _string_sim(a.get("name"), b.get("name"))
shared["name"] = name_sim >= name_thresh
if shared["name"]:
highlights.append(f"Name: {a.get('name')}")
# DOB (exact)
shared["dob"] = _exact(a.get("dob"), b.get("dob"))
if shared["dob"]:
highlights.append(f"DOB: {a.get('dob')}")
# Address (fuzzy)
addr_sim = _string_sim(a.get("address"), b.get("address"))
shared["address"] = addr_sim >= addr_thresh
if shared["address"]:
highlights.append(f"Address: {a.get('address')}")
# Phone (exact)
shared["phone"] = _exact(a.get("phone"), b.get("phone"))
if shared["phone"]:
highlights.append(f"Phone: {a.get('phone')}")
# IFSC (exact) — same branch is mildly suspicious
shared["ifsc"] = _exact(a.get("ifsc"), b.get("ifsc"))
if shared["ifsc"]:
highlights.append(f"IFSC: {a.get('ifsc')}")
# Account (exact) — VERY suspicious (same bank account = same person)
shared["account"] = _exact(a.get("account"), b.get("account"))
if shared["account"]:
highlights.append(f"Account: {a.get('account')}")
# Employer (fuzzy)
emp_sim = _string_sim(a.get("employer"), b.get("employer"))
shared["employer"] = emp_sim >= 0.85
if shared["employer"]:
highlights.append(f"Employer: {a.get('employer')}")
# Weighted score — different fields have different fraud significance
weights = {"name": 0.10, "dob": 0.15, "address": 0.20, "phone": 0.20,
"ifsc": 0.05, "account": 0.25, "employer": 0.05}
score = sum(weights[k] for k, v in shared.items() if v)
return {
"score": round(score, 3),
"shared": shared,
"highlights": highlights,
"n_matches": sum(1 for v in shared.values() if v),
}
# ============================================================
# Graph builder
# ============================================================
def build_fraud_graph(applicants, name_thresh=0.85, addr_thresh=0.80):
"""
applicants: list[dict] returned by extract_applicant_fields
Returns: nx.Graph with applicants as nodes (indexed 0..N-1)
and edges weighted by similarity score.
"""
G = nx.Graph()
for i, a in enumerate(applicants):
G.add_node(i,
file=a.get("file"),
name=a.get("name") or "(unknown)",
dob=a.get("dob"),
address=a.get("address"),
phone=a.get("phone"),
ifsc=a.get("ifsc"),
account=a.get("account"),
employer=a.get("employer"))
for i in range(len(applicants)):
for j in range(i + 1, len(applicants)):
cmp = compare_applicants(applicants[i], applicants[j],
name_thresh=name_thresh,
addr_thresh=addr_thresh)
if cmp["n_matches"] > 0:
G.add_edge(i, j,
weight=cmp["score"],
highlights=cmp["highlights"],
shared=cmp["shared"],
n_matches=cmp["n_matches"])
return G
def detect_rings(G, min_size=3, edge_threshold=0.30):
"""
Find suspected fraud rings: connected components with >= min_size
applicants linked by edges above edge_threshold.
Returns: list of frozensets (applicant indices), sorted by size.
"""
H = G.copy()
H.remove_edges_from([(u, v) for u, v, d in G.edges(data=True)
if d.get("weight", 0) < edge_threshold])
rings = [frozenset(c) for c in nx.connected_components(H) if len(c) >= min_size]
rings.sort(key=len, reverse=True)
return rings
# ============================================================
# Visualization
# ============================================================
def visualize_graph(G, rings=None, figsize=(11, 7), seed=42):
"""Render the fraud graph. Ring members in red, others in green."""
rings = rings or []
ring_members = set().union(*rings) if rings else set()
fig, ax = plt.subplots(figsize=figsize)
if len(G.nodes) == 0:
ax.text(0.5, 0.5, "No applicants to plot.",
ha="center", va="center", fontsize=14)
ax.axis("off")
return fig
pos = nx.spring_layout(G, seed=seed, k=0.8)
# Node colours
node_colors = ["#dc2626" if i in ring_members else "#16a34a" for i in G.nodes]
node_sizes = [900 if i in ring_members else 600 for i in G.nodes]
# Edge widths proportional to weight
edges = list(G.edges(data=True))
edge_widths = [1 + 5 * d.get("weight", 0) for u, v, d in edges]
edge_colors = ["#dc2626" if (u in ring_members and v in ring_members) else "#94a3b8"
for u, v, _ in edges]
nx.draw_networkx_nodes(G, pos, node_color=node_colors, node_size=node_sizes,
edgecolors="black", linewidths=1.5, ax=ax)
nx.draw_networkx_edges(G, pos, width=edge_widths, edge_color=edge_colors,
alpha=0.7, ax=ax)
labels = {i: (G.nodes[i].get("name") or f"#{i}")[:15] for i in G.nodes}
nx.draw_networkx_labels(G, pos, labels, font_size=9, font_color="white",
font_weight="bold", ax=ax)
# Title bar
n_rings = len(rings)
largest = max((len(r) for r in rings), default=0)
title = (f"Cross-applicant fraud network | {len(G.nodes)} applicants "
f"| {n_rings} suspected ring{'s' if n_rings != 1 else ''}")
if largest:
title += f" | largest ring: {largest} applicants"
ax.set_title(title, fontsize=12, weight="bold")
ax.axis("off")
plt.tight_layout()
return fig
def fraud_summary(G, rings, applicants):
"""Structured summary for the Streamlit UI."""
ring_data = []
for i, ring in enumerate(rings):
members = list(ring)
edges = [(u, v, d) for u, v, d in G.edges(data=True)
if u in ring and v in ring]
# Collect the most common shared signals
signal_counts = defaultdict(int)
for _, _, d in edges:
for k, v in d.get("shared", {}).items():
if v: signal_counts[k] += 1
top_signals = sorted(signal_counts.items(), key=lambda x: -x[1])[:3]
ring_data.append({
"ring_id": i + 1,
"size": len(ring),
"applicant_indices": members,
"applicant_names": [applicants[m].get("name") or f"#{m}" for m in members],
"applicant_files": [Path(applicants[m].get("file", "")).name for m in members],
"n_links": len(edges),
"top_shared_signals": top_signals,
"risk_band": "CRITICAL" if len(ring) >= 5 else "HIGH" if len(ring) >= 3 else "MEDIUM",
})
n_in_rings = len(set().union(*rings)) if rings else 0
return {
"n_applicants": len(applicants),
"n_rings": len(rings),
"n_applicants_in_rings": n_in_rings,
"largest_ring_size": max((len(r) for r in rings), default=0),
"rings": ring_data,
"fraud_risk_percentage": round(100 * n_in_rings / max(1, len(applicants)), 1),
}
# ============================================================
# Smoke test
# ============================================================
if __name__ == "__main__":
fake = [
{"file": "a.pdf", "name": "RAMESH KUMAR", "dob": "14-03-1985",
"address": "D-234 ALIPORE KOLKATA", "phone": "9876543210",
"ifsc": "SBIN0001234", "account": "78439336112", "employer": "ACME LTD"},
{"file": "b.pdf", "name": "SURESH KUMAR", "dob": "14-03-1985",
"address": "D-234 ALIPORE KOLKATA", "phone": "9876543210",
"ifsc": "HDFC0009999", "account": "78439336112", "employer": None},
{"file": "c.pdf", "name": "AMIT SHARMA", "dob": "22-07-1990",
"address": "D-234 ALIPORE KOLKATA", "phone": "9876543210",
"ifsc": None, "account": None, "employer": None},
{"file": "d.pdf", "name": "POOJA VERMA", "dob": "01-01-1995",
"address": "FLAT 12 BANDRA MUMBAI", "phone": "8765432100",
"ifsc": "ICIC0008888", "account": "12345678901", "employer": "XYZ CORP"},
]
G = build_fraud_graph(fake)
rings = detect_rings(G)
summary = fraud_summary(G, rings, fake)
print(f"Built graph: {len(G.nodes)} nodes, {len(G.edges)} edges")
print(f"Detected {len(rings)} rings")
for r in summary["rings"]:
print(f" Ring #{r['ring_id']} ({r['risk_band']}): {r['applicant_names']}")
print(f" Top signals: {r['top_shared_signals']}")
print(f"Fraud-risk percentage: {summary['fraud_risk_percentage']}%")