Dynamic-What-if-scheduling / optimizer /schedule_optimizer.py
suvradeepp's picture
Upload 63 files
03e7fda verified
"""
optimizer/schedule_optimizer.py
---------------------------------
Critical Path Method (CPM) + rule-based schedule optimization suggestions.
"""
import pandas as pd
import numpy as np
import networkx as nx
from datetime import datetime, timedelta
from typing import Optional, List, Dict
from data_loader import DataLoader
from engine.dag_builder import build_dag, get_topological_order
REFERENCE_DATE = datetime(2024, 6, 1)
class ScheduleOptimizer:
"""
CPM calculator + rule-based optimization suggestions.
"""
def __init__(self, project_id: str, loader: Optional[DataLoader] = None,
today: Optional[datetime] = None):
self.project_id = project_id
self.loader = loader or DataLoader()
self.today = pd.Timestamp(today or REFERENCE_DATE)
self.G = build_dag(project_id, loader=self.loader)
self._cpm_results: Optional[pd.DataFrame] = None
# ──────────────────────────────────────────────────────────────────────────
# Critical Path Method (CPM)
# ──────────────────────────────────────────────────────────────────────────
def compute_cpm(self) -> pd.DataFrame:
"""
Compute ES, EF, LS, LF, and Float for every activity.
Returns a DataFrame with one row per activity + CPM columns.
"""
acts = self.loader.get_project_activities(self.project_id)
if acts.empty:
return pd.DataFrame()
topo = get_topological_order(self.G)
act_map = {row["id"]: row.to_dict() for _, row in acts.iterrows()}
# Duration lookup
def dur(act_id):
d = act_map.get(act_id, {}).get("planned_duration_days", 14)
return max(int(d or 14), 1)
# --- Forward pass (Early Start, Early Finish) ---
ES = {}
EF = {}
start_day = 0
for node in topo:
preds = list(self.G.predecessors(node))
if not preds:
ES[node] = 0
else:
ES[node] = max(EF.get(p, 0) for p in preds)
EF[node] = ES[node] + dur(node)
# Project duration = max EF across all leaf nodes
project_duration = max(EF.values()) if EF else 0
# --- Backward pass (Late Start, Late Finish, Float) ---
LS = {}
LF = {}
for node in reversed(topo):
succs = list(self.G.successors(node))
if not succs:
LF[node] = project_duration
else:
LF[node] = min(LS.get(s, project_duration) for s in succs)
LS[node] = LF[node] - dur(node)
# Float = LS - ES
rows = []
for node in topo:
if node not in act_map:
continue
a = act_map[node]
float_val = LS.get(node, 0) - ES.get(node, 0)
is_critical = float_val <= 0
rows.append({
"activity_id": node,
"activity_name": a.get("name", node),
"status": a.get("status", "unknown"),
"planned_duration_days": dur(node),
"early_start_day": ES.get(node, 0),
"early_finish_day": EF.get(node, 0),
"late_start_day": LS.get(node, 0),
"late_finish_day": LF.get(node, 0),
"total_float_days": float_val,
"is_critical_path": is_critical,
"progress": float(a.get("progress", 0) or 0),
"schedule_variance_days": int(a.get("schedule_variance_days", 0) or 0),
"category": a.get("category", ""),
})
self._cpm_results = pd.DataFrame(rows)
return self._cpm_results
def get_critical_path(self) -> List[str]:
"""Return list of activity IDs on the critical path (float ≀ 0)."""
if self._cpm_results is None:
self.compute_cpm()
if self._cpm_results is None or self._cpm_results.empty:
return []
return self._cpm_results[self._cpm_results["is_critical_path"]]["activity_id"].tolist()
# ──────────────────────────────────────────────────────────────────────────
# Rule-Based Suggestions
# ──────────────────────────────────────────────────────────────────────────
def generate_suggestions(self, predictions_df: Optional[pd.DataFrame] = None) -> List[Dict]:
"""
Apply 6 rule-based optimization rules and return suggestion cards.
"""
if self._cpm_results is None:
self.compute_cpm()
cpm = self._cpm_results
if cpm is None or cpm.empty:
return []
acts = self.loader.get_project_activities(self.project_id)
if acts.empty:
return []
all_issues = self.loader.get_activity_issues(project_id=self.project_id)
if not all_issues.empty and "status" in all_issues.columns:
open_issues = all_issues[all_issues["status"] == "open"]
else:
open_issues = pd.DataFrame()
suggestions: List[Dict] = []
act_map = {row["id"]: row.to_dict() for _, row in acts.iterrows()}
cpm_map = {row["activity_id"]: row.to_dict() for _, row in cpm.iterrows()}
# Merge predictions if available
pred_map = {}
if predictions_df is not None and not predictions_df.empty:
for _, row in predictions_df.iterrows():
pred_map[row.get("activity_id", "")] = row.to_dict()
downstream_count = {n: len(list(nx.descendants(self.G, n))) for n in self.G.nodes}
# Rule 1: Slow critical activity β†’ increase crew
for _, cpm_row in cpm.iterrows():
if not cpm_row["is_critical_path"]:
continue
act_id = cpm_row["activity_id"]
act = act_map.get(act_id, {})
progress = float(act.get("progress", 0) or 0)
if act.get("status") != "in_progress":
continue
elapsed = max(1, (self.today - pd.Timestamp(
act.get("actual_start_date") or act.get("planned_start_date") or self.today
)).days)
actual_rate = progress / elapsed
planned_dur = max(cpm_row["planned_duration_days"], 1)
planned_rate = 100 / planned_dur
if actual_rate < 0.5 * planned_rate:
suggestions.append({
"type": "ACTION",
"priority": "πŸ”΄ CRITICAL",
"activity_id": act_id,
"activity_name": cpm_row["activity_name"],
"rule": "Slow Critical Activity",
"suggestion": (
f"**{cpm_row['activity_name']}** is on the critical path and running at "
f"{actual_rate:.1f}%/day vs planned {planned_rate:.1f}%/day. "
"β†’ **Increase crew size or shift to overtime.**"
),
"estimated_savings_days": int(planned_dur * 0.2),
})
# Rule 2: High schedule variance + many downstream β†’ escalate
for _, cpm_row in cpm.iterrows():
act_id = cpm_row["activity_id"]
act = act_map.get(act_id, {})
var = int(act.get("schedule_variance_days", 0) or 0)
down = downstream_count.get(act_id, 0)
if var > 5 and down > 3:
suggestions.append({
"type": "ALERT",
"priority": "πŸ”΄ HIGH",
"activity_id": act_id,
"activity_name": cpm_row["activity_name"],
"rule": "High Impact Delay",
"suggestion": (
f"**{cpm_row['activity_name']}** is {var} days late and has "
f"{down} downstream activities. "
"β†’ **Escalate immediately β€” cascading delay risk.**"
),
"estimated_savings_days": 0,
})
# Rule 3: Material delay issue on upcoming activity β†’ pre-order
for _, cpm_row in cpm.iterrows():
act_id = cpm_row["activity_id"]
if act_map.get(act_id, {}).get("status") == "not_started":
act_issues = open_issues[open_issues["activity_id"] == act_id] if not open_issues.empty else pd.DataFrame()
material_issues = act_issues[act_issues.get("category", pd.Series()) == "material_delay"] if not act_issues.empty else pd.DataFrame()
if not material_issues.empty:
suggestions.append({
"type": "PREVENTIVE",
"priority": "🟑 MEDIUM",
"activity_id": act_id,
"activity_name": cpm_row["activity_name"],
"rule": "Material Delay Risk",
"suggestion": (
f"**{cpm_row['activity_name']}** (not started) has open material delay issues. "
"β†’ **Pre-order materials now to avoid blocking this activity.**"
),
"estimated_savings_days": 3,
})
# Rule 4: Two non-dependent activities β†’ suggest parallel
nodes = list(self.G.nodes)
for i, a1 in enumerate(nodes):
for a2 in nodes[i + 1:]:
if (not self.G.has_edge(a1, a2) and not self.G.has_edge(a2, a1)
and not nx.has_path(self.G, a1, a2)
and not nx.has_path(self.G, a2, a1)):
a1_stat = act_map.get(a1, {}).get("status")
a2_stat = act_map.get(a2, {}).get("status")
if a1_stat == "not_started" and a2_stat == "not_started":
a1_name = cpm_map.get(a1, {}).get("activity_name", a1)
a2_name = cpm_map.get(a2, {}).get("activity_name", a2)
dur_saved = min(
cpm_map.get(a1, {}).get("planned_duration_days", 10),
cpm_map.get(a2, {}).get("planned_duration_days", 10),
)
suggestions.append({
"type": "OPTIMIZATION",
"priority": "🟒 OPPORTUNITY",
"activity_id": f"{a1}+{a2}",
"activity_name": f"{a1_name} + {a2_name}",
"rule": "Parallelization Opportunity",
"suggestion": (
f"**{a1_name}** and **{a2_name}** have no dependencies. "
f"β†’ **Run in parallel β€” potential savings: ~{dur_saved} days.**"
),
"estimated_savings_days": dur_saved,
})
break # Only one parallelization suggestion per node
# Rule 5: Stalled activity (0 progress for 3+ consecutive days)
all_updates = self.loader.daily_updates
if not all_updates.empty:
all_updates = all_updates.copy()
all_updates["date"] = pd.to_datetime(all_updates["date"], errors="coerce")
for _, act_row in acts.iterrows():
if act_row.get("status") != "in_progress":
continue
act_id = str(act_row["id"])
upd = all_updates[all_updates["activity_id"] == act_id].sort_values("date")
if len(upd) >= 3:
inc_col = "daily_increment" if "daily_increment" in upd.columns else None
if inc_col:
last3 = upd.tail(3)[inc_col].astype(float)
if (last3 <= 0.1).all():
a_name = act_row.get("name", act_id)
suggestions.append({
"type": "ALERT",
"priority": "πŸ”΄ HIGH",
"activity_id": act_id,
"activity_name": a_name,
"rule": "Stalled Activity",
"suggestion": (
f"**{a_name}** has shown zero progress for 3+ days. "
"β†’ **Investigate immediately β€” possible blockage.**"
),
"estimated_savings_days": 0,
})
# Rule 6: Activity ahead of schedule β†’ reallocate resources
for _, cpm_row in cpm.iterrows():
if cpm_row["total_float_days"] > 10 and cpm_row["is_critical_path"] is False:
act_id = cpm_row["activity_id"]
act = act_map.get(act_id, {})
if act.get("status") == "in_progress":
suggestions.append({
"type": "OPTIMIZATION",
"priority": "🟒 OPPORTUNITY",
"activity_id": act_id,
"activity_name": cpm_row["activity_name"],
"rule": "Resource Reallocation",
"suggestion": (
f"**{cpm_row['activity_name']}** has {cpm_row['total_float_days']} days of float. "
"β†’ **Consider reallocating some resources to critical path activities.**"
),
"estimated_savings_days": 2,
})
# De-duplicate by activity_id + rule
seen = set()
unique_suggestions = []
for s in suggestions:
key = (s["activity_id"], s["rule"])
if key not in seen:
seen.add(key)
unique_suggestions.append(s)
# Sort: CRITICAL first, then HIGH, then others
priority_order = {"πŸ”΄ CRITICAL": 0, "πŸ”΄ HIGH": 1, "🟑 MEDIUM": 2, "🟒 OPPORTUNITY": 3}
unique_suggestions.sort(key=lambda x: priority_order.get(x["priority"], 9))
return unique_suggestions[:15] # cap at 15
if __name__ == "__main__":
from data_loader import DataLoader
dl = DataLoader()
opt = ScheduleOptimizer("proj_008", loader=dl)
cpm = opt.compute_cpm()
print("CPM Results:")
print(cpm[["activity_name", "total_float_days", "is_critical_path"]].to_string())
print("\nSuggestions:")
for s in opt.generate_suggestions():
print(f"[{s['priority']}] {s['activity_name']}: {s['suggestion'][:80]}...")