| import networkx as nx |
| import matplotlib.pyplot as plt |
| import pandas as pd |
| import io |
| import logging |
|
|
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| class FraudGraph: |
| def __init__(self): |
| self.graph = nx.DiGraph() |
|
|
| def build_graph(self, df, source_col='VendorName', target_col='BuyerName', amount_col='Amount'): |
| """ |
| Builds a directed graph of transactions. |
| Assuming we have some buyer info, but in standard Books vs GST, |
| we usually have our company as the buyer and vendors as sellers. |
| To simulate circular trading, we might need a dataset that has multi-party transactions. |
| For demonstration, we'll try to find any cycles if they exist. |
| """ |
| self.graph.clear() |
| |
| |
| |
| |
| if target_col not in df.columns: |
| logger.warning(f"No '{target_col}' column. Assuming central company.") |
| buyer_col_actual = 'TargetEntity' |
| df[buyer_col_actual] = 'OurCompany' |
| else: |
| buyer_col_actual = target_col |
|
|
| for _, row in df.iterrows(): |
| source = str(row.get(source_col, 'Unknown')) |
| target = str(row.get(buyer_col_actual, 'Unknown')) |
| raw_w = row.get(amount_col, 0) |
| if pd.isna(raw_w): |
| raw_w = 0 |
| weight = float(raw_w) |
| |
| if self.graph.has_edge(source, target): |
| self.graph[source][target]['weight'] += weight |
| else: |
| self.graph.add_edge(source, target, weight=weight) |
| |
| def detect_cycles(self): |
| try: |
| cycles = list(nx.simple_cycles(self.graph)) |
| |
| cycles = [c for c in cycles if len(c) > 1] |
| return cycles |
| except Exception as e: |
| logger.error(f"Error detecting cycles: {e}") |
| return [] |
|
|
| def analyze_risk_nodes(self): |
| """ |
| Calculate centrality scores to find high-risk 'hub' vendors using PageRank. |
| Returns a dictionary mapping vendor names to risk scores. |
| """ |
| if len(self.graph.nodes) < 2: |
| return {} |
| |
| try: |
| pagerank = nx.pagerank(self.graph, weight='weight') |
| return pagerank |
| except Exception as e: |
| logger.error(f"Error calculating PageRank: {e}") |
| return {node: 0.0 for node in self.graph.nodes} |
|
|
|
|
| def visualize_graph(self, title="Transaction Network"): |
| plt.figure(figsize=(12, 8)) |
| |
| |
| pos = nx.spring_layout(self.graph, k=0.5, iterations=50) |
| |
| |
| node_sizes = [300 + 100 * self.graph.degree(n) for n in self.graph.nodes()] |
| |
| |
| nx.draw_networkx_nodes(self.graph, pos, node_size=node_sizes, node_color='skyblue', alpha=0.8) |
| |
| |
| edges = self.graph.edges(data=True) |
| weights = [d['weight'] / 1000 for u, v, d in edges] |
| nx.draw_networkx_edges(self.graph, pos, width=weights, alpha=0.5, edge_color='gray', arrows=True) |
| |
| |
| nx.draw_networkx_labels(self.graph, pos, font_size=10, font_family="sans-serif") |
| |
| |
| cycles = self.detect_cycles() |
| if cycles: |
| cycle_edges = [] |
| for cycle in cycles: |
| for i in range(len(cycle)): |
| cycle_edges.append((cycle[i], cycle[(i + 1) % len(cycle)])) |
| |
| |
| nx.draw_networkx_edges(self.graph, pos, edgelist=cycle_edges, width=2.0, edge_color='red', arrows=True) |
| plt.title(f"{title} - Alert: {len(cycles)} Potential Circular Trading Rings Detected!", color='red') |
| else: |
| plt.title(f"{title} - No obvious circular rings detected.") |
|
|
| plt.axis('off') |
| |
| |
| buf = io.BytesIO() |
| plt.savefig(buf, format='png', bbox_inches='tight') |
| plt.close() |
| buf.seek(0) |
| return buf |
|
|