Spaces:
Paused
Paused
| # Advanced Graph Analytics Service with Neo4j Integration | |
| import asyncio | |
| import json | |
| from collections import defaultdict | |
| from dataclasses import dataclass | |
| from datetime import UTC, datetime | |
| from typing import Any | |
| import networkx as nx | |
| from core.logging import logger | |
| # Try to import Neo4j driver, fallback to in-memory if not available | |
| try: | |
| from neo4j import GraphDatabase | |
| NEO4J_AVAILABLE = True | |
| except ImportError: | |
| NEO4J_AVAILABLE = False | |
| logger.warning("Neo4j driver not available, using in-memory graph fallback") | |
| class GraphNode: | |
| """Graph node representation""" | |
| id: str | |
| label: str | |
| properties: dict[str, Any] | |
| node_type: str # 'entity', 'transaction', 'alert', etc. | |
| class GraphEdge: | |
| """Graph edge representation""" | |
| source: str | |
| target: str | |
| relationship: str | |
| properties: dict[str, Any] | |
| weight: float = 1.0 | |
| class Community: | |
| """Community detection result""" | |
| id: str | |
| nodes: list[str] | |
| size: int | |
| density: float | |
| central_node: str | |
| class CentralityResult: | |
| """Centrality analysis result""" | |
| node_id: str | |
| centrality_score: float | |
| ranking: int | |
| class Neo4jGraphService: | |
| """Neo4j-based graph service""" | |
| def __init__(self, uri: str, user: str, password: str): | |
| if not NEO4J_AVAILABLE: | |
| raise RuntimeError("Neo4j driver not available") | |
| self.driver = GraphDatabase.driver(uri, auth=(user, password)) | |
| self.database = "neo4j" | |
| async def close(self): | |
| """Close the driver connection""" | |
| await asyncio.get_event_loop().run_in_executor(None, self.driver.close) | |
| async def create_node(self, node: GraphNode) -> bool: | |
| """Create a node in Neo4j""" | |
| def _create_node(tx, node): | |
| query = f""" | |
| CREATE (n:{node.node_type} {{ | |
| id: $id, | |
| label: $label, | |
| properties: $properties | |
| }}) | |
| """ | |
| tx.run( | |
| query, | |
| id=node.id, | |
| label=node.label, | |
| properties=json.dumps(node.properties), | |
| ) | |
| return True | |
| try: | |
| await asyncio.get_event_loop().run_in_executor( | |
| None, | |
| lambda: self.driver.execute_query( | |
| _create_node, node, database_=self.database | |
| ), | |
| ) | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to create node {node.id}: {e}") | |
| return False | |
| async def create_relationship(self, edge: GraphEdge) -> bool: | |
| """Create a relationship between nodes""" | |
| def _create_relationship(tx, edge): | |
| query = f""" | |
| MATCH (a {{id: $source_id}}), (b {{id: $target_id}}) | |
| CREATE (a)-[r:{edge.relationship} {{ | |
| properties: $properties, | |
| weight: $weight | |
| }}]->(b) | |
| """ | |
| tx.run( | |
| query, | |
| source_id=edge.source, | |
| target_id=edge.target, | |
| properties=json.dumps(edge.properties), | |
| weight=edge.weight, | |
| ) | |
| return True | |
| try: | |
| await asyncio.get_event_loop().run_in_executor( | |
| None, | |
| lambda: self.driver.execute_query( | |
| _create_relationship, edge, database_=self.database | |
| ), | |
| ) | |
| return True | |
| except Exception as e: | |
| logger.error( | |
| f"Failed to create relationship {edge.source}->{edge.target}: {e}" | |
| ) | |
| return False | |
| async def run_community_detection(self) -> list[Community]: | |
| """Run Louvain community detection algorithm""" | |
| def _community_detection(tx): | |
| # Run Louvain algorithm | |
| louvain_query = """ | |
| CALL gds.louvain.stream('fraudGraph') | |
| YIELD nodeId, communityId, intermediateCommunityIds | |
| RETURN gds.util.asNode(nodeId).id AS nodeId, | |
| communityId, | |
| intermediateCommunityIds | |
| """ | |
| communities = defaultdict(list) | |
| results = tx.run(louvain_query) | |
| for record in results: | |
| communities[record["communityId"]].append(record["nodeId"]) | |
| return communities | |
| try: | |
| communities_data = await asyncio.get_event_loop().run_in_executor( | |
| None, | |
| lambda: self.driver.execute_query( | |
| _community_detection, database_=self.database | |
| ), | |
| ) | |
| communities = [] | |
| for comm_id, nodes in communities_data[0].items(): | |
| communities.append( | |
| Community( | |
| id=str(comm_id), | |
| nodes=nodes, | |
| size=len(nodes), | |
| density=self._calculate_density(nodes), | |
| central_node=nodes[0] if nodes else "", | |
| ) | |
| ) | |
| return communities | |
| except Exception as e: | |
| logger.error(f"Community detection failed: {e}") | |
| return [] | |
| def _calculate_density(self, nodes: list[str]) -> float: | |
| """Calculate community density""" | |
| # Simplified density calculation | |
| return min(1.0, len(nodes) / 100.0) | |
| class InMemoryGraphService: | |
| """Fallback in-memory graph service using NetworkX""" | |
| def __init__(self): | |
| self.graph = nx.Graph() | |
| self.node_data = {} | |
| self.edge_data = {} | |
| async def create_node(self, node: GraphNode) -> bool: | |
| """Create a node in the in-memory graph""" | |
| try: | |
| self.graph.add_node(node.id, **node.properties) | |
| self.node_data[node.id] = node | |
| return True | |
| except Exception as e: | |
| logger.error(f"Failed to create node {node.id}: {e}") | |
| return False | |
| async def create_relationship(self, edge: GraphEdge) -> bool: | |
| """Create a relationship between nodes""" | |
| try: | |
| self.graph.add_edge( | |
| edge.source, | |
| edge.target, | |
| relationship=edge.relationship, | |
| weight=edge.weight, | |
| **edge.properties, | |
| ) | |
| self.edge_data[(edge.source, edge.target)] = edge | |
| return True | |
| except Exception as e: | |
| logger.error( | |
| f"Failed to create relationship {edge.source}->{edge.target}: {e}" | |
| ) | |
| return False | |
| async def run_community_detection(self) -> list[Community]: | |
| """Run community detection using NetworkX""" | |
| try: | |
| # Use greedy modularity maximization | |
| communities = list( | |
| nx.algorithms.community.greedy_modularity_communities(self.graph) | |
| ) | |
| result = [] | |
| for i, community_nodes in enumerate(communities): | |
| nodes_list = list(community_nodes) | |
| # Calculate density (ratio of actual to possible edges) | |
| subgraph = self.graph.subgraph(nodes_list) | |
| density = nx.density(subgraph) | |
| # Find most central node | |
| if len(nodes_list) > 1: | |
| centrality = nx.degree_centrality(subgraph) | |
| central_node = max(centrality, key=centrality.get) | |
| else: | |
| central_node = nodes_list[0] if nodes_list else "" | |
| result.append( | |
| Community( | |
| id=f"comm_{i}", | |
| nodes=nodes_list, | |
| size=len(nodes_list), | |
| density=density, | |
| central_node=central_node, | |
| ) | |
| ) | |
| return result | |
| except Exception as e: | |
| logger.error(f"In-memory community detection failed: {e}") | |
| return [] | |
| async def calculate_centrality( | |
| self, method: str = "degree" | |
| ) -> list[CentralityResult]: | |
| """Calculate node centrality""" | |
| try: | |
| if method == "degree": | |
| centrality = nx.degree_centrality(self.graph) | |
| elif method == "betweenness": | |
| centrality = nx.betweenness_centrality(self.graph) | |
| elif method == "closeness": | |
| centrality = nx.closeness_centrality(self.graph) | |
| elif method == "eigenvector": | |
| centrality = nx.eigenvector_centrality(self.graph, max_iter=100) | |
| else: | |
| centrality = nx.degree_centrality(self.graph) | |
| # Sort by centrality score | |
| sorted_nodes = sorted(centrality.items(), key=lambda x: x[1], reverse=True) | |
| results = [] | |
| for rank, (node_id, score) in enumerate(sorted_nodes[:20]): # Top 20 | |
| results.append( | |
| CentralityResult( | |
| node_id=node_id, centrality_score=score, ranking=rank + 1 | |
| ) | |
| ) | |
| return results | |
| except Exception as e: | |
| logger.error(f"Centrality calculation failed: {e}") | |
| return [] | |
| async def find_shortest_paths( | |
| self, source: str, target: str, max_paths: int = 5 | |
| ) -> list[list[str]]: | |
| """Find shortest paths between nodes""" | |
| try: | |
| paths = list(nx.all_shortest_paths(self.graph, source, target)) | |
| return paths[:max_paths] | |
| except Exception as e: | |
| logger.error(f"Shortest path finding failed: {e}") | |
| return [] | |
| class AdvancedGraphAnalyticsService: | |
| """Main service for advanced graph analytics""" | |
| def __init__(self): | |
| self.graph_service = None | |
| self._initialize_graph_service() | |
| def _initialize_graph_service(self): | |
| """Initialize the appropriate graph service""" | |
| # Try Neo4j first | |
| neo4j_config = { | |
| "uri": "bolt://localhost:7687", | |
| "user": "neo4j", | |
| "password": "password", | |
| } | |
| if NEO4J_AVAILABLE: | |
| try: | |
| self.graph_service = Neo4jGraphService( | |
| neo4j_config["uri"], neo4j_config["user"], neo4j_config["password"] | |
| ) | |
| logger.info("Using Neo4j graph service") | |
| return | |
| except Exception as e: | |
| logger.warning(f"Neo4j connection failed: {e}") | |
| # Fallback to in-memory | |
| self.graph_service = InMemoryGraphService() | |
| logger.info("Using in-memory graph service (Neo4j not available)") | |
| async def add_entity( | |
| self, entity_id: str, entity_type: str, properties: dict[str, Any] | |
| ) -> bool: | |
| """Add an entity to the graph""" | |
| node = GraphNode( | |
| id=entity_id, | |
| label=f"{entity_type}: {entity_id}", | |
| properties=properties, | |
| node_type="Entity", | |
| ) | |
| return await self.graph_service.create_node(node) | |
| async def add_transaction( | |
| self, | |
| transaction_id: str, | |
| from_entity: str, | |
| to_entity: str, | |
| amount: float, | |
| timestamp: datetime, | |
| ) -> bool: | |
| """Add a transaction relationship to the graph""" | |
| # Create transaction node | |
| transaction_node = GraphNode( | |
| id=transaction_id, | |
| label=f"Transaction: ${amount}", | |
| properties={ | |
| "amount": amount, | |
| "timestamp": timestamp.isoformat(), | |
| "currency": "USD", | |
| }, | |
| node_type="Transaction", | |
| ) | |
| success1 = await self.graph_service.create_node(transaction_node) | |
| # Create relationships | |
| edge1 = GraphEdge( | |
| source=from_entity, | |
| target=transaction_id, | |
| relationship="INITIATED", | |
| properties={"role": "sender"}, | |
| ) | |
| edge2 = GraphEdge( | |
| source=transaction_id, | |
| target=to_entity, | |
| relationship="TRANSFERRED_TO", | |
| properties={"role": "receiver"}, | |
| ) | |
| success2 = await self.graph_service.create_relationship(edge1) | |
| success3 = await self.graph_service.create_relationship(edge2) | |
| return success1 and success2 and success3 | |
| async def detect_communities(self) -> dict[str, Any]: | |
| """Run community detection analysis""" | |
| communities = await self.graph_service.run_community_detection() | |
| # Sort by size | |
| communities.sort(key=lambda x: x.size, reverse=True) | |
| return { | |
| "total_communities": len(communities), | |
| "largest_community": communities[0].size if communities else 0, | |
| "average_community_size": ( | |
| sum(c.size for c in communities) / len(communities) | |
| if communities | |
| else 0 | |
| ), | |
| "communities": [ | |
| { | |
| "id": c.id, | |
| "size": c.size, | |
| "density": c.density, | |
| "central_node": c.central_node, | |
| "nodes_sample": c.nodes[:5], # First 5 nodes | |
| } | |
| for c in communities[:10] # Top 10 communities | |
| ], | |
| } | |
| async def analyze_centrality(self, method: str = "degree") -> dict[str, Any]: | |
| """Analyze node centrality""" | |
| if not hasattr(self.graph_service, "calculate_centrality"): | |
| return { | |
| "error": "Centrality analysis not available for current graph service" | |
| } | |
| results = await self.graph_service.calculate_centrality(method) | |
| return { | |
| "method": method, | |
| "top_nodes": [ | |
| { | |
| "node_id": r.node_id, | |
| "centrality_score": r.centrality_score, | |
| "ranking": r.ranking, | |
| } | |
| for r in results | |
| ], | |
| } | |
| async def find_suspicious_patterns(self) -> list[dict[str, Any]]: | |
| """Find suspicious patterns in the graph""" | |
| # This would implement various fraud detection patterns | |
| patterns = [] | |
| # Pattern 1: Circular transactions (money laundering indicator) | |
| # Pattern 2: High-velocity transactions | |
| # Pattern 3: Connections to known risky entities | |
| # Mock patterns for demonstration | |
| patterns.append( | |
| { | |
| "type": "circular_transaction", | |
| "severity": "high", | |
| "description": "Detected circular transaction pattern", | |
| "involved_nodes": ["entity_1", "entity_2", "entity_3"], | |
| "confidence": 0.85, | |
| } | |
| ) | |
| patterns.append( | |
| { | |
| "type": "high_velocity", | |
| "severity": "medium", | |
| "description": "Unusual transaction velocity detected", | |
| "involved_nodes": ["entity_5"], | |
| "confidence": 0.72, | |
| } | |
| ) | |
| return patterns | |
| async def get_graph_statistics(self) -> dict[str, Any]: | |
| """Get overall graph statistics""" | |
| # This would query the graph for statistics | |
| return { | |
| "total_nodes": 150, # Mock data | |
| "total_relationships": 450, | |
| "node_types": {"Entity": 120, "Transaction": 25, "Alert": 5}, | |
| "relationship_types": { | |
| "RELATED_TO": 200, | |
| "TRANSFERRED_TO": 150, | |
| "TRIGGERED_BY": 100, | |
| }, | |
| "last_updated": datetime.now(UTC).isoformat(), | |
| } | |
| # Global instance | |