| """ |
| Module 5: Timeline Builder |
| ============================ |
| Builds integrated investigation timelines from all evidence sources. |
| """ |
|
|
| import pandas as pd |
| import numpy as np |
| from typing import Dict, List, Any |
| import plotly.graph_objects as go |
| import plotly.express as px |
|
|
|
|
| class TimelineBuilder: |
| """Builds integrated forensic investigation timelines.""" |
|
|
| CATEGORY_COLORS = { |
| "Physical Evidence": "#f85149", |
| "Digital Evidence": "#79c0ff", |
| "TOD Window": "#ffa657", |
| "Witness/Report": "#56d364", |
| "Analysis": "#d2a8ff", |
| "Unknown": "#8b949e", |
| } |
|
|
| def build(self, state: Dict) -> Dict[str, Any]: |
| events = self._collect_events(state) |
| if not events: |
| return {"timeline_plot": self._empty_plot(), "timeline_table": pd.DataFrame(), |
| "summary_markdown": "## ⚠️ No timeline events available"} |
|
|
| events_df = self._build_df(events) |
| timeline_plot = self._build_plot(events_df) |
| summary = self._build_summary(events_df) |
| return {"timeline_plot": timeline_plot, "timeline_table": events_df, "summary_markdown": summary} |
|
|
| def _collect_events(self, state): |
| events = [] |
| for ev in state.get("timeline_events", []): |
| if isinstance(ev, dict): |
| events.append({ |
| "event": ev.get("event", "Unknown"), |
| "category": ev.get("category", "Unknown"), |
| "source": ev.get("source", "Unknown"), |
| "timestamp": ev.get("timestamp", ""), |
| }) |
|
|
| tod = state.get("tod_estimate", {}) |
| if tod and tod.get("estimated_pmi_hours"): |
| events.append({ |
| "event": f"Estimated TOD (PMI: {tod['estimated_pmi_hours']}h)", |
| "category": "TOD Window", "source": "Henssge Model", |
| "timestamp": f"~{tod['estimated_pmi_hours']}h before discovery", |
| }) |
|
|
| if state.get("risk_score"): |
| events.append({ |
| "event": f"Risk Score: {state['risk_score']:.0f}/100", |
| "category": "Analysis", "source": "Risk Engine", |
| "timestamp": "Assessment", |
| }) |
| return events |
|
|
| def _build_df(self, events): |
| df = pd.DataFrame(events) |
| df["sort_key"] = 0 |
| for idx, row in df.iterrows(): |
| try: |
| parsed = pd.to_datetime(row["timestamp"]) |
| df.at[idx, "sort_key"] = parsed.timestamp() |
| except: |
| df.at[idx, "sort_key"] = 9999999999 + idx |
| df = df.sort_values("sort_key").reset_index(drop=True) |
| df = df.drop(columns=["sort_key"]) |
| df.insert(0, "#", range(1, len(df) + 1)) |
| return df |
|
|
| def _build_plot(self, df): |
| fig = go.Figure() |
| parseable = [] |
| for _, row in df.iterrows(): |
| try: |
| ts = pd.to_datetime(row["timestamp"]) |
| parseable.append({**row.to_dict(), "parsed_time": ts}) |
| except: |
| pass |
|
|
| if parseable: |
| pe_df = pd.DataFrame(parseable) |
| for category in pe_df["category"].unique(): |
| cat_ev = pe_df[pe_df["category"] == category] |
| color = self.CATEGORY_COLORS.get(category, "#8b949e") |
| fig.add_trace(go.Scatter( |
| x=cat_ev["parsed_time"], y=[category] * len(cat_ev), |
| mode="markers+text", name=category, |
| marker=dict(size=14, color=color, symbol="diamond"), |
| text=cat_ev["event"].str[:35], |
| textposition="top center", textfont=dict(size=8, color=color), |
| )) |
| else: |
| for idx, row in df.iterrows(): |
| cat = row.get("category", "Unknown") |
| color = self.CATEGORY_COLORS.get(cat, "#8b949e") |
| fig.add_trace(go.Scatter( |
| x=[idx], y=[cat], mode="markers", showlegend=False, |
| marker=dict(size=14, color=color, symbol="diamond"), |
| hovertemplate=f"<b>{row.get('event', '')}</b><extra></extra>", |
| )) |
|
|
| fig.update_layout( |
| title="🔬 Integrated Forensic Timeline", |
| template="plotly_dark", paper_bgcolor="#0d1117", plot_bgcolor="#161b22", |
| font=dict(color="#e6edf3"), height=450, showlegend=True, |
| ) |
| fig.update_xaxes(gridcolor="#30363d") |
| fig.update_yaxes(gridcolor="#30363d") |
| return fig |
|
|
| def _build_summary(self, df): |
| md = "## 📅 Timeline Summary\n\n" |
| md += f"**Total Events:** {len(df)}\n\n" |
| if "category" in df.columns: |
| md += "### Distribution\n| Category | Count |\n|----------|-------|\n" |
| for cat, cnt in df["category"].value_counts().items(): |
| md += f"| {cat} | {cnt} |\n" |
| md += "\n" |
| md += "### Event Sequence\n\n" |
| for _, row in df.iterrows(): |
| md += f"{row['#']}. **[{row.get('category', '')}]** {row.get('event', '')} " |
| md += f"*(Source: {row.get('source', '')})* — `{row.get('timestamp', '')}`\n" |
| md += "\n---\n*Timeline ordered by timestamp where parseable.*\n" |
| return md |
|
|
| def _empty_plot(self): |
| fig = go.Figure() |
| fig.add_annotation(text="No timeline data yet.", xref="paper", yref="paper", |
| x=0.5, y=0.5, showarrow=False, font=dict(size=16, color="#8b949e")) |
| fig.update_layout(template="plotly_dark", paper_bgcolor="#0d1117", |
| plot_bgcolor="#161b22", height=400) |
| return fig |
|
|