Spaces:
Running
Running
| """Schedule-grounded Monte-Carlo EVM simulator. | |
| Generates grounded PV/EV/AC trajectories from the REAL DSLIB baseline schedules: | |
| * PV - time-phase each real leaf activity's real cost across its real baseline dates | |
| (the planned S-curve EMERGES from the actual schedule, not a logistic formula). | |
| * EV/AC - Monte-Carlo execution: sample per-activity duration and cost performance, | |
| cascade delays through the REAL finish-to-start precedence network (CPM forward | |
| pass on sampled durations), earn baseline cost over each activity's actual window | |
| (EV) and apply a cost factor (AC). | |
| Outcome stats (final CPI, slippage) are calibrated to DSLIB; the SHAPE comes from real | |
| structure - so it is far less "predictable" than parametric S-curves. Output: a | |
| `synthetic.Project`, so it flows through the same baselines / forecaster / eval harness. | |
| """ | |
| from __future__ import annotations | |
| import glob | |
| import os | |
| import re | |
| import warnings | |
| from dataclasses import dataclass | |
| import numpy as np | |
| import pandas as pd | |
| from .synthetic import Project | |
| PERIOD_DAYS = 30.44 # monthly bins (DSLIB cadence) | |
| _LAG_UNIT = {"d": 1.0, "w": 7.0, "m": 30.44} | |
| class Activity: | |
| aid: str | |
| cost: float | |
| start: float # baseline start, days from project start | |
| end: float # baseline end, days from project start | |
| preds: list # [(pred_id, lag_days), ...] (finish-to-start) | |
| class Schedule: | |
| name: str | |
| acts: list # leaf Activities (costs sum to bac) | |
| bac: float | |
| horizon_days: float # baseline project duration | |
| def _parse_preds(s) -> list: | |
| """'14FS;13FS', '16FS-6w', '10FS+2d' -> [(id, lag_days), ...]. FS assumed.""" | |
| out = [] | |
| if not isinstance(s, str): | |
| return out | |
| for tok in s.split(";"): | |
| m = re.match(r"\s*(\d+)\s*(FS|SS|FF|SF)?\s*([+-]\d+(?:\.\d+)?)?\s*(d|w|m)?", tok, re.I) | |
| if not m: | |
| continue | |
| pid = m.group(1) | |
| lag = float(m.group(3) or 0.0) * _LAG_UNIT[(m.group(4) or "d").lower()] | |
| out.append((pid, lag)) | |
| return out | |
| def parse_schedule(path: str) -> Schedule | None: | |
| """Parse a DSLIB workbook's 'Baseline Schedule' into leaf activities.""" | |
| try: | |
| bs = pd.read_excel(path, sheet_name="Baseline Schedule", header=1) | |
| except Exception: | |
| return None | |
| if not {"WBS", "Total Cost", "Baseline Start", "Baseline End"}.issubset(bs.columns): | |
| return None | |
| wbs = [str(w) if pd.notna(w) else "" for w in bs["WBS"]] | |
| is_leaf = np.array([w != "" and not any(o != w and o.startswith(w + ".") for o in wbs) for w in wbs]) | |
| cost = pd.to_numeric(bs["Total Cost"], errors="coerce") | |
| start = pd.to_datetime(bs["Baseline Start"], errors="coerce") | |
| end = pd.to_datetime(bs["Baseline End"], errors="coerce") | |
| keep = is_leaf & (cost > 0) & start.notna() & end.notna() | |
| if keep.sum() < 3: | |
| return None | |
| proj_start = start[keep].min() | |
| ids = bs["ID"].astype(str) if "ID" in bs.columns else pd.Series(range(len(bs))).astype(str) | |
| preds_col = bs["Predecessors"] if "Predecessors" in bs.columns else pd.Series([None] * len(bs)) | |
| acts = [] | |
| for i in np.where(keep.to_numpy())[0]: | |
| s = (start.iloc[i] - proj_start).days | |
| e = (end.iloc[i] - proj_start).days | |
| acts.append(Activity(aid=ids.iloc[i], cost=float(cost.iloc[i]), | |
| start=float(s), end=float(max(e, s + 1)), | |
| preds=_parse_preds(preds_col.iloc[i]))) | |
| leaf_ids = {a.aid for a in acts} | |
| for a in acts: # keep only dependencies on other leaves | |
| a.preds = [(p, lag) for (p, lag) in a.preds if p in leaf_ids and p != a.aid] | |
| horizon = max(a.end for a in acts) | |
| return Schedule(name=os.path.splitext(os.path.basename(path))[0], | |
| acts=acts, bac=float(sum(a.cost for a in acts)), horizon_days=horizon) | |
| def _topo_order(acts) -> list: | |
| """Kahn topological order by FS predecessors; cycles fall back to start-date order.""" | |
| idx = {a.aid: i for i, a in enumerate(acts)} | |
| indeg = [0] * len(acts) | |
| children = [[] for _ in acts] | |
| for i, a in enumerate(acts): | |
| for p, _ in a.preds: | |
| indeg[i] += 1 | |
| children[idx[p]].append(i) | |
| queue = [i for i in range(len(acts)) if indeg[i] == 0] | |
| order = [] | |
| while queue: | |
| i = queue.pop() | |
| order.append(i) | |
| for c in children[i]: | |
| indeg[c] -= 1 | |
| if indeg[c] == 0: | |
| queue.append(c) | |
| if len(order) < len(acts): # cycle: append the rest by baseline start | |
| order += sorted(set(range(len(acts))) - set(order), key=lambda i: acts[i].start) | |
| return order | |
| def _phase(s: float, e: float, edges: np.ndarray) -> np.ndarray: | |
| """Fraction of [s, e] falling in each [edges[i], edges[i+1]) bin (sums to ~1).""" | |
| if e <= s: | |
| e = s + 1e-6 | |
| lo = np.maximum(edges[:-1], s) | |
| hi = np.minimum(edges[1:], e) | |
| return np.clip(hi - lo, 0.0, None) / (e - s) | |
| def _lumpify(ev_inc, ac_inc, rng, p_lag=0.4, defer=0.7): | |
| """Progress-certification lag: defer part of a month's earned value (and its cost) into | |
| the next month, producing low months followed by catch-up spikes. Preserves totals and | |
| monotonicity, and lifts increment CV from the over-smooth linear value to ~real (0.9).""" | |
| ev = ev_inc.astype(float).copy() | |
| ac = ac_inc.astype(float).copy() | |
| ce = ca = 0.0 | |
| for t in range(len(ev)): | |
| ev[t] += ce | |
| ac[t] += ca | |
| ce = ca = 0.0 | |
| if t < len(ev) - 1 and rng.random() < p_lag: | |
| ce, ca = ev[t] * defer, ac[t] * defer | |
| ev[t] -= ce | |
| ac[t] -= ca | |
| ev[-1] += ce | |
| ac[-1] += ca | |
| return ev, ac | |
| def simulate(sched: Schedule, rng: np.random.Generator) -> Project: | |
| """One Monte-Carlo execution of a baseline schedule -> grounded PV/EV/AC Project.""" | |
| acts = sched.acts | |
| idx = {a.aid: i for i, a in enumerate(acts)} | |
| n = len(acts) | |
| # Sample performance, calibrated to DSLIB (final CPI ~0.77-1.09; schedule stretch). | |
| cpi_target = float(np.clip(rng.normal(0.95, 0.12), 0.6, 1.25)) | |
| dur_mean = float(np.clip(rng.normal(1.08, 0.18), 0.7, 2.2)) | |
| cost_factor = (1.0 / cpi_target) * np.exp(rng.normal(0.0, 0.10, n)) # AC = cost x factor | |
| base_dur = np.array([max(a.end - a.start, 1.0) for a in acts]) | |
| act_dur = base_dur * dur_mean * np.exp(rng.normal(0.0, 0.25, n)) # per-activity stretch | |
| # Occasional disruption: a contiguous burst of activities stalls (durations inflate). | |
| if n >= 8 and rng.random() < 0.4: | |
| j = int(rng.integers(0, n - 4)) | |
| act_dur[j:j + int(rng.integers(2, 5))] *= rng.uniform(1.5, 3.0) | |
| # CPM forward pass on sampled durations: delays cascade through FS precedence. | |
| astart = np.full(n, np.nan) | |
| aend = np.full(n, np.nan) | |
| for i in _topo_order(acts): | |
| a = acts[i] | |
| pc = max([aend[idx[p]] + lag for p, lag in a.preds], default=a.start) | |
| astart[i] = max(a.start, pc) # no-pred activities anchor at baseline start | |
| aend[i] = astart[i] + act_dur[i] | |
| end_day = max(float(np.nanmax(aend)), sched.horizon_days) | |
| edges = np.arange(0.0, end_day + PERIOD_DAYS, PERIOD_DAYS) | |
| pv = np.zeros(len(edges) - 1) | |
| ev = np.zeros(len(edges) - 1) | |
| ac = np.zeros(len(edges) - 1) | |
| for i, a in enumerate(acts): | |
| pv += a.cost * _phase(a.start, a.end, edges) | |
| share = a.cost * _phase(astart[i], aend[i], edges) | |
| ev += share | |
| ac += share * cost_factor[i] | |
| ev, ac = _lumpify(ev, ac, rng) # certification-lag lumpiness -> realistic CV | |
| pv_cum = np.cumsum(pv) | |
| np.minimum(pv_cum, sched.bac, out=pv_cum) | |
| nper = len(pv) | |
| return Project( | |
| name=f"sim::{sched.name}", | |
| period=np.arange(1, nper + 1), | |
| pv=pv_cum, | |
| ev=np.cumsum(ev), | |
| ac=np.cumsum(ac), | |
| bac=sched.bac, | |
| planned_finish=int(np.ceil(sched.horizon_days / PERIOD_DAYS)), | |
| meta={"source": "grounded_sim", "base": sched.name, | |
| "cpi_target": cpi_target, "dur_mean": dur_mean, "n_acts": n}, | |
| ) | |
| def load_schedules(excel_dir: str = "data/DSLIB/Excel", min_acts: int = 5) -> list[Schedule]: | |
| warnings.filterwarnings("ignore") | |
| out = [] | |
| for f in sorted(glob.glob(os.path.join(excel_dir, "*.xlsx"))): | |
| s = parse_schedule(f) | |
| if s is not None and len(s.acts) >= min_acts: | |
| out.append(s) | |
| return out | |
| def generate_grounded_corpus(excel_dir: str = "data/DSLIB/Excel", runs_per_project: int = 20, | |
| seed: int = 0, min_acts: int = 5) -> list[Project]: | |
| """Monte-Carlo a corpus from all real DSLIB schedules.""" | |
| rng = np.random.default_rng(seed) | |
| out = [] | |
| for s in load_schedules(excel_dir, min_acts): | |
| for _ in range(runs_per_project): | |
| out.append(simulate(s, rng)) | |
| return out | |