File size: 16,310 Bytes
371efe0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
from __future__ import annotations

import asyncio
import json
import uuid
from typing import AsyncIterator

from fastapi import FastAPI, File, HTTPException, Query, Request, Response, UploadFile, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from sse_starlette.sse import EventSourceResponse

from api_service import (
    apply_manual_result_override,
    delete_model_results,
    delete_dataset,
    export_results,
    export_results_table,
    get_datasets,
    get_dataset_template,
    get_health,
    get_ollama_auth_status,
    get_models,
    get_questions,
    get_results,
    get_run_status,
    run_snapshot,
    start_run,
    stop_run,
    upload_dataset,
)
from slo_monitor import get_slo_monitor


app = FastAPI(title="openLLMbenchmark API", version="v1")
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

OLLAMA_API_KEY_HEADER = "X-Ollama-API-Key"


def _is_local_request(request: Request) -> bool:
    client = request.client
    host = (client.host if client else "").strip().lower()
    return host in {"127.0.0.1", "::1", "localhost", "testclient"}


def _raise_if_runs_circuit_open() -> None:
    snapshot = get_slo_monitor().snapshot()
    if snapshot.breached:
        raise HTTPException(
            status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
            detail=(
                "Runs are temporarily unavailable due to SSE SLO breach. "
                "Investigate and recover before retrying."
            ),
        )


def _ollama_api_key_from_request(request: Request) -> str:
    return str(request.headers.get(OLLAMA_API_KEY_HEADER, "") or "").strip()


def _record_terminal_run_outcome(
    *,
    run_id: int,
    session_id: str,
    completed: bool,
    interrupted: bool,
    error: str,
) -> None:
    if not completed:
        return
    if interrupted:
        return
    success = not interrupted and not str(error).strip()
    run_key = f"{session_id}:{run_id}"
    get_slo_monitor().register_run_outcome(run_key, success=success)


@app.get("/health")
def health() -> dict[str, str]:
    return get_health()


@app.get("/models")
def models(request: Request) -> dict[str, list[str]]:
    try:
        return {"models": get_models(ollama_api_key=_ollama_api_key_from_request(request))}
    except RuntimeError as exc:
        raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=str(exc)) from exc


@app.get("/ollama/auth-status")
def ollama_auth_status() -> dict[str, bool]:
    return get_ollama_auth_status()


@app.get("/datasets")
def datasets() -> dict[str, list[dict[str, object]]]:
    return {"datasets": get_datasets()}


@app.get("/datasets/template")
def datasets_template() -> Response:
    content = get_dataset_template()
    headers = {"Content-Disposition": 'attachment; filename="benchmark_template.json"'}
    return Response(content=content, media_type="application/json", headers=headers)


@app.post("/datasets/upload", status_code=status.HTTP_201_CREATED)
async def datasets_upload(file: UploadFile = File(...)) -> dict[str, object]:
    payload = await file.read()
    if not payload:
        raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Uploaded file is empty.")
    try:
        dataset = upload_dataset(filename=file.filename or "dataset.json", content=payload)
    except (ValueError, RuntimeError) as exc:
        raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Invalid dataset payload.") from exc
    except Exception as exc:  # noqa: BLE001
        raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Dataset upload failed.") from exc
    return {"dataset": dataset}


@app.delete("/datasets/{dataset_key}")
def datasets_delete(dataset_key: str) -> dict[str, object]:
    state, summary = delete_dataset(dataset_key)
    if state == "not_found":
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Unknown dataset")
    if state == "default_forbidden":
        raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Default dataset cannot be deleted.")
    return {"status": "deleted", "summary": summary}


@app.get("/questions")
def questions(dataset_key: str = Query(..., min_length=1)) -> dict[str, object]:
    payload = get_questions(dataset_key)
    if payload is None:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Unknown dataset")
    return payload


@app.get("/results")
def results(dataset_key: str = Query(..., min_length=1)) -> dict[str, object]:
    payload = get_results(dataset_key)
    if payload is None:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Unknown dataset")
    return payload


