"""Unified loader for public-government project data. Wraps the per-source parsers (in `scripts/`) and the downloaded samples (in `data/raw/`) into one lightweight `Series` type so gov data can feed the same forecasting / fine-tuning pipeline as DSLIB and synthetic. Two families, kept deliberately separate (different forecasting semantics): PRIMARY breadth (monotone cumulative cost, like DSLIB EV - mix into training): * IATI / World Bank - monthly cumulative disbursement curves (long: median ~82 pts) SECONDARY (annual cost-ESTIMATE drift, signed revisions - a different task / eval axis): * UK GMPP - whole-life-cost estimate per annual snapshot * US DoD SAR - current programme cost estimate per annual report * US IT Dashboard - mostly a single cross-sectional snapshot; NOT a series (skip) See data/README.md for provenance, licences and download recipes. All sources are public (OGL / US public domain / CC-BY-4.0 + IATI open licensing). """ from __future__ import annotations import json import os import sys from dataclasses import dataclass, field import numpy as np _SCRIPTS = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "scripts") @dataclass class Series: series_id: str source: str # 'iati' | 'gmpp' | 'sar' kind: str # 'cum_cost' (monotone) | 'cost_estimate' (level/drift) values: np.ndarray # cumulative cost, or estimate level, per observation times: list # dates / years aligned with values meta: dict = field(default_factory=dict) @property def increments(self) -> np.ndarray: return np.diff(self.values, prepend=0.0) if self.kind == "cum_cost" else np.diff(self.values) # --------------------------------------------------------------------------- # # PRIMARY: IATI / World Bank monthly cumulative disbursement curves # --------------------------------------------------------------------------- # def load_iati(path: str = "data/raw/worldbank/iati_projects.jsonl", min_points: int = 8) -> list[Series]: """Real monthly cumulative-spend curves. The best gov source for TimesFM breadth (long, monthly, monotone) - shape-compatible with DSLIB EV increments.""" out = [] if not os.path.exists(path): return out for line in open(path): r = json.loads(line) obs = [o for o in r.get("observations", []) if o.get("cumulative_disbursement") is not None] if len(obs) < min_points: continue obs.sort(key=lambda o: o["date"]) cum = np.asarray([o["cumulative_disbursement"] for o in obs], float) if cum[-1] <= 0: continue out.append(Series( series_id=r["project_id"], source="iati", kind="cum_cost", values=cum, times=[o["date"] for o in obs], meta={"name": r.get("name"), "sector": r.get("sector"), "commitment_usd": r.get("commitment_usd"), "schedule_slip_days": r.get("schedule_slip_days_disbursement")}, )) return out # --------------------------------------------------------------------------- # # SECONDARY: UK GMPP annual whole-life-cost estimate drift # --------------------------------------------------------------------------- # def load_gmpp(folder: str = "data/raw/gmpp", min_points: int = 3) -> list[Series]: """Annual whole-life-cost estimate per project across GMPP snapshots (overrun-drift signal). Short, annual, signed - a secondary eval axis, not mixed with cum_cost.""" if _SCRIPTS not in sys.path: sys.path.insert(0, _SCRIPTS) import gmpp_parser # type: ignore out = [] for r in gmpp_parser.build_longitudinal(folder): obs = [o for o in r["observations"] if o.get("cost_estimate") is not None] if len({o["year"] for o in obs}) < min_points: continue obs.sort(key=lambda o: o["year"]) out.append(Series( series_id=r["project_id"], source="gmpp", kind="cost_estimate", values=np.asarray([o["cost_estimate"] for o in obs], float), times=[o["year"] for o in obs], meta={"name": r.get("name"), "dept": r.get("dept")}, )) return out # --------------------------------------------------------------------------- # # Helpers for the fine-tuning windower # --------------------------------------------------------------------------- # def increment_arrays(series: list[Series]) -> list[np.ndarray]: """Per-series increment sequences (for the same windower used on DSLIB / synthetic).""" return [s.increments.astype("float32") for s in series] def inventory() -> dict: """Quick counts of usable series per source (for the data README / report).""" inv = {} for name, fn in (("iati", load_iati), ("gmpp", load_gmpp)): try: s = fn() lens = [len(x.values) for x in s] inv[name] = {"series": len(s), "len_p50": int(np.median(lens)) if lens else 0, "len_max": int(max(lens)) if lens else 0} except Exception as exc: # missing sample / parser dep inv[name] = {"error": f"{type(exc).__name__}: {exc}"} return inv if __name__ == "__main__": import pprint pprint.pp(inventory())