Spaces:
Sleeping
Sleeping
| """ | |
| api/app.py β FastAPI backend for the TS Anomaly Detection Benchmark | |
| ==================================================================== | |
| Designed to run on Hugging Face Spaces (Docker, port 7860). | |
| Endpoints: | |
| POST /api/run β start a benchmark job | |
| GET /api/status/{job_id} β poll for progress + results | |
| GET / β health check | |
| """ | |
| import os | |
| import sys | |
| import uuid | |
| import math | |
| import base64 | |
| import threading | |
| import io | |
| from typing import Optional, List | |
| from fastapi import FastAPI, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| # ββ Add benchmark package to Python path ββ | |
| # Allows importing data.synthetic, evaluation.scorer, models.*, etc. | |
| _BENCH_PATH = os.path.join(os.path.dirname(__file__), "..", "ts-anomaly-benchmark") | |
| sys.path.insert(0, os.path.abspath(_BENCH_PATH)) | |
| # ββ App ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| app = FastAPI(title="TS Anomaly Benchmark API", version="1.0.0") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # ββ Job store βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Simple in-memory store. Fine for a demo β one server instance. | |
| _jobs: dict = {} | |
| _run_lock = threading.Lock() # one benchmark at a time (models are CPU-heavy) | |
| # ββ Request / Response models βββββββββββββββββββββββββββββββββββββββββ | |
| class RunRequest(BaseModel): | |
| models: List[str] # e.g. ["moment", "isolation_forest"] | |
| synthetic_types: List[str] = [ | |
| "sine_with_spikes", | |
| "random_walk_with_shift", | |
| "seasonal_with_noise", | |
| "ecg_like", | |
| ] | |
| num_points: int = 512 | |
| custom_csv: Optional[str] = None # base64-encoded CSV content | |
| custom_value_col: str = "value" | |
| custom_label_col: Optional[str] = None | |
| # ββ Endpoints βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def root(): | |
| return {"status": "ok", "message": "TS Anomaly Benchmark API"} | |
| def start_run(req: RunRequest): | |
| """Start a benchmark job. Returns a job_id for polling.""" | |
| job_id = str(uuid.uuid4()) | |
| _jobs[job_id] = { | |
| "status": "running", | |
| "logs": [], | |
| "results": None, | |
| "error": None, | |
| } | |
| thread = threading.Thread( | |
| target=_execute_job, | |
| args=(job_id, req), | |
| daemon=True, | |
| ) | |
| thread.start() | |
| return {"job_id": job_id} | |
| def get_status(job_id: str): | |
| """Poll job progress. When status=='done', results are included.""" | |
| job = _jobs.get(job_id) | |
| if job is None: | |
| raise HTTPException(status_code=404, detail="Job not found") | |
| return { | |
| "status": job["status"], | |
| "logs": job["logs"], | |
| "results": job["results"], | |
| "error": job["error"], | |
| } | |
| # ββ Job execution βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _execute_job(job_id: str, req: RunRequest): | |
| """Run the full benchmark in a background thread.""" | |
| job = _jobs[job_id] | |
| logs = job["logs"] | |
| # Serialise concurrent jobs β models are memory-heavy | |
| with _run_lock: | |
| original_stdout = sys.stdout | |
| sys.stdout = _LogCapture(logs, original_stdout) | |
| try: | |
| config = _build_config(req) | |
| # Import benchmark modules (torch may be slow to first import) | |
| from data.synthetic import generate_all as generate_synthetic | |
| from evaluation.scorer import build_models, run_benchmark | |
| # ββ Datasets ββ | |
| all_datasets = {} | |
| if req.synthetic_types: | |
| all_datasets.update(generate_synthetic(config["datasets"]["synthetic"])) | |
| if req.custom_csv: | |
| all_datasets.update(_load_custom_csv(req)) | |
| if not all_datasets: | |
| raise ValueError("No datasets loaded.") | |
| # ββ Models ββ | |
| models = build_models(config["models"]) | |
| if not models: | |
| raise ValueError("No models enabled. Select at least one.") | |
| # ββ Run ββ | |
| results_df = run_benchmark(models, all_datasets, config["evaluation"]) | |
| job["results"] = _serialise(results_df, all_datasets) | |
| job["status"] = "done" | |
| except Exception as exc: | |
| job["status"] = "error" | |
| job["error"] = str(exc) | |
| logs.append(f"ERROR: {exc}") | |
| finally: | |
| sys.stdout = original_stdout | |
| def _build_config(req: RunRequest) -> dict: | |
| enabled = set(req.models) | |
| return { | |
| "models": { | |
| "moment": { | |
| "enabled": "moment" in enabled, | |
| "pretrained": "AutonLab/MOMENT-1-large", | |
| "task": "reconstruction", | |
| }, | |
| "isolation_forest": { | |
| "enabled": "isolation_forest" in enabled, | |
| "n_estimators": 100, | |
| "contamination": "auto", | |
| "window_features": True, | |
| "feature_window": 20, | |
| }, | |
| "lof": { | |
| "enabled": "lof" in enabled, | |
| "n_neighbors": 20, | |
| "contamination": "auto", | |
| "window_features": True, | |
| "feature_window": 20, | |
| }, | |
| "moving_window": { | |
| "enabled": "moving_window" in enabled, | |
| "window_size": 15, | |
| "sigma_multiplier": 2.0, | |
| }, | |
| }, | |
| "datasets": { | |
| "synthetic": { | |
| "enabled": bool(req.synthetic_types), | |
| "num_points": req.num_points, | |
| "types": req.synthetic_types, | |
| "anomaly_rate": 0.03, | |
| "random_seed": 42, | |
| } | |
| }, | |
| "evaluation": {"threshold_method": "best_f1"}, | |
| } | |
| def _load_custom_csv(req: RunRequest) -> dict: | |
| import pandas as pd | |
| import numpy as np | |
| csv_bytes = base64.b64decode(req.custom_csv) | |
| df = pd.read_csv(io.BytesIO(csv_bytes)) | |
| if req.custom_value_col not in df.columns: | |
| numeric = df.select_dtypes(include=[np.number]).columns | |
| if len(numeric) == 0: | |
| raise ValueError("No numeric columns found in uploaded CSV.") | |
| value_col = numeric[0] | |
| else: | |
| value_col = req.custom_value_col | |
| series = df[value_col].to_numpy(dtype=float) | |
| labels = None | |
| if req.custom_label_col and req.custom_label_col in df.columns: | |
| labels = df[req.custom_label_col].to_numpy(dtype=int) | |
| valid = ~__import__("numpy").isnan(series) | |
| series = series[valid] | |
| if labels is not None: | |
| labels = labels[valid] | |
| return {"custom_upload": {"series": series, "labels": labels}} | |
| def _serialise(results_df, all_datasets: dict) -> dict: | |
| metrics = [] | |
| for _, row in results_df.iterrows(): | |
| metrics.append({ | |
| "model": row.get("model"), | |
| "dataset": row.get("dataset"), | |
| "auc_roc": _f(row.get("auc_roc")), | |
| "auc_pr": _f(row.get("auc_pr")), | |
| "f1": _f(row.get("f1")), | |
| "precision": _f(row.get("precision")), | |
| "recall": _f(row.get("recall")), | |
| "time_seconds": _f(row.get("time_seconds")), | |
| "scores": _arr(row.get("_scores")), | |
| }) | |
| series_out = {} | |
| for name, data in all_datasets.items(): | |
| series_out[name] = { | |
| "series": [round(float(v), 4) for v in data["series"]], | |
| "labels": data["labels"].tolist() if data.get("labels") is not None else None, | |
| } | |
| return {"metrics": metrics, "series": series_out} | |
| def _f(v): | |
| try: | |
| f = float(v) | |
| return None if math.isnan(f) else round(f, 4) | |
| except Exception: | |
| return None | |
| def _arr(v): | |
| try: | |
| if v is None or not hasattr(v, "__len__"): | |
| return None | |
| return [round(float(x), 4) for x in v] | |
| except Exception: | |
| return None | |
| # ββ Stdout capture ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class _LogCapture: | |
| def __init__(self, log_list: list, original): | |
| self._log = log_list | |
| self._orig = original | |
| def write(self, text: str): | |
| if text and text.strip(): | |
| self._log.append(text.strip()) | |
| self._orig.write(text) | |
| def flush(self): | |
| self._orig.flush() | |