ReconAI / fraud_graph.py
ACA050's picture
Upload 14 files
64e5ee2 verified
import networkx as nx
import matplotlib.pyplot as plt
import pandas as pd
import io
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class FraudGraph:
def __init__(self):
self.graph = nx.DiGraph()
def build_graph(self, df, source_col='VendorName', target_col='BuyerName', amount_col='Amount'):
"""
Builds a directed graph of transactions.
Assuming we have some buyer info, but in standard Books vs GST,
we usually have our company as the buyer and vendors as sellers.
To simulate circular trading, we might need a dataset that has multi-party transactions.
For demonstration, we'll try to find any cycles if they exist.
"""
self.graph.clear()
# If 'BuyerName' isn't there, we'll simulate it by assuming 'OurCompany' is the buyer
# but to show circular trading, let's look for duplicate invoices or anomalies
if target_col not in df.columns:
logger.warning(f"No '{target_col}' column. Assuming central company.")
buyer_col_actual = 'TargetEntity'
df[buyer_col_actual] = 'OurCompany'
else:
buyer_col_actual = target_col
for _, row in df.iterrows():
source = str(row.get(source_col, 'Unknown'))
target = str(row.get(buyer_col_actual, 'Unknown'))
raw_w = row.get(amount_col, 0)
if pd.isna(raw_w):
raw_w = 0
weight = float(raw_w)
if self.graph.has_edge(source, target):
self.graph[source][target]['weight'] += weight
else:
self.graph.add_edge(source, target, weight=weight)
def detect_cycles(self):
try:
cycles = list(nx.simple_cycles(self.graph))
# Filter out self-loops (length 1) which just represent exact matches between Books and GST
cycles = [c for c in cycles if len(c) > 1]
return cycles
except Exception as e:
logger.error(f"Error detecting cycles: {e}")
return []
def analyze_risk_nodes(self):
"""
Calculate centrality scores to find high-risk 'hub' vendors using PageRank.
Returns a dictionary mapping vendor names to risk scores.
"""
if len(self.graph.nodes) < 2:
return {}
try:
pagerank = nx.pagerank(self.graph, weight='weight')
return pagerank
except Exception as e:
logger.error(f"Error calculating PageRank: {e}")
return {node: 0.0 for node in self.graph.nodes}
def visualize_graph(self, title="Transaction Network"):
plt.figure(figsize=(12, 8))
# Try to find a good layout
pos = nx.spring_layout(self.graph, k=0.5, iterations=50)
# Node sizes based on degree
node_sizes = [300 + 100 * self.graph.degree(n) for n in self.graph.nodes()]
# Draw nodes
nx.draw_networkx_nodes(self.graph, pos, node_size=node_sizes, node_color='skyblue', alpha=0.8)
# Draw edges
edges = self.graph.edges(data=True)
weights = [d['weight'] / 1000 for u, v, d in edges] # Scale down for visualization
nx.draw_networkx_edges(self.graph, pos, width=weights, alpha=0.5, edge_color='gray', arrows=True)
# Draw labels
nx.draw_networkx_labels(self.graph, pos, font_size=10, font_family="sans-serif")
# Highlight cycles if any
cycles = self.detect_cycles()
if cycles:
cycle_edges = []
for cycle in cycles:
for i in range(len(cycle)):
cycle_edges.append((cycle[i], cycle[(i + 1) % len(cycle)]))
# Draw cycle edges in red
nx.draw_networkx_edges(self.graph, pos, edgelist=cycle_edges, width=2.0, edge_color='red', arrows=True)
plt.title(f"{title} - Alert: {len(cycles)} Potential Circular Trading Rings Detected!", color='red')
else:
plt.title(f"{title} - No obvious circular rings detected.")
plt.axis('off')
# Save to buffer
buf = io.BytesIO()
plt.savefig(buf, format='png', bbox_inches='tight')
plt.close()
buf.seek(0)
return buf