"""Earned Value Management metrics and forecasting formulas. All functions take cumulative arrays (PV/EV/AC) aligned on a 1-based period index, matching `src.synthetic.Project` and the tidy schema in `data/README.md`. References: PMI EVM, NASA EVM handbook, Lipke earned-schedule method. """ from __future__ import annotations import numpy as np def to_increments(cum) -> np.ndarray: """Cumulative -> per-period increments.""" return np.diff(np.asarray(cum, float), prepend=0.0) def from_increments(inc, start: float = 0.0) -> np.ndarray: """Per-period increments -> cumulative.""" return start + np.cumsum(np.asarray(inc, float)) def indices(pv, ev, ac) -> dict: """Time series of SV, CV, SPI, CPI.""" pv = np.asarray(pv, float) ev = np.asarray(ev, float) ac = np.asarray(ac, float) with np.errstate(divide="ignore", invalid="ignore"): spi = np.where(pv > 0, ev / pv, np.nan) cpi = np.where(ac > 0, ev / ac, np.nan) return {"sv": ev - pv, "cv": ev - ac, "spi": spi, "cpi": cpi} def latest(pv, ev, ac, bac) -> dict: """Status at the most recent period.""" pv = np.asarray(pv, float) ev = np.asarray(ev, float) ac = np.asarray(ac, float) pv_n, ev_n, ac_n = pv[-1], ev[-1], ac[-1] return { "pv": pv_n, "ev": ev_n, "ac": ac_n, "bac": bac, "spi": ev_n / pv_n if pv_n > 0 else np.nan, "cpi": ev_n / ac_n if ac_n > 0 else np.nan, "pct_complete": ev_n / bac if bac > 0 else np.nan, } # --------------------------------------------------------------------------- # # Cost forecasting (Estimate at Completion) # --------------------------------------------------------------------------- # EAC_METHODS = ("bac_cpi", "ac_plus_remaining", "cpi_spi") EAC_LABELS = { "bac_cpi": "EAC = BAC / CPI", "ac_plus_remaining": "EAC = AC + (BAC - EV)", "cpi_spi": "EAC = AC + (BAC - EV) / (CPI x SPI)", } def forecast_eac(pv, ev, ac, bac, method: str = "cpi_spi") -> float: s = latest(pv, ev, ac, bac) cpi, spi, ev_n, ac_n = s["cpi"], s["spi"], s["ev"], s["ac"] if method == "bac_cpi": return bac / cpi if cpi and cpi > 0 else np.nan if method == "ac_plus_remaining": return ac_n + (bac - ev_n) if method == "cpi_spi": d = cpi * spi return ac_n + (bac - ev_n) / d if d and d > 0 else np.nan raise ValueError(f"unknown EAC method: {method!r}") def all_eacs(pv, ev, ac, bac) -> dict: return {m: forecast_eac(pv, ev, ac, bac, m) for m in EAC_METHODS} def etc(pv, ev, ac, bac, method: str = "cpi_spi") -> float: return forecast_eac(pv, ev, ac, bac, method) - np.asarray(ac, float)[-1] def vac(pv, ev, ac, bac, method: str = "cpi_spi") -> float: return bac - forecast_eac(pv, ev, ac, bac, method) def tcpi(ev, ac, bac, target: float | None = None) -> float: """To-Complete Performance Index (efficiency required to hit `target`/BAC).""" ev_n = np.asarray(ev, float)[-1] ac_n = np.asarray(ac, float)[-1] target = bac if target is None else target denom = target - ac_n return (bac - ev_n) / denom if denom != 0 else np.nan # --------------------------------------------------------------------------- # # Schedule forecasting (earned-schedule method) # --------------------------------------------------------------------------- # def earned_schedule(pv, ev_now: float) -> float: """Earned schedule (in period units): the time at which planned PV first equals the current EV, by linear interpolation between period boundaries.""" pv = np.asarray(pv, float) if pv[0] > 0 and ev_now <= pv[0]: return ev_now / pv[0] if ev_now >= pv[-1]: return float(len(pv)) i = int(np.searchsorted(pv, ev_now)) # pv[i-1] < ev_now <= pv[i] lo, hi = pv[i - 1], pv[i] frac = (ev_now - lo) / (hi - lo) if hi > lo else 0.0 return i + frac # pv[i-1] sits at time i def forecast_finish(pv, ev, planned_duration: int) -> float: """Independent estimate of finish period via earned schedule: PD / SPI(t).""" ev = np.asarray(ev, float) at = len(ev) # actual time elapsed es = earned_schedule(pv, ev[-1]) spi_t = es / at if at > 0 else np.nan return planned_duration / spi_t if spi_t and spi_t > 0 else np.nan