# Advanced Graph Analytics Service with Neo4j Integration import asyncio import json from collections import defaultdict from dataclasses import dataclass from datetime import UTC, datetime from typing import Any import networkx as nx from core.logging import logger # Try to import Neo4j driver, fallback to in-memory if not available try: from neo4j import GraphDatabase NEO4J_AVAILABLE = True except ImportError: NEO4J_AVAILABLE = False logger.warning("Neo4j driver not available, using in-memory graph fallback") @dataclass class GraphNode: """Graph node representation""" id: str label: str properties: dict[str, Any] node_type: str # 'entity', 'transaction', 'alert', etc. @dataclass class GraphEdge: """Graph edge representation""" source: str target: str relationship: str properties: dict[str, Any] weight: float = 1.0 @dataclass class Community: """Community detection result""" id: str nodes: list[str] size: int density: float central_node: str @dataclass class CentralityResult: """Centrality analysis result""" node_id: str centrality_score: float ranking: int class Neo4jGraphService: """Neo4j-based graph service""" def __init__(self, uri: str, user: str, password: str): if not NEO4J_AVAILABLE: raise RuntimeError("Neo4j driver not available") self.driver = GraphDatabase.driver(uri, auth=(user, password)) self.database = "neo4j" async def close(self): """Close the driver connection""" await asyncio.get_event_loop().run_in_executor(None, self.driver.close) async def create_node(self, node: GraphNode) -> bool: """Create a node in Neo4j""" def _create_node(tx, node): query = f""" CREATE (n:{node.node_type} {{ id: $id, label: $label, properties: $properties }}) """ tx.run( query, id=node.id, label=node.label, properties=json.dumps(node.properties), ) return True try: await asyncio.get_event_loop().run_in_executor( None, lambda: self.driver.execute_query( _create_node, node, database_=self.database ), ) return True except Exception as e: logger.error(f"Failed to create node {node.id}: {e}") return False async def create_relationship(self, edge: GraphEdge) -> bool: """Create a relationship between nodes""" def _create_relationship(tx, edge): query = f""" MATCH (a {{id: $source_id}}), (b {{id: $target_id}}) CREATE (a)-[r:{edge.relationship} {{ properties: $properties, weight: $weight }}]->(b) """ tx.run( query, source_id=edge.source, target_id=edge.target, properties=json.dumps(edge.properties), weight=edge.weight, ) return True try: await asyncio.get_event_loop().run_in_executor( None, lambda: self.driver.execute_query( _create_relationship, edge, database_=self.database ), ) return True except Exception as e: logger.error( f"Failed to create relationship {edge.source}->{edge.target}: {e}" ) return False async def run_community_detection(self) -> list[Community]: """Run Louvain community detection algorithm""" def _community_detection(tx): # Run Louvain algorithm louvain_query = """ CALL gds.louvain.stream('fraudGraph') YIELD nodeId, communityId, intermediateCommunityIds RETURN gds.util.asNode(nodeId).id AS nodeId, communityId, intermediateCommunityIds """ communities = defaultdict(list) results = tx.run(louvain_query) for record in results: communities[record["communityId"]].append(record["nodeId"]) return communities try: communities_data = await asyncio.get_event_loop().run_in_executor( None, lambda: self.driver.execute_query( _community_detection, database_=self.database ), ) communities = [] for comm_id, nodes in communities_data[0].items(): communities.append( Community( id=str(comm_id), nodes=nodes, size=len(nodes), density=self._calculate_density(nodes), central_node=nodes[0] if nodes else "", ) ) return communities except Exception as e: logger.error(f"Community detection failed: {e}") return [] def _calculate_density(self, nodes: list[str]) -> float: """Calculate community density""" # Simplified density calculation return min(1.0, len(nodes) / 100.0) class InMemoryGraphService: """Fallback in-memory graph service using NetworkX""" def __init__(self): self.graph = nx.Graph() self.node_data = {} self.edge_data = {} async def create_node(self, node: GraphNode) -> bool: """Create a node in the in-memory graph""" try: self.graph.add_node(node.id, **node.properties) self.node_data[node.id] = node return True except Exception as e: logger.error(f"Failed to create node {node.id}: {e}") return False async def create_relationship(self, edge: GraphEdge) -> bool: """Create a relationship between nodes""" try: self.graph.add_edge( edge.source, edge.target, relationship=edge.relationship, weight=edge.weight, **edge.properties, ) self.edge_data[(edge.source, edge.target)] = edge return True except Exception as e: logger.error( f"Failed to create relationship {edge.source}->{edge.target}: {e}" ) return False async def run_community_detection(self) -> list[Community]: """Run community detection using NetworkX""" try: # Use greedy modularity maximization communities = list( nx.algorithms.community.greedy_modularity_communities(self.graph) ) result = [] for i, community_nodes in enumerate(communities): nodes_list = list(community_nodes) # Calculate density (ratio of actual to possible edges) subgraph = self.graph.subgraph(nodes_list) density = nx.density(subgraph) # Find most central node if len(nodes_list) > 1: centrality = nx.degree_centrality(subgraph) central_node = max(centrality, key=centrality.get) else: central_node = nodes_list[0] if nodes_list else "" result.append( Community( id=f"comm_{i}", nodes=nodes_list, size=len(nodes_list), density=density, central_node=central_node, ) ) return result except Exception as e: logger.error(f"In-memory community detection failed: {e}") return [] async def calculate_centrality( self, method: str = "degree" ) -> list[CentralityResult]: """Calculate node centrality""" try: if method == "degree": centrality = nx.degree_centrality(self.graph) elif method == "betweenness": centrality = nx.betweenness_centrality(self.graph) elif method == "closeness": centrality = nx.closeness_centrality(self.graph) elif method == "eigenvector": centrality = nx.eigenvector_centrality(self.graph, max_iter=100) else: centrality = nx.degree_centrality(self.graph) # Sort by centrality score sorted_nodes = sorted(centrality.items(), key=lambda x: x[1], reverse=True) results = [] for rank, (node_id, score) in enumerate(sorted_nodes[:20]): # Top 20 results.append( CentralityResult( node_id=node_id, centrality_score=score, ranking=rank + 1 ) ) return results except Exception as e: logger.error(f"Centrality calculation failed: {e}") return [] async def find_shortest_paths( self, source: str, target: str, max_paths: int = 5 ) -> list[list[str]]: """Find shortest paths between nodes""" try: paths = list(nx.all_shortest_paths(self.graph, source, target)) return paths[:max_paths] except Exception as e: logger.error(f"Shortest path finding failed: {e}") return [] class AdvancedGraphAnalyticsService: """Main service for advanced graph analytics""" def __init__(self): self.graph_service = None self._initialize_graph_service() def _initialize_graph_service(self): """Initialize the appropriate graph service""" # Try Neo4j first neo4j_config = { "uri": "bolt://localhost:7687", "user": "neo4j", "password": "password", } if NEO4J_AVAILABLE: try: self.graph_service = Neo4jGraphService( neo4j_config["uri"], neo4j_config["user"], neo4j_config["password"] ) logger.info("Using Neo4j graph service") return except Exception as e: logger.warning(f"Neo4j connection failed: {e}") # Fallback to in-memory self.graph_service = InMemoryGraphService() logger.info("Using in-memory graph service (Neo4j not available)") async def add_entity( self, entity_id: str, entity_type: str, properties: dict[str, Any] ) -> bool: """Add an entity to the graph""" node = GraphNode( id=entity_id, label=f"{entity_type}: {entity_id}", properties=properties, node_type="Entity", ) return await self.graph_service.create_node(node) async def add_transaction( self, transaction_id: str, from_entity: str, to_entity: str, amount: float, timestamp: datetime, ) -> bool: """Add a transaction relationship to the graph""" # Create transaction node transaction_node = GraphNode( id=transaction_id, label=f"Transaction: ${amount}", properties={ "amount": amount, "timestamp": timestamp.isoformat(), "currency": "USD", }, node_type="Transaction", ) success1 = await self.graph_service.create_node(transaction_node) # Create relationships edge1 = GraphEdge( source=from_entity, target=transaction_id, relationship="INITIATED", properties={"role": "sender"}, ) edge2 = GraphEdge( source=transaction_id, target=to_entity, relationship="TRANSFERRED_TO", properties={"role": "receiver"}, ) success2 = await self.graph_service.create_relationship(edge1) success3 = await self.graph_service.create_relationship(edge2) return success1 and success2 and success3 async def detect_communities(self) -> dict[str, Any]: """Run community detection analysis""" communities = await self.graph_service.run_community_detection() # Sort by size communities.sort(key=lambda x: x.size, reverse=True) return { "total_communities": len(communities), "largest_community": communities[0].size if communities else 0, "average_community_size": ( sum(c.size for c in communities) / len(communities) if communities else 0 ), "communities": [ { "id": c.id, "size": c.size, "density": c.density, "central_node": c.central_node, "nodes_sample": c.nodes[:5], # First 5 nodes } for c in communities[:10] # Top 10 communities ], } async def analyze_centrality(self, method: str = "degree") -> dict[str, Any]: """Analyze node centrality""" if not hasattr(self.graph_service, "calculate_centrality"): return { "error": "Centrality analysis not available for current graph service" } results = await self.graph_service.calculate_centrality(method) return { "method": method, "top_nodes": [ { "node_id": r.node_id, "centrality_score": r.centrality_score, "ranking": r.ranking, } for r in results ], } async def find_suspicious_patterns(self) -> list[dict[str, Any]]: """Find suspicious patterns in the graph""" # This would implement various fraud detection patterns patterns = [] # Pattern 1: Circular transactions (money laundering indicator) # Pattern 2: High-velocity transactions # Pattern 3: Connections to known risky entities # Mock patterns for demonstration patterns.append( { "type": "circular_transaction", "severity": "high", "description": "Detected circular transaction pattern", "involved_nodes": ["entity_1", "entity_2", "entity_3"], "confidence": 0.85, } ) patterns.append( { "type": "high_velocity", "severity": "medium", "description": "Unusual transaction velocity detected", "involved_nodes": ["entity_5"], "confidence": 0.72, } ) return patterns async def get_graph_statistics(self) -> dict[str, Any]: """Get overall graph statistics""" # This would query the graph for statistics return { "total_nodes": 150, # Mock data "total_relationships": 450, "node_types": {"Entity": 120, "Transaction": 25, "Alert": 5}, "relationship_types": { "RELATED_TO": 200, "TRANSFERRED_TO": 150, "TRIGGERED_BY": 100, }, "last_updated": datetime.now(UTC).isoformat(), } # Global instance