Spaces:
Configuration error
Configuration error
File size: 15,499 Bytes
03e7fda | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 | """
optimizer/schedule_optimizer.py
---------------------------------
Critical Path Method (CPM) + rule-based schedule optimization suggestions.
"""
import pandas as pd
import numpy as np
import networkx as nx
from datetime import datetime, timedelta
from typing import Optional, List, Dict
from data_loader import DataLoader
from engine.dag_builder import build_dag, get_topological_order
REFERENCE_DATE = datetime(2024, 6, 1)
class ScheduleOptimizer:
"""
CPM calculator + rule-based optimization suggestions.
"""
def __init__(self, project_id: str, loader: Optional[DataLoader] = None,
today: Optional[datetime] = None):
self.project_id = project_id
self.loader = loader or DataLoader()
self.today = pd.Timestamp(today or REFERENCE_DATE)
self.G = build_dag(project_id, loader=self.loader)
self._cpm_results: Optional[pd.DataFrame] = None
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Critical Path Method (CPM)
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def compute_cpm(self) -> pd.DataFrame:
"""
Compute ES, EF, LS, LF, and Float for every activity.
Returns a DataFrame with one row per activity + CPM columns.
"""
acts = self.loader.get_project_activities(self.project_id)
if acts.empty:
return pd.DataFrame()
topo = get_topological_order(self.G)
act_map = {row["id"]: row.to_dict() for _, row in acts.iterrows()}
# Duration lookup
def dur(act_id):
d = act_map.get(act_id, {}).get("planned_duration_days", 14)
return max(int(d or 14), 1)
# --- Forward pass (Early Start, Early Finish) ---
ES = {}
EF = {}
start_day = 0
for node in topo:
preds = list(self.G.predecessors(node))
if not preds:
ES[node] = 0
else:
ES[node] = max(EF.get(p, 0) for p in preds)
EF[node] = ES[node] + dur(node)
# Project duration = max EF across all leaf nodes
project_duration = max(EF.values()) if EF else 0
# --- Backward pass (Late Start, Late Finish, Float) ---
LS = {}
LF = {}
for node in reversed(topo):
succs = list(self.G.successors(node))
if not succs:
LF[node] = project_duration
else:
LF[node] = min(LS.get(s, project_duration) for s in succs)
LS[node] = LF[node] - dur(node)
# Float = LS - ES
rows = []
for node in topo:
if node not in act_map:
continue
a = act_map[node]
float_val = LS.get(node, 0) - ES.get(node, 0)
is_critical = float_val <= 0
rows.append({
"activity_id": node,
"activity_name": a.get("name", node),
"status": a.get("status", "unknown"),
"planned_duration_days": dur(node),
"early_start_day": ES.get(node, 0),
"early_finish_day": EF.get(node, 0),
"late_start_day": LS.get(node, 0),
"late_finish_day": LF.get(node, 0),
"total_float_days": float_val,
"is_critical_path": is_critical,
"progress": float(a.get("progress", 0) or 0),
"schedule_variance_days": int(a.get("schedule_variance_days", 0) or 0),
"category": a.get("category", ""),
})
self._cpm_results = pd.DataFrame(rows)
return self._cpm_results
def get_critical_path(self) -> List[str]:
"""Return list of activity IDs on the critical path (float β€ 0)."""
if self._cpm_results is None:
self.compute_cpm()
if self._cpm_results is None or self._cpm_results.empty:
return []
return self._cpm_results[self._cpm_results["is_critical_path"]]["activity_id"].tolist()
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Rule-Based Suggestions
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def generate_suggestions(self, predictions_df: Optional[pd.DataFrame] = None) -> List[Dict]:
"""
Apply 6 rule-based optimization rules and return suggestion cards.
"""
if self._cpm_results is None:
self.compute_cpm()
cpm = self._cpm_results
if cpm is None or cpm.empty:
return []
acts = self.loader.get_project_activities(self.project_id)
if acts.empty:
return []
all_issues = self.loader.get_activity_issues(project_id=self.project_id)
if not all_issues.empty and "status" in all_issues.columns:
open_issues = all_issues[all_issues["status"] == "open"]
else:
open_issues = pd.DataFrame()
suggestions: List[Dict] = []
act_map = {row["id"]: row.to_dict() for _, row in acts.iterrows()}
cpm_map = {row["activity_id"]: row.to_dict() for _, row in cpm.iterrows()}
# Merge predictions if available
pred_map = {}
if predictions_df is not None and not predictions_df.empty:
for _, row in predictions_df.iterrows():
pred_map[row.get("activity_id", "")] = row.to_dict()
downstream_count = {n: len(list(nx.descendants(self.G, n))) for n in self.G.nodes}
# Rule 1: Slow critical activity β increase crew
for _, cpm_row in cpm.iterrows():
if not cpm_row["is_critical_path"]:
continue
act_id = cpm_row["activity_id"]
act = act_map.get(act_id, {})
progress = float(act.get("progress", 0) or 0)
if act.get("status") != "in_progress":
continue
elapsed = max(1, (self.today - pd.Timestamp(
act.get("actual_start_date") or act.get("planned_start_date") or self.today
)).days)
actual_rate = progress / elapsed
planned_dur = max(cpm_row["planned_duration_days"], 1)
planned_rate = 100 / planned_dur
if actual_rate < 0.5 * planned_rate:
suggestions.append({
"type": "ACTION",
"priority": "π΄ CRITICAL",
"activity_id": act_id,
"activity_name": cpm_row["activity_name"],
"rule": "Slow Critical Activity",
"suggestion": (
f"**{cpm_row['activity_name']}** is on the critical path and running at "
f"{actual_rate:.1f}%/day vs planned {planned_rate:.1f}%/day. "
"β **Increase crew size or shift to overtime.**"
),
"estimated_savings_days": int(planned_dur * 0.2),
})
# Rule 2: High schedule variance + many downstream β escalate
for _, cpm_row in cpm.iterrows():
act_id = cpm_row["activity_id"]
act = act_map.get(act_id, {})
var = int(act.get("schedule_variance_days", 0) or 0)
down = downstream_count.get(act_id, 0)
if var > 5 and down > 3:
suggestions.append({
"type": "ALERT",
"priority": "π΄ HIGH",
"activity_id": act_id,
"activity_name": cpm_row["activity_name"],
"rule": "High Impact Delay",
"suggestion": (
f"**{cpm_row['activity_name']}** is {var} days late and has "
f"{down} downstream activities. "
"β **Escalate immediately β cascading delay risk.**"
),
"estimated_savings_days": 0,
})
# Rule 3: Material delay issue on upcoming activity β pre-order
for _, cpm_row in cpm.iterrows():
act_id = cpm_row["activity_id"]
if act_map.get(act_id, {}).get("status") == "not_started":
act_issues = open_issues[open_issues["activity_id"] == act_id] if not open_issues.empty else pd.DataFrame()
material_issues = act_issues[act_issues.get("category", pd.Series()) == "material_delay"] if not act_issues.empty else pd.DataFrame()
if not material_issues.empty:
suggestions.append({
"type": "PREVENTIVE",
"priority": "π‘ MEDIUM",
"activity_id": act_id,
"activity_name": cpm_row["activity_name"],
"rule": "Material Delay Risk",
"suggestion": (
f"**{cpm_row['activity_name']}** (not started) has open material delay issues. "
"β **Pre-order materials now to avoid blocking this activity.**"
),
"estimated_savings_days": 3,
})
# Rule 4: Two non-dependent activities β suggest parallel
nodes = list(self.G.nodes)
for i, a1 in enumerate(nodes):
for a2 in nodes[i + 1:]:
if (not self.G.has_edge(a1, a2) and not self.G.has_edge(a2, a1)
and not nx.has_path(self.G, a1, a2)
and not nx.has_path(self.G, a2, a1)):
a1_stat = act_map.get(a1, {}).get("status")
a2_stat = act_map.get(a2, {}).get("status")
if a1_stat == "not_started" and a2_stat == "not_started":
a1_name = cpm_map.get(a1, {}).get("activity_name", a1)
a2_name = cpm_map.get(a2, {}).get("activity_name", a2)
dur_saved = min(
cpm_map.get(a1, {}).get("planned_duration_days", 10),
cpm_map.get(a2, {}).get("planned_duration_days", 10),
)
suggestions.append({
"type": "OPTIMIZATION",
"priority": "π’ OPPORTUNITY",
"activity_id": f"{a1}+{a2}",
"activity_name": f"{a1_name} + {a2_name}",
"rule": "Parallelization Opportunity",
"suggestion": (
f"**{a1_name}** and **{a2_name}** have no dependencies. "
f"β **Run in parallel β potential savings: ~{dur_saved} days.**"
),
"estimated_savings_days": dur_saved,
})
break # Only one parallelization suggestion per node
# Rule 5: Stalled activity (0 progress for 3+ consecutive days)
all_updates = self.loader.daily_updates
if not all_updates.empty:
all_updates = all_updates.copy()
all_updates["date"] = pd.to_datetime(all_updates["date"], errors="coerce")
for _, act_row in acts.iterrows():
if act_row.get("status") != "in_progress":
continue
act_id = str(act_row["id"])
upd = all_updates[all_updates["activity_id"] == act_id].sort_values("date")
if len(upd) >= 3:
inc_col = "daily_increment" if "daily_increment" in upd.columns else None
if inc_col:
last3 = upd.tail(3)[inc_col].astype(float)
if (last3 <= 0.1).all():
a_name = act_row.get("name", act_id)
suggestions.append({
"type": "ALERT",
"priority": "π΄ HIGH",
"activity_id": act_id,
"activity_name": a_name,
"rule": "Stalled Activity",
"suggestion": (
f"**{a_name}** has shown zero progress for 3+ days. "
"β **Investigate immediately β possible blockage.**"
),
"estimated_savings_days": 0,
})
# Rule 6: Activity ahead of schedule β reallocate resources
for _, cpm_row in cpm.iterrows():
if cpm_row["total_float_days"] > 10 and cpm_row["is_critical_path"] is False:
act_id = cpm_row["activity_id"]
act = act_map.get(act_id, {})
if act.get("status") == "in_progress":
suggestions.append({
"type": "OPTIMIZATION",
"priority": "π’ OPPORTUNITY",
"activity_id": act_id,
"activity_name": cpm_row["activity_name"],
"rule": "Resource Reallocation",
"suggestion": (
f"**{cpm_row['activity_name']}** has {cpm_row['total_float_days']} days of float. "
"β **Consider reallocating some resources to critical path activities.**"
),
"estimated_savings_days": 2,
})
# De-duplicate by activity_id + rule
seen = set()
unique_suggestions = []
for s in suggestions:
key = (s["activity_id"], s["rule"])
if key not in seen:
seen.add(key)
unique_suggestions.append(s)
# Sort: CRITICAL first, then HIGH, then others
priority_order = {"π΄ CRITICAL": 0, "π΄ HIGH": 1, "π‘ MEDIUM": 2, "π’ OPPORTUNITY": 3}
unique_suggestions.sort(key=lambda x: priority_order.get(x["priority"], 9))
return unique_suggestions[:15] # cap at 15
if __name__ == "__main__":
from data_loader import DataLoader
dl = DataLoader()
opt = ScheduleOptimizer("proj_008", loader=dl)
cpm = opt.compute_cpm()
print("CPM Results:")
print(cpm[["activity_name", "total_float_days", "is_critical_path"]].to_string())
print("\nSuggestions:")
for s in opt.generate_suggestions():
print(f"[{s['priority']}] {s['activity_name']}: {s['suggestion'][:80]}...")
|