"""LightGBM multi-horizon CGM forecaster, packaged for the HF Hub. One repo holds four feature ablations (``cgm``, ``insulin``, ``carbs``, ``all``); each ablation has 12 boosters (one per 5-minute horizon) stored as LightGBM ``Booster.save_model`` text files under ``boosters//horizon_.txt``. The active ablation is selected at load time via ``ablation=`` on ``AutoConfig`` / ``AutoModel`` ``from_pretrained``. Usage:: from transformers import AutoConfig, AutoModel cfg = AutoConfig.from_pretrained( "anonymous-4FAD/LightGBM", trust_remote_code=True, ablation="cgm") model = AutoModel.from_pretrained( "anonymous-4FAD/LightGBM", trust_remote_code=True, config=cfg) preds = model.predict(timestamps_ns, cgm, insulin, carbs) # (B, 12) """ from __future__ import annotations import math import os from typing import Optional import numpy as np import torch from huggingface_hub import snapshot_download from transformers import PretrainedConfig, PreTrainedModel _HUB_DOWNLOAD_KWARGS = ( "cache_dir", "force_download", "local_files_only", "proxies", "revision", "token", ) class LightGBMMultiHorizonConfig(PretrainedConfig): """Config for the multi-horizon LightGBM forecaster.""" model_type = "lightgbm_multihorizon" def __init__( self, ablation: str = "all", ablations: Optional[list] = None, history_length: int = 24, horizon_length: int = 12, feature_names_by_ablation: Optional[dict] = None, n_features_by_ablation: Optional[dict] = None, target_names: Optional[list] = None, **kwargs, ): if ablations is None: ablations = ["cgm", "insulin", "carbs", "all"] if ablation not in ablations: raise ValueError( f"ablation must be one of {ablations}, got {ablation!r}" ) self.ablation = ablation self.ablations = list(ablations) self.history_length = int(history_length) self.horizon_length = int(horizon_length) self.feature_names_by_ablation = feature_names_by_ablation or {} self.n_features_by_ablation = n_features_by_ablation or {} self.target_names = list(target_names or []) super().__init__(**kwargs) @property def n_features(self) -> int: if self.n_features_by_ablation: return int(self.n_features_by_ablation[self.ablation]) return len(self.feature_names_by_ablation[self.ablation]) @property def feature_names(self) -> list: return list(self.feature_names_by_ablation[self.ablation]) class LightGBMMultiHorizonModel(PreTrainedModel): """Wraps 12 LightGBM boosters (one per horizon) behind a transformers API. Holds no torch parameters; the boosters live in ``self._boosters`` after ``from_pretrained`` and run on CPU. """ config_class = LightGBMMultiHorizonConfig main_input_name = "features" _tied_weights_keys: dict = None _no_split_modules: list = [] def __init__(self, config: LightGBMMultiHorizonConfig): super().__init__(config) # Sentinel buffer so ``model.to(device)`` and ``state_dict()`` don't choke. self.register_buffer("_dummy", torch.zeros(1)) self._boosters: list = [] def _init_weights(self, module): # No torch params to initialize. pass @classmethod def from_pretrained( cls, pretrained_model_name_or_path, *model_args, config=None, ablation: Optional[str] = None, **kwargs, ): kwargs.pop("trust_remote_code", None) kwargs.pop("_from_auto", None) kwargs.pop("_commit_hash", None) kwargs.pop("subfolder", None) hub_kwargs = {k: kwargs.pop(k) for k in _HUB_DOWNLOAD_KWARGS if k in kwargs} if config is None: config_kwargs = dict(hub_kwargs) if ablation is not None: config_kwargs["ablation"] = ablation config = LightGBMMultiHorizonConfig.from_pretrained( pretrained_model_name_or_path, **config_kwargs ) elif ablation is not None: config.ablation = ablation model = cls(config) if os.path.isdir(str(pretrained_model_name_or_path)): local_dir = str(pretrained_model_name_or_path) else: local_dir = snapshot_download( repo_id=str(pretrained_model_name_or_path), allow_patterns=[ "config.json", f"boosters/{config.ablation}/horizon_*.txt", ], **hub_kwargs, ) booster_dir = os.path.join(local_dir, "boosters", config.ablation) if not os.path.isdir(booster_dir): raise FileNotFoundError( f"Missing boosters directory for ablation {config.ablation!r}: {booster_dir}" ) # Imported lazily so the package is only required for inference. import lightgbm as lgb boosters = [] for h in range(config.horizon_length): path = os.path.join(booster_dir, f"horizon_{h:02d}.txt") if not os.path.isfile(path): raise FileNotFoundError(f"Missing booster: {path}") boosters.append(lgb.Booster(model_file=path)) model._boosters = boosters model.eval() return model def forward(self, features) -> torch.Tensor: if isinstance(features, torch.Tensor): x = features.detach().cpu().numpy().astype(np.float32, copy=False) else: x = np.asarray(features, dtype=np.float32) if not self._boosters: raise RuntimeError( "LightGBM boosters are not loaded. Construct the model via " "from_pretrained()." ) cols = [b.predict(x) for b in self._boosters] out = np.stack(cols, axis=-1).astype(np.float32, copy=False) return torch.as_tensor(out) def predict(self, timestamps, cgm, insulin, carbs) -> np.ndarray: """Run inference for a benchmark.py-style batch. See the corresponding ``predict`` on the Ridge model for the input contract; output is ``(B, horizon_length)``. """ features = _build_tabular_features( timestamps=np.asarray(timestamps), cgm=np.asarray(cgm, dtype=np.float64), insulin=np.asarray(insulin, dtype=np.float64), carbs=np.asarray(carbs, dtype=np.float64), feature_names=self.config.feature_names, history_length=self.config.history_length, ) out = self.forward(features) return out.detach().cpu().numpy() def _build_tabular_features( *, timestamps: np.ndarray, cgm: np.ndarray, insulin: np.ndarray, carbs: np.ndarray, feature_names: list, history_length: int, ) -> np.ndarray: """Assemble a (B, F) feature matrix matching ``feature_names`` order. See ``hub/ridge/model.py`` for the lag convention (``CGM_t0`` = oldest, ``CGM_t`` = newest within the window). """ if cgm.shape[-1] < history_length: raise ValueError( f"Need at least {history_length} CGM samples, got {cgm.shape[-1]}" ) cgm_h = cgm[..., -history_length:] insulin_h = insulin[..., -history_length:] carbs_h = carbs[..., -history_length:] last_ts = np.asarray(timestamps)[..., -1].astype(np.int64) hours = (last_ts // 3_600_000_000_000) % 24 hour_sin = np.sin(2.0 * math.pi * hours / 24.0) hour_cos = np.cos(2.0 * math.pi * hours / 24.0) columns = [] for name in feature_names: if name.startswith("CGM_t"): i = int(name.split("_t", 1)[1]) columns.append(cgm_h[..., i]) elif name.startswith("Insulin_t"): i = int(name.split("_t", 1)[1]) columns.append(insulin_h[..., i]) elif name.startswith("Carbs_t"): i = int(name.split("_t", 1)[1]) columns.append(carbs_h[..., i]) elif name == "hour_sin": columns.append(hour_sin) elif name == "hour_cos": columns.append(hour_cos) else: raise ValueError(f"Unknown feature column: {name!r}") return np.stack(columns, axis=-1).astype(np.float32)