File size: 5,100 Bytes
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4ae946d
4a2ab42
 
 
 
 
4ae946d
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
4ae946d
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import logging
from dataclasses import dataclass
from typing import Any

from core.plugin_system import PluginContext, PluginInterface, PluginMetadata

logger = logging.getLogger(__name__)


@dataclass
class EntityLinkageConfig:
    connection_threshold: int


class EntityLinkagePlugin(PluginInterface):
    """
    Analyzes connections between entities to find clusters and hubs.
    """

    @property
    def metadata(self) -> PluginMetadata:
        return PluginMetadata(
            name="entity_linkage",
            version="1.0.0",
            namespace="zenith/intelligence/entity_linkage",
            author="Zenith Team",
            description="Analyzes relationships and connections between entities in a case",
            dependencies={},
            capabilities=["intelligence", "case_analysis"],
            security_level="official",
            api_version="v1",
        )

    async def initialize(self, context: PluginContext) -> bool:
        self.context = context
        config_dict = context.config if context.config else {"connection_threshold": 3}
        self.config = EntityLinkageConfig(**config_dict)
        return True

    async def execute(self, inputs: dict[str, Any]) -> dict[str, Any]:
        """
        Expects {"case_data": {...}}
        """
        case_data = inputs.get("case_data")
        if not case_data:
            return {"error": "No case data provided"}

        return await self._analyze_entity_linkage(case_data)

    async def _analyze_entity_linkage(self, case_data: dict[str, Any]) -> dict[str, Any]:
        """Analyze entity relationships and linkages"""
        insights = []
        recommendations = []
        confidence = 0.0

        entities = case_data.get("entities", [])
        transactions = case_data.get("transactions", [])

        if entities and transactions:
            # Build entity graph
            entity_connections = {}

            for transaction in transactions:
                sender = transaction.get("sender")
                receiver = transaction.get("receiver")

                if sender and receiver:
                    if sender not in entity_connections:
                        entity_connections[sender] = set()
                    if receiver not in entity_connections:
                        entity_connections[receiver] = set()

                    entity_connections[sender].add(receiver)
                    entity_connections[receiver].add(sender)

            # Find highly connected entities
            for entity, connections in entity_connections.items():
                if len(connections) >= self.config.connection_threshold:
                    insights.append(f"Entity '{entity}' connected to {len(connections)} other entities")
                    recommendations.append(f"Investigate entity '{entity}' for central role in network")
                    confidence += 0.5

            # Find isolated clusters
            # Start timer for expensive DFS
            import time

            start_time = time.time()
            timeout_seconds = 2.0

            visited = set()
            clusters = []

            for entity in entity_connections:
                if time.time() - start_time > timeout_seconds:
                    insights.append("Entity analysis timed out - partial results shown")
                    break

                if entity not in visited:
                    cluster = set()
                    # Pass context to avoid using global start_time if strict,
                    # but simple closure works here
                    self._dfs(
                        entity,
                        entity_connections,
                        visited,
                        cluster,
                        start_time,
                        timeout_seconds,
                    )
                    clusters.append(cluster)

            if len(clusters) > 1:
                insights.append(f"Found {len(clusters)} separate entity clusters")
                recommendations.append("Analyze each cluster for independent fraud schemes")
                confidence += 0.4

        return {
            "insights": insights,
            "recommendations": recommendations,
            "confidence": min(confidence, 1.0),
            "risk_score": 50 if confidence > 0.5 else 20,
        }

    def _dfs(
        self,
        entity: str,
        connections: dict[str, set],
        visited: set,
        cluster: set,
        start_time: float = 0,
        timeout: float = 0,
    ):
        """Depth-first search for connected components with timeout"""
        import time

        if timeout > 0 and (time.time() - start_time > timeout):
            return

        visited.add(entity)
        cluster.add(entity)

        for neighbor in connections.get(entity, set()):
            if neighbor not in visited:
                self._dfs(neighbor, connections, visited, cluster, start_time, timeout)

    async def cleanup(self) -> None:
        pass

    def validate_config(self, config: dict[str, Any]) -> list[str]:
        return []