File size: 8,916 Bytes
c658ad5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
"""Schedule-grounded Monte-Carlo EVM simulator.

Generates grounded PV/EV/AC trajectories from the REAL DSLIB baseline schedules:

  * PV  - time-phase each real leaf activity's real cost across its real baseline dates
          (the planned S-curve EMERGES from the actual schedule, not a logistic formula).
  * EV/AC - Monte-Carlo execution: sample per-activity duration and cost performance,
          cascade delays through the REAL finish-to-start precedence network (CPM forward
          pass on sampled durations), earn baseline cost over each activity's actual window
          (EV) and apply a cost factor (AC).

Outcome stats (final CPI, slippage) are calibrated to DSLIB; the SHAPE comes from real
structure - so it is far less "predictable" than parametric S-curves. Output: a
`synthetic.Project`, so it flows through the same baselines / forecaster / eval harness.
"""
from __future__ import annotations

import glob
import os
import re
import warnings
from dataclasses import dataclass

import numpy as np
import pandas as pd

from .synthetic import Project

PERIOD_DAYS = 30.44  # monthly bins (DSLIB cadence)
_LAG_UNIT = {"d": 1.0, "w": 7.0, "m": 30.44}


@dataclass
class Activity:
    aid: str
    cost: float
    start: float            # baseline start, days from project start
    end: float              # baseline end, days from project start
    preds: list             # [(pred_id, lag_days), ...] (finish-to-start)


@dataclass
class Schedule:
    name: str
    acts: list              # leaf Activities (costs sum to bac)
    bac: float
    horizon_days: float     # baseline project duration


def _parse_preds(s) -> list:
    """'14FS;13FS', '16FS-6w', '10FS+2d' -> [(id, lag_days), ...]. FS assumed."""
    out = []
    if not isinstance(s, str):
        return out
    for tok in s.split(";"):
        m = re.match(r"\s*(\d+)\s*(FS|SS|FF|SF)?\s*([+-]\d+(?:\.\d+)?)?\s*(d|w|m)?", tok, re.I)
        if not m:
            continue
        pid = m.group(1)
        lag = float(m.group(3) or 0.0) * _LAG_UNIT[(m.group(4) or "d").lower()]
        out.append((pid, lag))
    return out


def parse_schedule(path: str) -> Schedule | None:
    """Parse a DSLIB workbook's 'Baseline Schedule' into leaf activities."""
    try:
        bs = pd.read_excel(path, sheet_name="Baseline Schedule", header=1)
    except Exception:
        return None
    if not {"WBS", "Total Cost", "Baseline Start", "Baseline End"}.issubset(bs.columns):
        return None

    wbs = [str(w) if pd.notna(w) else "" for w in bs["WBS"]]
    is_leaf = np.array([w != "" and not any(o != w and o.startswith(w + ".") for o in wbs) for w in wbs])
    cost = pd.to_numeric(bs["Total Cost"], errors="coerce")
    start = pd.to_datetime(bs["Baseline Start"], errors="coerce")
    end = pd.to_datetime(bs["Baseline End"], errors="coerce")
    keep = is_leaf & (cost > 0) & start.notna() & end.notna()
    if keep.sum() < 3:
        return None

    proj_start = start[keep].min()
    ids = bs["ID"].astype(str) if "ID" in bs.columns else pd.Series(range(len(bs))).astype(str)
    preds_col = bs["Predecessors"] if "Predecessors" in bs.columns else pd.Series([None] * len(bs))

    acts = []
    for i in np.where(keep.to_numpy())[0]:
        s = (start.iloc[i] - proj_start).days
        e = (end.iloc[i] - proj_start).days
        acts.append(Activity(aid=ids.iloc[i], cost=float(cost.iloc[i]),
                             start=float(s), end=float(max(e, s + 1)),
                             preds=_parse_preds(preds_col.iloc[i])))
    leaf_ids = {a.aid for a in acts}
    for a in acts:  # keep only dependencies on other leaves
        a.preds = [(p, lag) for (p, lag) in a.preds if p in leaf_ids and p != a.aid]

    horizon = max(a.end for a in acts)
    return Schedule(name=os.path.splitext(os.path.basename(path))[0],
                    acts=acts, bac=float(sum(a.cost for a in acts)), horizon_days=horizon)


def _topo_order(acts) -> list:
    """Kahn topological order by FS predecessors; cycles fall back to start-date order."""
    idx = {a.aid: i for i, a in enumerate(acts)}
    indeg = [0] * len(acts)
    children = [[] for _ in acts]
    for i, a in enumerate(acts):
        for p, _ in a.preds:
            indeg[i] += 1
            children[idx[p]].append(i)
    queue = [i for i in range(len(acts)) if indeg[i] == 0]
    order = []
    while queue:
        i = queue.pop()
        order.append(i)
        for c in children[i]:
            indeg[c] -= 1
            if indeg[c] == 0:
                queue.append(c)
    if len(order) < len(acts):  # cycle: append the rest by baseline start
        order += sorted(set(range(len(acts))) - set(order), key=lambda i: acts[i].start)
    return order


