zenith-backend / app /services /intelligence /metadata_correlation_service.py
teoat's picture
Upload folder using huggingface_hub
4ae946d verified
# backend/services/metadata_correlation.py
import logging
from collections import defaultdict
from sqlalchemy.orm import Session
from core.database import Entity
logger = logging.getLogger(__name__)
class MetadataCorrelationEngine:
"""
Detects relationships between entities via shared metadata.
Examples:
- Same phone number: Person A and Person B share +1-555-0123
- Same email: Person C and Company D both registered to john@example.com
- Same IP: User E and Merchant F both accessed from 192.168.1.100
- Same address: Individual G and Shell Corp H both at 123 Main St
"""
def __init__(self, session: Session):
self.session = session
self.correlation_strength_weights = {
"phone": 0.8, # High weight - phone is unique
"email": 0.85, # Very high weight
"address": 0.7, # Medium weight - shared addresses are common
"ip_address": 0.6, # Lower weight - IP can be shared
"name_similarity": 0.5, # Fuzzy match on names
}
def find_all_correlations(self, case_id: str) -> list[dict]:
"""Find all metadata correlations within a case"""
entities = self.session.query(Entity).filter(Entity.case_id == case_id).all()
correlations = []
# Check each metadata type
correlations.extend(self._find_phone_correlations(entities))
correlations.extend(self._find_email_correlations(entities))
correlations.extend(self._find_address_correlations(entities))
correlations.extend(self._find_ip_correlations(entities))
# Deduplicate (avoid reporting same pair twice)
unique_correlations = []
seen = set()
for corr in correlations:
pair_key = tuple(sorted([corr["entity_a"], corr["entity_b"]]))
if pair_key not in seen:
unique_correlations.append(corr)
seen.add(pair_key)
return unique_correlations
def _find_phone_correlations(self, entities: list[Entity]) -> list[dict]:
"""Find entities sharing phone numbers"""
phone_map = defaultdict(list)
for entity in entities:
# Extract phone from metadata
phones = self._extract_phones(entity)
for phone in phones:
phone_map[phone].append(entity)
correlations = []
for phone, entity_list in phone_map.items():
if len(entity_list) > 1:
for i in range(len(entity_list)):
for j in range(i + 1, len(entity_list)):
correlations.append(
{
"entity_a": entity_list[i].id,
"entity_b": entity_list[j].id,
"metadata_type": "phone",
"metadata_value": phone,
"confidence": self.correlation_strength_weights[
"phone"
],
"reasoning": f"Both entities share phone {phone}",
}
)
return correlations
def _find_email_correlations(self, entities: list[Entity]) -> list[dict]:
"""Find entities sharing email addresses"""
email_map = defaultdict(list)
for entity in entities:
emails = self._extract_emails(entity)
for email in emails:
email_map[email.lower()].append(entity)
correlations = []
for email, entity_list in email_map.items():
if len(entity_list) > 1:
for i in range(len(entity_list)):
for j in range(i + 1, len(entity_list)):
correlations.append(
{
"entity_a": entity_list[i].id,
"entity_b": entity_list[j].id,
"metadata_type": "email",
"metadata_value": email,
"confidence": self.correlation_strength_weights[
"email"
],
"reasoning": f"Both entities share email {email}",
}
)
return correlations
def _find_address_correlations(self, entities: list[Entity]) -> list[dict]:
"""Find entities sharing physical addresses"""
address_map = defaultdict(list)
for entity in entities:
addresses = self._extract_addresses(entity)
for address in addresses:
address_key = self._normalize_address(address)
address_map[address_key].append(entity)
correlations = []
for address, entity_list in address_map.items():
if len(entity_list) > 1:
for i in range(len(entity_list)):
for j in range(i + 1, len(entity_list)):
correlations.append(
{
"entity_a": entity_list[i].id,
"entity_b": entity_list[j].id,
"metadata_type": "address",
"metadata_value": address,
"confidence": self.correlation_strength_weights[
"address"
],
"reasoning": f"Both entities share address {address}",
}
)
return correlations
def _find_ip_correlations(self, entities: list[Entity]) -> list[dict]:
"""Find entities sharing IP addresses"""
ip_map = defaultdict(list)
for entity in entities:
ips = self._extract_ips(entity)
for ip in ips:
ip_map[ip].append(entity)
correlations = []
for ip, entity_list in ip_map.items():
if len(entity_list) > 1:
for i in range(len(entity_list)):
for j in range(i + 1, len(entity_list)):
correlations.append(
{
"entity_a": entity_list[i].id,
"entity_b": entity_list[j].id,
"metadata_type": "ip_address",
"metadata_value": ip,
"confidence": self.correlation_strength_weights[
"ip_address"
],
"reasoning": f"Both entities accessed from IP {ip}",
}
)
return correlations
def _extract_phones(self, entity: Entity) -> set[str]:
"""Extract phone numbers from entity metadata"""
phones = set()
if entity.entity_metadata and "phone" in entity.entity_metadata:
phones.add(entity.entity_metadata["phone"])
if entity.entity_metadata and "phones" in entity.entity_metadata:
phones.update(entity.entity_metadata["phones"])
return phones
def _extract_emails(self, entity: Entity) -> set[str]:
"""Extract emails from entity metadata"""
emails = set()
if entity.entity_metadata and "email" in entity.entity_metadata:
emails.add(entity.entity_metadata["email"])
if entity.entity_metadata and "emails" in entity.entity_metadata:
emails.update(entity.entity_metadata["emails"])
return emails
def _extract_addresses(self, entity: Entity) -> set[str]:
"""Extract addresses from entity metadata"""
addresses = set()
if entity.entity_metadata and "address" in entity.entity_metadata:
addresses.add(entity.entity_metadata["address"])
if entity.entity_metadata and "addresses" in entity.entity_metadata:
addresses.update(entity.entity_metadata["addresses"])
return addresses
def _extract_ips(self, entity: Entity) -> set[str]:
"""Extract IP addresses from entity metadata"""
ips = set()
if entity.entity_metadata and "ip_address" in entity.entity_metadata:
ips.add(entity.entity_metadata["ip_address"])
if entity.entity_metadata and "ip_addresses" in entity.entity_metadata:
ips.update(entity.entity_metadata["ip_addresses"])
return ips
def _normalize_address(self, address: str) -> str:
"""Normalize address for comparison (lowercase, strip whitespace)"""
return address.lower().strip()