""" Module 3: Digital Evidence Correlation ======================================= Analyzes CCTV logs, mobile metadata, geolocation records. """ import pandas as pd import numpy as np from typing import Dict, List, Any import plotly.graph_objects as go from plotly.subplots import make_subplots import plotly.express as px from io import StringIO class DigitalEvidenceCorrelator: """Correlates and analyzes digital evidence from multiple sources.""" def analyze_from_file(self, filepath: str) -> Dict[str, Any]: df = pd.read_csv(filepath) return self._analyze(df) def analyze_from_text(self, csv_text: str) -> Dict[str, Any]: df = pd.read_csv(StringIO(csv_text)) return self._analyze(df) def _analyze(self, df: pd.DataFrame) -> Dict[str, Any]: df.columns = [c.strip().lower() for c in df.columns] if "timestamp" in df.columns: df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce") df = df.dropna(subset=["timestamp"]) df = df.sort_values("timestamp").reset_index(drop=True) for col in ["timestamp", "source", "event_type", "location_lat", "location_lon", "details"]: if col not in df.columns: df[col] = 0.0 if "location" in col else "unknown" correlations = self._find_correlations(df) gaps = self._detect_gaps(df) patterns = self._identify_patterns(df) evidence_table = df[["timestamp", "source", "event_type", "details"]].copy() evidence_table["timestamp"] = evidence_table["timestamp"].astype(str) correlation_plot = self._build_plot(df, correlations, gaps) analysis_md = self._build_markdown(df, correlations, gaps, patterns) timeline_events = [ {"event": row.get("details", "Unknown"), "category": "Digital Evidence", "source": row.get("source", "Unknown"), "timestamp": str(row.get("timestamp", ""))} for _, row in df.iterrows() ] return { "evidence_table": evidence_table, "correlation_plot": correlation_plot, "analysis_markdown": analysis_md, "evidence_records": df.to_dict("records"), "timeline_events": timeline_events, "correlations": correlations, } def _find_correlations(self, df): correlations = [] if len(df) < 2: return correlations for i in range(len(df) - 1): for j in range(i + 1, min(i + 5, len(df))): td = (df.iloc[j]["timestamp"] - df.iloc[i]["timestamp"]).total_seconds() / 60 if 0 < td <= 15 and df.iloc[i].get("source", "") != df.iloc[j].get("source", ""): correlations.append({ "event_1": f"{df.iloc[i].get('source', '')}: {str(df.iloc[i].get('details', ''))[:40]}", "event_2": f"{df.iloc[j].get('source', '')}: {str(df.iloc[j].get('details', ''))[:40]}", "time_diff_min": round(td, 1), "correlation_type": "temporal_proximity", "significance": "HIGH" if td <= 5 else "MODERATE", }) return correlations def _detect_gaps(self, df): gaps = [] if len(df) < 2: return gaps for i in range(len(df) - 1): td = (df.iloc[i + 1]["timestamp"] - df.iloc[i]["timestamp"]).total_seconds() / 60 if td > 30: gaps.append({ "start": str(df.iloc[i]["timestamp"]), "end": str(df.iloc[i + 1]["timestamp"]), "duration_min": round(td, 1), "before_event": str(df.iloc[i].get("details", "N/A")), "after_event": str(df.iloc[i + 1].get("details", "N/A")), "suspicion_level": "HIGH" if td > 120 else "MODERATE" if td > 60 else "LOW", }) return gaps def _identify_patterns(self, df): patterns = [] if "details" not in df.columns: return patterns details_lower = df["details"].astype(str).str.lower() # Person count discrepancy arriving = df[details_lower.str.contains("two|2|multiple", na=False)] leaving = df[details_lower.str.contains("single|one|alone|1", na=False)] if len(arriving) > 0 and len(leaving) > 0: patterns.append({ "type": "person_count_discrepancy", "description": "Multiple individuals arrived but fewer departed", "significance": "CRITICAL", }) # Rapid departure rapid = df[details_lower.str.contains("high speed|rapid|fast|fleeing", na=False)] if len(rapid) > 0: patterns.append({ "type": "rapid_departure", "description": "Vehicle/person departing at unusual speed", "significance": "HIGH", }) # Communication cutoff if "event_type" in df.columns: disconnects = df[df["event_type"].astype(str).str.contains("disconnect", case=False, na=False)] if len(disconnects) > 0: patterns.append({ "type": "communication_cutoff", "description": f"Device disconnected at {disconnects.iloc[0]['timestamp']}", "significance": "HIGH", }) return patterns def _build_plot(self, df, correlations, gaps): fig = make_subplots(rows=2, cols=1, subplot_titles=("πŸ“ Evidence Timeline by Source", "πŸ“Š Event Density"), row_heights=[0.7, 0.3], vertical_spacing=0.15) if "timestamp" not in df.columns or len(df) == 0: return fig sources = df["source"].unique() if "source" in df.columns else ["Unknown"] colors = px.colors.qualitative.Set2 for idx, source in enumerate(sources): sdf = df[df["source"] == source] if "source" in df.columns else df fig.add_trace(go.Scatter( x=sdf["timestamp"], y=[source] * len(sdf), mode="markers", name=source, marker=dict(size=12, color=colors[idx % len(colors)], symbol="diamond"), hovertemplate="%{y}
Time: %{x}
", ), row=1, col=1) for gap in gaps: if gap["suspicion_level"] in ["HIGH", "MODERATE"]: color = "rgba(248, 81, 73, 0.2)" if gap["suspicion_level"] == "HIGH" else "rgba(255, 166, 87, 0.15)" fig.add_vrect(x0=gap["start"], x1=gap["end"], fillcolor=color, layer="below", line_width=0, row=1, col=1) fig.add_trace(go.Histogram(x=df["timestamp"], nbinsx=20, marker_color="#79c0ff", name="Density", opacity=0.7), row=2, col=1) fig.update_layout(template="plotly_dark", paper_bgcolor="#0d1117", plot_bgcolor="#161b22", font=dict(color="#e6edf3"), height=550, showlegend=True) fig.update_xaxes(gridcolor="#30363d") fig.update_yaxes(gridcolor="#30363d") return fig def _build_markdown(self, df, correlations, gaps, patterns): md = "## πŸ“± Digital Evidence Correlation Report\n\n" md += f"**Total Records:** {len(df)} | " md += f"**Sources:** {', '.join(df['source'].unique()) if 'source' in df.columns else 'N/A'}\n\n" if correlations: md += f"### πŸ”— Correlations ({len(correlations)})\n\n" md += "| Event 1 | Event 2 | Time Diff | Significance |\n|---------|---------|-----------|-------------|\n" for c in correlations[:8]: md += f"| {c['event_1'][:35]} | {c['event_2'][:35]} | {c['time_diff_min']}min | **{c['significance']}** |\n" md += "\n" if gaps: md += f"### ⏸️ Timeline Gaps ({len(gaps)})\n\n" for g in gaps: icon = "πŸ”΄" if g["suspicion_level"] == "HIGH" else "🟑" md += f"{icon} **{g['duration_min']:.0f} min gap** β€” {g['before_event'][:40]} β†’ {g['after_event'][:40]}\n\n" if patterns: md += f"### πŸ” Patterns ({len(patterns)})\n\n" for p in patterns: icon = "🚨" if p["significance"] == "CRITICAL" else "⚠️" md += f"{icon} **{p['type'].replace('_', ' ').title()}** [{p['significance']}]: {p['description']}\n\n" return md