Spaces:
Sleeping
Sleeping
| """Roulette next-spin prediction API — runs every persisted model and returns | |
| per-model predictions plus a majority-vote consensus for each target. | |
| Endpoints | |
| --------- | |
| GET / Service info + models loaded | |
| POST /predict Predict next N spins from JSON {"numbers": [...], "steps": N} | |
| POST /predict/file Predict next N spins from an uploaded CSV (column Winner/number) | |
| Each step's response contains, for every target (number, color, parity, dozen, | |
| column): the consensus (mode across models) and each model's individual | |
| prediction. | |
| """ | |
| from __future__ import annotations | |
| import io | |
| import logging | |
| from collections import Counter | |
| from pathlib import Path | |
| from typing import Any | |
| import joblib | |
| import numpy as np | |
| import pandas as pd | |
| from fastapi import FastAPI, File, HTTPException, UploadFile | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel, Field, field_validator | |
| from ml.features import ( | |
| WINDOW, | |
| _features_from_window, | |
| derive_color, | |
| derive_column, | |
| derive_dozen, | |
| derive_parity, | |
| ) | |
| from ml.features_v2 import WINDOW_V2, _features_v2 | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") | |
| LOG = logging.getLogger("app") | |
| APP_ROOT = Path(__file__).resolve().parent | |
| MODELS_DIR = APP_ROOT / "models" | |
| # --------------------------------------------------------------------------- | |
| # Label tables | |
| # --------------------------------------------------------------------------- | |
| COLOR_LABELS = ("red", "black", "green") | |
| PARITY_LABELS = ("odd", "even", "none") | |
| DOZEN_LABELS = ("first", "second", "third", "none") | |
| COLUMN_LABELS = ("first", "second", "third", "none") | |
| TARGET_LABELS: dict[str, tuple[str, ...]] = { | |
| "color": COLOR_LABELS, | |
| "parity": PARITY_LABELS, | |
| "dozen": DOZEN_LABELS, | |
| "column": COLUMN_LABELS, | |
| } | |
| TARGETS = ("number", "color", "parity", "dozen", "column") | |
| def _looks_like_html_document(content: bytes) -> bool: | |
| sample = content[:512].lstrip().lower() | |
| return sample.startswith((b"<!doctype html", b"<html", b"<?xml")) or b"<html" in sample | |
| def _read_uploaded_dataframe(content: bytes) -> pd.DataFrame: | |
| if not content.strip(): | |
| raise HTTPException(status_code=400, detail="uploaded file is empty") | |
| try: | |
| return pd.read_csv(io.BytesIO(content)) | |
| except Exception: | |
| pass | |
| excel_read_error = None | |
| for engine in ("openpyxl", "xlrd"): | |
| try: | |
| return pd.read_excel(io.BytesIO(content), engine=engine) | |
| except Exception as exc_excel: | |
| excel_read_error = exc_excel | |
| if _looks_like_html_document(content): | |
| try: | |
| tables = pd.read_html(io.BytesIO(content)) | |
| except Exception as exc_html: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=( | |
| "uploaded file appears to be HTML rather than CSV/Excel, " | |
| f"and no HTML table could be parsed: {exc_html}" | |
| ), | |
| ) from exc_html | |
| non_empty_tables = [table for table in tables if not table.empty] | |
| if non_empty_tables: | |
| return max(non_empty_tables, key=lambda table: table.shape[0]) | |
| if tables: | |
| return tables[0] | |
| raise HTTPException(status_code=400, detail="uploaded HTML file does not contain any tables") | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"could not read file as CSV, Excel, or HTML table: {excel_read_error}" | |
| ) from excel_read_error | |
| # --------------------------------------------------------------------------- | |
| # Model registry — filled at startup | |
| # --------------------------------------------------------------------------- | |
| # Each entry: (model_name, predictor_callable(window_v1, window_v2) -> int label) | |
| ModelEntry = tuple[str, Any] | |
| REGISTRY: dict[str, list[ModelEntry]] = {t: [] for t in TARGETS} | |
| # --- wrappers: each returns the class index (int) --- | |
| def _wrap_sklearn_v1(bundle: dict) -> Any: | |
| model = bundle["model"] | |
| scaler = bundle.get("scaler") | |
| def predict(w_v1: np.ndarray, w_v2: np.ndarray) -> int: | |
| feats = _features_from_window(w_v1).reshape(1, -1) | |
| if scaler is not None: | |
| feats = scaler.transform(feats) | |
| return int(model.predict(feats)[0]) | |
| return predict | |
| def _wrap_sklearn_v2(bundle: dict) -> Any: | |
| model = bundle["model"] | |
| scaler = bundle.get("scaler") | |
| def predict(w_v1: np.ndarray, w_v2: np.ndarray) -> int: | |
| feats = _features_v2(w_v2).reshape(1, -1) | |
| if scaler is not None: | |
| feats = scaler.transform(feats) | |
| return int(model.predict(feats)[0]) | |
| return predict | |
| def _wrap_markov(order: int, model: Any) -> Any: | |
| def predict(w_v1: np.ndarray, w_v2: np.ndarray) -> int: | |
| ctx = tuple(int(x) for x in w_v1[-order:]) | |
| return int(model.predict(ctx)) | |
| return predict | |
| def _wrap_torch(model: Any) -> Any: | |
| import torch | |
| model.eval() | |
| def predict(w_v1: np.ndarray, w_v2: np.ndarray) -> int: | |
| x = torch.from_numpy(w_v1.astype(np.int64)).unsqueeze(0) | |
| with torch.no_grad(): | |
| logits = model(x) | |
| return int(logits.argmax(dim=-1).item()) | |
| return predict | |
| def _load_torch_model(path: Path, name: str) -> Any: | |
| import torch | |
| # Lazy import to avoid torch at app import time if not available. | |
| from ml_torch import GRUClassifier, LSTMClassifier, TransformerClassifier | |
| cls = {"lstm": LSTMClassifier, "gru": GRUClassifier, "transformer": TransformerClassifier}[name] | |
| ckpt = torch.load(path, map_location="cpu", weights_only=True) | |
| model = cls() | |
| model.load_state_dict(ckpt["state_dict"]) | |
| return model | |
| # --------------------------------------------------------------------------- | |
| # Registry construction | |
| # --------------------------------------------------------------------------- | |
| V1_MODELS = [ | |
| "logreg", | |
| "hist_gradient_boosting", | |
| "mlp", | |
| "xgboost", | |
| "lightgbm", | |
| ] | |
| V2_MODELS = [ | |
| "catboost", | |
| "knn", | |
| "gaussian_nb", | |
| "ridge", | |
| "sgd", | |
| ] | |
| def _register(target: str, name: str, predictor: Any) -> None: | |
| REGISTRY[target].append((name, predictor)) | |
| def load_all_models() -> None: | |
| # V1 sklearn (window=10) | |
| for kind in V1_MODELS: | |
| for target in TARGETS: | |
| path = MODELS_DIR / f"{kind}__{target}.joblib" | |
| if not path.exists(): | |
| continue | |
| try: | |
| bundle = joblib.load(path) | |
| _register(target, kind, _wrap_sklearn_v1(bundle)) | |
| LOG.info("loaded v1/%s for %s", kind, target) | |
| except Exception as exc: | |
| LOG.warning("v1/%s/%s failed to load: %s", kind, target, exc) | |
| # V2 sklearn (window=20) | |
| for kind in V2_MODELS: | |
| for target in TARGETS: | |
| path = MODELS_DIR / f"{kind}__{target}.v2.joblib" | |
| if not path.exists(): | |
| continue | |
| try: | |
| bundle = joblib.load(path) | |
| _register(target, kind, _wrap_sklearn_v2(bundle)) | |
| LOG.info("loaded v2/%s for %s", kind, target) | |
| except Exception as exc: | |
| LOG.warning("v2/%s/%s failed to load: %s", kind, target, exc) | |
| # SVC (v2 features, only saved for color/column) | |
| for target in ("color", "column"): | |
| path = MODELS_DIR / f"svc__{target}.v2.joblib" | |
| if not path.exists(): | |
| continue | |
| try: | |
| bundle = joblib.load(path) | |
| _register(target, "svc", _wrap_sklearn_v2(bundle)) | |
| LOG.info("loaded svc for %s", target) | |
| except Exception as exc: | |
| LOG.warning("svc/%s failed to load: %s", target, exc) | |
| # Markov chains — predict number; derive others | |
| for order in (1, 2, 3): | |
| path = MODELS_DIR / f"markov_order{order}.joblib" | |
| if not path.exists(): | |
| continue | |
| try: | |
| markov = joblib.load(path) | |
| _register("number", f"markov_order{order}", _wrap_markov(order, markov)) | |
| LOG.info("loaded markov_order%d for number", order) | |
| except Exception as exc: | |
| LOG.warning("markov_order%d failed to load: %s", order, exc) | |
| # Torch deep models — predict number only | |
| for name in ("lstm", "gru", "transformer"): | |
| path = MODELS_DIR / f"{name}__number.pt" | |
| if not path.exists(): | |
| continue | |
| try: | |
| model = _load_torch_model(path, name) | |
| _register("number", name, _wrap_torch(model)) | |
| LOG.info("loaded torch/%s for number", name) | |
| except Exception as exc: | |
| LOG.warning("torch/%s failed to load: %s", name, exc) | |
| # --------------------------------------------------------------------------- | |
| # Aggregation | |
| # --------------------------------------------------------------------------- | |
| def _consensus_index(preds: list[int], n_classes: int) -> int: | |
| if not preds: | |
| return 0 | |
| counter = Counter(preds) | |
| max_count = max(counter.values()) | |
| # Tie-break: lowest index among those with max votes. | |
| for k in range(n_classes): | |
| if counter.get(k, 0) == max_count: | |
| return k | |
| return preds[0] | |
| def _predict_target_all(target: str, w_v1: np.ndarray, w_v2: np.ndarray) -> tuple[dict[str, Any], int]: | |
| """Return ({model_name: label_str_or_int}, consensus_index).""" | |
| entries = REGISTRY[target] | |
| raw: list[int] = [] | |
| per_model: dict[str, Any] = {} | |
| labels = TARGET_LABELS.get(target) | |
| for name, predictor in entries: | |
| try: | |
| idx = predictor(w_v1, w_v2) | |
| except Exception as exc: | |
| LOG.warning("%s/%s predict failed: %s", name, target, exc) | |
| continue | |
| idx = int(idx) | |
| raw.append(idx) | |
| if target == "number": | |
| per_model[name] = idx | |
| else: | |
| per_model[name] = labels[idx] if labels and 0 <= idx < len(labels) else str(idx) | |
| # For number, also fold in derived-from-consensus not applicable; just use mode. | |
| n_classes = 37 if target == "number" else len(labels or (0,)) | |
| consensus = _consensus_index(raw, n_classes) | |
| return per_model, consensus | |
| # --------------------------------------------------------------------------- | |
| # API | |
| # --------------------------------------------------------------------------- | |
| app = FastAPI( | |
| title="Roulette Next-Spin Predictor (all models)", | |
| description=( | |
| "Predicts the next N spins of a European single-zero roulette wheel. " | |
| "Every persisted model votes on every target (number, color, parity, " | |
| "dozen, column). Response returns each model's prediction plus a " | |
| "majority-vote consensus." | |
| ), | |
| version="2.0.0", | |
| ) | |
| app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) | |
| def _startup() -> None: | |
| load_all_models() | |
| summary = {t: len(REGISTRY[t]) for t in TARGETS} | |
| LOG.info("Models loaded per target: %s", summary) | |
| class PredictRequest(BaseModel): | |
| numbers: list[int] = Field(..., description="Past winning numbers (0–36), most recent last.") | |
| steps: int = Field(10, ge=1, le=50) | |
| def _check(cls, v: list[int]) -> list[int]: | |
| if not v: | |
| raise ValueError("numbers cannot be empty") | |
| if any(n < 0 or n > 36 for n in v): | |
| raise ValueError("all numbers must be in [0, 36]") | |
| return v | |
| class TargetBlock(BaseModel): | |
| consensus: Any | |
| by_model: dict[str, Any] | |
| class StepPrediction(BaseModel): | |
| step: int | |
| number: TargetBlock | |
| color: TargetBlock | |
| parity: TargetBlock | |
| dozen: TargetBlock | |
| column: TargetBlock | |
| class PredictResponse(BaseModel): | |
| predictions: list[StepPrediction] | |
| def _prepare_windows(sequence: list[int]) -> tuple[np.ndarray, np.ndarray]: | |
| arr = np.asarray(sequence, dtype=np.int64) | |
| if len(arr) < WINDOW_V2: | |
| pad = np.zeros(WINDOW_V2 - len(arr), dtype=np.int64) | |
| arr = np.concatenate([pad, arr]) | |
| return arr[-WINDOW:].copy(), arr[-WINDOW_V2:].copy() | |
| def _forecast(sequence: list[int], steps: int) -> list[StepPrediction]: | |
| w_v1, w_v2 = _prepare_windows(sequence) | |
| out: list[StepPrediction] = [] | |
| for step in range(1, steps + 1): | |
| blocks: dict[str, TargetBlock] = {} | |
| consensus_number: int | None = None | |
| for target in TARGETS: | |
| per_model, cons_idx = _predict_target_all(target, w_v1, w_v2) | |
| if target == "number": | |
| consensus_value: Any = cons_idx | |
| consensus_number = cons_idx | |
| else: | |
| labels = TARGET_LABELS[target] | |
| consensus_value = labels[cons_idx] if 0 <= cons_idx < len(labels) else str(cons_idx) | |
| blocks[target] = TargetBlock(consensus=consensus_value, by_model=per_model) | |
| out.append(StepPrediction(step=step, **blocks)) | |
| # Roll window forward using the consensus number. | |
| next_num = consensus_number if consensus_number is not None else int(w_v1[-1]) | |
| w_v1 = np.append(w_v1[1:], next_num) | |
| w_v2 = np.append(w_v2[1:], next_num) | |
| return out | |
| def root() -> dict[str, Any]: | |
| return { | |
| "service": "Roulette Next-Spin Predictor (all models)", | |
| "version": "2.0.0", | |
| "wheel": "European single-zero (0–36)", | |
| "endpoints": { | |
| "POST /predict": "Predict from JSON {numbers: [...], steps: N}", | |
| "POST /predict/file": "Predict from uploaded CSV (column Winner/number)", | |
| "GET /docs": "Swagger UI", | |
| }, | |
| "models_per_target": {t: len(REGISTRY[t]) for t in TARGETS}, | |
| "model_names_per_target": {t: [n for n, _ in REGISTRY[t]] for t in TARGETS}, | |
| } | |
| def predict(req: PredictRequest) -> PredictResponse: | |
| if not any(REGISTRY[t] for t in TARGETS): | |
| raise HTTPException(status_code=503, detail="no models loaded") | |
| preds = _forecast(req.numbers, req.steps) | |
| return PredictResponse(predictions=preds) | |
| async def predict_file(file: UploadFile = File(...), steps: int = 10) -> PredictResponse: | |
| if not any(REGISTRY[t] for t in TARGETS): | |
| raise HTTPException(status_code=503, detail="no models loaded") | |
| if steps < 1 or steps > 50: | |
| raise HTTPException(status_code=400, detail="steps must be between 1 and 50") | |
| try: | |
| content = await file.read() | |
| df = _read_uploaded_dataframe(content) | |
| except HTTPException: | |
| raise | |
| except Exception as exc: | |
| raise HTTPException(status_code=400, detail=f"could not read file: {exc}") from exc | |
| col = next( | |
| (c for c in df.columns if c.lower() in {"winner", "winning number", "number"}), | |
| df.columns[-1], | |
| ) | |
| try: | |
| numbers = [int(x) for x in df[col].tolist()] | |
| except Exception as exc: | |
| raise HTTPException(status_code=400, detail=f"column {col!r} is not integer-coercible: {exc}") from exc | |
| if any(n < 0 or n > 36 for n in numbers): | |
| raise HTTPException(status_code=400, detail="values must be in [0, 36]") | |
| preds = _forecast(numbers, steps) | |
| return PredictResponse(predictions=preds) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=False) | |