forensic-triage-intelligence / modules /digital_evidence.py
Muthukumarank's picture
Add modules/digital_evidence.py
57a5b21 verified
"""
Module 3: Digital Evidence Correlation
=======================================
Analyzes CCTV logs, mobile metadata, geolocation records.
"""
import pandas as pd
import numpy as np
from typing import Dict, List, Any
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import plotly.express as px
from io import StringIO
class DigitalEvidenceCorrelator:
"""Correlates and analyzes digital evidence from multiple sources."""
def analyze_from_file(self, filepath: str) -> Dict[str, Any]:
df = pd.read_csv(filepath)
return self._analyze(df)
def analyze_from_text(self, csv_text: str) -> Dict[str, Any]:
df = pd.read_csv(StringIO(csv_text))
return self._analyze(df)
def _analyze(self, df: pd.DataFrame) -> Dict[str, Any]:
df.columns = [c.strip().lower() for c in df.columns]
if "timestamp" in df.columns:
df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce")
df = df.dropna(subset=["timestamp"])
df = df.sort_values("timestamp").reset_index(drop=True)
for col in ["timestamp", "source", "event_type", "location_lat", "location_lon", "details"]:
if col not in df.columns:
df[col] = 0.0 if "location" in col else "unknown"
correlations = self._find_correlations(df)
gaps = self._detect_gaps(df)
patterns = self._identify_patterns(df)
evidence_table = df[["timestamp", "source", "event_type", "details"]].copy()
evidence_table["timestamp"] = evidence_table["timestamp"].astype(str)
correlation_plot = self._build_plot(df, correlations, gaps)
analysis_md = self._build_markdown(df, correlations, gaps, patterns)
timeline_events = [
{"event": row.get("details", "Unknown"), "category": "Digital Evidence",
"source": row.get("source", "Unknown"), "timestamp": str(row.get("timestamp", ""))}
for _, row in df.iterrows()
]
return {
"evidence_table": evidence_table,
"correlation_plot": correlation_plot,
"analysis_markdown": analysis_md,
"evidence_records": df.to_dict("records"),
"timeline_events": timeline_events,
"correlations": correlations,
}
def _find_correlations(self, df):
correlations = []
if len(df) < 2:
return correlations
for i in range(len(df) - 1):
for j in range(i + 1, min(i + 5, len(df))):
td = (df.iloc[j]["timestamp"] - df.iloc[i]["timestamp"]).total_seconds() / 60
if 0 < td <= 15 and df.iloc[i].get("source", "") != df.iloc[j].get("source", ""):
correlations.append({
"event_1": f"{df.iloc[i].get('source', '')}: {str(df.iloc[i].get('details', ''))[:40]}",
"event_2": f"{df.iloc[j].get('source', '')}: {str(df.iloc[j].get('details', ''))[:40]}",
"time_diff_min": round(td, 1),
"correlation_type": "temporal_proximity",
"significance": "HIGH" if td <= 5 else "MODERATE",
})
return correlations
def _detect_gaps(self, df):
gaps = []
if len(df) < 2:
return gaps
for i in range(len(df) - 1):
td = (df.iloc[i + 1]["timestamp"] - df.iloc[i]["timestamp"]).total_seconds() / 60
if td > 30:
gaps.append({
"start": str(df.iloc[i]["timestamp"]),
"end": str(df.iloc[i + 1]["timestamp"]),
"duration_min": round(td, 1),
"before_event": str(df.iloc[i].get("details", "N/A")),
"after_event": str(df.iloc[i + 1].get("details", "N/A")),
"suspicion_level": "HIGH" if td > 120 else "MODERATE" if td > 60 else "LOW",
})
return gaps
def _identify_patterns(self, df):
patterns = []
if "details" not in df.columns:
return patterns
details_lower = df["details"].astype(str).str.lower()
# Person count discrepancy
arriving = df[details_lower.str.contains("two|2|multiple", na=False)]
leaving = df[details_lower.str.contains("single|one|alone|1", na=False)]
if len(arriving) > 0 and len(leaving) > 0:
patterns.append({
"type": "person_count_discrepancy",
"description": "Multiple individuals arrived but fewer departed",
"significance": "CRITICAL",
})
# Rapid departure
rapid = df[details_lower.str.contains("high speed|rapid|fast|fleeing", na=False)]
if len(rapid) > 0:
patterns.append({
"type": "rapid_departure",
"description": "Vehicle/person departing at unusual speed",
"significance": "HIGH",
})
# Communication cutoff
if "event_type" in df.columns:
disconnects = df[df["event_type"].astype(str).str.contains("disconnect", case=False, na=False)]
if len(disconnects) > 0:
patterns.append({
"type": "communication_cutoff",
"description": f"Device disconnected at {disconnects.iloc[0]['timestamp']}",
"significance": "HIGH",
})
return patterns
def _build_plot(self, df, correlations, gaps):
fig = make_subplots(rows=2, cols=1,
subplot_titles=("πŸ“ Evidence Timeline by Source", "πŸ“Š Event Density"),
row_heights=[0.7, 0.3], vertical_spacing=0.15)
if "timestamp" not in df.columns or len(df) == 0:
return fig
sources = df["source"].unique() if "source" in df.columns else ["Unknown"]
colors = px.colors.qualitative.Set2
for idx, source in enumerate(sources):
sdf = df[df["source"] == source] if "source" in df.columns else df
fig.add_trace(go.Scatter(
x=sdf["timestamp"], y=[source] * len(sdf),
mode="markers", name=source,
marker=dict(size=12, color=colors[idx % len(colors)], symbol="diamond"),
hovertemplate="<b>%{y}</b><br>Time: %{x}<br><extra></extra>",
), row=1, col=1)
for gap in gaps:
if gap["suspicion_level"] in ["HIGH", "MODERATE"]:
color = "rgba(248, 81, 73, 0.2)" if gap["suspicion_level"] == "HIGH" else "rgba(255, 166, 87, 0.15)"
fig.add_vrect(x0=gap["start"], x1=gap["end"], fillcolor=color,
layer="below", line_width=0, row=1, col=1)
fig.add_trace(go.Histogram(x=df["timestamp"], nbinsx=20,
marker_color="#79c0ff", name="Density", opacity=0.7), row=2, col=1)
fig.update_layout(template="plotly_dark", paper_bgcolor="#0d1117",
plot_bgcolor="#161b22", font=dict(color="#e6edf3"), height=550, showlegend=True)
fig.update_xaxes(gridcolor="#30363d")
fig.update_yaxes(gridcolor="#30363d")
return fig
def _build_markdown(self, df, correlations, gaps, patterns):
md = "## πŸ“± Digital Evidence Correlation Report\n\n"
md += f"**Total Records:** {len(df)} | "
md += f"**Sources:** {', '.join(df['source'].unique()) if 'source' in df.columns else 'N/A'}\n\n"
if correlations:
md += f"### πŸ”— Correlations ({len(correlations)})\n\n"
md += "| Event 1 | Event 2 | Time Diff | Significance |\n|---------|---------|-----------|-------------|\n"
for c in correlations[:8]:
md += f"| {c['event_1'][:35]} | {c['event_2'][:35]} | {c['time_diff_min']}min | **{c['significance']}** |\n"
md += "\n"
if gaps:
md += f"### ⏸️ Timeline Gaps ({len(gaps)})\n\n"
for g in gaps:
icon = "πŸ”΄" if g["suspicion_level"] == "HIGH" else "🟑"
md += f"{icon} **{g['duration_min']:.0f} min gap** β€” {g['before_event'][:40]} β†’ {g['after_event'][:40]}\n\n"
if patterns:
md += f"### πŸ” Patterns ({len(patterns)})\n\n"
for p in patterns:
icon = "🚨" if p["significance"] == "CRITICAL" else "⚠️"
md += f"{icon} **{p['type'].replace('_', ' ').title()}** [{p['significance']}]: {p['description']}\n\n"
return md