@app.get("/results/export")
def results_export(
    dataset_key: str = Query(..., min_length=1),
    export_format: str = Query(..., alias="format", pattern="^(json|xlsx)$"),
) -> Response:
    exported = export_results(dataset_key, export_format)
    if exported is None:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Unknown dataset")
    content, media_type, filename = exported
    headers = {"Content-Disposition": f'attachment; filename="{filename}"'}
    return Response(content=content, media_type=media_type, headers=headers)


@app.get("/results/table_export")
def results_table_export(
    dataset_key: str = Query(..., min_length=1),
    table: str = Query(..., min_length=1),
    export_format: str = Query(..., alias="format", pattern="^(json|xlsx)$"),
) -> Response:
    state, exported = export_results_table(dataset_key=dataset_key, table_key=table, export_format=export_format)
    if state == "dataset_not_found":
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Unknown dataset")
    if state == "table_not_supported":
        raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Unknown results table")
    if state == "format_not_supported":
        raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Unsupported export format")
    if state != "ok" or exported is None:
        raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Results table export failed")
    content, media_type, filename = exported
    headers = {"Content-Disposition": f'attachment; filename="{filename}"'}
    return Response(content=content, media_type=media_type, headers=headers)


@app.delete("/results/model")
def results_model_delete(
    dataset_key: str = Query(..., min_length=1),
    model: str = Query(..., min_length=1),
) -> dict[str, object]:
    state, summary = delete_model_results(dataset_key=dataset_key, model=model)
    if state == "dataset_not_found":
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Unknown dataset")
    if state == "model_not_found":
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Model results not found for dataset")
    if state == "invalid_model":
        raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Invalid model")
    if state != "deleted" or summary is None:
        raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Model history delete failed")
    return {"status": "deleted", "summary": summary}


@app.post("/runs", status_code=status.HTTP_201_CREATED)
async def runs(request: Request) -> dict[str, object]:
    _raise_if_runs_circuit_open()

    body = await request.json()
    session_id = str(body.get("session_id", "")).strip() or uuid.uuid4().hex
    dataset_key = str(body.get("dataset_key", "")).strip()
    question_id = str(body.get("question_id", "")).strip()
    models = body.get("models", [])
    system_prompt = str(body.get("system_prompt", "") or "")
    if not dataset_key or not question_id or not isinstance(models, list):
        raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Invalid run payload")

    run_id, status_text = start_run(
        session_id=session_id,
        dataset_key=dataset_key,
        question_id=question_id,
        models=models,
        system_prompt=system_prompt,
        ollama_api_key=_ollama_api_key_from_request(request),
    )
    if status_text == "dataset_not_found" or status_text == "question_not_found":
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=status_text)
    if status_text == "invalid_models":
        raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=status_text)
    if status_text == "missing_api_key":
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
            detail="Enter Ollama API Key to be able to use Ollama Cloud models.",
        )
    if status_text == "conflict":
        payload: dict[str, object] = {"detail": "A run is already active for this session."}
        if run_id is not None:
            payload["run_id"] = run_id
        return JSONResponse(status_code=status.HTTP_409_CONFLICT, content=payload)
    if status_text != "started" or run_id is None:
        raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=status_text)
    return {"run_id": run_id, "status": "started", "session_id": session_id}


