File size: 5,644 Bytes
e38efd1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 | """
Module 5: Timeline Builder
============================
Builds integrated investigation timelines from all evidence sources.
"""
import pandas as pd
import numpy as np
from typing import Dict, List, Any
import plotly.graph_objects as go
import plotly.express as px
class TimelineBuilder:
"""Builds integrated forensic investigation timelines."""
CATEGORY_COLORS = {
"Physical Evidence": "#f85149",
"Digital Evidence": "#79c0ff",
"TOD Window": "#ffa657",
"Witness/Report": "#56d364",
"Analysis": "#d2a8ff",
"Unknown": "#8b949e",
}
def build(self, state: Dict) -> Dict[str, Any]:
events = self._collect_events(state)
if not events:
return {"timeline_plot": self._empty_plot(), "timeline_table": pd.DataFrame(),
"summary_markdown": "## ⚠️ No timeline events available"}
events_df = self._build_df(events)
timeline_plot = self._build_plot(events_df)
summary = self._build_summary(events_df)
return {"timeline_plot": timeline_plot, "timeline_table": events_df, "summary_markdown": summary}
def _collect_events(self, state):
events = []
for ev in state.get("timeline_events", []):
if isinstance(ev, dict):
events.append({
"event": ev.get("event", "Unknown"),
"category": ev.get("category", "Unknown"),
"source": ev.get("source", "Unknown"),
"timestamp": ev.get("timestamp", ""),
})
tod = state.get("tod_estimate", {})
if tod and tod.get("estimated_pmi_hours"):
events.append({
"event": f"Estimated TOD (PMI: {tod['estimated_pmi_hours']}h)",
"category": "TOD Window", "source": "Henssge Model",
"timestamp": f"~{tod['estimated_pmi_hours']}h before discovery",
})
if state.get("risk_score"):
events.append({
"event": f"Risk Score: {state['risk_score']:.0f}/100",
"category": "Analysis", "source": "Risk Engine",
"timestamp": "Assessment",
})
return events
def _build_df(self, events):
df = pd.DataFrame(events)
df["sort_key"] = 0
for idx, row in df.iterrows():
try:
parsed = pd.to_datetime(row["timestamp"])
df.at[idx, "sort_key"] = parsed.timestamp()
except:
df.at[idx, "sort_key"] = 9999999999 + idx
df = df.sort_values("sort_key").reset_index(drop=True)
df = df.drop(columns=["sort_key"])
df.insert(0, "#", range(1, len(df) + 1))
return df
def _build_plot(self, df):
fig = go.Figure()
parseable = []
for _, row in df.iterrows():
try:
ts = pd.to_datetime(row["timestamp"])
parseable.append({**row.to_dict(), "parsed_time": ts})
except:
pass
if parseable:
pe_df = pd.DataFrame(parseable)
for category in pe_df["category"].unique():
cat_ev = pe_df[pe_df["category"] == category]
color = self.CATEGORY_COLORS.get(category, "#8b949e")
fig.add_trace(go.Scatter(
x=cat_ev["parsed_time"], y=[category] * len(cat_ev),
mode="markers+text", name=category,
marker=dict(size=14, color=color, symbol="diamond"),
text=cat_ev["event"].str[:35],
textposition="top center", textfont=dict(size=8, color=color),
))
else:
for idx, row in df.iterrows():
cat = row.get("category", "Unknown")
color = self.CATEGORY_COLORS.get(cat, "#8b949e")
fig.add_trace(go.Scatter(
x=[idx], y=[cat], mode="markers", showlegend=False,
marker=dict(size=14, color=color, symbol="diamond"),
hovertemplate=f"<b>{row.get('event', '')}</b><extra></extra>",
))
fig.update_layout(
title="🔬 Integrated Forensic Timeline",
template="plotly_dark", paper_bgcolor="#0d1117", plot_bgcolor="#161b22",
font=dict(color="#e6edf3"), height=450, showlegend=True,
)
fig.update_xaxes(gridcolor="#30363d")
fig.update_yaxes(gridcolor="#30363d")
return fig
def _build_summary(self, df):
md = "## 📅 Timeline Summary\n\n"
md += f"**Total Events:** {len(df)}\n\n"
if "category" in df.columns:
md += "### Distribution\n| Category | Count |\n|----------|-------|\n"
for cat, cnt in df["category"].value_counts().items():
md += f"| {cat} | {cnt} |\n"
md += "\n"
md += "### Event Sequence\n\n"
for _, row in df.iterrows():
md += f"{row['#']}. **[{row.get('category', '')}]** {row.get('event', '')} "
md += f"*(Source: {row.get('source', '')})* — `{row.get('timestamp', '')}`\n"
md += "\n---\n*Timeline ordered by timestamp where parseable.*\n"
return md
def _empty_plot(self):
fig = go.Figure()
fig.add_annotation(text="No timeline data yet.", xref="paper", yref="paper",
x=0.5, y=0.5, showarrow=False, font=dict(size=16, color="#8b949e"))
fig.update_layout(template="plotly_dark", paper_bgcolor="#0d1117",
plot_bgcolor="#161b22", height=400)
return fig
|