def _phase(s: float, e: float, edges: np.ndarray) -> np.ndarray:
    """Fraction of [s, e] falling in each [edges[i], edges[i+1]) bin (sums to ~1)."""
    if e <= s:
        e = s + 1e-6
    lo = np.maximum(edges[:-1], s)
    hi = np.minimum(edges[1:], e)
    return np.clip(hi - lo, 0.0, None) / (e - s)


def _lumpify(ev_inc, ac_inc, rng, p_lag=0.4, defer=0.7):
    """Progress-certification lag: defer part of a month's earned value (and its cost) into
    the next month, producing low months followed by catch-up spikes. Preserves totals and
    monotonicity, and lifts increment CV from the over-smooth linear value to ~real (0.9)."""
    ev = ev_inc.astype(float).copy()
    ac = ac_inc.astype(float).copy()
    ce = ca = 0.0
    for t in range(len(ev)):
        ev[t] += ce
        ac[t] += ca
        ce = ca = 0.0
        if t < len(ev) - 1 and rng.random() < p_lag:
            ce, ca = ev[t] * defer, ac[t] * defer
            ev[t] -= ce
            ac[t] -= ca
    ev[-1] += ce
    ac[-1] += ca
    return ev, ac


def simulate(sched: Schedule, rng: np.random.Generator) -> Project:
    """One Monte-Carlo execution of a baseline schedule -> grounded PV/EV/AC Project."""
    acts = sched.acts
    idx = {a.aid: i for i, a in enumerate(acts)}
    n = len(acts)

    # Sample performance, calibrated to DSLIB (final CPI ~0.77-1.09; schedule stretch).
    cpi_target = float(np.clip(rng.normal(0.95, 0.12), 0.6, 1.25))
    dur_mean = float(np.clip(rng.normal(1.08, 0.18), 0.7, 2.2))
    cost_factor = (1.0 / cpi_target) * np.exp(rng.normal(0.0, 0.10, n))     # AC = cost x factor
    base_dur = np.array([max(a.end - a.start, 1.0) for a in acts])
    act_dur = base_dur * dur_mean * np.exp(rng.normal(0.0, 0.25, n))        # per-activity stretch

    # Occasional disruption: a contiguous burst of activities stalls (durations inflate).
    if n >= 8 and rng.random() < 0.4:
        j = int(rng.integers(0, n - 4))
        act_dur[j:j + int(rng.integers(2, 5))] *= rng.uniform(1.5, 3.0)

    # CPM forward pass on sampled durations: delays cascade through FS precedence.
    astart = np.full(n, np.nan)
    aend = np.full(n, np.nan)
    for i in _topo_order(acts):
        a = acts[i]
        pc = max([aend[idx[p]] + lag for p, lag in a.preds], default=a.start)
        astart[i] = max(a.start, pc)            # no-pred activities anchor at baseline start
        aend[i] = astart[i] + act_dur[i]

    end_day = max(float(np.nanmax(aend)), sched.horizon_days)
    edges = np.arange(0.0, end_day + PERIOD_DAYS, PERIOD_DAYS)
    pv = np.zeros(len(edges) - 1)
    ev = np.zeros(len(edges) - 1)
    ac = np.zeros(len(edges) - 1)
    for i, a in enumerate(acts):
        pv += a.cost * _phase(a.start, a.end, edges)
        share = a.cost * _phase(astart[i], aend[i], edges)
        ev += share
        ac += share * cost_factor[i]

    ev, ac = _lumpify(ev, ac, rng)          # certification-lag lumpiness -> realistic CV
    pv_cum = np.cumsum(pv)
    np.minimum(pv_cum, sched.bac, out=pv_cum)
    nper = len(pv)
    return Project(
        name=f"sim::{sched.name}",
        period=np.arange(1, nper + 1),
        pv=pv_cum,
        ev=np.cumsum(ev),
        ac=np.cumsum(ac),
        bac=sched.bac,
        planned_finish=int(np.ceil(sched.horizon_days / PERIOD_DAYS)),
        meta={"source": "grounded_sim", "base": sched.name,
              "cpi_target": cpi_target, "dur_mean": dur_mean, "n_acts": n},
    )


def load_schedules(excel_dir: str = "data/DSLIB/Excel", min_acts: int = 5) -> list[Schedule]:
    warnings.filterwarnings("ignore")
    out = []
    for f in sorted(glob.glob(os.path.join(excel_dir, "*.xlsx"))):
        s = parse_schedule(f)
        if s is not None and len(s.acts) >= min_acts:
            out.append(s)
    return out


def generate_grounded_corpus(excel_dir: str = "data/DSLIB/Excel", runs_per_project: int = 20,
                             seed: int = 0, min_acts: int = 5) -> list[Project]:
    """Monte-Carlo a corpus from all real DSLIB schedules."""
    rng = np.random.default_rng(seed)
    out = []
    for s in load_schedules(excel_dir, min_acts):
        for _ in range(runs_per_project):
            out.append(simulate(s, rng))
    return out