Spaces:
Running
Running
| from __future__ import annotations | |
| import math | |
| import os | |
| from statistics import mean | |
| from typing import Any | |
| from schemas import HealthResponse, PredictRequest, PredictResponse, PredictionItem | |
| class MoiraiService: | |
| """HF Space service wrapper for Moirai.""" | |
| def __init__(self) -> None: | |
| self.model_id = "moirai" | |
| self.model_name = os.getenv( | |
| "MOIRAI_MODEL_NAME", | |
| "Salesforce/moirai-2.0-R-small", | |
| ) | |
| self.backend = os.getenv("MOIRAI_BACKEND", "hf_cpu").strip() or "hf_cpu" | |
| self.device = "cpu" | |
| self.max_context_length = int(os.getenv("MOIRAI_MAX_CONTEXT_LENGTH", "512")) | |
| self.max_horizon_step = int(os.getenv("MOIRAI_MAX_HORIZON_STEP", "288")) | |
| self.confidence_floor = float(os.getenv("MOIRAI_CONFIDENCE_FLOOR", "0.18")) | |
| self.confidence_ceiling = float(os.getenv("MOIRAI_CONFIDENCE_CEILING", "0.82")) | |
| self.min_required_points = int(os.getenv("MOIRAI_MIN_REQUIRED_POINTS", "32")) | |
| self.num_samples = int(os.getenv("MOIRAI_NUM_SAMPLES", "20")) | |
| self.allow_baseline_fallback = os.getenv("MOIRAI_ALLOW_BASELINE_FALLBACK", "false").lower() == "true" | |
| self.ready = False | |
| self.load_error = "" | |
| self._np = None | |
| self._pd = None | |
| self._module = None | |
| self._predictor = None | |
| self._initialize_backend() | |
| def health(self) -> HealthResponse: | |
| return HealthResponse( | |
| status="ok" if self.ready else "degraded", | |
| model=self.model_name, | |
| model_id=self.model_id, | |
| backend=self.backend, | |
| device=self.device, | |
| ready=self.ready, | |
| max_context_length=self.max_context_length, | |
| max_horizon_step=self.max_horizon_step, | |
| ) | |
| def predict(self, payload: PredictRequest) -> PredictResponse: | |
| self._validate_request(payload) | |
| closes = payload.close_prices[-payload.context_length :] | |
| if self.backend == "hf_cpu": | |
| if not self.ready: | |
| raise RuntimeError(self.load_error or "moirai backend not ready") | |
| predictions = self._predict_with_hf(closes, payload.horizons) | |
| else: | |
| predictions = self._predict_with_baseline(closes, payload.horizons) | |
| return PredictResponse(model_id=self.model_id, predictions=predictions) | |
| def _initialize_backend(self) -> None: | |
| if self.backend == "baseline_cpu": | |
| self.ready = True | |
| return | |
| if self.backend != "hf_cpu": | |
| raise ValueError(f"unsupported MOIRAI_BACKEND={self.backend}") | |
| try: | |
| self._load_hf_model() | |
| self.ready = True | |
| except Exception as exc: | |
| self.load_error = f"moirai hf load failed: {exc}" | |
| if self.allow_baseline_fallback: | |
| self.backend = "baseline_cpu" | |
| self.ready = True | |
| else: | |
| self.ready = False | |
| def _load_hf_model(self) -> None: | |
| import numpy as np | |
| import pandas as pd | |
| from gluonts.dataset.common import ListDataset | |
| from uni2ts.model.moirai2 import Moirai2Forecast, Moirai2Module | |
| self._np = np | |
| self._pd = pd | |
| self._ListDataset = ListDataset | |
| self._Moirai2Forecast = Moirai2Forecast | |
| self._module = Moirai2Module.from_pretrained(self.model_name) | |
| self._module.eval() | |
| def _predict_with_hf( | |
| self, close_prices: list[float], horizons: list[int] | |
| ) -> list[PredictionItem]: | |
| assert self._np is not None | |
| assert self._pd is not None | |
| assert self._module is not None | |
| prediction_length = self.max_horizon_step | |
| model = self._Moirai2Forecast( | |
| module=self._module, | |
| prediction_length=prediction_length, | |
| context_length=min(self.max_context_length, len(close_prices)), | |
| target_dim=1, | |
| feat_dynamic_real_dim=0, | |
| past_feat_dynamic_real_dim=0, | |
| ) | |
| predictor = model.create_predictor(batch_size=1) | |
| dataset = self._ListDataset( | |
| [ | |
| { | |
| "start": self._pd.Period("2024-01-01 00:00", freq="min"), | |
| "target": self._np.asarray(close_prices, dtype=self._np.float32), | |
| } | |
| ], | |
| freq="min", | |
| ) | |
| forecast = next(predictor.predict(dataset)) | |
| dense_mean, dense_conf = self._extract_forecast(forecast) | |
| if len(dense_mean) < max(horizons): | |
| raise RuntimeError( | |
| f"Moirai output horizon {len(dense_mean)} is shorter than requested {max(horizons)}" | |
| ) | |
| predictions: list[PredictionItem] = [] | |
| for step in horizons: | |
| predictions.append( | |
| PredictionItem( | |
| step=step, | |
| pred_price=round(max(0.00000001, float(dense_mean[step - 1])), 8), | |
| pred_confidence=round(dense_conf[step - 1], 4), | |
| ) | |
| ) | |
| return predictions | |
| def _extract_forecast(self, forecast: Any) -> tuple[list[float], list[float]]: | |
| assert self._np is not None | |
| np = self._np | |
| if hasattr(forecast, "mean") and forecast.mean is not None: | |
| mean_forecast = np.asarray(forecast.mean, dtype=float).reshape(-1) | |
| elif hasattr(forecast, "quantile"): | |
| mean_forecast = np.asarray(forecast.quantile(0.5), dtype=float).reshape(-1) | |
| elif hasattr(forecast, "samples"): | |
| samples = np.asarray(forecast.samples, dtype=float) | |
| mean_forecast = np.median(samples, axis=0).reshape(-1) | |
| else: | |
| raise RuntimeError("unsupported Moirai forecast object") | |
| if hasattr(forecast, "samples"): | |
| samples = np.asarray(forecast.samples, dtype=float) | |
| std_forecast = samples.std(axis=0).reshape(-1) | |
| else: | |
| std_forecast = np.zeros_like(mean_forecast) | |
| confidence: list[float] = [] | |
| for pred, std in zip(mean_forecast.tolist(), std_forecast.tolist()): | |
| dispersion = abs(float(std)) / max(abs(float(pred)), 1e-6) | |
| raw = 1.0 / (1.0 + dispersion) | |
| confidence.append(max(self.confidence_floor, min(self.confidence_ceiling, raw))) | |
| return mean_forecast.tolist(), confidence | |
| def _validate_request(self, payload: PredictRequest) -> None: | |
| if payload.context_length > self.max_context_length: | |
| raise ValueError( | |
| f"context_length {payload.context_length} exceeds " | |
| f"MOIRAI_MAX_CONTEXT_LENGTH={self.max_context_length}" | |
| ) | |
| if payload.context_length > len(payload.close_prices): | |
| raise ValueError("context_length must not exceed len(close_prices)") | |
| if len(payload.close_prices) < self.min_required_points: | |
| raise ValueError( | |
| f"at least {self.min_required_points} close prices are required " | |
| "for Moirai stability" | |
| ) | |
| if any(step > self.max_horizon_step for step in payload.horizons): | |
| raise ValueError( | |
| f"horizons contain values above MOIRAI_MAX_HORIZON_STEP={self.max_horizon_step}" | |
| ) | |
| def _predict_with_baseline( | |
| self, close_prices: list[float], horizons: list[int] | |
| ) -> list[PredictionItem]: | |
| last_price = close_prices[-1] | |
| short_window = close_prices[-min(12, len(close_prices)) :] | |
| long_window = close_prices[-min(48, len(close_prices)) :] | |
| short_mean = mean(short_window) | |
| long_mean = mean(long_window) | |
| slope = self._slope(short_window) | |
| trend_bias = 0.0 if long_mean == 0 else (short_mean - long_mean) / long_mean | |
| momentum = 0.0 if short_mean == 0 else (last_price - short_mean) / short_mean | |
| predictions: list[PredictionItem] = [] | |
| for step in horizons: | |
| damped_step = math.sqrt(float(step)) | |
| expected_return = momentum * 0.40 + trend_bias * 0.35 + slope * 0.25 | |
| expected_return *= min(1.0, damped_step / 4.0) | |
| pred_price = max(0.00000001, last_price * (1.0 + expected_return)) | |
| confidence = self._baseline_confidence(close_prices, step, abs(expected_return)) | |
| predictions.append( | |
| PredictionItem( | |
| step=step, | |
| pred_price=round(pred_price, 8), | |
| pred_confidence=round(confidence, 4), | |
| ) | |
| ) | |
| return predictions | |
| def _baseline_confidence( | |
| self, close_prices: list[float], step: int, expected_move_abs: float | |
| ) -> float: | |
| if len(close_prices) < 3: | |
| return self.confidence_floor | |
| changes: list[float] = [] | |
| for previous, current in zip(close_prices[:-1], close_prices[1:]): | |
| if previous <= 0: | |
| continue | |
| changes.append(abs((current - previous) / previous)) | |
| realized_vol = mean(changes[-min(48, len(changes)) :]) if changes else 0.0 | |
| smoothness = max(0.0, 1.0 - min(realized_vol * 20.0, 1.0)) | |
| horizon_decay = 1.0 / (1.0 + math.log(step + 1.0)) | |
| raw = 0.22 + min(expected_move_abs / (realized_vol + 1e-9), 2.0) * 0.20 | |
| raw += smoothness * 0.18 + horizon_decay * 0.22 | |
| return max(self.confidence_floor, min(self.confidence_ceiling, raw)) | |
| def _slope(values: list[float]) -> float: | |
| if len(values) < 2 or values[0] == 0: | |
| return 0.0 | |
| return (values[-1] - values[0]) / values[0] | |
| def describe_runtime(self) -> dict[str, Any]: | |
| return { | |
| "model_id": self.model_id, | |
| "model_name": self.model_name, | |
| "backend": self.backend, | |
| "device": self.device, | |
| "ready": self.ready, | |
| "load_error": self.load_error, | |
| "max_context_length": self.max_context_length, | |
| "max_horizon_step": self.max_horizon_step, | |
| "min_required_points": self.min_required_points, | |
| } | |