Spaces:
Running
Running
| """Earned Value Management metrics and forecasting formulas. | |
| All functions take cumulative arrays (PV/EV/AC) aligned on a 1-based period index, | |
| matching `src.synthetic.Project` and the tidy schema in `data/README.md`. | |
| References: PMI EVM, NASA EVM handbook, Lipke earned-schedule method. | |
| """ | |
| from __future__ import annotations | |
| import numpy as np | |
| def to_increments(cum) -> np.ndarray: | |
| """Cumulative -> per-period increments.""" | |
| return np.diff(np.asarray(cum, float), prepend=0.0) | |
| def from_increments(inc, start: float = 0.0) -> np.ndarray: | |
| """Per-period increments -> cumulative.""" | |
| return start + np.cumsum(np.asarray(inc, float)) | |
| def indices(pv, ev, ac) -> dict: | |
| """Time series of SV, CV, SPI, CPI.""" | |
| pv = np.asarray(pv, float) | |
| ev = np.asarray(ev, float) | |
| ac = np.asarray(ac, float) | |
| with np.errstate(divide="ignore", invalid="ignore"): | |
| spi = np.where(pv > 0, ev / pv, np.nan) | |
| cpi = np.where(ac > 0, ev / ac, np.nan) | |
| return {"sv": ev - pv, "cv": ev - ac, "spi": spi, "cpi": cpi} | |
| def latest(pv, ev, ac, bac) -> dict: | |
| """Status at the most recent period.""" | |
| pv = np.asarray(pv, float) | |
| ev = np.asarray(ev, float) | |
| ac = np.asarray(ac, float) | |
| pv_n, ev_n, ac_n = pv[-1], ev[-1], ac[-1] | |
| return { | |
| "pv": pv_n, | |
| "ev": ev_n, | |
| "ac": ac_n, | |
| "bac": bac, | |
| "spi": ev_n / pv_n if pv_n > 0 else np.nan, | |
| "cpi": ev_n / ac_n if ac_n > 0 else np.nan, | |
| "pct_complete": ev_n / bac if bac > 0 else np.nan, | |
| } | |
| # --------------------------------------------------------------------------- # | |
| # Cost forecasting (Estimate at Completion) | |
| # --------------------------------------------------------------------------- # | |
| EAC_METHODS = ("bac_cpi", "ac_plus_remaining", "cpi_spi") | |
| EAC_LABELS = { | |
| "bac_cpi": "EAC = BAC / CPI", | |
| "ac_plus_remaining": "EAC = AC + (BAC - EV)", | |
| "cpi_spi": "EAC = AC + (BAC - EV) / (CPI x SPI)", | |
| } | |
| def forecast_eac(pv, ev, ac, bac, method: str = "cpi_spi") -> float: | |
| s = latest(pv, ev, ac, bac) | |
| cpi, spi, ev_n, ac_n = s["cpi"], s["spi"], s["ev"], s["ac"] | |
| if method == "bac_cpi": | |
| return bac / cpi if cpi and cpi > 0 else np.nan | |
| if method == "ac_plus_remaining": | |
| return ac_n + (bac - ev_n) | |
| if method == "cpi_spi": | |
| d = cpi * spi | |
| return ac_n + (bac - ev_n) / d if d and d > 0 else np.nan | |
| raise ValueError(f"unknown EAC method: {method!r}") | |
| def all_eacs(pv, ev, ac, bac) -> dict: | |
| return {m: forecast_eac(pv, ev, ac, bac, m) for m in EAC_METHODS} | |
| def etc(pv, ev, ac, bac, method: str = "cpi_spi") -> float: | |
| return forecast_eac(pv, ev, ac, bac, method) - np.asarray(ac, float)[-1] | |
| def vac(pv, ev, ac, bac, method: str = "cpi_spi") -> float: | |
| return bac - forecast_eac(pv, ev, ac, bac, method) | |
| def tcpi(ev, ac, bac, target: float | None = None) -> float: | |
| """To-Complete Performance Index (efficiency required to hit `target`/BAC).""" | |
| ev_n = np.asarray(ev, float)[-1] | |
| ac_n = np.asarray(ac, float)[-1] | |
| target = bac if target is None else target | |
| denom = target - ac_n | |
| return (bac - ev_n) / denom if denom != 0 else np.nan | |
| # --------------------------------------------------------------------------- # | |
| # Schedule forecasting (earned-schedule method) | |
| # --------------------------------------------------------------------------- # | |
| def earned_schedule(pv, ev_now: float) -> float: | |
| """Earned schedule (in period units): the time at which planned PV first | |
| equals the current EV, by linear interpolation between period boundaries.""" | |
| pv = np.asarray(pv, float) | |
| if pv[0] > 0 and ev_now <= pv[0]: | |
| return ev_now / pv[0] | |
| if ev_now >= pv[-1]: | |
| return float(len(pv)) | |
| i = int(np.searchsorted(pv, ev_now)) # pv[i-1] < ev_now <= pv[i] | |
| lo, hi = pv[i - 1], pv[i] | |
| frac = (ev_now - lo) / (hi - lo) if hi > lo else 0.0 | |
| return i + frac # pv[i-1] sits at time i | |
| def forecast_finish(pv, ev, planned_duration: int) -> float: | |
| """Independent estimate of finish period via earned schedule: PD / SPI(t).""" | |
| ev = np.asarray(ev, float) | |
| at = len(ev) # actual time elapsed | |
| es = earned_schedule(pv, ev[-1]) | |
| spi_t = es / at if at > 0 else np.nan | |
| return planned_duration / spi_t if spi_t and spi_t > 0 else np.nan | |