Spaces:
Configuration error
Configuration error
| """ | |
| optimizer/schedule_optimizer.py | |
| --------------------------------- | |
| Critical Path Method (CPM) + rule-based schedule optimization suggestions. | |
| """ | |
| import pandas as pd | |
| import numpy as np | |
| import networkx as nx | |
| from datetime import datetime, timedelta | |
| from typing import Optional, List, Dict | |
| from data_loader import DataLoader | |
| from engine.dag_builder import build_dag, get_topological_order | |
| REFERENCE_DATE = datetime(2024, 6, 1) | |
| class ScheduleOptimizer: | |
| """ | |
| CPM calculator + rule-based optimization suggestions. | |
| """ | |
| def __init__(self, project_id: str, loader: Optional[DataLoader] = None, | |
| today: Optional[datetime] = None): | |
| self.project_id = project_id | |
| self.loader = loader or DataLoader() | |
| self.today = pd.Timestamp(today or REFERENCE_DATE) | |
| self.G = build_dag(project_id, loader=self.loader) | |
| self._cpm_results: Optional[pd.DataFrame] = None | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Critical Path Method (CPM) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def compute_cpm(self) -> pd.DataFrame: | |
| """ | |
| Compute ES, EF, LS, LF, and Float for every activity. | |
| Returns a DataFrame with one row per activity + CPM columns. | |
| """ | |
| acts = self.loader.get_project_activities(self.project_id) | |
| if acts.empty: | |
| return pd.DataFrame() | |
| topo = get_topological_order(self.G) | |
| act_map = {row["id"]: row.to_dict() for _, row in acts.iterrows()} | |
| # Duration lookup | |
| def dur(act_id): | |
| d = act_map.get(act_id, {}).get("planned_duration_days", 14) | |
| return max(int(d or 14), 1) | |
| # --- Forward pass (Early Start, Early Finish) --- | |
| ES = {} | |
| EF = {} | |
| start_day = 0 | |
| for node in topo: | |
| preds = list(self.G.predecessors(node)) | |
| if not preds: | |
| ES[node] = 0 | |
| else: | |
| ES[node] = max(EF.get(p, 0) for p in preds) | |
| EF[node] = ES[node] + dur(node) | |
| # Project duration = max EF across all leaf nodes | |
| project_duration = max(EF.values()) if EF else 0 | |
| # --- Backward pass (Late Start, Late Finish, Float) --- | |
| LS = {} | |
| LF = {} | |
| for node in reversed(topo): | |
| succs = list(self.G.successors(node)) | |
| if not succs: | |
| LF[node] = project_duration | |
| else: | |
| LF[node] = min(LS.get(s, project_duration) for s in succs) | |
| LS[node] = LF[node] - dur(node) | |
| # Float = LS - ES | |
| rows = [] | |
| for node in topo: | |
| if node not in act_map: | |
| continue | |
| a = act_map[node] | |
| float_val = LS.get(node, 0) - ES.get(node, 0) | |
| is_critical = float_val <= 0 | |
| rows.append({ | |
| "activity_id": node, | |
| "activity_name": a.get("name", node), | |
| "status": a.get("status", "unknown"), | |
| "planned_duration_days": dur(node), | |
| "early_start_day": ES.get(node, 0), | |
| "early_finish_day": EF.get(node, 0), | |
| "late_start_day": LS.get(node, 0), | |
| "late_finish_day": LF.get(node, 0), | |
| "total_float_days": float_val, | |
| "is_critical_path": is_critical, | |
| "progress": float(a.get("progress", 0) or 0), | |
| "schedule_variance_days": int(a.get("schedule_variance_days", 0) or 0), | |
| "category": a.get("category", ""), | |
| }) | |
| self._cpm_results = pd.DataFrame(rows) | |
| return self._cpm_results | |
| def get_critical_path(self) -> List[str]: | |
| """Return list of activity IDs on the critical path (float β€ 0).""" | |
| if self._cpm_results is None: | |
| self.compute_cpm() | |
| if self._cpm_results is None or self._cpm_results.empty: | |
| return [] | |
| return self._cpm_results[self._cpm_results["is_critical_path"]]["activity_id"].tolist() | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Rule-Based Suggestions | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def generate_suggestions(self, predictions_df: Optional[pd.DataFrame] = None) -> List[Dict]: | |
| """ | |
| Apply 6 rule-based optimization rules and return suggestion cards. | |
| """ | |
| if self._cpm_results is None: | |
| self.compute_cpm() | |
| cpm = self._cpm_results | |
| if cpm is None or cpm.empty: | |
| return [] | |
| acts = self.loader.get_project_activities(self.project_id) | |
| if acts.empty: | |
| return [] | |
| all_issues = self.loader.get_activity_issues(project_id=self.project_id) | |
| if not all_issues.empty and "status" in all_issues.columns: | |
| open_issues = all_issues[all_issues["status"] == "open"] | |
| else: | |
| open_issues = pd.DataFrame() | |
| suggestions: List[Dict] = [] | |
| act_map = {row["id"]: row.to_dict() for _, row in acts.iterrows()} | |
| cpm_map = {row["activity_id"]: row.to_dict() for _, row in cpm.iterrows()} | |
| # Merge predictions if available | |
| pred_map = {} | |
| if predictions_df is not None and not predictions_df.empty: | |
| for _, row in predictions_df.iterrows(): | |
| pred_map[row.get("activity_id", "")] = row.to_dict() | |
| downstream_count = {n: len(list(nx.descendants(self.G, n))) for n in self.G.nodes} | |
| # Rule 1: Slow critical activity β increase crew | |
| for _, cpm_row in cpm.iterrows(): | |
| if not cpm_row["is_critical_path"]: | |
| continue | |
| act_id = cpm_row["activity_id"] | |
| act = act_map.get(act_id, {}) | |
| progress = float(act.get("progress", 0) or 0) | |
| if act.get("status") != "in_progress": | |
| continue | |
| elapsed = max(1, (self.today - pd.Timestamp( | |
| act.get("actual_start_date") or act.get("planned_start_date") or self.today | |
| )).days) | |
| actual_rate = progress / elapsed | |
| planned_dur = max(cpm_row["planned_duration_days"], 1) | |
| planned_rate = 100 / planned_dur | |
| if actual_rate < 0.5 * planned_rate: | |
| suggestions.append({ | |
| "type": "ACTION", | |
| "priority": "π΄ CRITICAL", | |
| "activity_id": act_id, | |
| "activity_name": cpm_row["activity_name"], | |
| "rule": "Slow Critical Activity", | |
| "suggestion": ( | |
| f"**{cpm_row['activity_name']}** is on the critical path and running at " | |
| f"{actual_rate:.1f}%/day vs planned {planned_rate:.1f}%/day. " | |
| "β **Increase crew size or shift to overtime.**" | |
| ), | |
| "estimated_savings_days": int(planned_dur * 0.2), | |
| }) | |
| # Rule 2: High schedule variance + many downstream β escalate | |
| for _, cpm_row in cpm.iterrows(): | |
| act_id = cpm_row["activity_id"] | |
| act = act_map.get(act_id, {}) | |
| var = int(act.get("schedule_variance_days", 0) or 0) | |
| down = downstream_count.get(act_id, 0) | |
| if var > 5 and down > 3: | |
| suggestions.append({ | |
| "type": "ALERT", | |
| "priority": "π΄ HIGH", | |
| "activity_id": act_id, | |
| "activity_name": cpm_row["activity_name"], | |
| "rule": "High Impact Delay", | |
| "suggestion": ( | |
| f"**{cpm_row['activity_name']}** is {var} days late and has " | |
| f"{down} downstream activities. " | |
| "β **Escalate immediately β cascading delay risk.**" | |
| ), | |
| "estimated_savings_days": 0, | |
| }) | |
| # Rule 3: Material delay issue on upcoming activity β pre-order | |
| for _, cpm_row in cpm.iterrows(): | |
| act_id = cpm_row["activity_id"] | |
| if act_map.get(act_id, {}).get("status") == "not_started": | |
| act_issues = open_issues[open_issues["activity_id"] == act_id] if not open_issues.empty else pd.DataFrame() | |
| material_issues = act_issues[act_issues.get("category", pd.Series()) == "material_delay"] if not act_issues.empty else pd.DataFrame() | |
| if not material_issues.empty: | |
| suggestions.append({ | |
| "type": "PREVENTIVE", | |
| "priority": "π‘ MEDIUM", | |
| "activity_id": act_id, | |
| "activity_name": cpm_row["activity_name"], | |
| "rule": "Material Delay Risk", | |
| "suggestion": ( | |
| f"**{cpm_row['activity_name']}** (not started) has open material delay issues. " | |
| "β **Pre-order materials now to avoid blocking this activity.**" | |
| ), | |
| "estimated_savings_days": 3, | |
| }) | |
| # Rule 4: Two non-dependent activities β suggest parallel | |
| nodes = list(self.G.nodes) | |
| for i, a1 in enumerate(nodes): | |
| for a2 in nodes[i + 1:]: | |
| if (not self.G.has_edge(a1, a2) and not self.G.has_edge(a2, a1) | |
| and not nx.has_path(self.G, a1, a2) | |
| and not nx.has_path(self.G, a2, a1)): | |
| a1_stat = act_map.get(a1, {}).get("status") | |
| a2_stat = act_map.get(a2, {}).get("status") | |
| if a1_stat == "not_started" and a2_stat == "not_started": | |
| a1_name = cpm_map.get(a1, {}).get("activity_name", a1) | |
| a2_name = cpm_map.get(a2, {}).get("activity_name", a2) | |
| dur_saved = min( | |
| cpm_map.get(a1, {}).get("planned_duration_days", 10), | |
| cpm_map.get(a2, {}).get("planned_duration_days", 10), | |
| ) | |
| suggestions.append({ | |
| "type": "OPTIMIZATION", | |
| "priority": "π’ OPPORTUNITY", | |
| "activity_id": f"{a1}+{a2}", | |
| "activity_name": f"{a1_name} + {a2_name}", | |
| "rule": "Parallelization Opportunity", | |
| "suggestion": ( | |
| f"**{a1_name}** and **{a2_name}** have no dependencies. " | |
| f"β **Run in parallel β potential savings: ~{dur_saved} days.**" | |
| ), | |
| "estimated_savings_days": dur_saved, | |
| }) | |
| break # Only one parallelization suggestion per node | |
| # Rule 5: Stalled activity (0 progress for 3+ consecutive days) | |
| all_updates = self.loader.daily_updates | |
| if not all_updates.empty: | |
| all_updates = all_updates.copy() | |
| all_updates["date"] = pd.to_datetime(all_updates["date"], errors="coerce") | |
| for _, act_row in acts.iterrows(): | |
| if act_row.get("status") != "in_progress": | |
| continue | |
| act_id = str(act_row["id"]) | |
| upd = all_updates[all_updates["activity_id"] == act_id].sort_values("date") | |
| if len(upd) >= 3: | |
| inc_col = "daily_increment" if "daily_increment" in upd.columns else None | |
| if inc_col: | |
| last3 = upd.tail(3)[inc_col].astype(float) | |
| if (last3 <= 0.1).all(): | |
| a_name = act_row.get("name", act_id) | |
| suggestions.append({ | |
| "type": "ALERT", | |
| "priority": "π΄ HIGH", | |
| "activity_id": act_id, | |
| "activity_name": a_name, | |
| "rule": "Stalled Activity", | |
| "suggestion": ( | |
| f"**{a_name}** has shown zero progress for 3+ days. " | |
| "β **Investigate immediately β possible blockage.**" | |
| ), | |
| "estimated_savings_days": 0, | |
| }) | |
| # Rule 6: Activity ahead of schedule β reallocate resources | |
| for _, cpm_row in cpm.iterrows(): | |
| if cpm_row["total_float_days"] > 10 and cpm_row["is_critical_path"] is False: | |
| act_id = cpm_row["activity_id"] | |
| act = act_map.get(act_id, {}) | |
| if act.get("status") == "in_progress": | |
| suggestions.append({ | |
| "type": "OPTIMIZATION", | |
| "priority": "π’ OPPORTUNITY", | |
| "activity_id": act_id, | |
| "activity_name": cpm_row["activity_name"], | |
| "rule": "Resource Reallocation", | |
| "suggestion": ( | |
| f"**{cpm_row['activity_name']}** has {cpm_row['total_float_days']} days of float. " | |
| "β **Consider reallocating some resources to critical path activities.**" | |
| ), | |
| "estimated_savings_days": 2, | |
| }) | |
| # De-duplicate by activity_id + rule | |
| seen = set() | |
| unique_suggestions = [] | |
| for s in suggestions: | |
| key = (s["activity_id"], s["rule"]) | |
| if key not in seen: | |
| seen.add(key) | |
| unique_suggestions.append(s) | |
| # Sort: CRITICAL first, then HIGH, then others | |
| priority_order = {"π΄ CRITICAL": 0, "π΄ HIGH": 1, "π‘ MEDIUM": 2, "π’ OPPORTUNITY": 3} | |
| unique_suggestions.sort(key=lambda x: priority_order.get(x["priority"], 9)) | |
| return unique_suggestions[:15] # cap at 15 | |
| if __name__ == "__main__": | |
| from data_loader import DataLoader | |
| dl = DataLoader() | |
| opt = ScheduleOptimizer("proj_008", loader=dl) | |
| cpm = opt.compute_cpm() | |
| print("CPM Results:") | |
| print(cpm[["activity_name", "total_float_days", "is_critical_path"]].to_string()) | |
| print("\nSuggestions:") | |
| for s in opt.generate_suggestions(): | |
| print(f"[{s['priority']}] {s['activity_name']}: {s['suggestion'][:80]}...") | |