File size: 4,324 Bytes
c658ad5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
"""Forecasting baselines.

All forecasters share one contract:

    forecast(obs_inc, horizon, *, bac=None, planned_periods=None) -> dict
        obs_inc : 1D array of observed PER-PERIOD increments (not cumulative)
        returns : {"q10": arr, "q50": arr, "q90": arr}  incremental quantile forecasts

The caller re-integrates the increments onto the last observed cumulative value to
obtain cumulative bands (see app.py / evaluate.py). We forecast increments, not the
cumulative S-curve, because a monotone S-curve is trivially curve-fit and gives a
foundation model no edge - PLAN.md section 5.
"""
from __future__ import annotations

import numpy as np

_Z10 = 1.2816  # standard normal quantile for the 10th/90th percentile


def _bands(median, sigma) -> dict:
    """Symmetric P10/P90 around a point forecast, widening with horizon."""
    median = np.asarray(median, float)
    h = len(median)
    s = np.maximum(float(sigma), 1e-9) * np.sqrt(1.0 + np.arange(h))
    return {
        "q10": np.clip(median - _Z10 * s, 0.0, None),
        "q50": np.clip(median, 0.0, None),
        "q90": median + _Z10 * s,
    }


def last_value(obs_inc, horizon, **kw) -> dict:
    obs_inc = np.asarray(obs_inc, float)
    last = obs_inc[-1] if len(obs_inc) else 0.0
    tail = obs_inc[-min(len(obs_inc), 6):]
    sigma = np.std(tail) if len(tail) > 1 else abs(last) * 0.2 + 1e-9
    return _bands(np.full(horizon, max(last, 0.0)), sigma)


def linear(obs_inc, horizon, **kw) -> dict:
    obs_inc = np.asarray(obs_inc, float)
    n = len(obs_inc)
    if n < 2:
        return last_value(obs_inc, horizon)
    x = np.arange(n)
    a, b = np.polyfit(x, obs_inc, 1)
    fx = np.arange(n, n + horizon)
    median = np.clip(a * fx + b, 0.0, None)
    sigma = np.std(obs_inc - (a * x + b))
    return _bands(median, sigma)


def logistic(obs_inc, horizon, *, bac=None, planned_periods=None, **kw) -> dict:
    """Fit a logistic curve to the cumulative observed series and extrapolate.

    This is the strongest classical competitor on project S-curves - if TimesFM
    cannot beat this on incremental error, reposition (PLAN.md, Risks).
    """
    from scipy.optimize import curve_fit

    obs_inc = np.asarray(obs_inc, float)
    cum = np.cumsum(obs_inc)
    n = len(cum)
    if n < 4:
        return linear(obs_inc, horizon)
    x = np.arange(1, n + 1, dtype=float)

    def f(t, L, k, t0):
        return L / (1.0 + np.exp(-k * (t - t0)))

    try:
        L_hi = max(cum[-1] * 5.0 + 1.0, (bac or cum[-1]) * 2.0)
        popt, _ = curve_fit(
            f, x, cum,
            p0=[max(cum[-1] * 1.2, (bac or cum[-1])), 0.3, n * 0.5],
            bounds=([cum[-1] * 0.9, 1e-3, 0.0], [L_hi, 5.0, n * 5.0]),
            maxfev=10000,
        )
        fx = np.arange(n + 1, n + horizon + 1, dtype=float)
        cum_fore = f(fx, *popt)
        resid = np.std(cum - f(x, *popt))
    except Exception:
        return linear(obs_inc, horizon)

    inc = np.clip(np.diff(np.concatenate([[cum[-1]], cum_fore])), 0.0, None)
    return _bands(inc, resid / max(np.sqrt(horizon), 1.0))


# --------------------------------------------------------------------------- #
# Foundation-model baseline (zero-shot). Skeleton - verify the import/API.
# --------------------------------------------------------------------------- #
_chronos = None


def _load_chronos(model: str):
    global _chronos
    if _chronos is None:
        # TODO verify: `from chronos import BaseChronosPipeline` (chronos-forecasting)
        from chronos import BaseChronosPipeline

        _chronos = BaseChronosPipeline.from_pretrained(model)
    return _chronos


def chronos(obs_inc, horizon, *, model="amazon/chronos-bolt-small", **kw) -> dict:
    """Zero-shot Chronos-Bolt baseline. Returns P10/P50/P90 increments."""
    import torch

    pipe = _load_chronos(model)
    ctx = torch.tensor(np.asarray(obs_inc, float), dtype=torch.float32)
    # TODO verify signature against chronos-forecasting docs.
    q, _mean = pipe.predict_quantiles(
        context=ctx, prediction_length=horizon, quantile_levels=[0.1, 0.5, 0.9]
    )
    q = q[0].cpu().numpy()  # (horizon, 3)
    return {"q10": np.clip(q[:, 0], 0, None), "q50": q[:, 1], "q90": q[:, 2]}


REGISTRY = {
    "last_value": last_value,
    "linear": linear,
    "logistic": logistic,
    "chronos": chronos,
}