| import hashlib
|
| import networkx as nx
|
| import torch
|
|
|
|
|
| transaction_graphs = {}
|
|
|
|
|
| def generate_graph_hash(transactions):
|
| hash_string = "-".join(sorted(transactions))
|
| return hashlib.sha256(hash_string.encode()).hexdigest()
|
|
|
|
|
| def hash_key(value):
|
| return hashlib.sha256(value.encode()).hexdigest()
|
|
|
|
|
| def add_transaction(txn):
|
| sender_hash = hash_key(txn["SenderAccount"])
|
| receiver_hash = hash_key(txn["ReceiverAccount"])
|
|
|
|
|
| related_graphs = [h for h, g in transaction_graphs.items() if sender_hash in g or receiver_hash in g]
|
|
|
| if related_graphs:
|
|
|
| new_graph_hash = generate_graph_hash(related_graphs)
|
| merged_graph = nx.compose_all([transaction_graphs[h] for h in related_graphs])
|
| merged_graph.add_edge(sender_hash, receiver_hash, **txn)
|
|
|
|
|
| for h in related_graphs:
|
| del transaction_graphs[h]
|
| transaction_graphs[new_graph_hash] = merged_graph
|
| else:
|
|
|
| new_graph = nx.DiGraph()
|
| new_graph.add_edge(sender_hash, receiver_hash, **txn)
|
| transaction_graphs[generate_graph_hash([sender_hash, receiver_hash])] = new_graph
|
|
|
|
|
|
|
|
|
| def detect_pattern(graph):
|
| """Detect laundering patterns in the transaction graph."""
|
|
|
|
|
| if isinstance(graph, torch.Tensor) or hasattr(graph, "edge_index"):
|
|
|
| nodes = torch.unique(graph.edge_index).tolist()
|
| successors = {node: [] for node in nodes}
|
| predecessors = {node: [] for node in nodes}
|
|
|
| for i in range(graph.edge_index.shape[1]):
|
| sender, receiver = graph.edge_index[:, i].tolist()
|
| successors[sender].append(receiver)
|
| predecessors[receiver].append(sender)
|
|
|
|
|
| elif hasattr(graph, "nodes"):
|
| nodes = list(graph.nodes)
|
| successors = {node: list(graph.successors(node)) for node in nodes}
|
| predecessors = {node: list(graph.predecessors(node)) for node in nodes}
|
|
|
| else:
|
| raise ValueError("Unsupported graph type")
|
|
|
|
|
| for node in nodes:
|
| outgoing = successors[node]
|
| incoming = predecessors[node]
|
|
|
| if len(outgoing) > 5:
|
| return "Fan-Out"
|
| elif len(incoming) > 5:
|
| return "Fan-In"
|
| elif node in incoming:
|
| return "Cycle"
|
| elif len(outgoing) > 2 and len(incoming) > 2:
|
| return "Scatter Gather"
|
|
|
| return "Normal"
|
|
|
|
|
|
|
|
|
| aml_clusters = {}
|
|
|
| def flag_suspicious_graph(graph_hash):
|
| """Mark a graph as an AML cluster if laundering is detected"""
|
| if graph_hash in transaction_graphs:
|
| pattern = detect_pattern(transaction_graphs[graph_hash])
|
| if pattern != "Normal":
|
| aml_clusters[graph_hash] = transaction_graphs[graph_hash]
|
| print(f"🚨 AML Detected: {pattern} | Cluster ID: {graph_hash}")
|
|
|
|
|
|
|