File size: 8,816 Bytes
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4ae946d
 
 
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4ae946d
 
 
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4ae946d
 
 
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4ae946d
 
 
4a2ab42
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# backend/services/metadata_correlation.py
import logging
from collections import defaultdict

from sqlalchemy.orm import Session

from core.database import Entity

logger = logging.getLogger(__name__)


class MetadataCorrelationEngine:
    """
    Detects relationships between entities via shared metadata.

    Examples:
    - Same phone number: Person A and Person B share +1-555-0123
    - Same email: Person C and Company D both registered to john@example.com
    - Same IP: User E and Merchant F both accessed from 192.168.1.100
    - Same address: Individual G and Shell Corp H both at 123 Main St
    """

    def __init__(self, session: Session):
        self.session = session
        self.correlation_strength_weights = {
            "phone": 0.8,  # High weight - phone is unique
            "email": 0.85,  # Very high weight
            "address": 0.7,  # Medium weight - shared addresses are common
            "ip_address": 0.6,  # Lower weight - IP can be shared
            "name_similarity": 0.5,  # Fuzzy match on names
        }

    def find_all_correlations(self, case_id: str) -> list[dict]:
        """Find all metadata correlations within a case"""
        entities = self.session.query(Entity).filter(Entity.case_id == case_id).all()

        correlations = []

        # Check each metadata type
        correlations.extend(self._find_phone_correlations(entities))
        correlations.extend(self._find_email_correlations(entities))
        correlations.extend(self._find_address_correlations(entities))
        correlations.extend(self._find_ip_correlations(entities))

        # Deduplicate (avoid reporting same pair twice)
        unique_correlations = []
        seen = set()
        for corr in correlations:
            pair_key = tuple(sorted([corr["entity_a"], corr["entity_b"]]))
            if pair_key not in seen:
                unique_correlations.append(corr)
                seen.add(pair_key)

        return unique_correlations

    def _find_phone_correlations(self, entities: list[Entity]) -> list[dict]:
        """Find entities sharing phone numbers"""
        phone_map = defaultdict(list)

        for entity in entities:
            # Extract phone from metadata
            phones = self._extract_phones(entity)
            for phone in phones:
                phone_map[phone].append(entity)

        correlations = []
        for phone, entity_list in phone_map.items():
            if len(entity_list) > 1:
                for i in range(len(entity_list)):
                    for j in range(i + 1, len(entity_list)):
                        correlations.append(
                            {
                                "entity_a": entity_list[i].id,
                                "entity_b": entity_list[j].id,
                                "metadata_type": "phone",
                                "metadata_value": phone,
                                "confidence": self.correlation_strength_weights[
                                    "phone"
                                ],
                                "reasoning": f"Both entities share phone {phone}",
                            }
                        )

        return correlations

    def _find_email_correlations(self, entities: list[Entity]) -> list[dict]:
        """Find entities sharing email addresses"""
        email_map = defaultdict(list)

        for entity in entities:
            emails = self._extract_emails(entity)
            for email in emails:
                email_map[email.lower()].append(entity)

        correlations = []
        for email, entity_list in email_map.items():
            if len(entity_list) > 1:
                for i in range(len(entity_list)):
                    for j in range(i + 1, len(entity_list)):
                        correlations.append(
                            {
                                "entity_a": entity_list[i].id,
                                "entity_b": entity_list[j].id,
                                "metadata_type": "email",
                                "metadata_value": email,
                                "confidence": self.correlation_strength_weights[
                                    "email"
                                ],
                                "reasoning": f"Both entities share email {email}",
                            }
                        )

        return correlations

    def _find_address_correlations(self, entities: list[Entity]) -> list[dict]:
        """Find entities sharing physical addresses"""
        address_map = defaultdict(list)

        for entity in entities:
            addresses = self._extract_addresses(entity)
            for address in addresses:
                address_key = self._normalize_address(address)
                address_map[address_key].append(entity)

        correlations = []
        for address, entity_list in address_map.items():
            if len(entity_list) > 1:
                for i in range(len(entity_list)):
                    for j in range(i + 1, len(entity_list)):
                        correlations.append(
                            {
                                "entity_a": entity_list[i].id,
                                "entity_b": entity_list[j].id,
                                "metadata_type": "address",
                                "metadata_value": address,
                                "confidence": self.correlation_strength_weights[
                                    "address"
                                ],
                                "reasoning": f"Both entities share address {address}",
                            }
                        )

        return correlations

    def _find_ip_correlations(self, entities: list[Entity]) -> list[dict]:
        """Find entities sharing IP addresses"""
        ip_map = defaultdict(list)

        for entity in entities:
            ips = self._extract_ips(entity)
            for ip in ips:
                ip_map[ip].append(entity)

        correlations = []
        for ip, entity_list in ip_map.items():
            if len(entity_list) > 1:
                for i in range(len(entity_list)):
                    for j in range(i + 1, len(entity_list)):
                        correlations.append(
                            {
                                "entity_a": entity_list[i].id,
                                "entity_b": entity_list[j].id,
                                "metadata_type": "ip_address",
                                "metadata_value": ip,
                                "confidence": self.correlation_strength_weights[
                                    "ip_address"
                                ],
                                "reasoning": f"Both entities accessed from IP {ip}",
                            }
                        )

        return correlations

    def _extract_phones(self, entity: Entity) -> set[str]:
        """Extract phone numbers from entity metadata"""
        phones = set()
        if entity.entity_metadata and "phone" in entity.entity_metadata:
            phones.add(entity.entity_metadata["phone"])
        if entity.entity_metadata and "phones" in entity.entity_metadata:
            phones.update(entity.entity_metadata["phones"])
        return phones

    def _extract_emails(self, entity: Entity) -> set[str]:
        """Extract emails from entity metadata"""
        emails = set()
        if entity.entity_metadata and "email" in entity.entity_metadata:
            emails.add(entity.entity_metadata["email"])
        if entity.entity_metadata and "emails" in entity.entity_metadata:
            emails.update(entity.entity_metadata["emails"])
        return emails

    def _extract_addresses(self, entity: Entity) -> set[str]:
        """Extract addresses from entity metadata"""
        addresses = set()
        if entity.entity_metadata and "address" in entity.entity_metadata:
            addresses.add(entity.entity_metadata["address"])
        if entity.entity_metadata and "addresses" in entity.entity_metadata:
            addresses.update(entity.entity_metadata["addresses"])
        return addresses

    def _extract_ips(self, entity: Entity) -> set[str]:
        """Extract IP addresses from entity metadata"""
        ips = set()
        if entity.entity_metadata and "ip_address" in entity.entity_metadata:
            ips.add(entity.entity_metadata["ip_address"])
        if entity.entity_metadata and "ip_addresses" in entity.entity_metadata:
            ips.update(entity.entity_metadata["ip_addresses"])
        return ips

    def _normalize_address(self, address: str) -> str:
        """Normalize address for comparison (lowercase, strip whitespace)"""
        return address.lower().strip()