Spaces:
Paused
Paused
| import logging | |
| from dataclasses import dataclass | |
| from typing import Any | |
| from core.plugin_system import PluginContext, PluginInterface, PluginMetadata | |
| logger = logging.getLogger(__name__) | |
| class EntityLinkageConfig: | |
| connection_threshold: int | |
| class EntityLinkagePlugin(PluginInterface): | |
| """ | |
| Analyzes connections between entities to find clusters and hubs. | |
| """ | |
| def metadata(self) -> PluginMetadata: | |
| return PluginMetadata( | |
| name="entity_linkage", | |
| version="1.0.0", | |
| namespace="zenith/intelligence/entity_linkage", | |
| author="Zenith Team", | |
| description="Analyzes relationships and connections between entities in a case", | |
| dependencies={}, | |
| capabilities=["intelligence", "case_analysis"], | |
| security_level="official", | |
| api_version="v1", | |
| ) | |
| async def initialize(self, context: PluginContext) -> bool: | |
| self.context = context | |
| config_dict = context.config if context.config else {"connection_threshold": 3} | |
| self.config = EntityLinkageConfig(**config_dict) | |
| return True | |
| async def execute(self, inputs: dict[str, Any]) -> dict[str, Any]: | |
| """ | |
| Expects {"case_data": {...}} | |
| """ | |
| case_data = inputs.get("case_data") | |
| if not case_data: | |
| return {"error": "No case data provided"} | |
| return await self._analyze_entity_linkage(case_data) | |
| async def _analyze_entity_linkage(self, case_data: dict[str, Any]) -> dict[str, Any]: | |
| """Analyze entity relationships and linkages""" | |
| insights = [] | |
| recommendations = [] | |
| confidence = 0.0 | |
| entities = case_data.get("entities", []) | |
| transactions = case_data.get("transactions", []) | |
| if entities and transactions: | |
| # Build entity graph | |
| entity_connections = {} | |
| for transaction in transactions: | |
| sender = transaction.get("sender") | |
| receiver = transaction.get("receiver") | |
| if sender and receiver: | |
| if sender not in entity_connections: | |
| entity_connections[sender] = set() | |
| if receiver not in entity_connections: | |
| entity_connections[receiver] = set() | |
| entity_connections[sender].add(receiver) | |
| entity_connections[receiver].add(sender) | |
| # Find highly connected entities | |
| for entity, connections in entity_connections.items(): | |
| if len(connections) >= self.config.connection_threshold: | |
| insights.append(f"Entity '{entity}' connected to {len(connections)} other entities") | |
| recommendations.append(f"Investigate entity '{entity}' for central role in network") | |
| confidence += 0.5 | |
| # Find isolated clusters | |
| # Start timer for expensive DFS | |
| import time | |
| start_time = time.time() | |
| timeout_seconds = 2.0 | |
| visited = set() | |
| clusters = [] | |
| for entity in entity_connections: | |
| if time.time() - start_time > timeout_seconds: | |
| insights.append("Entity analysis timed out - partial results shown") | |
| break | |
| if entity not in visited: | |
| cluster = set() | |
| # Pass context to avoid using global start_time if strict, | |
| # but simple closure works here | |
| self._dfs( | |
| entity, | |
| entity_connections, | |
| visited, | |
| cluster, | |
| start_time, | |
| timeout_seconds, | |
| ) | |
| clusters.append(cluster) | |
| if len(clusters) > 1: | |
| insights.append(f"Found {len(clusters)} separate entity clusters") | |
| recommendations.append("Analyze each cluster for independent fraud schemes") | |
| confidence += 0.4 | |
| return { | |
| "insights": insights, | |
| "recommendations": recommendations, | |
| "confidence": min(confidence, 1.0), | |
| "risk_score": 50 if confidence > 0.5 else 20, | |
| } | |
| def _dfs( | |
| self, | |
| entity: str, | |
| connections: dict[str, set], | |
| visited: set, | |
| cluster: set, | |
| start_time: float = 0, | |
| timeout: float = 0, | |
| ): | |
| """Depth-first search for connected components with timeout""" | |
| import time | |
| if timeout > 0 and (time.time() - start_time > timeout): | |
| return | |
| visited.add(entity) | |
| cluster.add(entity) | |
| for neighbor in connections.get(entity, set()): | |
| if neighbor not in visited: | |
| self._dfs(neighbor, connections, visited, cluster, start_time, timeout) | |
| async def cleanup(self) -> None: | |
| pass | |
| def validate_config(self, config: dict[str, Any]) -> list[str]: | |
| return [] | |