Muthukumarank commited on
Commit
e38efd1
·
verified ·
1 Parent(s): b6ab230

Add modules/timeline_builder.py

Browse files
Files changed (1) hide show
  1. modules/timeline_builder.py +140 -0
modules/timeline_builder.py ADDED
@@ -0,0 +1,140 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Module 5: Timeline Builder
3
+ ============================
4
+ Builds integrated investigation timelines from all evidence sources.
5
+ """
6
+
7
+ import pandas as pd
8
+ import numpy as np
9
+ from typing import Dict, List, Any
10
+ import plotly.graph_objects as go
11
+ import plotly.express as px
12
+
13
+
14
+ class TimelineBuilder:
15
+ """Builds integrated forensic investigation timelines."""
16
+
17
+ CATEGORY_COLORS = {
18
+ "Physical Evidence": "#f85149",
19
+ "Digital Evidence": "#79c0ff",
20
+ "TOD Window": "#ffa657",
21
+ "Witness/Report": "#56d364",
22
+ "Analysis": "#d2a8ff",
23
+ "Unknown": "#8b949e",
24
+ }
25
+
26
+ def build(self, state: Dict) -> Dict[str, Any]:
27
+ events = self._collect_events(state)
28
+ if not events:
29
+ return {"timeline_plot": self._empty_plot(), "timeline_table": pd.DataFrame(),
30
+ "summary_markdown": "## ⚠️ No timeline events available"}
31
+
32
+ events_df = self._build_df(events)
33
+ timeline_plot = self._build_plot(events_df)
34
+ summary = self._build_summary(events_df)
35
+ return {"timeline_plot": timeline_plot, "timeline_table": events_df, "summary_markdown": summary}
36
+
37
+ def _collect_events(self, state):
38
+ events = []
39
+ for ev in state.get("timeline_events", []):
40
+ if isinstance(ev, dict):
41
+ events.append({
42
+ "event": ev.get("event", "Unknown"),
43
+ "category": ev.get("category", "Unknown"),
44
+ "source": ev.get("source", "Unknown"),
45
+ "timestamp": ev.get("timestamp", ""),
46
+ })
47
+
48
+ tod = state.get("tod_estimate", {})
49
+ if tod and tod.get("estimated_pmi_hours"):
50
+ events.append({
51
+ "event": f"Estimated TOD (PMI: {tod['estimated_pmi_hours']}h)",
52
+ "category": "TOD Window", "source": "Henssge Model",
53
+ "timestamp": f"~{tod['estimated_pmi_hours']}h before discovery",
54
+ })
55
+
56
+ if state.get("risk_score"):
57
+ events.append({
58
+ "event": f"Risk Score: {state['risk_score']:.0f}/100",
59
+ "category": "Analysis", "source": "Risk Engine",
60
+ "timestamp": "Assessment",
61
+ })
62
+ return events
63
+
64
+ def _build_df(self, events):
65
+ df = pd.DataFrame(events)
66
+ df["sort_key"] = 0
67
+ for idx, row in df.iterrows():
68
+ try:
69
+ parsed = pd.to_datetime(row["timestamp"])
70
+ df.at[idx, "sort_key"] = parsed.timestamp()
71
+ except:
72
+ df.at[idx, "sort_key"] = 9999999999 + idx
73
+ df = df.sort_values("sort_key").reset_index(drop=True)
74
+ df = df.drop(columns=["sort_key"])
75
+ df.insert(0, "#", range(1, len(df) + 1))
76
+ return df
77
+
78
+ def _build_plot(self, df):
79
+ fig = go.Figure()
80
+ parseable = []
81
+ for _, row in df.iterrows():
82
+ try:
83
+ ts = pd.to_datetime(row["timestamp"])
84
+ parseable.append({**row.to_dict(), "parsed_time": ts})
85
+ except:
86
+ pass
87
+
88
+ if parseable:
89
+ pe_df = pd.DataFrame(parseable)
90
+ for category in pe_df["category"].unique():
91
+ cat_ev = pe_df[pe_df["category"] == category]
92
+ color = self.CATEGORY_COLORS.get(category, "#8b949e")
93
+ fig.add_trace(go.Scatter(
94
+ x=cat_ev["parsed_time"], y=[category] * len(cat_ev),
95
+ mode="markers+text", name=category,
96
+ marker=dict(size=14, color=color, symbol="diamond"),
97
+ text=cat_ev["event"].str[:35],
98
+ textposition="top center", textfont=dict(size=8, color=color),
99
+ ))
100
+ else:
101
+ for idx, row in df.iterrows():
102
+ cat = row.get("category", "Unknown")
103
+ color = self.CATEGORY_COLORS.get(cat, "#8b949e")
104
+ fig.add_trace(go.Scatter(
105
+ x=[idx], y=[cat], mode="markers", showlegend=False,
106
+ marker=dict(size=14, color=color, symbol="diamond"),
107
+ hovertemplate=f"<b>{row.get('event', '')}</b><extra></extra>",
108
+ ))
109
+
110
+ fig.update_layout(
111
+ title="🔬 Integrated Forensic Timeline",
112
+ template="plotly_dark", paper_bgcolor="#0d1117", plot_bgcolor="#161b22",
113
+ font=dict(color="#e6edf3"), height=450, showlegend=True,
114
+ )
115
+ fig.update_xaxes(gridcolor="#30363d")
116
+ fig.update_yaxes(gridcolor="#30363d")
117
+ return fig
118
+
119
+ def _build_summary(self, df):
120
+ md = "## 📅 Timeline Summary\n\n"
121
+ md += f"**Total Events:** {len(df)}\n\n"
122
+ if "category" in df.columns:
123
+ md += "### Distribution\n| Category | Count |\n|----------|-------|\n"
124
+ for cat, cnt in df["category"].value_counts().items():
125
+ md += f"| {cat} | {cnt} |\n"
126
+ md += "\n"
127
+ md += "### Event Sequence\n\n"
128
+ for _, row in df.iterrows():
129
+ md += f"{row['#']}. **[{row.get('category', '')}]** {row.get('event', '')} "
130
+ md += f"*(Source: {row.get('source', '')})* — `{row.get('timestamp', '')}`\n"
131
+ md += "\n---\n*Timeline ordered by timestamp where parseable.*\n"
132
+ return md
133
+
134
+ def _empty_plot(self):
135
+ fig = go.Figure()
136
+ fig.add_annotation(text="No timeline data yet.", xref="paper", yref="paper",
137
+ x=0.5, y=0.5, showarrow=False, font=dict(size=16, color="#8b949e"))
138
+ fig.update_layout(template="plotly_dark", paper_bgcolor="#0d1117",
139
+ plot_bgcolor="#161b22", height=400)
140
+ return fig