File size: 5,391 Bytes
c658ad5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
"""Unified loader for public-government project data.

Wraps the per-source parsers (in `scripts/`) and the downloaded samples (in
`data/raw/`) into one lightweight `Series` type so gov data can feed the same
forecasting / fine-tuning pipeline as DSLIB and synthetic.

Two families, kept deliberately separate (different forecasting semantics):

  PRIMARY breadth (monotone cumulative cost, like DSLIB EV - mix into training):
    * IATI / World Bank   - monthly cumulative disbursement curves (long: median ~82 pts)

  SECONDARY (annual cost-ESTIMATE drift, signed revisions - a different task / eval axis):
    * UK GMPP             - whole-life-cost estimate per annual snapshot
    * US DoD SAR          - current programme cost estimate per annual report
    * US IT Dashboard     - mostly a single cross-sectional snapshot; NOT a series (skip)

See data/README.md for provenance, licences and download recipes. All sources are
public (OGL / US public domain / CC-BY-4.0 + IATI open licensing).
"""
from __future__ import annotations

import json
import os
import sys
from dataclasses import dataclass, field

import numpy as np

_SCRIPTS = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "scripts")


@dataclass
class Series:
    series_id: str
    source: str                       # 'iati' | 'gmpp' | 'sar'
    kind: str                         # 'cum_cost' (monotone) | 'cost_estimate' (level/drift)
    values: np.ndarray                # cumulative cost, or estimate level, per observation
    times: list                       # dates / years aligned with values
    meta: dict = field(default_factory=dict)

    @property
    def increments(self) -> np.ndarray:
        return np.diff(self.values, prepend=0.0) if self.kind == "cum_cost" else np.diff(self.values)


# --------------------------------------------------------------------------- #
# PRIMARY: IATI / World Bank monthly cumulative disbursement curves
# --------------------------------------------------------------------------- #
def load_iati(path: str = "data/raw/worldbank/iati_projects.jsonl", min_points: int = 8) -> list[Series]:
    """Real monthly cumulative-spend curves. The best gov source for TimesFM breadth
    (long, monthly, monotone) - shape-compatible with DSLIB EV increments."""
    out = []
    if not os.path.exists(path):
        return out
    for line in open(path):
        r = json.loads(line)
        obs = [o for o in r.get("observations", []) if o.get("cumulative_disbursement") is not None]
        if len(obs) < min_points:
            continue
        obs.sort(key=lambda o: o["date"])
        cum = np.asarray([o["cumulative_disbursement"] for o in obs], float)
        if cum[-1] <= 0:
            continue
        out.append(Series(
            series_id=r["project_id"], source="iati", kind="cum_cost",
            values=cum, times=[o["date"] for o in obs],
            meta={"name": r.get("name"), "sector": r.get("sector"),
                  "commitment_usd": r.get("commitment_usd"),
                  "schedule_slip_days": r.get("schedule_slip_days_disbursement")},
        ))
    return out


# --------------------------------------------------------------------------- #
# SECONDARY: UK GMPP annual whole-life-cost estimate drift
# --------------------------------------------------------------------------- #
def load_gmpp(folder: str = "data/raw/gmpp", min_points: int = 3) -> list[Series]:
    """Annual whole-life-cost estimate per project across GMPP snapshots (overrun-drift
    signal). Short, annual, signed - a secondary eval axis, not mixed with cum_cost."""
    if _SCRIPTS not in sys.path:
        sys.path.insert(0, _SCRIPTS)
    import gmpp_parser  # type: ignore

    out = []
    for r in gmpp_parser.build_longitudinal(folder):
        obs = [o for o in r["observations"] if o.get("cost_estimate") is not None]
        if len({o["year"] for o in obs}) < min_points:
            continue
        obs.sort(key=lambda o: o["year"])
        out.append(Series(
            series_id=r["project_id"], source="gmpp", kind="cost_estimate",
            values=np.asarray([o["cost_estimate"] for o in obs], float),
            times=[o["year"] for o in obs],
            meta={"name": r.get("name"), "dept": r.get("dept")},
        ))
    return out


# --------------------------------------------------------------------------- #
# Helpers for the fine-tuning windower
# --------------------------------------------------------------------------- #
def increment_arrays(series: list[Series]) -> list[np.ndarray]:
    """Per-series increment sequences (for the same windower used on DSLIB / synthetic)."""
    return [s.increments.astype("float32") for s in series]


def inventory() -> dict:
    """Quick counts of usable series per source (for the data README / report)."""
    inv = {}
    for name, fn in (("iati", load_iati), ("gmpp", load_gmpp)):
        try:
            s = fn()
            lens = [len(x.values) for x in s]
            inv[name] = {"series": len(s),
                         "len_p50": int(np.median(lens)) if lens else 0,
                         "len_max": int(max(lens)) if lens else 0}
        except Exception as exc:  # missing sample / parser dep
            inv[name] = {"error": f"{type(exc).__name__}: {exc}"}
    return inv


if __name__ == "__main__":
    import pprint
    pprint.pp(inventory())