File size: 9,120 Bytes
1778581
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
"""
api/app.py β€” FastAPI backend for the TS Anomaly Detection Benchmark
====================================================================

Designed to run on Hugging Face Spaces (Docker, port 7860).

Endpoints:
  POST /api/run              β€” start a benchmark job
  GET  /api/status/{job_id} β€” poll for progress + results
  GET  /                    β€” health check
"""

import os
import sys
import uuid
import math
import base64
import threading
import io
from typing import Optional, List

from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel

# ── Add benchmark package to Python path ──
# Allows importing data.synthetic, evaluation.scorer, models.*, etc.
_BENCH_PATH = os.path.join(os.path.dirname(__file__), "..", "ts-anomaly-benchmark")
sys.path.insert(0, os.path.abspath(_BENCH_PATH))

# ── App ──────────────────────────────────────────────────────────────
app = FastAPI(title="TS Anomaly Benchmark API", version="1.0.0")

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_methods=["*"],
    allow_headers=["*"],
)

# ── Job store ─────────────────────────────────────────────────────────
# Simple in-memory store. Fine for a demo β€” one server instance.
_jobs: dict = {}
_run_lock = threading.Lock()  # one benchmark at a time (models are CPU-heavy)


# ── Request / Response models ─────────────────────────────────────────

class RunRequest(BaseModel):
    models: List[str]                          # e.g. ["moment", "isolation_forest"]
    synthetic_types: List[str] = [
        "sine_with_spikes",
        "random_walk_with_shift",
        "seasonal_with_noise",
        "ecg_like",
    ]
    num_points: int = 512
    custom_csv: Optional[str] = None           # base64-encoded CSV content
    custom_value_col: str = "value"
    custom_label_col: Optional[str] = None


# ── Endpoints ─────────────────────────────────────────────────────────

@app.get("/")
def root():
    return {"status": "ok", "message": "TS Anomaly Benchmark API"}


@app.post("/api/run")
def start_run(req: RunRequest):
    """Start a benchmark job. Returns a job_id for polling."""
    job_id = str(uuid.uuid4())
    _jobs[job_id] = {
        "status": "running",
        "logs": [],
        "results": None,
        "error": None,
    }
    thread = threading.Thread(
        target=_execute_job,
        args=(job_id, req),
        daemon=True,
    )
    thread.start()
    return {"job_id": job_id}


@app.get("/api/status/{job_id}")
def get_status(job_id: str):
    """Poll job progress. When status=='done', results are included."""
    job = _jobs.get(job_id)
    if job is None:
        raise HTTPException(status_code=404, detail="Job not found")
    return {
        "status":  job["status"],
        "logs":    job["logs"],
        "results": job["results"],
        "error":   job["error"],
    }


# ── Job execution ─────────────────────────────────────────────────────

def _execute_job(job_id: str, req: RunRequest):
    """Run the full benchmark in a background thread."""
    job = _jobs[job_id]
    logs = job["logs"]

    # Serialise concurrent jobs β€” models are memory-heavy
    with _run_lock:
        original_stdout = sys.stdout
        sys.stdout = _LogCapture(logs, original_stdout)
        try:
            config = _build_config(req)

            # Import benchmark modules (torch may be slow to first import)
            from data.synthetic import generate_all as generate_synthetic
            from evaluation.scorer import build_models, run_benchmark

            # ── Datasets ──
            all_datasets = {}
            if req.synthetic_types:
                all_datasets.update(generate_synthetic(config["datasets"]["synthetic"]))
            if req.custom_csv:
                all_datasets.update(_load_custom_csv(req))

            if not all_datasets:
                raise ValueError("No datasets loaded.")

            # ── Models ──
            models = build_models(config["models"])
            if not models:
                raise ValueError("No models enabled. Select at least one.")

            # ── Run ──
            results_df = run_benchmark(models, all_datasets, config["evaluation"])

            job["results"] = _serialise(results_df, all_datasets)
            job["status"] = "done"

        except Exception as exc:
            job["status"] = "error"
            job["error"] = str(exc)
            logs.append(f"ERROR: {exc}")
        finally:
            sys.stdout = original_stdout


def _build_config(req: RunRequest) -> dict:
    enabled = set(req.models)
    return {
        "models": {
            "moment": {
                "enabled": "moment" in enabled,
                "pretrained": "AutonLab/MOMENT-1-large",
                "task": "reconstruction",
            },
            "isolation_forest": {
                "enabled": "isolation_forest" in enabled,
                "n_estimators": 100,
                "contamination": "auto",
                "window_features": True,
                "feature_window": 20,
            },
            "lof": {
                "enabled": "lof" in enabled,
                "n_neighbors": 20,
                "contamination": "auto",
                "window_features": True,
                "feature_window": 20,
            },
            "moving_window": {
                "enabled": "moving_window" in enabled,
                "window_size": 15,
                "sigma_multiplier": 2.0,
            },
        },
        "datasets": {
            "synthetic": {
                "enabled": bool(req.synthetic_types),
                "num_points": req.num_points,
                "types": req.synthetic_types,
                "anomaly_rate": 0.03,
                "random_seed": 42,
            }
        },
        "evaluation": {"threshold_method": "best_f1"},
    }


def _load_custom_csv(req: RunRequest) -> dict:
    import pandas as pd
    import numpy as np

    csv_bytes = base64.b64decode(req.custom_csv)
    df = pd.read_csv(io.BytesIO(csv_bytes))

    if req.custom_value_col not in df.columns:
        numeric = df.select_dtypes(include=[np.number]).columns
        if len(numeric) == 0:
            raise ValueError("No numeric columns found in uploaded CSV.")
        value_col = numeric[0]
    else:
        value_col = req.custom_value_col

    series = df[value_col].to_numpy(dtype=float)
    labels = None
    if req.custom_label_col and req.custom_label_col in df.columns:
        labels = df[req.custom_label_col].to_numpy(dtype=int)

    valid = ~__import__("numpy").isnan(series)
    series = series[valid]
    if labels is not None:
        labels = labels[valid]

    return {"custom_upload": {"series": series, "labels": labels}}


def _serialise(results_df, all_datasets: dict) -> dict:
    metrics = []
    for _, row in results_df.iterrows():
        metrics.append({
            "model":        row.get("model"),
            "dataset":      row.get("dataset"),
            "auc_roc":      _f(row.get("auc_roc")),
            "auc_pr":       _f(row.get("auc_pr")),
            "f1":           _f(row.get("f1")),
            "precision":    _f(row.get("precision")),
            "recall":       _f(row.get("recall")),
            "time_seconds": _f(row.get("time_seconds")),
            "scores":       _arr(row.get("_scores")),
        })

    series_out = {}
    for name, data in all_datasets.items():
        series_out[name] = {
            "series": [round(float(v), 4) for v in data["series"]],
            "labels": data["labels"].tolist() if data.get("labels") is not None else None,
        }

    return {"metrics": metrics, "series": series_out}


def _f(v):
    try:
        f = float(v)
        return None if math.isnan(f) else round(f, 4)
    except Exception:
        return None


def _arr(v):
    try:
        if v is None or not hasattr(v, "__len__"):
            return None
        return [round(float(x), 4) for x in v]
    except Exception:
        return None


# ── Stdout capture ────────────────────────────────────────────────────

class _LogCapture:
    def __init__(self, log_list: list, original):
        self._log = log_list
        self._orig = original

    def write(self, text: str):
        if text and text.strip():
            self._log.append(text.strip())
        self._orig.write(text)

    def flush(self):
        self._orig.flush()