suvradeepp's picture
Upload 63 files
03e7fda verified
"""
engine/ripple_engine.py
------------------------
Propagates delays through the dependency DAG using BFS.
Computes cascade impact on all downstream activities.
"""
import pandas as pd
import numpy as np
import networkx as nx
from datetime import datetime, timedelta
from typing import Optional, Dict, List
from data_loader import DataLoader
from engine.dag_builder import build_dag, get_descendants
class RippleEngine:
"""
Given a delay delta on one activity, propagate the effect to all
downstream activities and compute the new project end date.
"""
def __init__(self, G: nx.DiGraph, loader: Optional[DataLoader] = None):
self.G = G
self.loader = loader or DataLoader()
def _get_activity(self, activity_id: str) -> Dict:
"""Return node attribute dict for an activity."""
if activity_id in self.G.nodes:
return dict(self.G.nodes[activity_id])
return {}
def _to_ts(self, val) -> Optional[pd.Timestamp]:
if val is None or (isinstance(val, float) and np.isnan(val)):
return None
try:
return pd.Timestamp(val)
except Exception:
return None
def propagate_delay(self, affected_activity_id: str, delta_days: int,
reference_date: Optional[datetime] = None) -> Dict:
"""
Simulate delaying `affected_activity_id` by `delta_days` and compute
the cascade effect on all downstream activities.
Returns
-------
dict with:
- affected_activity_id
- delta_days
- cascade_table : DataFrame of impacted activities
- new_project_end : pd.Timestamp or None
- original_project_end : pd.Timestamp or None
- total_project_delay_days : int
"""
if reference_date is None:
reference_date = datetime(2024, 6, 1)
today = pd.Timestamp(reference_date)
# Get all descendants (will be affected)
downstream = get_descendants(self.G, affected_activity_id)
# Compute original project end (max of all leaf node ends)
leaf_nodes = [n for n in self.G.nodes if self.G.out_degree(n) == 0]
original_project_end = None
for node in leaf_nodes:
end = self._to_ts(self.G.nodes[node].get("planned_end_date"))
if end and (original_project_end is None or end > original_project_end):
original_project_end = end
# Compute shifted dates using topological traversal
# shifted_ends dict: activity_id → new_end_date
shifted_ends = {}
shifted_starts = {}
# The directly affected activity shifts by delta_days in its end
act_data = self._get_activity(affected_activity_id)
orig_end = self._to_ts(
act_data.get("forecasted_end_date") or act_data.get("planned_end_date")
)
if orig_end is None:
orig_end = self._to_ts(act_data.get("planned_end_date"))
if orig_end:
shifted_ends[affected_activity_id] = orig_end + timedelta(days=delta_days)
else:
shifted_ends[affected_activity_id] = today + timedelta(days=delta_days)
# BFS propagation
try:
topo_order = list(nx.topological_sort(self.G))
except Exception:
topo_order = [affected_activity_id] + downstream
cascade_rows = []
for node_id in topo_order:
if node_id not in downstream and node_id != affected_activity_id:
continue
node_data = self._get_activity(node_id)
original_start = self._to_ts(
node_data.get("planned_start_date")
)
original_end = self._to_ts(
node_data.get("forecasted_end_date") or node_data.get("planned_end_date")
)
planned_dur = node_data.get(
"planned_duration_days",
(original_end - original_start).days if original_start and original_end else 14
)
# New start = max(original planned start, max of all predecessor new ends)
pred_ends = []
for pred_id in self.G.predecessors(node_id):
if pred_id in shifted_ends:
pred_ends.append(shifted_ends[pred_id])
else:
pred_data = self._get_activity(pred_id)
pred_end = self._to_ts(
pred_data.get("forecasted_end_date") or pred_data.get("planned_end_date")
)
if pred_end:
pred_ends.append(pred_end)
if pred_ends:
new_start = max(pred_ends)
if original_start:
new_start = max(new_start, original_start)
else:
new_start = original_start or today
new_end = new_start + timedelta(days=int(planned_dur or 14))
shifted_starts[node_id] = new_start
shifted_ends[node_id] = new_end
cascade_delay = 0
if original_end:
cascade_delay = (new_end - original_end).days
if node_id != affected_activity_id:
cascade_rows.append({
"activity_id": node_id,
"activity_name": node_data.get("name", node_id),
"original_start": original_start,
"original_end": original_end,
"new_start": new_start,
"new_end": new_end,
"cascade_delay_days": cascade_delay,
"has_open_issues": node_data.get("issue_count", 0) > 0,
})
# New project end
new_project_end = None
for node in leaf_nodes:
end = shifted_ends.get(node) or self._to_ts(
self.G.nodes[node].get("planned_end_date")
)
if end and (new_project_end is None or end > new_project_end):
new_project_end = end
total_project_delay = 0
if original_project_end and new_project_end:
total_project_delay = (new_project_end - original_project_end).days
cascade_df = pd.DataFrame(cascade_rows) if cascade_rows else pd.DataFrame(
columns=["activity_id", "activity_name", "original_start", "original_end",
"new_start", "new_end", "cascade_delay_days", "has_open_issues"]
)
return {
"affected_activity_id": affected_activity_id,
"delta_days": delta_days,
"cascade_table": cascade_df,
"new_project_end": new_project_end,
"original_project_end": original_project_end,
"total_project_delay_days": total_project_delay,
"num_activities_affected": len(cascade_rows),
}
def get_high_impact_activities(self, top_n: int = 5) -> pd.DataFrame:
"""
Find activities whose 1-day delay causes the most downstream impact.
"""
rows = []
for node_id in self.G.nodes:
descendants = get_descendants(self.G, node_id)
rows.append({
"activity_id": node_id,
"activity_name": self.G.nodes[node_id].get("name", node_id),
"downstream_count": len(descendants),
"status": self.G.nodes[node_id].get("status", "unknown"),
})
df = pd.DataFrame(rows).sort_values("downstream_count", ascending=False)
return df.head(top_n)