Spaces:
Configuration error
Configuration error
| """ | |
| engine/ripple_engine.py | |
| ------------------------ | |
| Propagates delays through the dependency DAG using BFS. | |
| Computes cascade impact on all downstream activities. | |
| """ | |
| import pandas as pd | |
| import numpy as np | |
| import networkx as nx | |
| from datetime import datetime, timedelta | |
| from typing import Optional, Dict, List | |
| from data_loader import DataLoader | |
| from engine.dag_builder import build_dag, get_descendants | |
| class RippleEngine: | |
| """ | |
| Given a delay delta on one activity, propagate the effect to all | |
| downstream activities and compute the new project end date. | |
| """ | |
| def __init__(self, G: nx.DiGraph, loader: Optional[DataLoader] = None): | |
| self.G = G | |
| self.loader = loader or DataLoader() | |
| def _get_activity(self, activity_id: str) -> Dict: | |
| """Return node attribute dict for an activity.""" | |
| if activity_id in self.G.nodes: | |
| return dict(self.G.nodes[activity_id]) | |
| return {} | |
| def _to_ts(self, val) -> Optional[pd.Timestamp]: | |
| if val is None or (isinstance(val, float) and np.isnan(val)): | |
| return None | |
| try: | |
| return pd.Timestamp(val) | |
| except Exception: | |
| return None | |
| def propagate_delay(self, affected_activity_id: str, delta_days: int, | |
| reference_date: Optional[datetime] = None) -> Dict: | |
| """ | |
| Simulate delaying `affected_activity_id` by `delta_days` and compute | |
| the cascade effect on all downstream activities. | |
| Returns | |
| ------- | |
| dict with: | |
| - affected_activity_id | |
| - delta_days | |
| - cascade_table : DataFrame of impacted activities | |
| - new_project_end : pd.Timestamp or None | |
| - original_project_end : pd.Timestamp or None | |
| - total_project_delay_days : int | |
| """ | |
| if reference_date is None: | |
| reference_date = datetime(2024, 6, 1) | |
| today = pd.Timestamp(reference_date) | |
| # Get all descendants (will be affected) | |
| downstream = get_descendants(self.G, affected_activity_id) | |
| # Compute original project end (max of all leaf node ends) | |
| leaf_nodes = [n for n in self.G.nodes if self.G.out_degree(n) == 0] | |
| original_project_end = None | |
| for node in leaf_nodes: | |
| end = self._to_ts(self.G.nodes[node].get("planned_end_date")) | |
| if end and (original_project_end is None or end > original_project_end): | |
| original_project_end = end | |
| # Compute shifted dates using topological traversal | |
| # shifted_ends dict: activity_id → new_end_date | |
| shifted_ends = {} | |
| shifted_starts = {} | |
| # The directly affected activity shifts by delta_days in its end | |
| act_data = self._get_activity(affected_activity_id) | |
| orig_end = self._to_ts( | |
| act_data.get("forecasted_end_date") or act_data.get("planned_end_date") | |
| ) | |
| if orig_end is None: | |
| orig_end = self._to_ts(act_data.get("planned_end_date")) | |
| if orig_end: | |
| shifted_ends[affected_activity_id] = orig_end + timedelta(days=delta_days) | |
| else: | |
| shifted_ends[affected_activity_id] = today + timedelta(days=delta_days) | |
| # BFS propagation | |
| try: | |
| topo_order = list(nx.topological_sort(self.G)) | |
| except Exception: | |
| topo_order = [affected_activity_id] + downstream | |
| cascade_rows = [] | |
| for node_id in topo_order: | |
| if node_id not in downstream and node_id != affected_activity_id: | |
| continue | |
| node_data = self._get_activity(node_id) | |
| original_start = self._to_ts( | |
| node_data.get("planned_start_date") | |
| ) | |
| original_end = self._to_ts( | |
| node_data.get("forecasted_end_date") or node_data.get("planned_end_date") | |
| ) | |
| planned_dur = node_data.get( | |
| "planned_duration_days", | |
| (original_end - original_start).days if original_start and original_end else 14 | |
| ) | |
| # New start = max(original planned start, max of all predecessor new ends) | |
| pred_ends = [] | |
| for pred_id in self.G.predecessors(node_id): | |
| if pred_id in shifted_ends: | |
| pred_ends.append(shifted_ends[pred_id]) | |
| else: | |
| pred_data = self._get_activity(pred_id) | |
| pred_end = self._to_ts( | |
| pred_data.get("forecasted_end_date") or pred_data.get("planned_end_date") | |
| ) | |
| if pred_end: | |
| pred_ends.append(pred_end) | |
| if pred_ends: | |
| new_start = max(pred_ends) | |
| if original_start: | |
| new_start = max(new_start, original_start) | |
| else: | |
| new_start = original_start or today | |
| new_end = new_start + timedelta(days=int(planned_dur or 14)) | |
| shifted_starts[node_id] = new_start | |
| shifted_ends[node_id] = new_end | |
| cascade_delay = 0 | |
| if original_end: | |
| cascade_delay = (new_end - original_end).days | |
| if node_id != affected_activity_id: | |
| cascade_rows.append({ | |
| "activity_id": node_id, | |
| "activity_name": node_data.get("name", node_id), | |
| "original_start": original_start, | |
| "original_end": original_end, | |
| "new_start": new_start, | |
| "new_end": new_end, | |
| "cascade_delay_days": cascade_delay, | |
| "has_open_issues": node_data.get("issue_count", 0) > 0, | |
| }) | |
| # New project end | |
| new_project_end = None | |
| for node in leaf_nodes: | |
| end = shifted_ends.get(node) or self._to_ts( | |
| self.G.nodes[node].get("planned_end_date") | |
| ) | |
| if end and (new_project_end is None or end > new_project_end): | |
| new_project_end = end | |
| total_project_delay = 0 | |
| if original_project_end and new_project_end: | |
| total_project_delay = (new_project_end - original_project_end).days | |
| cascade_df = pd.DataFrame(cascade_rows) if cascade_rows else pd.DataFrame( | |
| columns=["activity_id", "activity_name", "original_start", "original_end", | |
| "new_start", "new_end", "cascade_delay_days", "has_open_issues"] | |
| ) | |
| return { | |
| "affected_activity_id": affected_activity_id, | |
| "delta_days": delta_days, | |
| "cascade_table": cascade_df, | |
| "new_project_end": new_project_end, | |
| "original_project_end": original_project_end, | |
| "total_project_delay_days": total_project_delay, | |
| "num_activities_affected": len(cascade_rows), | |
| } | |
| def get_high_impact_activities(self, top_n: int = 5) -> pd.DataFrame: | |
| """ | |
| Find activities whose 1-day delay causes the most downstream impact. | |
| """ | |
| rows = [] | |
| for node_id in self.G.nodes: | |
| descendants = get_descendants(self.G, node_id) | |
| rows.append({ | |
| "activity_id": node_id, | |
| "activity_name": self.G.nodes[node_id].get("name", node_id), | |
| "downstream_count": len(descendants), | |
| "status": self.G.nodes[node_id].get("status", "unknown"), | |
| }) | |
| df = pd.DataFrame(rows).sort_values("downstream_count", ascending=False) | |
| return df.head(top_n) | |