@app.get("/runs/{run_id}/events")
async def run_events(
    request: Request,
    run_id: int,
    session_id: str = Query(..., min_length=1),
) -> EventSourceResponse:
    _raise_if_runs_circuit_open()
    monitor = get_slo_monitor()
    stream_key = f"{session_id}:{run_id}:{uuid.uuid4().hex}"
    monitor.register_stream_open(stream_key)

    async def _event_generator() -> AsyncIterator[dict[str, str]]:
        emitted_started = False
        emitted_completed_for: set[str] = set()
        try:
            while True:
                snapshot = run_snapshot(session_id=session_id)
                if int(snapshot.get("run_id", 0)) != run_id:
                    monitor.register_stream_error(stream_key)
                    yield {"event": "run_error", "data": json.dumps({"reason": "run_not_found"})}
                    break

                running = bool(snapshot.get("running"))
                completed = bool(snapshot.get("completed"))
                if await request.is_disconnected():
                    if running and not completed:
                        monitor.register_stream_disconnect(stream_key)
                    else:
                        monitor.register_stream_closed(stream_key)
                    break

                if not emitted_started:
                    emitted_started = True
                    yield {"event": "run_started", "data": json.dumps({"run_id": run_id})}

                entries = snapshot.get("entries", [])
                for entry in entries:
                    model = str(entry.get("model", ""))
                    source = str(entry.get("source", ""))
                    host = str(entry.get("host", ""))
                    response = str(entry.get("response", ""))
                    monitor.register_chunk(stream_key)
                    yield {
                        "event": "chunk",
                        "data": json.dumps(
                            {
                                "run_id": run_id,
                                "model": model,
                                "source": source,
                                "host": host,
                                "response": response,
                            }
                        ),
                    }
                    if bool(entry.get("completed")) and model not in emitted_completed_for:
                        emitted_completed_for.add(model)
                        event_name = "run_interrupted" if bool(entry.get("interrupted")) else "entry_completed"
                        payload = {
                            "run_id": run_id,
                            "model": model,
                            "source": source,
                            "host": host,
                            "interrupted": bool(entry.get("interrupted")),
                            "error": str(entry.get("error", "")),
                            "elapsed_ms": float(entry.get("elapsed_ms", 0.0) or 0.0),
                        }
                        yield {"event": event_name, "data": json.dumps(payload)}

                interrupted = any(bool(item.get("interrupted")) for item in entries)
                error = next((str(item.get("error", "")) for item in entries if str(item.get("error", "")).strip()), "")
                if completed and not running:
                    _record_terminal_run_outcome(
                        run_id=run_id,
                        session_id=session_id,
                        completed=completed,
                        interrupted=interrupted,
                        error=error,
                    )
                    yield {"event": "run_completed", "data": json.dumps({"run_id": run_id})}
                    monitor.register_stream_closed(stream_key)
                    break

                await asyncio.sleep(0.2)
        except Exception:
            monitor.register_stream_error(stream_key)
            raise

    return EventSourceResponse(_event_generator())


@app.get("/runs/{run_id}/status")
def run_status(run_id: int, session_id: str = Query(..., min_length=1)) -> dict[str, object]:
    payload = get_run_status(run_id=run_id, session_id=session_id)
    if payload is None:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Run not found")
    _record_terminal_run_outcome(
        run_id=run_id,
        session_id=session_id,
        completed=bool(payload.get("completed")),
        interrupted=bool(payload.get("interrupted")),
        error=str(payload.get("error", "")),
    )
    return payload


@app.post("/runs/{run_id}/stop", status_code=status.HTTP_202_ACCEPTED)
def run_stop(run_id: int, session_id: str = Query(..., min_length=1)) -> dict[str, str]:
    del run_id
    stop_run(session_id=session_id)
    return {"status": "stop_requested"}


@app.get("/ops/slo")
def ops_slo(request: Request) -> dict[str, object]:
    if not _is_local_request(request):
        raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Local/internal endpoint only.")
    return get_slo_monitor().snapshot().as_dict()


@app.post("/ops/slo/reset")
def ops_slo_reset(request: Request) -> dict[str, object]:
    if not _is_local_request(request):
        raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Local/internal endpoint only.")
    monitor = get_slo_monitor()
    monitor.reset()
    return {"status": "reset", "slo": monitor.snapshot().as_dict()}


@app.patch("/results/manual")
async def results_manual(request: Request):
    body = await request.json()
    dataset_key = str(body.get("dataset_key", "")).strip()
    question_id = str(body.get("question_id", "")).strip()
    model = str(body.get("model", "")).strip()
    override_status = str(body.get("status", "")).strip()
    reason = str(body.get("reason", ""))
    if not dataset_key or not question_id or not model or not override_status:
        raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Invalid manual override payload")
    state, payload = apply_manual_result_override(
        dataset_key=dataset_key,
        question_id=question_id,
        model=model,
        status=override_status,
        reason=reason,
    )
    if state == "dataset_not_found":
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Unknown dataset")
    if state == "result_not_found":
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Result record not found")
    if state == "invalid_status":
        raise HTTPException(
            status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
            detail="Invalid status; expected success, fail, or manual_review.",
        )
    if state != "updated" or payload is None:
        raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Manual override failed")
    return {"status": "updated", "result": payload}