ts-anomaly-api / api /app.py
stevenmcdermott's picture
Upload api/app.py
1778581 verified
"""
api/app.py β€” FastAPI backend for the TS Anomaly Detection Benchmark
====================================================================
Designed to run on Hugging Face Spaces (Docker, port 7860).
Endpoints:
POST /api/run β€” start a benchmark job
GET /api/status/{job_id} β€” poll for progress + results
GET / β€” health check
"""
import os
import sys
import uuid
import math
import base64
import threading
import io
from typing import Optional, List
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
# ── Add benchmark package to Python path ──
# Allows importing data.synthetic, evaluation.scorer, models.*, etc.
_BENCH_PATH = os.path.join(os.path.dirname(__file__), "..", "ts-anomaly-benchmark")
sys.path.insert(0, os.path.abspath(_BENCH_PATH))
# ── App ──────────────────────────────────────────────────────────────
app = FastAPI(title="TS Anomaly Benchmark API", version="1.0.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
# ── Job store ─────────────────────────────────────────────────────────
# Simple in-memory store. Fine for a demo β€” one server instance.
_jobs: dict = {}
_run_lock = threading.Lock() # one benchmark at a time (models are CPU-heavy)
# ── Request / Response models ─────────────────────────────────────────
class RunRequest(BaseModel):
models: List[str] # e.g. ["moment", "isolation_forest"]
synthetic_types: List[str] = [
"sine_with_spikes",
"random_walk_with_shift",
"seasonal_with_noise",
"ecg_like",
]
num_points: int = 512
custom_csv: Optional[str] = None # base64-encoded CSV content
custom_value_col: str = "value"
custom_label_col: Optional[str] = None
# ── Endpoints ─────────────────────────────────────────────────────────
@app.get("/")
def root():
return {"status": "ok", "message": "TS Anomaly Benchmark API"}
@app.post("/api/run")
def start_run(req: RunRequest):
"""Start a benchmark job. Returns a job_id for polling."""
job_id = str(uuid.uuid4())
_jobs[job_id] = {
"status": "running",
"logs": [],
"results": None,
"error": None,
}
thread = threading.Thread(
target=_execute_job,
args=(job_id, req),
daemon=True,
)
thread.start()
return {"job_id": job_id}
@app.get("/api/status/{job_id}")
def get_status(job_id: str):
"""Poll job progress. When status=='done', results are included."""
job = _jobs.get(job_id)
if job is None:
raise HTTPException(status_code=404, detail="Job not found")
return {
"status": job["status"],
"logs": job["logs"],
"results": job["results"],
"error": job["error"],
}
# ── Job execution ─────────────────────────────────────────────────────
def _execute_job(job_id: str, req: RunRequest):
"""Run the full benchmark in a background thread."""
job = _jobs[job_id]
logs = job["logs"]
# Serialise concurrent jobs β€” models are memory-heavy
with _run_lock:
original_stdout = sys.stdout
sys.stdout = _LogCapture(logs, original_stdout)
try:
config = _build_config(req)
# Import benchmark modules (torch may be slow to first import)
from data.synthetic import generate_all as generate_synthetic
from evaluation.scorer import build_models, run_benchmark
# ── Datasets ──
all_datasets = {}
if req.synthetic_types:
all_datasets.update(generate_synthetic(config["datasets"]["synthetic"]))
if req.custom_csv:
all_datasets.update(_load_custom_csv(req))
if not all_datasets:
raise ValueError("No datasets loaded.")
# ── Models ──
models = build_models(config["models"])
if not models:
raise ValueError("No models enabled. Select at least one.")
# ── Run ──
results_df = run_benchmark(models, all_datasets, config["evaluation"])
job["results"] = _serialise(results_df, all_datasets)
job["status"] = "done"
except Exception as exc:
job["status"] = "error"
job["error"] = str(exc)
logs.append(f"ERROR: {exc}")
finally:
sys.stdout = original_stdout
def _build_config(req: RunRequest) -> dict:
enabled = set(req.models)
return {
"models": {
"moment": {
"enabled": "moment" in enabled,
"pretrained": "AutonLab/MOMENT-1-large",
"task": "reconstruction",
},
"isolation_forest": {
"enabled": "isolation_forest" in enabled,
"n_estimators": 100,
"contamination": "auto",
"window_features": True,
"feature_window": 20,
},
"lof": {
"enabled": "lof" in enabled,
"n_neighbors": 20,
"contamination": "auto",
"window_features": True,
"feature_window": 20,
},
"moving_window": {
"enabled": "moving_window" in enabled,
"window_size": 15,
"sigma_multiplier": 2.0,
},
},
"datasets": {
"synthetic": {
"enabled": bool(req.synthetic_types),
"num_points": req.num_points,
"types": req.synthetic_types,
"anomaly_rate": 0.03,
"random_seed": 42,
}
},
"evaluation": {"threshold_method": "best_f1"},
}
def _load_custom_csv(req: RunRequest) -> dict:
import pandas as pd
import numpy as np
csv_bytes = base64.b64decode(req.custom_csv)
df = pd.read_csv(io.BytesIO(csv_bytes))
if req.custom_value_col not in df.columns:
numeric = df.select_dtypes(include=[np.number]).columns
if len(numeric) == 0:
raise ValueError("No numeric columns found in uploaded CSV.")
value_col = numeric[0]
else:
value_col = req.custom_value_col
series = df[value_col].to_numpy(dtype=float)
labels = None
if req.custom_label_col and req.custom_label_col in df.columns:
labels = df[req.custom_label_col].to_numpy(dtype=int)
valid = ~__import__("numpy").isnan(series)
series = series[valid]
if labels is not None:
labels = labels[valid]
return {"custom_upload": {"series": series, "labels": labels}}
def _serialise(results_df, all_datasets: dict) -> dict:
metrics = []
for _, row in results_df.iterrows():
metrics.append({
"model": row.get("model"),
"dataset": row.get("dataset"),
"auc_roc": _f(row.get("auc_roc")),
"auc_pr": _f(row.get("auc_pr")),
"f1": _f(row.get("f1")),
"precision": _f(row.get("precision")),
"recall": _f(row.get("recall")),
"time_seconds": _f(row.get("time_seconds")),
"scores": _arr(row.get("_scores")),
})
series_out = {}
for name, data in all_datasets.items():
series_out[name] = {
"series": [round(float(v), 4) for v in data["series"]],
"labels": data["labels"].tolist() if data.get("labels") is not None else None,
}
return {"metrics": metrics, "series": series_out}
def _f(v):
try:
f = float(v)
return None if math.isnan(f) else round(f, 4)
except Exception:
return None
def _arr(v):
try:
if v is None or not hasattr(v, "__len__"):
return None
return [round(float(x), 4) for x in v]
except Exception:
return None
# ── Stdout capture ────────────────────────────────────────────────────
class _LogCapture:
def __init__(self, log_list: list, original):
self._log = log_list
self._orig = original
def write(self, text: str):
if text and text.strip():
self._log.append(text.strip())
self._orig.write(text)
def flush(self):
self._orig.flush()