Spaces:
Paused
Paused
File size: 8,816 Bytes
4a2ab42 4ae946d 4a2ab42 4ae946d 4a2ab42 4ae946d 4a2ab42 4ae946d 4a2ab42 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 | # backend/services/metadata_correlation.py
import logging
from collections import defaultdict
from sqlalchemy.orm import Session
from core.database import Entity
logger = logging.getLogger(__name__)
class MetadataCorrelationEngine:
"""
Detects relationships between entities via shared metadata.
Examples:
- Same phone number: Person A and Person B share +1-555-0123
- Same email: Person C and Company D both registered to john@example.com
- Same IP: User E and Merchant F both accessed from 192.168.1.100
- Same address: Individual G and Shell Corp H both at 123 Main St
"""
def __init__(self, session: Session):
self.session = session
self.correlation_strength_weights = {
"phone": 0.8, # High weight - phone is unique
"email": 0.85, # Very high weight
"address": 0.7, # Medium weight - shared addresses are common
"ip_address": 0.6, # Lower weight - IP can be shared
"name_similarity": 0.5, # Fuzzy match on names
}
def find_all_correlations(self, case_id: str) -> list[dict]:
"""Find all metadata correlations within a case"""
entities = self.session.query(Entity).filter(Entity.case_id == case_id).all()
correlations = []
# Check each metadata type
correlations.extend(self._find_phone_correlations(entities))
correlations.extend(self._find_email_correlations(entities))
correlations.extend(self._find_address_correlations(entities))
correlations.extend(self._find_ip_correlations(entities))
# Deduplicate (avoid reporting same pair twice)
unique_correlations = []
seen = set()
for corr in correlations:
pair_key = tuple(sorted([corr["entity_a"], corr["entity_b"]]))
if pair_key not in seen:
unique_correlations.append(corr)
seen.add(pair_key)
return unique_correlations
def _find_phone_correlations(self, entities: list[Entity]) -> list[dict]:
"""Find entities sharing phone numbers"""
phone_map = defaultdict(list)
for entity in entities:
# Extract phone from metadata
phones = self._extract_phones(entity)
for phone in phones:
phone_map[phone].append(entity)
correlations = []
for phone, entity_list in phone_map.items():
if len(entity_list) > 1:
for i in range(len(entity_list)):
for j in range(i + 1, len(entity_list)):
correlations.append(
{
"entity_a": entity_list[i].id,
"entity_b": entity_list[j].id,
"metadata_type": "phone",
"metadata_value": phone,
"confidence": self.correlation_strength_weights[
"phone"
],
"reasoning": f"Both entities share phone {phone}",
}
)
return correlations
def _find_email_correlations(self, entities: list[Entity]) -> list[dict]:
"""Find entities sharing email addresses"""
email_map = defaultdict(list)
for entity in entities:
emails = self._extract_emails(entity)
for email in emails:
email_map[email.lower()].append(entity)
correlations = []
for email, entity_list in email_map.items():
if len(entity_list) > 1:
for i in range(len(entity_list)):
for j in range(i + 1, len(entity_list)):
correlations.append(
{
"entity_a": entity_list[i].id,
"entity_b": entity_list[j].id,
"metadata_type": "email",
"metadata_value": email,
"confidence": self.correlation_strength_weights[
"email"
],
"reasoning": f"Both entities share email {email}",
}
)
return correlations
def _find_address_correlations(self, entities: list[Entity]) -> list[dict]:
"""Find entities sharing physical addresses"""
address_map = defaultdict(list)
for entity in entities:
addresses = self._extract_addresses(entity)
for address in addresses:
address_key = self._normalize_address(address)
address_map[address_key].append(entity)
correlations = []
for address, entity_list in address_map.items():
if len(entity_list) > 1:
for i in range(len(entity_list)):
for j in range(i + 1, len(entity_list)):
correlations.append(
{
"entity_a": entity_list[i].id,
"entity_b": entity_list[j].id,
"metadata_type": "address",
"metadata_value": address,
"confidence": self.correlation_strength_weights[
"address"
],
"reasoning": f"Both entities share address {address}",
}
)
return correlations
def _find_ip_correlations(self, entities: list[Entity]) -> list[dict]:
"""Find entities sharing IP addresses"""
ip_map = defaultdict(list)
for entity in entities:
ips = self._extract_ips(entity)
for ip in ips:
ip_map[ip].append(entity)
correlations = []
for ip, entity_list in ip_map.items():
if len(entity_list) > 1:
for i in range(len(entity_list)):
for j in range(i + 1, len(entity_list)):
correlations.append(
{
"entity_a": entity_list[i].id,
"entity_b": entity_list[j].id,
"metadata_type": "ip_address",
"metadata_value": ip,
"confidence": self.correlation_strength_weights[
"ip_address"
],
"reasoning": f"Both entities accessed from IP {ip}",
}
)
return correlations
def _extract_phones(self, entity: Entity) -> set[str]:
"""Extract phone numbers from entity metadata"""
phones = set()
if entity.entity_metadata and "phone" in entity.entity_metadata:
phones.add(entity.entity_metadata["phone"])
if entity.entity_metadata and "phones" in entity.entity_metadata:
phones.update(entity.entity_metadata["phones"])
return phones
def _extract_emails(self, entity: Entity) -> set[str]:
"""Extract emails from entity metadata"""
emails = set()
if entity.entity_metadata and "email" in entity.entity_metadata:
emails.add(entity.entity_metadata["email"])
if entity.entity_metadata and "emails" in entity.entity_metadata:
emails.update(entity.entity_metadata["emails"])
return emails
def _extract_addresses(self, entity: Entity) -> set[str]:
"""Extract addresses from entity metadata"""
addresses = set()
if entity.entity_metadata and "address" in entity.entity_metadata:
addresses.add(entity.entity_metadata["address"])
if entity.entity_metadata and "addresses" in entity.entity_metadata:
addresses.update(entity.entity_metadata["addresses"])
return addresses
def _extract_ips(self, entity: Entity) -> set[str]:
"""Extract IP addresses from entity metadata"""
ips = set()
if entity.entity_metadata and "ip_address" in entity.entity_metadata:
ips.add(entity.entity_metadata["ip_address"])
if entity.entity_metadata and "ip_addresses" in entity.entity_metadata:
ips.update(entity.entity_metadata["ip_addresses"])
return ips
def _normalize_address(self, address: str) -> str:
"""Normalize address for comparison (lowercase, strip whitespace)"""
return address.lower().strip()
|