"""Schedule-grounded Monte-Carlo EVM simulator. Generates grounded PV/EV/AC trajectories from the REAL DSLIB baseline schedules: * PV - time-phase each real leaf activity's real cost across its real baseline dates (the planned S-curve EMERGES from the actual schedule, not a logistic formula). * EV/AC - Monte-Carlo execution: sample per-activity duration and cost performance, cascade delays through the REAL finish-to-start precedence network (CPM forward pass on sampled durations), earn baseline cost over each activity's actual window (EV) and apply a cost factor (AC). Outcome stats (final CPI, slippage) are calibrated to DSLIB; the SHAPE comes from real structure - so it is far less "predictable" than parametric S-curves. Output: a `synthetic.Project`, so it flows through the same baselines / forecaster / eval harness. """ from __future__ import annotations import glob import os import re import warnings from dataclasses import dataclass import numpy as np import pandas as pd from .synthetic import Project PERIOD_DAYS = 30.44 # monthly bins (DSLIB cadence) _LAG_UNIT = {"d": 1.0, "w": 7.0, "m": 30.44} @dataclass class Activity: aid: str cost: float start: float # baseline start, days from project start end: float # baseline end, days from project start preds: list # [(pred_id, lag_days), ...] (finish-to-start) @dataclass class Schedule: name: str acts: list # leaf Activities (costs sum to bac) bac: float horizon_days: float # baseline project duration def _parse_preds(s) -> list: """'14FS;13FS', '16FS-6w', '10FS+2d' -> [(id, lag_days), ...]. FS assumed.""" out = [] if not isinstance(s, str): return out for tok in s.split(";"): m = re.match(r"\s*(\d+)\s*(FS|SS|FF|SF)?\s*([+-]\d+(?:\.\d+)?)?\s*(d|w|m)?", tok, re.I) if not m: continue pid = m.group(1) lag = float(m.group(3) or 0.0) * _LAG_UNIT[(m.group(4) or "d").lower()] out.append((pid, lag)) return out def parse_schedule(path: str) -> Schedule | None: """Parse a DSLIB workbook's 'Baseline Schedule' into leaf activities.""" try: bs = pd.read_excel(path, sheet_name="Baseline Schedule", header=1) except Exception: return None if not {"WBS", "Total Cost", "Baseline Start", "Baseline End"}.issubset(bs.columns): return None wbs = [str(w) if pd.notna(w) else "" for w in bs["WBS"]] is_leaf = np.array([w != "" and not any(o != w and o.startswith(w + ".") for o in wbs) for w in wbs]) cost = pd.to_numeric(bs["Total Cost"], errors="coerce") start = pd.to_datetime(bs["Baseline Start"], errors="coerce") end = pd.to_datetime(bs["Baseline End"], errors="coerce") keep = is_leaf & (cost > 0) & start.notna() & end.notna() if keep.sum() < 3: return None proj_start = start[keep].min() ids = bs["ID"].astype(str) if "ID" in bs.columns else pd.Series(range(len(bs))).astype(str) preds_col = bs["Predecessors"] if "Predecessors" in bs.columns else pd.Series([None] * len(bs)) acts = [] for i in np.where(keep.to_numpy())[0]: s = (start.iloc[i] - proj_start).days e = (end.iloc[i] - proj_start).days acts.append(Activity(aid=ids.iloc[i], cost=float(cost.iloc[i]), start=float(s), end=float(max(e, s + 1)), preds=_parse_preds(preds_col.iloc[i]))) leaf_ids = {a.aid for a in acts} for a in acts: # keep only dependencies on other leaves a.preds = [(p, lag) for (p, lag) in a.preds if p in leaf_ids and p != a.aid] horizon = max(a.end for a in acts) return Schedule(name=os.path.splitext(os.path.basename(path))[0], acts=acts, bac=float(sum(a.cost for a in acts)), horizon_days=horizon) def _topo_order(acts) -> list: """Kahn topological order by FS predecessors; cycles fall back to start-date order.""" idx = {a.aid: i for i, a in enumerate(acts)} indeg = [0] * len(acts) children = [[] for _ in acts] for i, a in enumerate(acts): for p, _ in a.preds: indeg[i] += 1 children[idx[p]].append(i) queue = [i for i in range(len(acts)) if indeg[i] == 0] order = [] while queue: i = queue.pop() order.append(i) for c in children[i]: indeg[c] -= 1 if indeg[c] == 0: queue.append(c) if len(order) < len(acts): # cycle: append the rest by baseline start order += sorted(set(range(len(acts))) - set(order), key=lambda i: acts[i].start) return order def _phase(s: float, e: float, edges: np.ndarray) -> np.ndarray: """Fraction of [s, e] falling in each [edges[i], edges[i+1]) bin (sums to ~1).""" if e <= s: e = s + 1e-6 lo = np.maximum(edges[:-1], s) hi = np.minimum(edges[1:], e) return np.clip(hi - lo, 0.0, None) / (e - s) def _lumpify(ev_inc, ac_inc, rng, p_lag=0.4, defer=0.7): """Progress-certification lag: defer part of a month's earned value (and its cost) into the next month, producing low months followed by catch-up spikes. Preserves totals and monotonicity, and lifts increment CV from the over-smooth linear value to ~real (0.9).""" ev = ev_inc.astype(float).copy() ac = ac_inc.astype(float).copy() ce = ca = 0.0 for t in range(len(ev)): ev[t] += ce ac[t] += ca ce = ca = 0.0 if t < len(ev) - 1 and rng.random() < p_lag: ce, ca = ev[t] * defer, ac[t] * defer ev[t] -= ce ac[t] -= ca ev[-1] += ce ac[-1] += ca return ev, ac def simulate(sched: Schedule, rng: np.random.Generator) -> Project: """One Monte-Carlo execution of a baseline schedule -> grounded PV/EV/AC Project.""" acts = sched.acts idx = {a.aid: i for i, a in enumerate(acts)} n = len(acts) # Sample performance, calibrated to DSLIB (final CPI ~0.77-1.09; schedule stretch). cpi_target = float(np.clip(rng.normal(0.95, 0.12), 0.6, 1.25)) dur_mean = float(np.clip(rng.normal(1.08, 0.18), 0.7, 2.2)) cost_factor = (1.0 / cpi_target) * np.exp(rng.normal(0.0, 0.10, n)) # AC = cost x factor base_dur = np.array([max(a.end - a.start, 1.0) for a in acts]) act_dur = base_dur * dur_mean * np.exp(rng.normal(0.0, 0.25, n)) # per-activity stretch # Occasional disruption: a contiguous burst of activities stalls (durations inflate). if n >= 8 and rng.random() < 0.4: j = int(rng.integers(0, n - 4)) act_dur[j:j + int(rng.integers(2, 5))] *= rng.uniform(1.5, 3.0) # CPM forward pass on sampled durations: delays cascade through FS precedence. astart = np.full(n, np.nan) aend = np.full(n, np.nan) for i in _topo_order(acts): a = acts[i] pc = max([aend[idx[p]] + lag for p, lag in a.preds], default=a.start) astart[i] = max(a.start, pc) # no-pred activities anchor at baseline start aend[i] = astart[i] + act_dur[i] end_day = max(float(np.nanmax(aend)), sched.horizon_days) edges = np.arange(0.0, end_day + PERIOD_DAYS, PERIOD_DAYS) pv = np.zeros(len(edges) - 1) ev = np.zeros(len(edges) - 1) ac = np.zeros(len(edges) - 1) for i, a in enumerate(acts): pv += a.cost * _phase(a.start, a.end, edges) share = a.cost * _phase(astart[i], aend[i], edges) ev += share ac += share * cost_factor[i] ev, ac = _lumpify(ev, ac, rng) # certification-lag lumpiness -> realistic CV pv_cum = np.cumsum(pv) np.minimum(pv_cum, sched.bac, out=pv_cum) nper = len(pv) return Project( name=f"sim::{sched.name}", period=np.arange(1, nper + 1), pv=pv_cum, ev=np.cumsum(ev), ac=np.cumsum(ac), bac=sched.bac, planned_finish=int(np.ceil(sched.horizon_days / PERIOD_DAYS)), meta={"source": "grounded_sim", "base": sched.name, "cpi_target": cpi_target, "dur_mean": dur_mean, "n_acts": n}, ) def load_schedules(excel_dir: str = "data/DSLIB/Excel", min_acts: int = 5) -> list[Schedule]: warnings.filterwarnings("ignore") out = [] for f in sorted(glob.glob(os.path.join(excel_dir, "*.xlsx"))): s = parse_schedule(f) if s is not None and len(s.acts) >= min_acts: out.append(s) return out def generate_grounded_corpus(excel_dir: str = "data/DSLIB/Excel", runs_per_project: int = 20, seed: int = 0, min_acts: int = 5) -> list[Project]: """Monte-Carlo a corpus from all real DSLIB schedules.""" rng = np.random.default_rng(seed) out = [] for s in load_schedules(excel_dir, min_acts): for _ in range(runs_per_project): out.append(simulate(s, rng)) return out