Spaces:
Running
Running
File size: 5,391 Bytes
c658ad5 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 | """Unified loader for public-government project data.
Wraps the per-source parsers (in `scripts/`) and the downloaded samples (in
`data/raw/`) into one lightweight `Series` type so gov data can feed the same
forecasting / fine-tuning pipeline as DSLIB and synthetic.
Two families, kept deliberately separate (different forecasting semantics):
PRIMARY breadth (monotone cumulative cost, like DSLIB EV - mix into training):
* IATI / World Bank - monthly cumulative disbursement curves (long: median ~82 pts)
SECONDARY (annual cost-ESTIMATE drift, signed revisions - a different task / eval axis):
* UK GMPP - whole-life-cost estimate per annual snapshot
* US DoD SAR - current programme cost estimate per annual report
* US IT Dashboard - mostly a single cross-sectional snapshot; NOT a series (skip)
See data/README.md for provenance, licences and download recipes. All sources are
public (OGL / US public domain / CC-BY-4.0 + IATI open licensing).
"""
from __future__ import annotations
import json
import os
import sys
from dataclasses import dataclass, field
import numpy as np
_SCRIPTS = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "scripts")
@dataclass
class Series:
series_id: str
source: str # 'iati' | 'gmpp' | 'sar'
kind: str # 'cum_cost' (monotone) | 'cost_estimate' (level/drift)
values: np.ndarray # cumulative cost, or estimate level, per observation
times: list # dates / years aligned with values
meta: dict = field(default_factory=dict)
@property
def increments(self) -> np.ndarray:
return np.diff(self.values, prepend=0.0) if self.kind == "cum_cost" else np.diff(self.values)
# --------------------------------------------------------------------------- #
# PRIMARY: IATI / World Bank monthly cumulative disbursement curves
# --------------------------------------------------------------------------- #
def load_iati(path: str = "data/raw/worldbank/iati_projects.jsonl", min_points: int = 8) -> list[Series]:
"""Real monthly cumulative-spend curves. The best gov source for TimesFM breadth
(long, monthly, monotone) - shape-compatible with DSLIB EV increments."""
out = []
if not os.path.exists(path):
return out
for line in open(path):
r = json.loads(line)
obs = [o for o in r.get("observations", []) if o.get("cumulative_disbursement") is not None]
if len(obs) < min_points:
continue
obs.sort(key=lambda o: o["date"])
cum = np.asarray([o["cumulative_disbursement"] for o in obs], float)
if cum[-1] <= 0:
continue
out.append(Series(
series_id=r["project_id"], source="iati", kind="cum_cost",
values=cum, times=[o["date"] for o in obs],
meta={"name": r.get("name"), "sector": r.get("sector"),
"commitment_usd": r.get("commitment_usd"),
"schedule_slip_days": r.get("schedule_slip_days_disbursement")},
))
return out
# --------------------------------------------------------------------------- #
# SECONDARY: UK GMPP annual whole-life-cost estimate drift
# --------------------------------------------------------------------------- #
def load_gmpp(folder: str = "data/raw/gmpp", min_points: int = 3) -> list[Series]:
"""Annual whole-life-cost estimate per project across GMPP snapshots (overrun-drift
signal). Short, annual, signed - a secondary eval axis, not mixed with cum_cost."""
if _SCRIPTS not in sys.path:
sys.path.insert(0, _SCRIPTS)
import gmpp_parser # type: ignore
out = []
for r in gmpp_parser.build_longitudinal(folder):
obs = [o for o in r["observations"] if o.get("cost_estimate") is not None]
if len({o["year"] for o in obs}) < min_points:
continue
obs.sort(key=lambda o: o["year"])
out.append(Series(
series_id=r["project_id"], source="gmpp", kind="cost_estimate",
values=np.asarray([o["cost_estimate"] for o in obs], float),
times=[o["year"] for o in obs],
meta={"name": r.get("name"), "dept": r.get("dept")},
))
return out
# --------------------------------------------------------------------------- #
# Helpers for the fine-tuning windower
# --------------------------------------------------------------------------- #
def increment_arrays(series: list[Series]) -> list[np.ndarray]:
"""Per-series increment sequences (for the same windower used on DSLIB / synthetic)."""
return [s.increments.astype("float32") for s in series]
def inventory() -> dict:
"""Quick counts of usable series per source (for the data README / report)."""
inv = {}
for name, fn in (("iati", load_iati), ("gmpp", load_gmpp)):
try:
s = fn()
lens = [len(x.values) for x in s]
inv[name] = {"series": len(s),
"len_p50": int(np.median(lens)) if lens else 0,
"len_max": int(max(lens)) if lens else 0}
except Exception as exc: # missing sample / parser dep
inv[name] = {"error": f"{type(exc).__name__}: {exc}"}
return inv
if __name__ == "__main__":
import pprint
pprint.pp(inventory())
|