""" api/app.py — FastAPI backend for the TS Anomaly Detection Benchmark ==================================================================== Designed to run on Hugging Face Spaces (Docker, port 7860). Endpoints: POST /api/run — start a benchmark job GET /api/status/{job_id} — poll for progress + results GET / — health check """ import os import sys import uuid import math import base64 import threading import io from typing import Optional, List from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel # ── Add benchmark package to Python path ── # Allows importing data.synthetic, evaluation.scorer, models.*, etc. _BENCH_PATH = os.path.join(os.path.dirname(__file__), "..", "ts-anomaly-benchmark") sys.path.insert(0, os.path.abspath(_BENCH_PATH)) # ── App ────────────────────────────────────────────────────────────── app = FastAPI(title="TS Anomaly Benchmark API", version="1.0.0") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) # ── Job store ───────────────────────────────────────────────────────── # Simple in-memory store. Fine for a demo — one server instance. _jobs: dict = {} _run_lock = threading.Lock() # one benchmark at a time (models are CPU-heavy) # ── Request / Response models ───────────────────────────────────────── class RunRequest(BaseModel): models: List[str] # e.g. ["moment", "isolation_forest"] synthetic_types: List[str] = [ "sine_with_spikes", "random_walk_with_shift", "seasonal_with_noise", "ecg_like", ] num_points: int = 512 custom_csv: Optional[str] = None # base64-encoded CSV content custom_value_col: str = "value" custom_label_col: Optional[str] = None # ── Endpoints ───────────────────────────────────────────────────────── @app.get("/") def root(): return {"status": "ok", "message": "TS Anomaly Benchmark API"} @app.post("/api/run") def start_run(req: RunRequest): """Start a benchmark job. Returns a job_id for polling.""" job_id = str(uuid.uuid4()) _jobs[job_id] = { "status": "running", "logs": [], "results": None, "error": None, } thread = threading.Thread( target=_execute_job, args=(job_id, req), daemon=True, ) thread.start() return {"job_id": job_id} @app.get("/api/status/{job_id}") def get_status(job_id: str): """Poll job progress. When status=='done', results are included.""" job = _jobs.get(job_id) if job is None: raise HTTPException(status_code=404, detail="Job not found") return { "status": job["status"], "logs": job["logs"], "results": job["results"], "error": job["error"], } # ── Job execution ───────────────────────────────────────────────────── def _execute_job(job_id: str, req: RunRequest): """Run the full benchmark in a background thread.""" job = _jobs[job_id] logs = job["logs"] # Serialise concurrent jobs — models are memory-heavy with _run_lock: original_stdout = sys.stdout sys.stdout = _LogCapture(logs, original_stdout) try: config = _build_config(req) # Import benchmark modules (torch may be slow to first import) from data.synthetic import generate_all as generate_synthetic from evaluation.scorer import build_models, run_benchmark # ── Datasets ── all_datasets = {} if req.synthetic_types: all_datasets.update(generate_synthetic(config["datasets"]["synthetic"])) if req.custom_csv: all_datasets.update(_load_custom_csv(req)) if not all_datasets: raise ValueError("No datasets loaded.") # ── Models ── models = build_models(config["models"]) if not models: raise ValueError("No models enabled. Select at least one.") # ── Run ── results_df = run_benchmark(models, all_datasets, config["evaluation"]) job["results"] = _serialise(results_df, all_datasets) job["status"] = "done" except Exception as exc: job["status"] = "error" job["error"] = str(exc) logs.append(f"ERROR: {exc}") finally: sys.stdout = original_stdout def _build_config(req: RunRequest) -> dict: enabled = set(req.models) return { "models": { "moment": { "enabled": "moment" in enabled, "pretrained": "AutonLab/MOMENT-1-large", "task": "reconstruction", }, "isolation_forest": { "enabled": "isolation_forest" in enabled, "n_estimators": 100, "contamination": "auto", "window_features": True, "feature_window": 20, }, "lof": { "enabled": "lof" in enabled, "n_neighbors": 20, "contamination": "auto", "window_features": True, "feature_window": 20, }, "moving_window": { "enabled": "moving_window" in enabled, "window_size": 15, "sigma_multiplier": 2.0, }, }, "datasets": { "synthetic": { "enabled": bool(req.synthetic_types), "num_points": req.num_points, "types": req.synthetic_types, "anomaly_rate": 0.03, "random_seed": 42, } }, "evaluation": {"threshold_method": "best_f1"}, } def _load_custom_csv(req: RunRequest) -> dict: import pandas as pd import numpy as np csv_bytes = base64.b64decode(req.custom_csv) df = pd.read_csv(io.BytesIO(csv_bytes)) if req.custom_value_col not in df.columns: numeric = df.select_dtypes(include=[np.number]).columns if len(numeric) == 0: raise ValueError("No numeric columns found in uploaded CSV.") value_col = numeric[0] else: value_col = req.custom_value_col series = df[value_col].to_numpy(dtype=float) labels = None if req.custom_label_col and req.custom_label_col in df.columns: labels = df[req.custom_label_col].to_numpy(dtype=int) valid = ~__import__("numpy").isnan(series) series = series[valid] if labels is not None: labels = labels[valid] return {"custom_upload": {"series": series, "labels": labels}} def _serialise(results_df, all_datasets: dict) -> dict: metrics = [] for _, row in results_df.iterrows(): metrics.append({ "model": row.get("model"), "dataset": row.get("dataset"), "auc_roc": _f(row.get("auc_roc")), "auc_pr": _f(row.get("auc_pr")), "f1": _f(row.get("f1")), "precision": _f(row.get("precision")), "recall": _f(row.get("recall")), "time_seconds": _f(row.get("time_seconds")), "scores": _arr(row.get("_scores")), }) series_out = {} for name, data in all_datasets.items(): series_out[name] = { "series": [round(float(v), 4) for v in data["series"]], "labels": data["labels"].tolist() if data.get("labels") is not None else None, } return {"metrics": metrics, "series": series_out} def _f(v): try: f = float(v) return None if math.isnan(f) else round(f, 4) except Exception: return None def _arr(v): try: if v is None or not hasattr(v, "__len__"): return None return [round(float(x), 4) for x in v] except Exception: return None # ── Stdout capture ──────────────────────────────────────────────────── class _LogCapture: def __init__(self, log_list: list, original): self._log = log_list self._orig = original def write(self, text: str): if text and text.strip(): self._log.append(text.strip()) self._orig.write(text) def flush(self): self._orig.flush()