ashaibani's picture
Slipstream WebGPU (in-browser agent)
c658ad5 verified
"""Earned Value Management metrics and forecasting formulas.
All functions take cumulative arrays (PV/EV/AC) aligned on a 1-based period index,
matching `src.synthetic.Project` and the tidy schema in `data/README.md`.
References: PMI EVM, NASA EVM handbook, Lipke earned-schedule method.
"""
from __future__ import annotations
import numpy as np
def to_increments(cum) -> np.ndarray:
"""Cumulative -> per-period increments."""
return np.diff(np.asarray(cum, float), prepend=0.0)
def from_increments(inc, start: float = 0.0) -> np.ndarray:
"""Per-period increments -> cumulative."""
return start + np.cumsum(np.asarray(inc, float))
def indices(pv, ev, ac) -> dict:
"""Time series of SV, CV, SPI, CPI."""
pv = np.asarray(pv, float)
ev = np.asarray(ev, float)
ac = np.asarray(ac, float)
with np.errstate(divide="ignore", invalid="ignore"):
spi = np.where(pv > 0, ev / pv, np.nan)
cpi = np.where(ac > 0, ev / ac, np.nan)
return {"sv": ev - pv, "cv": ev - ac, "spi": spi, "cpi": cpi}
def latest(pv, ev, ac, bac) -> dict:
"""Status at the most recent period."""
pv = np.asarray(pv, float)
ev = np.asarray(ev, float)
ac = np.asarray(ac, float)
pv_n, ev_n, ac_n = pv[-1], ev[-1], ac[-1]
return {
"pv": pv_n,
"ev": ev_n,
"ac": ac_n,
"bac": bac,
"spi": ev_n / pv_n if pv_n > 0 else np.nan,
"cpi": ev_n / ac_n if ac_n > 0 else np.nan,
"pct_complete": ev_n / bac if bac > 0 else np.nan,
}
# --------------------------------------------------------------------------- #
# Cost forecasting (Estimate at Completion)
# --------------------------------------------------------------------------- #
EAC_METHODS = ("bac_cpi", "ac_plus_remaining", "cpi_spi")
EAC_LABELS = {
"bac_cpi": "EAC = BAC / CPI",
"ac_plus_remaining": "EAC = AC + (BAC - EV)",
"cpi_spi": "EAC = AC + (BAC - EV) / (CPI x SPI)",
}
def forecast_eac(pv, ev, ac, bac, method: str = "cpi_spi") -> float:
s = latest(pv, ev, ac, bac)
cpi, spi, ev_n, ac_n = s["cpi"], s["spi"], s["ev"], s["ac"]
if method == "bac_cpi":
return bac / cpi if cpi and cpi > 0 else np.nan
if method == "ac_plus_remaining":
return ac_n + (bac - ev_n)
if method == "cpi_spi":
d = cpi * spi
return ac_n + (bac - ev_n) / d if d and d > 0 else np.nan
raise ValueError(f"unknown EAC method: {method!r}")
def all_eacs(pv, ev, ac, bac) -> dict:
return {m: forecast_eac(pv, ev, ac, bac, m) for m in EAC_METHODS}
def etc(pv, ev, ac, bac, method: str = "cpi_spi") -> float:
return forecast_eac(pv, ev, ac, bac, method) - np.asarray(ac, float)[-1]
def vac(pv, ev, ac, bac, method: str = "cpi_spi") -> float:
return bac - forecast_eac(pv, ev, ac, bac, method)
def tcpi(ev, ac, bac, target: float | None = None) -> float:
"""To-Complete Performance Index (efficiency required to hit `target`/BAC)."""
ev_n = np.asarray(ev, float)[-1]
ac_n = np.asarray(ac, float)[-1]
target = bac if target is None else target
denom = target - ac_n
return (bac - ev_n) / denom if denom != 0 else np.nan
# --------------------------------------------------------------------------- #
# Schedule forecasting (earned-schedule method)
# --------------------------------------------------------------------------- #
def earned_schedule(pv, ev_now: float) -> float:
"""Earned schedule (in period units): the time at which planned PV first
equals the current EV, by linear interpolation between period boundaries."""
pv = np.asarray(pv, float)
if pv[0] > 0 and ev_now <= pv[0]:
return ev_now / pv[0]
if ev_now >= pv[-1]:
return float(len(pv))
i = int(np.searchsorted(pv, ev_now)) # pv[i-1] < ev_now <= pv[i]
lo, hi = pv[i - 1], pv[i]
frac = (ev_now - lo) / (hi - lo) if hi > lo else 0.0
return i + frac # pv[i-1] sits at time i
def forecast_finish(pv, ev, planned_duration: int) -> float:
"""Independent estimate of finish period via earned schedule: PD / SPI(t)."""
ev = np.asarray(ev, float)
at = len(ev) # actual time elapsed
es = earned_schedule(pv, ev[-1])
spi_t = es / at if at > 0 else np.nan
return planned_duration / spi_t if spi_t and spi_t > 0 else np.nan