Spaces:
Paused
Paused
File size: 5,100 Bytes
4a2ab42 4ae946d 4a2ab42 4ae946d 4a2ab42 4ae946d 4a2ab42 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 | import logging
from dataclasses import dataclass
from typing import Any
from core.plugin_system import PluginContext, PluginInterface, PluginMetadata
logger = logging.getLogger(__name__)
@dataclass
class EntityLinkageConfig:
connection_threshold: int
class EntityLinkagePlugin(PluginInterface):
"""
Analyzes connections between entities to find clusters and hubs.
"""
@property
def metadata(self) -> PluginMetadata:
return PluginMetadata(
name="entity_linkage",
version="1.0.0",
namespace="zenith/intelligence/entity_linkage",
author="Zenith Team",
description="Analyzes relationships and connections between entities in a case",
dependencies={},
capabilities=["intelligence", "case_analysis"],
security_level="official",
api_version="v1",
)
async def initialize(self, context: PluginContext) -> bool:
self.context = context
config_dict = context.config if context.config else {"connection_threshold": 3}
self.config = EntityLinkageConfig(**config_dict)
return True
async def execute(self, inputs: dict[str, Any]) -> dict[str, Any]:
"""
Expects {"case_data": {...}}
"""
case_data = inputs.get("case_data")
if not case_data:
return {"error": "No case data provided"}
return await self._analyze_entity_linkage(case_data)
async def _analyze_entity_linkage(self, case_data: dict[str, Any]) -> dict[str, Any]:
"""Analyze entity relationships and linkages"""
insights = []
recommendations = []
confidence = 0.0
entities = case_data.get("entities", [])
transactions = case_data.get("transactions", [])
if entities and transactions:
# Build entity graph
entity_connections = {}
for transaction in transactions:
sender = transaction.get("sender")
receiver = transaction.get("receiver")
if sender and receiver:
if sender not in entity_connections:
entity_connections[sender] = set()
if receiver not in entity_connections:
entity_connections[receiver] = set()
entity_connections[sender].add(receiver)
entity_connections[receiver].add(sender)
# Find highly connected entities
for entity, connections in entity_connections.items():
if len(connections) >= self.config.connection_threshold:
insights.append(f"Entity '{entity}' connected to {len(connections)} other entities")
recommendations.append(f"Investigate entity '{entity}' for central role in network")
confidence += 0.5
# Find isolated clusters
# Start timer for expensive DFS
import time
start_time = time.time()
timeout_seconds = 2.0
visited = set()
clusters = []
for entity in entity_connections:
if time.time() - start_time > timeout_seconds:
insights.append("Entity analysis timed out - partial results shown")
break
if entity not in visited:
cluster = set()
# Pass context to avoid using global start_time if strict,
# but simple closure works here
self._dfs(
entity,
entity_connections,
visited,
cluster,
start_time,
timeout_seconds,
)
clusters.append(cluster)
if len(clusters) > 1:
insights.append(f"Found {len(clusters)} separate entity clusters")
recommendations.append("Analyze each cluster for independent fraud schemes")
confidence += 0.4
return {
"insights": insights,
"recommendations": recommendations,
"confidence": min(confidence, 1.0),
"risk_score": 50 if confidence > 0.5 else 20,
}
def _dfs(
self,
entity: str,
connections: dict[str, set],
visited: set,
cluster: set,
start_time: float = 0,
timeout: float = 0,
):
"""Depth-first search for connected components with timeout"""
import time
if timeout > 0 and (time.time() - start_time > timeout):
return
visited.add(entity)
cluster.add(entity)
for neighbor in connections.get(entity, set()):
if neighbor not in visited:
self._dfs(neighbor, connections, visited, cluster, start_time, timeout)
async def cleanup(self) -> None:
pass
def validate_config(self, config: dict[str, Any]) -> list[str]:
return []
|