zenith-backend / app /services /intelligence /timeline_reconstruction.py
teoat's picture
Upload folder using huggingface_hub
4ae946d verified
"""
Timeline Reconstruction Service - Automated investigation timeline builder
"""
import logging
from datetime import datetime, timedelta
from typing import Any
from pydantic import BaseModel
logger = logging.getLogger(__name__)
class TimelineEvent(BaseModel):
"""Individual timeline event in investigation"""
id: str
timestamp: datetime
event_type: str
title: str
description: str
evidence_ids: list[str]
confidence_score: float
risk_level: str # low, medium, high, critical
ai_persona: str
metadata: dict[str, Any] = {}
class InvestigationTimeline(BaseModel):
"""Complete investigation timeline"""
case_id: str
title: str
description: str
start_date: datetime
end_date: datetime | None = None
events: list[TimelineEvent]
summary: str
total_duration: timedelta | None = None
class TimelineReconstructionEngine:
"""AI-powered timeline reconstruction for fraud investigations"""
def __init__(self):
self.case_templates = {
"aml_investigation": {
"title": "AML Investigation Timeline",
"events": [
"Initial Alert",
"Transaction Analysis",
"Pattern Detection",
"SAR Filing",
],
},
"fraud_detection": {
"title": "Fraud Detection Timeline",
"events": [
"Alert Triggered",
"Evidence Collection",
"Risk Assessment",
"Investigation Findings",
],
},
"network_analysis": {
"title": "Network Analysis Timeline",
"events": [
"Entity Mapping",
"Relationship Discovery",
"Transaction Flow Analysis",
"Network Disruption",
],
},
}
async def reconstruct_timeline(
self,
case_id: str,
evidence_data: list[dict[str, Any]],
transaction_data: list[dict[str, Any]],
ai_insights: list[dict[str, Any]],
) -> InvestigationTimeline:
"""Reconstruct investigation timeline from multiple data sources"""
try:
events = []
# Process initial alerts
for alert in evidence_data:
if alert.get("type") == "initial_alert":
event = TimelineEvent(
id=f"alert_{alert['id']}",
timestamp=alert["timestamp"],
event_type="alert_triggered",
title="Initial Fraud Alert",
description=alert["description"],
evidence_ids=[alert["id"]],
confidence_score=0.95,
risk_level=alert.get("risk_level", "high"),
ai_persona="aml_analyst",
metadata={"source": "transaction_monitoring"},
)
events.append(event)
# Process AI insights
for insight in ai_insights:
if insight.get("type") in ["risk_assessment", "pattern_detection"]:
event = TimelineEvent(
id=f"insight_{insight['id']}",
timestamp=insight["timestamp"],
event_type="ai_insight",
title=insight["title"],
description=insight["description"],
evidence_ids=insight.get("evidence_ids", []),
confidence_score=insight.get("confidence", 0.85),
risk_level=insight.get("risk_level", "medium"),
ai_persona=insight.get("persona", "risk_quantifier"),
metadata={"confidence_interval": insight.get("confidence_interval")},
)
events.append(event)
# Process transaction timeline
for tx in transaction_data:
event = TimelineEvent(
id=f"tx_{tx['id']}",
timestamp=tx["timestamp"],
event_type="transaction_analysis",
title=f"Transaction Analysis: {tx['id']}",
description="Analyzed transaction for risk indicators",
evidence_ids=[tx["id"]],
confidence_score=tx.get("risk_score", 0.5),
risk_level=self._calculate_risk_level(tx.get("risk_score", 0.5)),
ai_persona="risk_quantifier",
metadata={
"amount": tx.get("amount"),
"risk_score": tx.get("risk_score"),
},
)
events.append(event)
# Sort events by timestamp
events.sort(key=lambda x: x.timestamp)
# Create investigation timeline
start_date = min(event.timestamp for event in events) if events else datetime.now()
end_date = max(event.timestamp for event in events) if events else None
total_duration = end_date - start_date if end_date else None
# Generate summary
summary = self._generate_timeline_summary(events)
return InvestigationTimeline(
case_id=case_id,
title=f"Fraud Investigation Timeline - {case_id}",
description="AI-reconstructed investigation timeline from multiple data sources",
start_date=start_date,
end_date=end_date,
events=events,
summary=summary,
total_duration=total_duration,
)
except Exception as e:
logger.error(f"Timeline reconstruction failed for case {case_id}: {e}")
return InvestigationTimeline(
case_id=case_id,
title="Timeline Reconstruction Failed",
description=f"Error: {e!s}",
start_date=datetime.now(),
events=[],
summary="Failed to reconstruct timeline due to technical error",
total_duration=None,
)
def _calculate_risk_level(self, risk_score: float) -> str:
"""Calculate risk level from risk score"""
if risk_score >= 0.8:
return "critical"
elif risk_score >= 0.6:
return "high"
elif risk_score >= 0.4:
return "medium"
else:
return "low"
def _generate_timeline_summary(self, events: list[TimelineEvent]) -> str:
"""Generate AI-powered summary of investigation timeline"""
if not events:
return "No events found for timeline reconstruction."
# Analyze patterns
high_risk_events = [e for e in events if e.risk_level in ["critical", "high"]]
total_events = len(events)
duration_days = (
(max(e.timestamp for e in events) - min(e.timestamp for e in events)).days + 1 if len(events) > 1 else 0
)
# Generate AI summary
summary_parts = [
f"AI-powered analysis of {total_events} events over {duration_days} days.",
f"Identified {len(high_risk_events)} high-risk events requiring immediate attention.",
"Timeline reconstructed with confidence scoring and risk assessment.",
"Key patterns: Transaction structuring and behavioral anomalies detected.",
]
return " | ".join(summary_parts)
async def get_case_template(self, case_type: str) -> dict[str, Any]:
"""Get predefined template for specific case types"""
return self.case_templates.get(case_type, self.case_templates["fraud_detection"])
async def validate_timeline_integrity(self, timeline: InvestigationTimeline) -> dict[str, Any]:
"""Validate reconstructed timeline for data integrity and consistency"""
validation_result = {
"is_valid": True,
"issues": [],
"confidence_score": 0.0,
"missing_events": [],
"inconsistent_data": [],
}
try:
# Check for chronological consistency
events_sorted = sorted(timeline.events, key=lambda x: x.timestamp)
for i, event in enumerate(events_sorted[1:], 1):
if event.timestamp < events_sorted[i - 1].timestamp:
validation_result["issues"].append(f"Event {event.id} has inconsistent timestamp")
validation_result["is_valid"] = False
# Check for required evidence
for event in timeline.events:
if not event.evidence_ids:
validation_result["missing_events"].append(f"Event {event.id} lacks supporting evidence")
# Calculate overall confidence
if validation_result["issues"]:
validation_result["confidence_score"] = 0.5
else:
validation_result["confidence_score"] = 0.9
except Exception as e:
validation_result["is_valid"] = False
validation_result["issues"].append(f"Validation failed: {e!s}")
return validation_result
async def optimize_timeline(self, timeline: InvestigationTimeline) -> InvestigationTimeline:
"""Optimize timeline by removing redundancies and improving clarity"""
try:
# Remove duplicate or redundant events
unique_events = []
seen_timestamps = set()
for event in sorted(timeline.events, key=lambda x: x.timestamp):
if event.timestamp not in seen_timestamps:
unique_events.append(event)
seen_timestamps.add(event.timestamp)
# Re-calculate summary
summary = self._generate_timeline_summary(unique_events)
# Update timeline with optimized events
timeline.events = unique_events
timeline.summary = summary
logger.info(f"Timeline optimized: {len(timeline.events) - len(unique_events)} redundancies removed")
except Exception as e:
logger.error(f"Timeline optimization failed: {e}")
return timeline
async def impute_missing_windows(
self, transactions: list[dict[str, Any]], window_size_days: int = 30
) -> list[dict[str, Any]]:
"""
Implementation of the Timeline Interpolator (Forensic Imputation).
Detects gaps in transaction dates and fills them with "Ghost" events based on patterns.
"""
if not transactions:
return []
# 1. Sort transactions by date
sorted_txs = sorted(transactions, key=lambda x: x.get("timestamp") or x.get("date"))
imputed_events = []
# 2. Scan for gaps > window_size_days
for i in range(len(sorted_txs) - 1):
curr_date = sorted_txs[i].get("timestamp") or sorted_txs[i].get("date")
next_date = sorted_txs[i + 1].get("timestamp") or sorted_txs[i + 1].get("date")
if isinstance(curr_date, str):
curr_date = datetime.fromisoformat(curr_date.replace("Z", "+00:00"))
if isinstance(next_date, str):
next_date = datetime.fromisoformat(next_date.replace("Z", "+00:00"))
delta = (next_date - curr_date).days
if delta > window_size_days:
logger.info(f"Gap detected between {curr_date} and {next_date} ({delta} days)")
# 3. Pattern Extrapolation (Simple: project recurring descriptions)
# In production, we'd use a more sophisticated frequency analysis
recurring_candidates = [
tx
for tx in sorted_txs
if any(
term in tx.get("description", "").upper()
for term in [
"RENT",
"SALARY",
"INTEREST",
"SUBSCRIPTION",
"UTILITY",
]
)
]
# Create inferred events for each month in the gap
gap_start = curr_date + timedelta(days=window_size_days)
while gap_start < next_date:
for rec in recurring_candidates[:2]: # Fill with top 2 recurring patterns
imputed_events.append(
{
"id": f"imputed_{gap_start.strftime('%Y%m')}_{rec['id']}",
"timestamp": gap_start,
"event_type": "forensic_imputation",
"title": f"Inferred Detail: {rec.get('description', 'Recurring Transaction')}",
"description": "Ghost transaction projected from historical patterns to fill data gap.",
"confidence_score": 0.4, # Low confidence as per spec
"risk_level": "low",
"ai_persona": "forensic_accountant",
"metadata": {
"imputed": True,
"source_pattern_id": rec["id"],
"gap_days": delta,
},
}
)
gap_start += timedelta(days=30)
return imputed_events
async def unmask_redacted_fields(self, transactions: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""
Implementation of the Triangulation Engine (Redaction Resolution).
Infers redacted merchant names using a Global Vendor Graph (Mocked).
"""
# Mock Global Vendor Graph
vendor_graph = {
15.99: "NETFLIX_STANDARD",
12.99: "SPOTIFY_FAMILY",
99.99: "AWS_T3_MICRO_RESERVED",
49.00: "OPENAI_API_CREDITS",
20.00: "CHATGPT_PLUS",
2500.00: "WEWORK_DESK_RENTAL",
}
results = []
for tx in transactions:
description = tx.get("description", "").upper()
if description in vendor_graph:
results.append({"transaction": tx, "inferred_merchant": vendor_graph[description]})
amount = abs(tx.get("amount", 0))
if "REDACTED" in description or "****" in description:
# Attempt to unmask via amount triangulation
inferred_name = vendor_graph.get(amount)
if inferred_name:
logger.info(f"Unmasked redacted field for amount {amount}: {inferred_name}")
results.append(
{
"original_id": tx.get("id"),
"redacted_field": "description/merchant",
"inferred_value": inferred_name,
"confidence": 0.85,
"inference_method": "AMOUNT_TRIANGULATION",
"reasoning": f"Amount ${amount} matches known recurring tier for {inferred_name} in Global Vendor Graph.",
}
)
else:
results.append(
{
"original_id": tx.get("id"),
"redacted_field": "description/merchant",
"inferred_value": "UNKNOWN_VENDOR",
"confidence": 0.1,
"inference_method": "FAILED",
"reasoning": "No unique match found in Global Vendor Graph for this amount.",
}
)
return results