File size: 7,783 Bytes
03e7fda
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
"""

engine/ripple_engine.py

------------------------

Propagates delays through the dependency DAG using BFS.

Computes cascade impact on all downstream activities.

"""

import pandas as pd
import numpy as np
import networkx as nx
from datetime import datetime, timedelta
from typing import Optional, Dict, List
from data_loader import DataLoader
from engine.dag_builder import build_dag, get_descendants


class RippleEngine:
    """

    Given a delay delta on one activity, propagate the effect to all

    downstream activities and compute the new project end date.

    """

    def __init__(self, G: nx.DiGraph, loader: Optional[DataLoader] = None):
        self.G = G
        self.loader = loader or DataLoader()

    def _get_activity(self, activity_id: str) -> Dict:
        """Return node attribute dict for an activity."""
        if activity_id in self.G.nodes:
            return dict(self.G.nodes[activity_id])
        return {}

    def _to_ts(self, val) -> Optional[pd.Timestamp]:
        if val is None or (isinstance(val, float) and np.isnan(val)):
            return None
        try:
            return pd.Timestamp(val)
        except Exception:
            return None

    def propagate_delay(self, affected_activity_id: str, delta_days: int,

                        reference_date: Optional[datetime] = None) -> Dict:
        """

        Simulate delaying `affected_activity_id` by `delta_days` and compute

        the cascade effect on all downstream activities.



        Returns

        -------

        dict with:

            - affected_activity_id

            - delta_days

            - cascade_table : DataFrame of impacted activities

            - new_project_end : pd.Timestamp or None

            - original_project_end : pd.Timestamp or None

            - total_project_delay_days : int

        """
        if reference_date is None:
            reference_date = datetime(2024, 6, 1)
        today = pd.Timestamp(reference_date)

        # Get all descendants (will be affected)
        downstream = get_descendants(self.G, affected_activity_id)

        # Compute original project end (max of all leaf node ends)
        leaf_nodes = [n for n in self.G.nodes if self.G.out_degree(n) == 0]
        original_project_end = None
        for node in leaf_nodes:
            end = self._to_ts(self.G.nodes[node].get("planned_end_date"))
            if end and (original_project_end is None or end > original_project_end):
                original_project_end = end

        # Compute shifted dates using topological traversal
        # shifted_ends dict: activity_id → new_end_date
        shifted_ends = {}
        shifted_starts = {}

        # The directly affected activity shifts by delta_days in its end
        act_data = self._get_activity(affected_activity_id)
        orig_end = self._to_ts(
            act_data.get("forecasted_end_date") or act_data.get("planned_end_date")
        )
        if orig_end is None:
            orig_end = self._to_ts(act_data.get("planned_end_date"))
        if orig_end:
            shifted_ends[affected_activity_id] = orig_end + timedelta(days=delta_days)
        else:
            shifted_ends[affected_activity_id] = today + timedelta(days=delta_days)

        # BFS propagation
        try:
            topo_order = list(nx.topological_sort(self.G))
        except Exception:
            topo_order = [affected_activity_id] + downstream

        cascade_rows = []

        for node_id in topo_order:
            if node_id not in downstream and node_id != affected_activity_id:
                continue

            node_data = self._get_activity(node_id)
            original_start = self._to_ts(
                node_data.get("planned_start_date")
            )
            original_end = self._to_ts(
                node_data.get("forecasted_end_date") or node_data.get("planned_end_date")
            )
            planned_dur = node_data.get(
                "planned_duration_days",
                (original_end - original_start).days if original_start and original_end else 14
            )

            # New start = max(original planned start, max of all predecessor new ends)
            pred_ends = []
            for pred_id in self.G.predecessors(node_id):
                if pred_id in shifted_ends:
                    pred_ends.append(shifted_ends[pred_id])
                else:
                    pred_data = self._get_activity(pred_id)
                    pred_end = self._to_ts(
                        pred_data.get("forecasted_end_date") or pred_data.get("planned_end_date")
                    )
                    if pred_end:
                        pred_ends.append(pred_end)

            if pred_ends:
                new_start = max(pred_ends)
                if original_start:
                    new_start = max(new_start, original_start)
            else:
                new_start = original_start or today

            new_end = new_start + timedelta(days=int(planned_dur or 14))
            shifted_starts[node_id] = new_start
            shifted_ends[node_id] = new_end

            cascade_delay = 0
            if original_end:
                cascade_delay = (new_end - original_end).days

            if node_id != affected_activity_id:
                cascade_rows.append({
                    "activity_id": node_id,
                    "activity_name": node_data.get("name", node_id),
                    "original_start": original_start,
                    "original_end": original_end,
                    "new_start": new_start,
                    "new_end": new_end,
                    "cascade_delay_days": cascade_delay,
                    "has_open_issues": node_data.get("issue_count", 0) > 0,
                })

        # New project end
        new_project_end = None
        for node in leaf_nodes:
            end = shifted_ends.get(node) or self._to_ts(
                self.G.nodes[node].get("planned_end_date")
            )
            if end and (new_project_end is None or end > new_project_end):
                new_project_end = end

        total_project_delay = 0
        if original_project_end and new_project_end:
            total_project_delay = (new_project_end - original_project_end).days

        cascade_df = pd.DataFrame(cascade_rows) if cascade_rows else pd.DataFrame(
            columns=["activity_id", "activity_name", "original_start", "original_end",
                     "new_start", "new_end", "cascade_delay_days", "has_open_issues"]
        )

        return {
            "affected_activity_id": affected_activity_id,
            "delta_days": delta_days,
            "cascade_table": cascade_df,
            "new_project_end": new_project_end,
            "original_project_end": original_project_end,
            "total_project_delay_days": total_project_delay,
            "num_activities_affected": len(cascade_rows),
        }

    def get_high_impact_activities(self, top_n: int = 5) -> pd.DataFrame:
        """

        Find activities whose 1-day delay causes the most downstream impact.

        """
        rows = []
        for node_id in self.G.nodes:
            descendants = get_descendants(self.G, node_id)
            rows.append({
                "activity_id": node_id,
                "activity_name": self.G.nodes[node_id].get("name", node_id),
                "downstream_count": len(descendants),
                "status": self.G.nodes[node_id].get("status", "unknown"),
            })
        df = pd.DataFrame(rows).sort_values("downstream_count", ascending=False)
        return df.head(top_n)