slipstream / src /simulate.py
ashaibani's picture
Slipstream: gr.Server + Preact SPA, MiniCPM-1B agent + TimesFM 2.5
16eaf84 verified
"""Schedule-grounded Monte-Carlo EVM simulator.
Generates grounded PV/EV/AC trajectories from the REAL DSLIB baseline schedules:
* PV - time-phase each real leaf activity's real cost across its real baseline dates
(the planned S-curve EMERGES from the actual schedule, not a logistic formula).
* EV/AC - Monte-Carlo execution: sample per-activity duration and cost performance,
cascade delays through the REAL finish-to-start precedence network (CPM forward
pass on sampled durations), earn baseline cost over each activity's actual window
(EV) and apply a cost factor (AC).
Outcome stats (final CPI, slippage) are calibrated to DSLIB; the SHAPE comes from real
structure - so it is far less "predictable" than parametric S-curves. Output: a
`synthetic.Project`, so it flows through the same baselines / forecaster / eval harness.
"""
from __future__ import annotations
import glob
import os
import re
import warnings
from dataclasses import dataclass
import numpy as np
import pandas as pd
from .synthetic import Project
PERIOD_DAYS = 30.44 # monthly bins (DSLIB cadence)
_LAG_UNIT = {"d": 1.0, "w": 7.0, "m": 30.44}
@dataclass
class Activity:
aid: str
cost: float
start: float # baseline start, days from project start
end: float # baseline end, days from project start
preds: list # [(pred_id, lag_days), ...] (finish-to-start)
@dataclass
class Schedule:
name: str
acts: list # leaf Activities (costs sum to bac)
bac: float
horizon_days: float # baseline project duration
def _parse_preds(s) -> list:
"""'14FS;13FS', '16FS-6w', '10FS+2d' -> [(id, lag_days), ...]. FS assumed."""
out = []
if not isinstance(s, str):
return out
for tok in s.split(";"):
m = re.match(r"\s*(\d+)\s*(FS|SS|FF|SF)?\s*([+-]\d+(?:\.\d+)?)?\s*(d|w|m)?", tok, re.I)
if not m:
continue
pid = m.group(1)
lag = float(m.group(3) or 0.0) * _LAG_UNIT[(m.group(4) or "d").lower()]
out.append((pid, lag))
return out
def parse_schedule(path: str) -> Schedule | None:
"""Parse a DSLIB workbook's 'Baseline Schedule' into leaf activities."""
try:
bs = pd.read_excel(path, sheet_name="Baseline Schedule", header=1)
except Exception:
return None
if not {"WBS", "Total Cost", "Baseline Start", "Baseline End"}.issubset(bs.columns):
return None
wbs = [str(w) if pd.notna(w) else "" for w in bs["WBS"]]
is_leaf = np.array([w != "" and not any(o != w and o.startswith(w + ".") for o in wbs) for w in wbs])
cost = pd.to_numeric(bs["Total Cost"], errors="coerce")
start = pd.to_datetime(bs["Baseline Start"], errors="coerce")
end = pd.to_datetime(bs["Baseline End"], errors="coerce")
keep = is_leaf & (cost > 0) & start.notna() & end.notna()
if keep.sum() < 3:
return None
proj_start = start[keep].min()
ids = bs["ID"].astype(str) if "ID" in bs.columns else pd.Series(range(len(bs))).astype(str)
preds_col = bs["Predecessors"] if "Predecessors" in bs.columns else pd.Series([None] * len(bs))
acts = []
for i in np.where(keep.to_numpy())[0]:
s = (start.iloc[i] - proj_start).days
e = (end.iloc[i] - proj_start).days
acts.append(Activity(aid=ids.iloc[i], cost=float(cost.iloc[i]),
start=float(s), end=float(max(e, s + 1)),
preds=_parse_preds(preds_col.iloc[i])))
leaf_ids = {a.aid for a in acts}
for a in acts: # keep only dependencies on other leaves
a.preds = [(p, lag) for (p, lag) in a.preds if p in leaf_ids and p != a.aid]
horizon = max(a.end for a in acts)
return Schedule(name=os.path.splitext(os.path.basename(path))[0],
acts=acts, bac=float(sum(a.cost for a in acts)), horizon_days=horizon)
def _topo_order(acts) -> list:
"""Kahn topological order by FS predecessors; cycles fall back to start-date order."""
idx = {a.aid: i for i, a in enumerate(acts)}
indeg = [0] * len(acts)
children = [[] for _ in acts]
for i, a in enumerate(acts):
for p, _ in a.preds:
indeg[i] += 1
children[idx[p]].append(i)
queue = [i for i in range(len(acts)) if indeg[i] == 0]
order = []
while queue:
i = queue.pop()
order.append(i)
for c in children[i]:
indeg[c] -= 1
if indeg[c] == 0:
queue.append(c)
if len(order) < len(acts): # cycle: append the rest by baseline start
order += sorted(set(range(len(acts))) - set(order), key=lambda i: acts[i].start)
return order
def _phase(s: float, e: float, edges: np.ndarray) -> np.ndarray:
"""Fraction of [s, e] falling in each [edges[i], edges[i+1]) bin (sums to ~1)."""
if e <= s:
e = s + 1e-6
lo = np.maximum(edges[:-1], s)
hi = np.minimum(edges[1:], e)
return np.clip(hi - lo, 0.0, None) / (e - s)
def _lumpify(ev_inc, ac_inc, rng, p_lag=0.4, defer=0.7):
"""Progress-certification lag: defer part of a month's earned value (and its cost) into
the next month, producing low months followed by catch-up spikes. Preserves totals and
monotonicity, and lifts increment CV from the over-smooth linear value to ~real (0.9)."""
ev = ev_inc.astype(float).copy()
ac = ac_inc.astype(float).copy()
ce = ca = 0.0
for t in range(len(ev)):
ev[t] += ce
ac[t] += ca
ce = ca = 0.0
if t < len(ev) - 1 and rng.random() < p_lag:
ce, ca = ev[t] * defer, ac[t] * defer
ev[t] -= ce
ac[t] -= ca
ev[-1] += ce
ac[-1] += ca
return ev, ac
def simulate(sched: Schedule, rng: np.random.Generator) -> Project:
"""One Monte-Carlo execution of a baseline schedule -> grounded PV/EV/AC Project."""
acts = sched.acts
idx = {a.aid: i for i, a in enumerate(acts)}
n = len(acts)
# Sample performance, calibrated to DSLIB (final CPI ~0.77-1.09; schedule stretch).
cpi_target = float(np.clip(rng.normal(0.95, 0.12), 0.6, 1.25))
dur_mean = float(np.clip(rng.normal(1.08, 0.18), 0.7, 2.2))
cost_factor = (1.0 / cpi_target) * np.exp(rng.normal(0.0, 0.10, n)) # AC = cost x factor
base_dur = np.array([max(a.end - a.start, 1.0) for a in acts])
act_dur = base_dur * dur_mean * np.exp(rng.normal(0.0, 0.25, n)) # per-activity stretch
# Occasional disruption: a contiguous burst of activities stalls (durations inflate).
if n >= 8 and rng.random() < 0.4:
j = int(rng.integers(0, n - 4))
act_dur[j:j + int(rng.integers(2, 5))] *= rng.uniform(1.5, 3.0)
# CPM forward pass on sampled durations: delays cascade through FS precedence.
astart = np.full(n, np.nan)
aend = np.full(n, np.nan)
for i in _topo_order(acts):
a = acts[i]
pc = max([aend[idx[p]] + lag for p, lag in a.preds], default=a.start)
astart[i] = max(a.start, pc) # no-pred activities anchor at baseline start
aend[i] = astart[i] + act_dur[i]
end_day = max(float(np.nanmax(aend)), sched.horizon_days)
edges = np.arange(0.0, end_day + PERIOD_DAYS, PERIOD_DAYS)
pv = np.zeros(len(edges) - 1)
ev = np.zeros(len(edges) - 1)
ac = np.zeros(len(edges) - 1)
for i, a in enumerate(acts):
pv += a.cost * _phase(a.start, a.end, edges)
share = a.cost * _phase(astart[i], aend[i], edges)
ev += share
ac += share * cost_factor[i]
ev, ac = _lumpify(ev, ac, rng) # certification-lag lumpiness -> realistic CV
pv_cum = np.cumsum(pv)
np.minimum(pv_cum, sched.bac, out=pv_cum)
nper = len(pv)
return Project(
name=f"sim::{sched.name}",
period=np.arange(1, nper + 1),
pv=pv_cum,
ev=np.cumsum(ev),
ac=np.cumsum(ac),
bac=sched.bac,
planned_finish=int(np.ceil(sched.horizon_days / PERIOD_DAYS)),
meta={"source": "grounded_sim", "base": sched.name,
"cpi_target": cpi_target, "dur_mean": dur_mean, "n_acts": n},
)
def load_schedules(excel_dir: str = "data/DSLIB/Excel", min_acts: int = 5) -> list[Schedule]:
warnings.filterwarnings("ignore")
out = []
for f in sorted(glob.glob(os.path.join(excel_dir, "*.xlsx"))):
s = parse_schedule(f)
if s is not None and len(s.acts) >= min_acts:
out.append(s)
return out
def generate_grounded_corpus(excel_dir: str = "data/DSLIB/Excel", runs_per_project: int = 20,
seed: int = 0, min_acts: int = 5) -> list[Project]:
"""Monte-Carlo a corpus from all real DSLIB schedules."""
rng = np.random.default_rng(seed)
out = []
for s in load_schedules(excel_dir, min_acts):
for _ in range(runs_per_project):
out.append(simulate(s, rng))
return out