LightGBM / model.py
anonymous-4FAD's picture
Upload 4 files
53e13eb verified
"""LightGBM multi-horizon CGM forecaster, packaged for the HF Hub.
One repo holds four feature ablations (``cgm``, ``insulin``, ``carbs``,
``all``); each ablation has 12 boosters (one per 5-minute horizon) stored as
LightGBM ``Booster.save_model`` text files under
``boosters/<ablation>/horizon_<NN>.txt``. The active ablation is selected at
load time via ``ablation=`` on ``AutoConfig`` / ``AutoModel`` ``from_pretrained``.
Usage::
from transformers import AutoConfig, AutoModel
cfg = AutoConfig.from_pretrained(
"anonymous-4FAD/LightGBM", trust_remote_code=True, ablation="cgm")
model = AutoModel.from_pretrained(
"anonymous-4FAD/LightGBM", trust_remote_code=True, config=cfg)
preds = model.predict(timestamps_ns, cgm, insulin, carbs) # (B, 12)
"""
from __future__ import annotations
import math
import os
from typing import Optional
import numpy as np
import torch
from huggingface_hub import snapshot_download
from transformers import PretrainedConfig, PreTrainedModel
_HUB_DOWNLOAD_KWARGS = (
"cache_dir",
"force_download",
"local_files_only",
"proxies",
"revision",
"token",
)
class LightGBMMultiHorizonConfig(PretrainedConfig):
"""Config for the multi-horizon LightGBM forecaster."""
model_type = "lightgbm_multihorizon"
def __init__(
self,
ablation: str = "all",
ablations: Optional[list] = None,
history_length: int = 24,
horizon_length: int = 12,
feature_names_by_ablation: Optional[dict] = None,
n_features_by_ablation: Optional[dict] = None,
target_names: Optional[list] = None,
**kwargs,
):
if ablations is None:
ablations = ["cgm", "insulin", "carbs", "all"]
if ablation not in ablations:
raise ValueError(
f"ablation must be one of {ablations}, got {ablation!r}"
)
self.ablation = ablation
self.ablations = list(ablations)
self.history_length = int(history_length)
self.horizon_length = int(horizon_length)
self.feature_names_by_ablation = feature_names_by_ablation or {}
self.n_features_by_ablation = n_features_by_ablation or {}
self.target_names = list(target_names or [])
super().__init__(**kwargs)
@property
def n_features(self) -> int:
if self.n_features_by_ablation:
return int(self.n_features_by_ablation[self.ablation])
return len(self.feature_names_by_ablation[self.ablation])
@property
def feature_names(self) -> list:
return list(self.feature_names_by_ablation[self.ablation])
class LightGBMMultiHorizonModel(PreTrainedModel):
"""Wraps 12 LightGBM boosters (one per horizon) behind a transformers API.
Holds no torch parameters; the boosters live in ``self._boosters`` after
``from_pretrained`` and run on CPU.
"""
config_class = LightGBMMultiHorizonConfig
main_input_name = "features"
_tied_weights_keys: dict = None
_no_split_modules: list = []
def __init__(self, config: LightGBMMultiHorizonConfig):
super().__init__(config)
# Sentinel buffer so ``model.to(device)`` and ``state_dict()`` don't choke.
self.register_buffer("_dummy", torch.zeros(1))
self._boosters: list = []
def _init_weights(self, module):
# No torch params to initialize.
pass
@classmethod
def from_pretrained(
cls,
pretrained_model_name_or_path,
*model_args,
config=None,
ablation: Optional[str] = None,
**kwargs,
):
kwargs.pop("trust_remote_code", None)
kwargs.pop("_from_auto", None)
kwargs.pop("_commit_hash", None)
kwargs.pop("subfolder", None)
hub_kwargs = {k: kwargs.pop(k) for k in _HUB_DOWNLOAD_KWARGS if k in kwargs}
if config is None:
config_kwargs = dict(hub_kwargs)
if ablation is not None:
config_kwargs["ablation"] = ablation
config = LightGBMMultiHorizonConfig.from_pretrained(
pretrained_model_name_or_path, **config_kwargs
)
elif ablation is not None:
config.ablation = ablation
model = cls(config)
if os.path.isdir(str(pretrained_model_name_or_path)):
local_dir = str(pretrained_model_name_or_path)
else:
local_dir = snapshot_download(
repo_id=str(pretrained_model_name_or_path),
allow_patterns=[
"config.json",
f"boosters/{config.ablation}/horizon_*.txt",
],
**hub_kwargs,
)
booster_dir = os.path.join(local_dir, "boosters", config.ablation)
if not os.path.isdir(booster_dir):
raise FileNotFoundError(
f"Missing boosters directory for ablation {config.ablation!r}: {booster_dir}"
)
# Imported lazily so the package is only required for inference.
import lightgbm as lgb
boosters = []
for h in range(config.horizon_length):
path = os.path.join(booster_dir, f"horizon_{h:02d}.txt")
if not os.path.isfile(path):
raise FileNotFoundError(f"Missing booster: {path}")
boosters.append(lgb.Booster(model_file=path))
model._boosters = boosters
model.eval()
return model
def forward(self, features) -> torch.Tensor:
if isinstance(features, torch.Tensor):
x = features.detach().cpu().numpy().astype(np.float32, copy=False)
else:
x = np.asarray(features, dtype=np.float32)
if not self._boosters:
raise RuntimeError(
"LightGBM boosters are not loaded. Construct the model via "
"from_pretrained()."
)
cols = [b.predict(x) for b in self._boosters]
out = np.stack(cols, axis=-1).astype(np.float32, copy=False)
return torch.as_tensor(out)
def predict(self, timestamps, cgm, insulin, carbs) -> np.ndarray:
"""Run inference for a benchmark.py-style batch.
See the corresponding ``predict`` on the Ridge model for the input
contract; output is ``(B, horizon_length)``.
"""
features = _build_tabular_features(
timestamps=np.asarray(timestamps),
cgm=np.asarray(cgm, dtype=np.float64),
insulin=np.asarray(insulin, dtype=np.float64),
carbs=np.asarray(carbs, dtype=np.float64),
feature_names=self.config.feature_names,
history_length=self.config.history_length,
)
out = self.forward(features)
return out.detach().cpu().numpy()
def _build_tabular_features(
*,
timestamps: np.ndarray,
cgm: np.ndarray,
insulin: np.ndarray,
carbs: np.ndarray,
feature_names: list,
history_length: int,
) -> np.ndarray:
"""Assemble a (B, F) feature matrix matching ``feature_names`` order.
See ``hub/ridge/model.py`` for the lag convention (``CGM_t0`` = oldest,
``CGM_t<history_length-1>`` = newest within the window).
"""
if cgm.shape[-1] < history_length:
raise ValueError(
f"Need at least {history_length} CGM samples, got {cgm.shape[-1]}"
)
cgm_h = cgm[..., -history_length:]
insulin_h = insulin[..., -history_length:]
carbs_h = carbs[..., -history_length:]
last_ts = np.asarray(timestamps)[..., -1].astype(np.int64)
hours = (last_ts // 3_600_000_000_000) % 24
hour_sin = np.sin(2.0 * math.pi * hours / 24.0)
hour_cos = np.cos(2.0 * math.pi * hours / 24.0)
columns = []
for name in feature_names:
if name.startswith("CGM_t"):
i = int(name.split("_t", 1)[1])
columns.append(cgm_h[..., i])
elif name.startswith("Insulin_t"):
i = int(name.split("_t", 1)[1])
columns.append(insulin_h[..., i])
elif name.startswith("Carbs_t"):
i = int(name.split("_t", 1)[1])
columns.append(carbs_h[..., i])
elif name == "hour_sin":
columns.append(hour_sin)
elif name == "hour_cos":
columns.append(hour_cos)
else:
raise ValueError(f"Unknown feature column: {name!r}")
return np.stack(columns, axis=-1).astype(np.float32)