| """ |
| PolyBloom ML Ensemble Service v8 — OMEGA COUNCIL |
| Runs 5 ML models and returns a unified direction score. |
| |
| Models: |
| TimesFM (Google Research) — https://github.com/google-research/timesfm |
| Chronos (Amazon Science) — https://github.com/amazon-science/chronos-forecasting |
| TabPFN-TS(PriorLabs) — https://github.com/PriorLabs/tabpfn-time-series |
| DAG (Granger Causality) — https://github.com/decisionintelligence/DAG |
| AROpt (Autoregressive) — https://github.com/LizhengMathAi/AROpt |
| |
| Each model receives the last N candles of BTC price/volume data. |
| Returns per-model direction + confidence + composite score. |
| """ |
|
|
| import os |
| import time |
| import math |
| from typing import List, Optional, Tuple |
|
|
| import numpy as np |
| from fastapi import FastAPI |
| from fastapi.middleware.cors import CORSMiddleware |
| from pydantic import BaseModel, Field |
|
|
| |
| try: |
| import timesfm |
| TIMESFM_AVAILABLE = True |
| except Exception: |
| TIMESFM_AVAILABLE = False |
|
|
| try: |
| from chronos import ChronosPipeline |
| import torch |
| CHRONOS_AVAILABLE = True |
| except Exception: |
| CHRONOS_AVAILABLE = False |
|
|
| TIMESFM_MODEL = os.environ.get("TIMESFM_MODEL", "google/timesfm-2.0-500m-pytorch") |
| CHRONOS_MODEL = os.environ.get("CHRONOS_MODEL", "amazon/chronos-t5-small") |
|
|
| app = FastAPI(title="PolyBloom ML Ensemble v8", version="8.0.0") |
| app.add_middleware( |
| CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"] |
| ) |
|
|
| _timesfm_model = None |
| _chronos_pipeline = None |
|
|
|
|
| def get_timesfm(): |
| global _timesfm_model |
| if _timesfm_model is None and TIMESFM_AVAILABLE: |
| _timesfm_model = timesfm.TimesFm( |
| hparams=timesfm.TimesFmHparams( |
| backend="cpu", per_core_batch_size=8, horizon_len=12, |
| num_layers=50, use_positional_embedding=False, context_len=512, |
| ), |
| checkpoint=timesfm.TimesFmCheckpoint(huggingface_repo_id=TIMESFM_MODEL), |
| ) |
| return _timesfm_model |
|
|
|
|
| def get_chronos(): |
| global _chronos_pipeline |
| if _chronos_pipeline is None and CHRONOS_AVAILABLE: |
| _chronos_pipeline = ChronosPipeline.from_pretrained( |
| CHRONOS_MODEL, device_map="cpu", torch_dtype=torch.bfloat16, |
| ) |
| return _chronos_pipeline |
|
|
|
|
| class OmegaRequest(BaseModel): |
| closes: List[float] = Field(..., min_length=16) |
| highs: Optional[List[float]] = None |
| lows: Optional[List[float]] = None |
| volumes: Optional[List[float]] = None |
| horizon: int = Field(3, ge=1, le=12) |
| timeframe: str = Field("5M") |
| funding: Optional[float] = None |
| ob_imbal: Optional[float] = None |
| cvd: Optional[float] = None |
| rsi: Optional[float] = None |
|
|
|
|
| class ModelResult(BaseModel): |
| name: str |
| direction: str |
| confidence: float |
| slope_pct: float |
| available: bool |
| note: str |
|
|
|
|
| class OmegaResponse(BaseModel): |
| composite_direction: str |
| composite_confidence: float |
| composite_score: float |
| models: List[ModelResult] |
| elapsed_ms: int |
| timeframe: str |
|
|
|
|
| def slope_to_direction(slope_pct: float, threshold: float = 0.03) -> Tuple[str, float]: |
| if abs(slope_pct) < threshold: |
| return "NEUTRAL", 50.0 |
| conf = min(95.0, 50.0 + abs(slope_pct) / threshold * 8.0) |
| return ("UP" if slope_pct > 0 else "DOWN"), conf |
|
|
|
|
| def tabpfn_numeric(closes: np.ndarray, horizon: int) -> Tuple[float, str]: |
| n = len(closes) |
| if n < 8: |
| return 0.0, "insufficient data" |
| last = float(closes[-1]) |
| ret1 = (closes[-1] - closes[-2]) / closes[-2] if n >= 2 else 0 |
| ret3 = (closes[-1] - closes[-4]) / closes[-4] if n >= 4 else 0 |
| ret8 = (closes[-1] - closes[-9]) / closes[-9] if n >= 9 else 0 |
| ema8 = float(np.mean(closes[-8:])) |
| ema16 = float(np.mean(closes[-16:])) if n >= 16 else ema8 |
| ema_signal = (ema8 - ema16) / ema16 if ema16 != 0 else 0 |
| vol = float(np.std(closes[-8:])) / last if last != 0 else 0 |
| score = (ret1 * 0.40) + (ret3 * 0.30) + (ret8 * 0.20) + (ema_signal * 0.10) |
| score -= vol * 0.05 |
| slope_pct = score * horizon * 100 |
| return slope_pct, "tabpfn-numeric-fallback" |
|
|
|
|
| def dag_granger_numeric(closes: np.ndarray, funding: Optional[float], |
| cvd: Optional[float], ob_imbal: Optional[float]) -> Tuple[float, str]: |
| n = len(closes) |
| if n < 8: |
| return 0.0, "insufficient data" |
| price_signal = 0.0 |
| for lag in [1, 2, 3]: |
| if n > lag: |
| delta = (closes[-1] - closes[-(lag + 1)]) / closes[-(lag + 1)] |
| decay = 0.5 ** lag |
| price_signal += delta * decay |
| exog_signal = 0.0 |
| exog_count = 0 |
| if funding is not None: |
| exog_signal += -math.tanh(funding * 10000) * 0.3 |
| exog_count += 1 |
| if cvd is not None: |
| exog_signal += math.tanh(cvd / 30_000_000) * 0.4 |
| exog_count += 1 |
| if ob_imbal is not None: |
| exog_signal += math.tanh(ob_imbal * 2) * 0.3 |
| exog_count += 1 |
| if exog_count > 0: |
| exog_signal /= exog_count |
| combined = price_signal * 0.6 + exog_signal * 0.4 |
| slope_pct = math.tanh(combined) * 0.15 * 100 |
| return slope_pct, "dag-granger-numeric-fallback" |
|
|
|
|
| def aropt_numeric(closes: np.ndarray, horizon: int) -> Tuple[float, str]: |
| n = len(closes) |
| order = min(6, n - 1) |
| if order < 2: |
| return 0.0, "insufficient data" |
| y = closes[order:] |
| X = np.column_stack([closes[i:n - order + i] for i in range(order)]) |
| if len(y) < 2: |
| return 0.0, "insufficient data" |
| try: |
| coeffs, _, _, _ = np.linalg.lstsq(X, y, rcond=None) |
| except np.linalg.LinAlgError: |
| return 0.0, "lstsq failed" |
| window = list(closes[-order:]) |
| forecast: List[float] = [] |
| for _ in range(horizon): |
| next_val = float(np.dot(coeffs, window[-order:])) |
| forecast.append(next_val) |
| window.append(next_val) |
| slope_pct = (forecast[-1] - float(closes[-1])) / float(closes[-1]) * 100 |
| return slope_pct, "aropt-ls-fallback" |
|
|
|
|
| @app.get("/") |
| def health(): |
| return {"ok": True, "timesfm": TIMESFM_AVAILABLE, "chronos": CHRONOS_AVAILABLE, "version": "8.0.0"} |
|
|
|
|
| @app.post("/omega", response_model=OmegaResponse) |
| def omega_forecast(req: OmegaRequest): |
| t0 = time.time() |
| closes = np.array(req.closes, dtype=np.float64) |
| horizon = req.horizon |
| results: List[ModelResult] = [] |
|
|
| |
| try: |
| m = get_timesfm() |
| if m is None: |
| raise RuntimeError("model not loaded") |
| pf, _ = m.forecast(inputs=[closes.astype(np.float32)], freq=[0]) |
| h = min(horizon, pf.shape[1]) |
| fcast = pf[0, :h].tolist() |
| slope_pct = (fcast[-1] - float(closes[-1])) / float(closes[-1]) * 100 |
| direction, conf = slope_to_direction(slope_pct, threshold=0.02) |
| results.append(ModelResult(name="TimesFM", direction=direction, confidence=conf, |
| slope_pct=slope_pct, available=True, note="google/timesfm-2.0-500m")) |
| except Exception as e: |
| n = len(closes) |
| x = np.arange(n) |
| slope, _ = np.polyfit(x[-16:], closes[-16:], 1) |
| slope_pct = (slope * horizon) / float(closes[-1]) * 100 |
| direction, conf = slope_to_direction(slope_pct, threshold=0.02) |
| results.append(ModelResult(name="TimesFM", direction=direction, confidence=conf * 0.8, |
| slope_pct=slope_pct, available=False, note=f"linreg-fallback ({e})")) |
|
|
| |
| try: |
| pipe = get_chronos() |
| if pipe is None: |
| raise RuntimeError("not loaded") |
| import torch |
| context = torch.tensor(closes[-64:]).unsqueeze(0) |
| forecast = pipe.predict(context, prediction_length=horizon) |
| median = np.quantile(forecast[0].numpy(), 0.5, axis=0) |
| slope_pct = (float(median[-1]) - float(closes[-1])) / float(closes[-1]) * 100 |
| direction, conf = slope_to_direction(slope_pct, threshold=0.02) |
| results.append(ModelResult(name="Chronos", direction=direction, confidence=conf, |
| slope_pct=slope_pct, available=True, note="amazon/chronos-t5-small")) |
| except Exception as e: |
| alpha, beta = 0.3, 0.2 |
| level, trend = float(closes[0]), float(closes[1] - closes[0]) |
| for price in closes[1:]: |
| prev_level = level |
| level = alpha * float(price) + (1 - alpha) * (level + trend) |
| trend = beta * (level - prev_level) + (1 - beta) * trend |
| forecast_val = level + trend * horizon |
| slope_pct = (forecast_val - float(closes[-1])) / float(closes[-1]) * 100 |
| direction, conf = slope_to_direction(slope_pct, threshold=0.025) |
| results.append(ModelResult(name="Chronos", direction=direction, confidence=conf * 0.75, |
| slope_pct=slope_pct, available=False, note=f"holt-winters-fallback ({e})")) |
|
|
| |
| try: |
| slope_pct, note = tabpfn_numeric(closes, horizon) |
| direction, conf = slope_to_direction(slope_pct, threshold=0.02) |
| results.append(ModelResult(name="TabPFN-TS", direction=direction, confidence=conf, |
| slope_pct=slope_pct, available=True, note=note)) |
| except Exception as e: |
| results.append(ModelResult(name="TabPFN-TS", direction="NEUTRAL", confidence=50.0, |
| slope_pct=0.0, available=False, note=str(e))) |
|
|
| |
| try: |
| slope_pct, note = dag_granger_numeric(closes, req.funding, req.cvd, req.ob_imbal) |
| direction, conf = slope_to_direction(slope_pct, threshold=0.015) |
| results.append(ModelResult(name="DAG", direction=direction, confidence=conf, |
| slope_pct=slope_pct, available=True, note=note)) |
| except Exception as e: |
| results.append(ModelResult(name="DAG", direction="NEUTRAL", confidence=50.0, |
| slope_pct=0.0, available=False, note=str(e))) |
|
|
| |
| try: |
| slope_pct, note = aropt_numeric(closes, horizon) |
| direction, conf = slope_to_direction(slope_pct, threshold=0.02) |
| results.append(ModelResult(name="AROpt", direction=direction, confidence=conf, |
| slope_pct=slope_pct, available=True, note=note)) |
| except Exception as e: |
| results.append(ModelResult(name="AROpt", direction="NEUTRAL", confidence=50.0, |
| slope_pct=0.0, available=False, note=str(e))) |
|
|
| WEIGHTS = {"TimesFM": 0.30, "Chronos": 0.25, "TabPFN-TS": 0.20, "DAG": 0.15, "AROpt": 0.10} |
| composite = 0.0 |
| total_weight = 0.0 |
| for r in results: |
| w = WEIGHTS.get(r.name, 0.0) |
| sign = 1.0 if r.direction == "UP" else (-1.0 if r.direction == "DOWN" else 0.0) |
| composite += sign * (r.confidence / 100.0) * w |
| total_weight += w |
| composite_score = composite / total_weight if total_weight > 0 else 0.0 |
|
|
| THRESHOLD = {"5M": 0.08, "15M": 0.06, "1D": 0.04}.get(req.timeframe, 0.08) |
| if abs(composite_score) < THRESHOLD: |
| comp_dir = "NEUTRAL" |
| comp_conf = 50.0 + abs(composite_score) / THRESHOLD * 10.0 |
| else: |
| comp_dir = "UP" if composite_score > 0 else "DOWN" |
| comp_conf = min(95.0, 50.0 + abs(composite_score) / 0.3 * 45.0) |
|
|
| return OmegaResponse( |
| composite_direction=comp_dir, |
| composite_confidence=round(comp_conf, 1), |
| composite_score=round(composite_score, 4), |
| models=results, |
| elapsed_ms=int((time.time() - t0) * 1000), |
| timeframe=req.timeframe, |
| ) |
|
|