Spaces:
Configuration error
Configuration error
File size: 7,783 Bytes
03e7fda | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 | """
engine/ripple_engine.py
------------------------
Propagates delays through the dependency DAG using BFS.
Computes cascade impact on all downstream activities.
"""
import pandas as pd
import numpy as np
import networkx as nx
from datetime import datetime, timedelta
from typing import Optional, Dict, List
from data_loader import DataLoader
from engine.dag_builder import build_dag, get_descendants
class RippleEngine:
"""
Given a delay delta on one activity, propagate the effect to all
downstream activities and compute the new project end date.
"""
def __init__(self, G: nx.DiGraph, loader: Optional[DataLoader] = None):
self.G = G
self.loader = loader or DataLoader()
def _get_activity(self, activity_id: str) -> Dict:
"""Return node attribute dict for an activity."""
if activity_id in self.G.nodes:
return dict(self.G.nodes[activity_id])
return {}
def _to_ts(self, val) -> Optional[pd.Timestamp]:
if val is None or (isinstance(val, float) and np.isnan(val)):
return None
try:
return pd.Timestamp(val)
except Exception:
return None
def propagate_delay(self, affected_activity_id: str, delta_days: int,
reference_date: Optional[datetime] = None) -> Dict:
"""
Simulate delaying `affected_activity_id` by `delta_days` and compute
the cascade effect on all downstream activities.
Returns
-------
dict with:
- affected_activity_id
- delta_days
- cascade_table : DataFrame of impacted activities
- new_project_end : pd.Timestamp or None
- original_project_end : pd.Timestamp or None
- total_project_delay_days : int
"""
if reference_date is None:
reference_date = datetime(2024, 6, 1)
today = pd.Timestamp(reference_date)
# Get all descendants (will be affected)
downstream = get_descendants(self.G, affected_activity_id)
# Compute original project end (max of all leaf node ends)
leaf_nodes = [n for n in self.G.nodes if self.G.out_degree(n) == 0]
original_project_end = None
for node in leaf_nodes:
end = self._to_ts(self.G.nodes[node].get("planned_end_date"))
if end and (original_project_end is None or end > original_project_end):
original_project_end = end
# Compute shifted dates using topological traversal
# shifted_ends dict: activity_id → new_end_date
shifted_ends = {}
shifted_starts = {}
# The directly affected activity shifts by delta_days in its end
act_data = self._get_activity(affected_activity_id)
orig_end = self._to_ts(
act_data.get("forecasted_end_date") or act_data.get("planned_end_date")
)
if orig_end is None:
orig_end = self._to_ts(act_data.get("planned_end_date"))
if orig_end:
shifted_ends[affected_activity_id] = orig_end + timedelta(days=delta_days)
else:
shifted_ends[affected_activity_id] = today + timedelta(days=delta_days)
# BFS propagation
try:
topo_order = list(nx.topological_sort(self.G))
except Exception:
topo_order = [affected_activity_id] + downstream
cascade_rows = []
for node_id in topo_order:
if node_id not in downstream and node_id != affected_activity_id:
continue
node_data = self._get_activity(node_id)
original_start = self._to_ts(
node_data.get("planned_start_date")
)
original_end = self._to_ts(
node_data.get("forecasted_end_date") or node_data.get("planned_end_date")
)
planned_dur = node_data.get(
"planned_duration_days",
(original_end - original_start).days if original_start and original_end else 14
)
# New start = max(original planned start, max of all predecessor new ends)
pred_ends = []
for pred_id in self.G.predecessors(node_id):
if pred_id in shifted_ends:
pred_ends.append(shifted_ends[pred_id])
else:
pred_data = self._get_activity(pred_id)
pred_end = self._to_ts(
pred_data.get("forecasted_end_date") or pred_data.get("planned_end_date")
)
if pred_end:
pred_ends.append(pred_end)
if pred_ends:
new_start = max(pred_ends)
if original_start:
new_start = max(new_start, original_start)
else:
new_start = original_start or today
new_end = new_start + timedelta(days=int(planned_dur or 14))
shifted_starts[node_id] = new_start
shifted_ends[node_id] = new_end
cascade_delay = 0
if original_end:
cascade_delay = (new_end - original_end).days
if node_id != affected_activity_id:
cascade_rows.append({
"activity_id": node_id,
"activity_name": node_data.get("name", node_id),
"original_start": original_start,
"original_end": original_end,
"new_start": new_start,
"new_end": new_end,
"cascade_delay_days": cascade_delay,
"has_open_issues": node_data.get("issue_count", 0) > 0,
})
# New project end
new_project_end = None
for node in leaf_nodes:
end = shifted_ends.get(node) or self._to_ts(
self.G.nodes[node].get("planned_end_date")
)
if end and (new_project_end is None or end > new_project_end):
new_project_end = end
total_project_delay = 0
if original_project_end and new_project_end:
total_project_delay = (new_project_end - original_project_end).days
cascade_df = pd.DataFrame(cascade_rows) if cascade_rows else pd.DataFrame(
columns=["activity_id", "activity_name", "original_start", "original_end",
"new_start", "new_end", "cascade_delay_days", "has_open_issues"]
)
return {
"affected_activity_id": affected_activity_id,
"delta_days": delta_days,
"cascade_table": cascade_df,
"new_project_end": new_project_end,
"original_project_end": original_project_end,
"total_project_delay_days": total_project_delay,
"num_activities_affected": len(cascade_rows),
}
def get_high_impact_activities(self, top_n: int = 5) -> pd.DataFrame:
"""
Find activities whose 1-day delay causes the most downstream impact.
"""
rows = []
for node_id in self.G.nodes:
descendants = get_descendants(self.G, node_id)
rows.append({
"activity_id": node_id,
"activity_name": self.G.nodes[node_id].get("name", node_id),
"downstream_count": len(descendants),
"status": self.G.nodes[node_id].get("status", "unknown"),
})
df = pd.DataFrame(rows).sort_values("downstream_count", ascending=False)
return df.head(top_n)
|