| """LightGBM multi-horizon CGM forecaster, packaged for the HF Hub. |
| |
| One repo holds four feature ablations (``cgm``, ``insulin``, ``carbs``, |
| ``all``); each ablation has 12 boosters (one per 5-minute horizon) stored as |
| LightGBM ``Booster.save_model`` text files under |
| ``boosters/<ablation>/horizon_<NN>.txt``. The active ablation is selected at |
| load time via ``ablation=`` on ``AutoConfig`` / ``AutoModel`` ``from_pretrained``. |
| |
| Usage:: |
| |
| from transformers import AutoConfig, AutoModel |
| cfg = AutoConfig.from_pretrained( |
| "anonymous-4FAD/LightGBM", trust_remote_code=True, ablation="cgm") |
| model = AutoModel.from_pretrained( |
| "anonymous-4FAD/LightGBM", trust_remote_code=True, config=cfg) |
| preds = model.predict(timestamps_ns, cgm, insulin, carbs) # (B, 12) |
| """ |
|
|
| from __future__ import annotations |
|
|
| import math |
| import os |
| from typing import Optional |
|
|
| import numpy as np |
| import torch |
| from huggingface_hub import snapshot_download |
| from transformers import PretrainedConfig, PreTrainedModel |
|
|
|
|
| _HUB_DOWNLOAD_KWARGS = ( |
| "cache_dir", |
| "force_download", |
| "local_files_only", |
| "proxies", |
| "revision", |
| "token", |
| ) |
|
|
|
|
| class LightGBMMultiHorizonConfig(PretrainedConfig): |
| """Config for the multi-horizon LightGBM forecaster.""" |
|
|
| model_type = "lightgbm_multihorizon" |
|
|
| def __init__( |
| self, |
| ablation: str = "all", |
| ablations: Optional[list] = None, |
| history_length: int = 24, |
| horizon_length: int = 12, |
| feature_names_by_ablation: Optional[dict] = None, |
| n_features_by_ablation: Optional[dict] = None, |
| target_names: Optional[list] = None, |
| **kwargs, |
| ): |
| if ablations is None: |
| ablations = ["cgm", "insulin", "carbs", "all"] |
| if ablation not in ablations: |
| raise ValueError( |
| f"ablation must be one of {ablations}, got {ablation!r}" |
| ) |
| self.ablation = ablation |
| self.ablations = list(ablations) |
| self.history_length = int(history_length) |
| self.horizon_length = int(horizon_length) |
| self.feature_names_by_ablation = feature_names_by_ablation or {} |
| self.n_features_by_ablation = n_features_by_ablation or {} |
| self.target_names = list(target_names or []) |
| super().__init__(**kwargs) |
|
|
| @property |
| def n_features(self) -> int: |
| if self.n_features_by_ablation: |
| return int(self.n_features_by_ablation[self.ablation]) |
| return len(self.feature_names_by_ablation[self.ablation]) |
|
|
| @property |
| def feature_names(self) -> list: |
| return list(self.feature_names_by_ablation[self.ablation]) |
|
|
|
|
| class LightGBMMultiHorizonModel(PreTrainedModel): |
| """Wraps 12 LightGBM boosters (one per horizon) behind a transformers API. |
| |
| Holds no torch parameters; the boosters live in ``self._boosters`` after |
| ``from_pretrained`` and run on CPU. |
| """ |
|
|
| config_class = LightGBMMultiHorizonConfig |
| main_input_name = "features" |
| _tied_weights_keys: dict = None |
| _no_split_modules: list = [] |
|
|
| def __init__(self, config: LightGBMMultiHorizonConfig): |
| super().__init__(config) |
| |
| self.register_buffer("_dummy", torch.zeros(1)) |
| self._boosters: list = [] |
|
|
| def _init_weights(self, module): |
| |
| pass |
|
|
| @classmethod |
| def from_pretrained( |
| cls, |
| pretrained_model_name_or_path, |
| *model_args, |
| config=None, |
| ablation: Optional[str] = None, |
| **kwargs, |
| ): |
| kwargs.pop("trust_remote_code", None) |
| kwargs.pop("_from_auto", None) |
| kwargs.pop("_commit_hash", None) |
| kwargs.pop("subfolder", None) |
|
|
| hub_kwargs = {k: kwargs.pop(k) for k in _HUB_DOWNLOAD_KWARGS if k in kwargs} |
|
|
| if config is None: |
| config_kwargs = dict(hub_kwargs) |
| if ablation is not None: |
| config_kwargs["ablation"] = ablation |
| config = LightGBMMultiHorizonConfig.from_pretrained( |
| pretrained_model_name_or_path, **config_kwargs |
| ) |
| elif ablation is not None: |
| config.ablation = ablation |
|
|
| model = cls(config) |
|
|
| if os.path.isdir(str(pretrained_model_name_or_path)): |
| local_dir = str(pretrained_model_name_or_path) |
| else: |
| local_dir = snapshot_download( |
| repo_id=str(pretrained_model_name_or_path), |
| allow_patterns=[ |
| "config.json", |
| f"boosters/{config.ablation}/horizon_*.txt", |
| ], |
| **hub_kwargs, |
| ) |
|
|
| booster_dir = os.path.join(local_dir, "boosters", config.ablation) |
| if not os.path.isdir(booster_dir): |
| raise FileNotFoundError( |
| f"Missing boosters directory for ablation {config.ablation!r}: {booster_dir}" |
| ) |
|
|
| |
| import lightgbm as lgb |
|
|
| boosters = [] |
| for h in range(config.horizon_length): |
| path = os.path.join(booster_dir, f"horizon_{h:02d}.txt") |
| if not os.path.isfile(path): |
| raise FileNotFoundError(f"Missing booster: {path}") |
| boosters.append(lgb.Booster(model_file=path)) |
| model._boosters = boosters |
| model.eval() |
| return model |
|
|
| def forward(self, features) -> torch.Tensor: |
| if isinstance(features, torch.Tensor): |
| x = features.detach().cpu().numpy().astype(np.float32, copy=False) |
| else: |
| x = np.asarray(features, dtype=np.float32) |
| if not self._boosters: |
| raise RuntimeError( |
| "LightGBM boosters are not loaded. Construct the model via " |
| "from_pretrained()." |
| ) |
| cols = [b.predict(x) for b in self._boosters] |
| out = np.stack(cols, axis=-1).astype(np.float32, copy=False) |
| return torch.as_tensor(out) |
|
|
| def predict(self, timestamps, cgm, insulin, carbs) -> np.ndarray: |
| """Run inference for a benchmark.py-style batch. |
| |
| See the corresponding ``predict`` on the Ridge model for the input |
| contract; output is ``(B, horizon_length)``. |
| """ |
| features = _build_tabular_features( |
| timestamps=np.asarray(timestamps), |
| cgm=np.asarray(cgm, dtype=np.float64), |
| insulin=np.asarray(insulin, dtype=np.float64), |
| carbs=np.asarray(carbs, dtype=np.float64), |
| feature_names=self.config.feature_names, |
| history_length=self.config.history_length, |
| ) |
| out = self.forward(features) |
| return out.detach().cpu().numpy() |
|
|
|
|
| def _build_tabular_features( |
| *, |
| timestamps: np.ndarray, |
| cgm: np.ndarray, |
| insulin: np.ndarray, |
| carbs: np.ndarray, |
| feature_names: list, |
| history_length: int, |
| ) -> np.ndarray: |
| """Assemble a (B, F) feature matrix matching ``feature_names`` order. |
| |
| See ``hub/ridge/model.py`` for the lag convention (``CGM_t0`` = oldest, |
| ``CGM_t<history_length-1>`` = newest within the window). |
| """ |
| if cgm.shape[-1] < history_length: |
| raise ValueError( |
| f"Need at least {history_length} CGM samples, got {cgm.shape[-1]}" |
| ) |
| cgm_h = cgm[..., -history_length:] |
| insulin_h = insulin[..., -history_length:] |
| carbs_h = carbs[..., -history_length:] |
|
|
| last_ts = np.asarray(timestamps)[..., -1].astype(np.int64) |
| hours = (last_ts // 3_600_000_000_000) % 24 |
| hour_sin = np.sin(2.0 * math.pi * hours / 24.0) |
| hour_cos = np.cos(2.0 * math.pi * hours / 24.0) |
|
|
| columns = [] |
| for name in feature_names: |
| if name.startswith("CGM_t"): |
| i = int(name.split("_t", 1)[1]) |
| columns.append(cgm_h[..., i]) |
| elif name.startswith("Insulin_t"): |
| i = int(name.split("_t", 1)[1]) |
| columns.append(insulin_h[..., i]) |
| elif name.startswith("Carbs_t"): |
| i = int(name.split("_t", 1)[1]) |
| columns.append(carbs_h[..., i]) |
| elif name == "hour_sin": |
| columns.append(hour_sin) |
| elif name == "hour_cos": |
| columns.append(hour_cos) |
| else: |
| raise ValueError(f"Unknown feature column: {name!r}") |
| return np.stack(columns, axis=-1).astype(np.float32) |
|
|