| """ |
| Module 3: Digital Evidence Correlation |
| ======================================= |
| Analyzes CCTV logs, mobile metadata, geolocation records. |
| """ |
|
|
| import pandas as pd |
| import numpy as np |
| from typing import Dict, List, Any |
| import plotly.graph_objects as go |
| from plotly.subplots import make_subplots |
| import plotly.express as px |
| from io import StringIO |
|
|
|
|
| class DigitalEvidenceCorrelator: |
| """Correlates and analyzes digital evidence from multiple sources.""" |
|
|
| def analyze_from_file(self, filepath: str) -> Dict[str, Any]: |
| df = pd.read_csv(filepath) |
| return self._analyze(df) |
|
|
| def analyze_from_text(self, csv_text: str) -> Dict[str, Any]: |
| df = pd.read_csv(StringIO(csv_text)) |
| return self._analyze(df) |
|
|
| def _analyze(self, df: pd.DataFrame) -> Dict[str, Any]: |
| df.columns = [c.strip().lower() for c in df.columns] |
| if "timestamp" in df.columns: |
| df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce") |
| df = df.dropna(subset=["timestamp"]) |
| df = df.sort_values("timestamp").reset_index(drop=True) |
|
|
| for col in ["timestamp", "source", "event_type", "location_lat", "location_lon", "details"]: |
| if col not in df.columns: |
| df[col] = 0.0 if "location" in col else "unknown" |
|
|
| correlations = self._find_correlations(df) |
| gaps = self._detect_gaps(df) |
| patterns = self._identify_patterns(df) |
|
|
| evidence_table = df[["timestamp", "source", "event_type", "details"]].copy() |
| evidence_table["timestamp"] = evidence_table["timestamp"].astype(str) |
|
|
| correlation_plot = self._build_plot(df, correlations, gaps) |
| analysis_md = self._build_markdown(df, correlations, gaps, patterns) |
| timeline_events = [ |
| {"event": row.get("details", "Unknown"), "category": "Digital Evidence", |
| "source": row.get("source", "Unknown"), "timestamp": str(row.get("timestamp", ""))} |
| for _, row in df.iterrows() |
| ] |
|
|
| return { |
| "evidence_table": evidence_table, |
| "correlation_plot": correlation_plot, |
| "analysis_markdown": analysis_md, |
| "evidence_records": df.to_dict("records"), |
| "timeline_events": timeline_events, |
| "correlations": correlations, |
| } |
|
|
| def _find_correlations(self, df): |
| correlations = [] |
| if len(df) < 2: |
| return correlations |
| for i in range(len(df) - 1): |
| for j in range(i + 1, min(i + 5, len(df))): |
| td = (df.iloc[j]["timestamp"] - df.iloc[i]["timestamp"]).total_seconds() / 60 |
| if 0 < td <= 15 and df.iloc[i].get("source", "") != df.iloc[j].get("source", ""): |
| correlations.append({ |
| "event_1": f"{df.iloc[i].get('source', '')}: {str(df.iloc[i].get('details', ''))[:40]}", |
| "event_2": f"{df.iloc[j].get('source', '')}: {str(df.iloc[j].get('details', ''))[:40]}", |
| "time_diff_min": round(td, 1), |
| "correlation_type": "temporal_proximity", |
| "significance": "HIGH" if td <= 5 else "MODERATE", |
| }) |
| return correlations |
|
|
| def _detect_gaps(self, df): |
| gaps = [] |
| if len(df) < 2: |
| return gaps |
| for i in range(len(df) - 1): |
| td = (df.iloc[i + 1]["timestamp"] - df.iloc[i]["timestamp"]).total_seconds() / 60 |
| if td > 30: |
| gaps.append({ |
| "start": str(df.iloc[i]["timestamp"]), |
| "end": str(df.iloc[i + 1]["timestamp"]), |
| "duration_min": round(td, 1), |
| "before_event": str(df.iloc[i].get("details", "N/A")), |
| "after_event": str(df.iloc[i + 1].get("details", "N/A")), |
| "suspicion_level": "HIGH" if td > 120 else "MODERATE" if td > 60 else "LOW", |
| }) |
| return gaps |
|
|
| def _identify_patterns(self, df): |
| patterns = [] |
| if "details" not in df.columns: |
| return patterns |
| details_lower = df["details"].astype(str).str.lower() |
|
|
| |
| arriving = df[details_lower.str.contains("two|2|multiple", na=False)] |
| leaving = df[details_lower.str.contains("single|one|alone|1", na=False)] |
| if len(arriving) > 0 and len(leaving) > 0: |
| patterns.append({ |
| "type": "person_count_discrepancy", |
| "description": "Multiple individuals arrived but fewer departed", |
| "significance": "CRITICAL", |
| }) |
|
|
| |
| rapid = df[details_lower.str.contains("high speed|rapid|fast|fleeing", na=False)] |
| if len(rapid) > 0: |
| patterns.append({ |
| "type": "rapid_departure", |
| "description": "Vehicle/person departing at unusual speed", |
| "significance": "HIGH", |
| }) |
|
|
| |
| if "event_type" in df.columns: |
| disconnects = df[df["event_type"].astype(str).str.contains("disconnect", case=False, na=False)] |
| if len(disconnects) > 0: |
| patterns.append({ |
| "type": "communication_cutoff", |
| "description": f"Device disconnected at {disconnects.iloc[0]['timestamp']}", |
| "significance": "HIGH", |
| }) |
|
|
| return patterns |
|
|
| def _build_plot(self, df, correlations, gaps): |
| fig = make_subplots(rows=2, cols=1, |
| subplot_titles=("π Evidence Timeline by Source", "π Event Density"), |
| row_heights=[0.7, 0.3], vertical_spacing=0.15) |
|
|
| if "timestamp" not in df.columns or len(df) == 0: |
| return fig |
|
|
| sources = df["source"].unique() if "source" in df.columns else ["Unknown"] |
| colors = px.colors.qualitative.Set2 |
|
|
| for idx, source in enumerate(sources): |
| sdf = df[df["source"] == source] if "source" in df.columns else df |
| fig.add_trace(go.Scatter( |
| x=sdf["timestamp"], y=[source] * len(sdf), |
| mode="markers", name=source, |
| marker=dict(size=12, color=colors[idx % len(colors)], symbol="diamond"), |
| hovertemplate="<b>%{y}</b><br>Time: %{x}<br><extra></extra>", |
| ), row=1, col=1) |
|
|
| for gap in gaps: |
| if gap["suspicion_level"] in ["HIGH", "MODERATE"]: |
| color = "rgba(248, 81, 73, 0.2)" if gap["suspicion_level"] == "HIGH" else "rgba(255, 166, 87, 0.15)" |
| fig.add_vrect(x0=gap["start"], x1=gap["end"], fillcolor=color, |
| layer="below", line_width=0, row=1, col=1) |
|
|
| fig.add_trace(go.Histogram(x=df["timestamp"], nbinsx=20, |
| marker_color="#79c0ff", name="Density", opacity=0.7), row=2, col=1) |
|
|
| fig.update_layout(template="plotly_dark", paper_bgcolor="#0d1117", |
| plot_bgcolor="#161b22", font=dict(color="#e6edf3"), height=550, showlegend=True) |
| fig.update_xaxes(gridcolor="#30363d") |
| fig.update_yaxes(gridcolor="#30363d") |
| return fig |
|
|
| def _build_markdown(self, df, correlations, gaps, patterns): |
| md = "## π± Digital Evidence Correlation Report\n\n" |
| md += f"**Total Records:** {len(df)} | " |
| md += f"**Sources:** {', '.join(df['source'].unique()) if 'source' in df.columns else 'N/A'}\n\n" |
|
|
| if correlations: |
| md += f"### π Correlations ({len(correlations)})\n\n" |
| md += "| Event 1 | Event 2 | Time Diff | Significance |\n|---------|---------|-----------|-------------|\n" |
| for c in correlations[:8]: |
| md += f"| {c['event_1'][:35]} | {c['event_2'][:35]} | {c['time_diff_min']}min | **{c['significance']}** |\n" |
| md += "\n" |
|
|
| if gaps: |
| md += f"### βΈοΈ Timeline Gaps ({len(gaps)})\n\n" |
| for g in gaps: |
| icon = "π΄" if g["suspicion_level"] == "HIGH" else "π‘" |
| md += f"{icon} **{g['duration_min']:.0f} min gap** β {g['before_event'][:40]} β {g['after_event'][:40]}\n\n" |
|
|
| if patterns: |
| md += f"### π Patterns ({len(patterns)})\n\n" |
| for p in patterns: |
| icon = "π¨" if p["significance"] == "CRITICAL" else "β οΈ" |
| md += f"{icon} **{p['type'].replace('_', ' ').title()}** [{p['significance']}]: {p['description']}\n\n" |
|
|
| return md |
|
|