""" PolyBloom ML Ensemble Service v8 — OMEGA COUNCIL Runs 5 ML models and returns a unified direction score. Models: TimesFM (Google Research) — https://github.com/google-research/timesfm Chronos (Amazon Science) — https://github.com/amazon-science/chronos-forecasting TabPFN-TS(PriorLabs) — https://github.com/PriorLabs/tabpfn-time-series DAG (Granger Causality) — https://github.com/decisionintelligence/DAG AROpt (Autoregressive) — https://github.com/LizhengMathAi/AROpt Each model receives the last N candles of BTC price/volume data. Returns per-model direction + confidence + composite score. """ import os import time import math from typing import List, Optional, Tuple import numpy as np from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field # ─── Model imports (graceful fallback if not installed) ─────────────────────── try: import timesfm # type: ignore TIMESFM_AVAILABLE = True except Exception: TIMESFM_AVAILABLE = False try: from chronos import ChronosPipeline # type: ignore import torch # type: ignore CHRONOS_AVAILABLE = True except Exception: CHRONOS_AVAILABLE = False TIMESFM_MODEL = os.environ.get("TIMESFM_MODEL", "google/timesfm-2.0-500m-pytorch") CHRONOS_MODEL = os.environ.get("CHRONOS_MODEL", "amazon/chronos-t5-small") app = FastAPI(title="PolyBloom ML Ensemble v8", version="8.0.0") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"] ) _timesfm_model = None _chronos_pipeline = None def get_timesfm(): global _timesfm_model if _timesfm_model is None and TIMESFM_AVAILABLE: _timesfm_model = timesfm.TimesFm( hparams=timesfm.TimesFmHparams( backend="cpu", per_core_batch_size=8, horizon_len=12, num_layers=50, use_positional_embedding=False, context_len=512, ), checkpoint=timesfm.TimesFmCheckpoint(huggingface_repo_id=TIMESFM_MODEL), ) return _timesfm_model def get_chronos(): global _chronos_pipeline if _chronos_pipeline is None and CHRONOS_AVAILABLE: _chronos_pipeline = ChronosPipeline.from_pretrained( CHRONOS_MODEL, device_map="cpu", torch_dtype=torch.bfloat16, ) return _chronos_pipeline class OmegaRequest(BaseModel): closes: List[float] = Field(..., min_length=16) highs: Optional[List[float]] = None lows: Optional[List[float]] = None volumes: Optional[List[float]] = None horizon: int = Field(3, ge=1, le=12) timeframe: str = Field("5M") funding: Optional[float] = None ob_imbal: Optional[float] = None cvd: Optional[float] = None rsi: Optional[float] = None class ModelResult(BaseModel): name: str direction: str confidence: float slope_pct: float available: bool note: str class OmegaResponse(BaseModel): composite_direction: str composite_confidence: float composite_score: float models: List[ModelResult] elapsed_ms: int timeframe: str def slope_to_direction(slope_pct: float, threshold: float = 0.03) -> Tuple[str, float]: if abs(slope_pct) < threshold: return "NEUTRAL", 50.0 conf = min(95.0, 50.0 + abs(slope_pct) / threshold * 8.0) return ("UP" if slope_pct > 0 else "DOWN"), conf def tabpfn_numeric(closes: np.ndarray, horizon: int) -> Tuple[float, str]: n = len(closes) if n < 8: return 0.0, "insufficient data" last = float(closes[-1]) ret1 = (closes[-1] - closes[-2]) / closes[-2] if n >= 2 else 0 ret3 = (closes[-1] - closes[-4]) / closes[-4] if n >= 4 else 0 ret8 = (closes[-1] - closes[-9]) / closes[-9] if n >= 9 else 0 ema8 = float(np.mean(closes[-8:])) ema16 = float(np.mean(closes[-16:])) if n >= 16 else ema8 ema_signal = (ema8 - ema16) / ema16 if ema16 != 0 else 0 vol = float(np.std(closes[-8:])) / last if last != 0 else 0 score = (ret1 * 0.40) + (ret3 * 0.30) + (ret8 * 0.20) + (ema_signal * 0.10) score -= vol * 0.05 slope_pct = score * horizon * 100 return slope_pct, "tabpfn-numeric-fallback" def dag_granger_numeric(closes: np.ndarray, funding: Optional[float], cvd: Optional[float], ob_imbal: Optional[float]) -> Tuple[float, str]: n = len(closes) if n < 8: return 0.0, "insufficient data" price_signal = 0.0 for lag in [1, 2, 3]: if n > lag: delta = (closes[-1] - closes[-(lag + 1)]) / closes[-(lag + 1)] decay = 0.5 ** lag price_signal += delta * decay exog_signal = 0.0 exog_count = 0 if funding is not None: exog_signal += -math.tanh(funding * 10000) * 0.3 exog_count += 1 if cvd is not None: exog_signal += math.tanh(cvd / 30_000_000) * 0.4 exog_count += 1 if ob_imbal is not None: exog_signal += math.tanh(ob_imbal * 2) * 0.3 exog_count += 1 if exog_count > 0: exog_signal /= exog_count combined = price_signal * 0.6 + exog_signal * 0.4 slope_pct = math.tanh(combined) * 0.15 * 100 return slope_pct, "dag-granger-numeric-fallback" def aropt_numeric(closes: np.ndarray, horizon: int) -> Tuple[float, str]: n = len(closes) order = min(6, n - 1) if order < 2: return 0.0, "insufficient data" y = closes[order:] X = np.column_stack([closes[i:n - order + i] for i in range(order)]) if len(y) < 2: return 0.0, "insufficient data" try: coeffs, _, _, _ = np.linalg.lstsq(X, y, rcond=None) except np.linalg.LinAlgError: return 0.0, "lstsq failed" window = list(closes[-order:]) forecast: List[float] = [] for _ in range(horizon): next_val = float(np.dot(coeffs, window[-order:])) forecast.append(next_val) window.append(next_val) slope_pct = (forecast[-1] - float(closes[-1])) / float(closes[-1]) * 100 return slope_pct, "aropt-ls-fallback" @app.get("/") def health(): return {"ok": True, "timesfm": TIMESFM_AVAILABLE, "chronos": CHRONOS_AVAILABLE, "version": "8.0.0"} @app.post("/omega", response_model=OmegaResponse) def omega_forecast(req: OmegaRequest): t0 = time.time() closes = np.array(req.closes, dtype=np.float64) horizon = req.horizon results: List[ModelResult] = [] # 1. TimesFM try: m = get_timesfm() if m is None: raise RuntimeError("model not loaded") pf, _ = m.forecast(inputs=[closes.astype(np.float32)], freq=[0]) h = min(horizon, pf.shape[1]) fcast = pf[0, :h].tolist() slope_pct = (fcast[-1] - float(closes[-1])) / float(closes[-1]) * 100 direction, conf = slope_to_direction(slope_pct, threshold=0.02) results.append(ModelResult(name="TimesFM", direction=direction, confidence=conf, slope_pct=slope_pct, available=True, note="google/timesfm-2.0-500m")) except Exception as e: n = len(closes) x = np.arange(n) slope, _ = np.polyfit(x[-16:], closes[-16:], 1) slope_pct = (slope * horizon) / float(closes[-1]) * 100 direction, conf = slope_to_direction(slope_pct, threshold=0.02) results.append(ModelResult(name="TimesFM", direction=direction, confidence=conf * 0.8, slope_pct=slope_pct, available=False, note=f"linreg-fallback ({e})")) # 2. Chronos try: pipe = get_chronos() if pipe is None: raise RuntimeError("not loaded") import torch # type: ignore context = torch.tensor(closes[-64:]).unsqueeze(0) forecast = pipe.predict(context, prediction_length=horizon) median = np.quantile(forecast[0].numpy(), 0.5, axis=0) slope_pct = (float(median[-1]) - float(closes[-1])) / float(closes[-1]) * 100 direction, conf = slope_to_direction(slope_pct, threshold=0.02) results.append(ModelResult(name="Chronos", direction=direction, confidence=conf, slope_pct=slope_pct, available=True, note="amazon/chronos-t5-small")) except Exception as e: alpha, beta = 0.3, 0.2 level, trend = float(closes[0]), float(closes[1] - closes[0]) for price in closes[1:]: prev_level = level level = alpha * float(price) + (1 - alpha) * (level + trend) trend = beta * (level - prev_level) + (1 - beta) * trend forecast_val = level + trend * horizon slope_pct = (forecast_val - float(closes[-1])) / float(closes[-1]) * 100 direction, conf = slope_to_direction(slope_pct, threshold=0.025) results.append(ModelResult(name="Chronos", direction=direction, confidence=conf * 0.75, slope_pct=slope_pct, available=False, note=f"holt-winters-fallback ({e})")) # 3. TabPFN-TS try: slope_pct, note = tabpfn_numeric(closes, horizon) direction, conf = slope_to_direction(slope_pct, threshold=0.02) results.append(ModelResult(name="TabPFN-TS", direction=direction, confidence=conf, slope_pct=slope_pct, available=True, note=note)) except Exception as e: results.append(ModelResult(name="TabPFN-TS", direction="NEUTRAL", confidence=50.0, slope_pct=0.0, available=False, note=str(e))) # 4. DAG try: slope_pct, note = dag_granger_numeric(closes, req.funding, req.cvd, req.ob_imbal) direction, conf = slope_to_direction(slope_pct, threshold=0.015) results.append(ModelResult(name="DAG", direction=direction, confidence=conf, slope_pct=slope_pct, available=True, note=note)) except Exception as e: results.append(ModelResult(name="DAG", direction="NEUTRAL", confidence=50.0, slope_pct=0.0, available=False, note=str(e))) # 5. AROpt try: slope_pct, note = aropt_numeric(closes, horizon) direction, conf = slope_to_direction(slope_pct, threshold=0.02) results.append(ModelResult(name="AROpt", direction=direction, confidence=conf, slope_pct=slope_pct, available=True, note=note)) except Exception as e: results.append(ModelResult(name="AROpt", direction="NEUTRAL", confidence=50.0, slope_pct=0.0, available=False, note=str(e))) WEIGHTS = {"TimesFM": 0.30, "Chronos": 0.25, "TabPFN-TS": 0.20, "DAG": 0.15, "AROpt": 0.10} composite = 0.0 total_weight = 0.0 for r in results: w = WEIGHTS.get(r.name, 0.0) sign = 1.0 if r.direction == "UP" else (-1.0 if r.direction == "DOWN" else 0.0) composite += sign * (r.confidence / 100.0) * w total_weight += w composite_score = composite / total_weight if total_weight > 0 else 0.0 THRESHOLD = {"5M": 0.08, "15M": 0.06, "1D": 0.04}.get(req.timeframe, 0.08) if abs(composite_score) < THRESHOLD: comp_dir = "NEUTRAL" comp_conf = 50.0 + abs(composite_score) / THRESHOLD * 10.0 else: comp_dir = "UP" if composite_score > 0 else "DOWN" comp_conf = min(95.0, 50.0 + abs(composite_score) / 0.3 * 45.0) return OmegaResponse( composite_direction=comp_dir, composite_confidence=round(comp_conf, 1), composite_score=round(composite_score, 4), models=results, elapsed_ms=int((time.time() - t0) * 1000), timeframe=req.timeframe, )