import networkx as nx import matplotlib.pyplot as plt import pandas as pd import io import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class FraudGraph: def __init__(self): self.graph = nx.DiGraph() def build_graph(self, df, source_col='VendorName', target_col='BuyerName', amount_col='Amount'): """ Builds a directed graph of transactions. Assuming we have some buyer info, but in standard Books vs GST, we usually have our company as the buyer and vendors as sellers. To simulate circular trading, we might need a dataset that has multi-party transactions. For demonstration, we'll try to find any cycles if they exist. """ self.graph.clear() # If 'BuyerName' isn't there, we'll simulate it by assuming 'OurCompany' is the buyer # but to show circular trading, let's look for duplicate invoices or anomalies if target_col not in df.columns: logger.warning(f"No '{target_col}' column. Assuming central company.") buyer_col_actual = 'TargetEntity' df[buyer_col_actual] = 'OurCompany' else: buyer_col_actual = target_col for _, row in df.iterrows(): source = str(row.get(source_col, 'Unknown')) target = str(row.get(buyer_col_actual, 'Unknown')) raw_w = row.get(amount_col, 0) if pd.isna(raw_w): raw_w = 0 weight = float(raw_w) if self.graph.has_edge(source, target): self.graph[source][target]['weight'] += weight else: self.graph.add_edge(source, target, weight=weight) def detect_cycles(self): try: cycles = list(nx.simple_cycles(self.graph)) # Filter out self-loops (length 1) which just represent exact matches between Books and GST cycles = [c for c in cycles if len(c) > 1] return cycles except Exception as e: logger.error(f"Error detecting cycles: {e}") return [] def analyze_risk_nodes(self): """ Calculate centrality scores to find high-risk 'hub' vendors using PageRank. Returns a dictionary mapping vendor names to risk scores. """ if len(self.graph.nodes) < 2: return {} try: pagerank = nx.pagerank(self.graph, weight='weight') return pagerank except Exception as e: logger.error(f"Error calculating PageRank: {e}") return {node: 0.0 for node in self.graph.nodes} def visualize_graph(self, title="Transaction Network"): plt.figure(figsize=(12, 8)) # Try to find a good layout pos = nx.spring_layout(self.graph, k=0.5, iterations=50) # Node sizes based on degree node_sizes = [300 + 100 * self.graph.degree(n) for n in self.graph.nodes()] # Draw nodes nx.draw_networkx_nodes(self.graph, pos, node_size=node_sizes, node_color='skyblue', alpha=0.8) # Draw edges edges = self.graph.edges(data=True) weights = [d['weight'] / 1000 for u, v, d in edges] # Scale down for visualization nx.draw_networkx_edges(self.graph, pos, width=weights, alpha=0.5, edge_color='gray', arrows=True) # Draw labels nx.draw_networkx_labels(self.graph, pos, font_size=10, font_family="sans-serif") # Highlight cycles if any cycles = self.detect_cycles() if cycles: cycle_edges = [] for cycle in cycles: for i in range(len(cycle)): cycle_edges.append((cycle[i], cycle[(i + 1) % len(cycle)])) # Draw cycle edges in red nx.draw_networkx_edges(self.graph, pos, edgelist=cycle_edges, width=2.0, edge_color='red', arrows=True) plt.title(f"{title} - Alert: {len(cycles)} Potential Circular Trading Rings Detected!", color='red') else: plt.title(f"{title} - No obvious circular rings detected.") plt.axis('off') # Save to buffer buf = io.BytesIO() plt.savefig(buf, format='png', bbox_inches='tight') plt.close() buf.seek(0) return buf