File size: 4,378 Bytes
c658ad5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
"""Earned Value Management metrics and forecasting formulas.

All functions take cumulative arrays (PV/EV/AC) aligned on a 1-based period index,
matching `src.synthetic.Project` and the tidy schema in `data/README.md`.

References: PMI EVM, NASA EVM handbook, Lipke earned-schedule method.
"""
from __future__ import annotations

import numpy as np


def to_increments(cum) -> np.ndarray:
    """Cumulative -> per-period increments."""
    return np.diff(np.asarray(cum, float), prepend=0.0)


def from_increments(inc, start: float = 0.0) -> np.ndarray:
    """Per-period increments -> cumulative."""
    return start + np.cumsum(np.asarray(inc, float))


def indices(pv, ev, ac) -> dict:
    """Time series of SV, CV, SPI, CPI."""
    pv = np.asarray(pv, float)
    ev = np.asarray(ev, float)
    ac = np.asarray(ac, float)
    with np.errstate(divide="ignore", invalid="ignore"):
        spi = np.where(pv > 0, ev / pv, np.nan)
        cpi = np.where(ac > 0, ev / ac, np.nan)
    return {"sv": ev - pv, "cv": ev - ac, "spi": spi, "cpi": cpi}


def latest(pv, ev, ac, bac) -> dict:
    """Status at the most recent period."""
    pv = np.asarray(pv, float)
    ev = np.asarray(ev, float)
    ac = np.asarray(ac, float)
    pv_n, ev_n, ac_n = pv[-1], ev[-1], ac[-1]
    return {
        "pv": pv_n,
        "ev": ev_n,
        "ac": ac_n,
        "bac": bac,
        "spi": ev_n / pv_n if pv_n > 0 else np.nan,
        "cpi": ev_n / ac_n if ac_n > 0 else np.nan,
        "pct_complete": ev_n / bac if bac > 0 else np.nan,
    }


# --------------------------------------------------------------------------- #
# Cost forecasting (Estimate at Completion)
# --------------------------------------------------------------------------- #
EAC_METHODS = ("bac_cpi", "ac_plus_remaining", "cpi_spi")
EAC_LABELS = {
    "bac_cpi": "EAC = BAC / CPI",
    "ac_plus_remaining": "EAC = AC + (BAC - EV)",
    "cpi_spi": "EAC = AC + (BAC - EV) / (CPI x SPI)",
}


def forecast_eac(pv, ev, ac, bac, method: str = "cpi_spi") -> float:
    s = latest(pv, ev, ac, bac)
    cpi, spi, ev_n, ac_n = s["cpi"], s["spi"], s["ev"], s["ac"]
    if method == "bac_cpi":
        return bac / cpi if cpi and cpi > 0 else np.nan
    if method == "ac_plus_remaining":
        return ac_n + (bac - ev_n)
    if method == "cpi_spi":
        d = cpi * spi
        return ac_n + (bac - ev_n) / d if d and d > 0 else np.nan
    raise ValueError(f"unknown EAC method: {method!r}")


def all_eacs(pv, ev, ac, bac) -> dict:
    return {m: forecast_eac(pv, ev, ac, bac, m) for m in EAC_METHODS}


def etc(pv, ev, ac, bac, method: str = "cpi_spi") -> float:
    return forecast_eac(pv, ev, ac, bac, method) - np.asarray(ac, float)[-1]


def vac(pv, ev, ac, bac, method: str = "cpi_spi") -> float:
    return bac - forecast_eac(pv, ev, ac, bac, method)


def tcpi(ev, ac, bac, target: float | None = None) -> float:
    """To-Complete Performance Index (efficiency required to hit `target`/BAC)."""
    ev_n = np.asarray(ev, float)[-1]
    ac_n = np.asarray(ac, float)[-1]
    target = bac if target is None else target
    denom = target - ac_n
    return (bac - ev_n) / denom if denom != 0 else np.nan


# --------------------------------------------------------------------------- #
# Schedule forecasting (earned-schedule method)
# --------------------------------------------------------------------------- #
def earned_schedule(pv, ev_now: float) -> float:
    """Earned schedule (in period units): the time at which planned PV first
    equals the current EV, by linear interpolation between period boundaries."""
    pv = np.asarray(pv, float)
    if pv[0] > 0 and ev_now <= pv[0]:
        return ev_now / pv[0]
    if ev_now >= pv[-1]:
        return float(len(pv))
    i = int(np.searchsorted(pv, ev_now))            # pv[i-1] < ev_now <= pv[i]
    lo, hi = pv[i - 1], pv[i]
    frac = (ev_now - lo) / (hi - lo) if hi > lo else 0.0
    return i + frac                                  # pv[i-1] sits at time i


def forecast_finish(pv, ev, planned_duration: int) -> float:
    """Independent estimate of finish period via earned schedule: PD / SPI(t)."""
    ev = np.asarray(ev, float)
    at = len(ev)                                     # actual time elapsed
    es = earned_schedule(pv, ev[-1])
    spi_t = es / at if at > 0 else np.nan
    return planned_duration / spi_t if spi_t and spi_t > 0 else np.nan