Docgenie-API / api /tests /performance /test_latency_throughput.py
Ahadhassan-2003
deploy: update HF Space
dc4e6da
"""
Performance tests β€” latency and throughput
==========================================
Measures:
2.1 Lightweight endpoint latency (GET / and GET /health, N sequential samples)
2.2 POST /generate/pdf input-validation latency (schema rejection path β€” fast)
2.3 POST /generate/async input-validation latency (schema rejection path β€” fast)
2.4 GET /jobs/user/{id} latency (Supabase read β€” lightweight)
2.5 Sequential throughput across lightweight GETs
2.6 Concurrent lightweight GET requests (ThreadPoolExecutor)
NOTE: The generation endpoints (/generate/pdf, /generate/async) are NOT exercised
with real requests because they require a valid Supabase request_id and call
expensive downstream services (Claude API, PDF rendering). Instead, we measure
the *input-validation* (422 / 404) paths which are still real network round-trips
to the deployed API and reflect cold-path overhead.
All timings are collected into _perf_results and saved to artifacts/perf_metrics.json
at the end of the session so compile_results.py can embed them in the report.
"""
import json
import time
import pathlib
import statistics
import pytest
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
from tests.conftest import (
BASE_URL, TIMEOUT, SEED_IMAGE_URL,
NONEXISTENT_REQUEST_ID, NONEXISTENT_USER_ID,
MINIMAL_GENERATE_PAYLOAD,
)
ARTIFACTS = pathlib.Path(__file__).parent.parent / "artifacts"
ARTIFACTS.mkdir(exist_ok=True)
# ---------------------------------------------------------------------------
# Thresholds (loose β€” HuggingFace Spaces can have cold-start jitter)
# ---------------------------------------------------------------------------
MAX_HEALTH_MEAN_S = 10.0 # mean response time for /health
MAX_HEALTH_P95_S = 20.0 # 95th-percentile for /health
MAX_SCHEMA_REJECT_S = 5.0 # 422 responses should always be fast
MAX_USER_JOBS_S = 10.0 # Supabase read should be fast when warm
N_SAMPLES = 5 # sequential sample count for latency stats
N_CONCURRENT = 4 # worker count for concurrent test
# Shared results dict β€” populated by the test methods
_perf_results: dict = {}
# ---------------------------------------------------------------------------
# Session-end fixture β€” persist metrics to JSON for compile_results.py
# ---------------------------------------------------------------------------
@pytest.fixture(scope="session", autouse=True)
def _persist_perf_metrics():
"""Yield during the test session; write metrics JSON afterwards."""
yield
try:
(ARTIFACTS / "perf_metrics.json").write_text(
json.dumps(_perf_results, indent=2)
)
except Exception as e:
print(f"Warning: could not save perf metrics: {e}")
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _timeit(fn) -> float:
t0 = time.perf_counter()
fn()
return time.perf_counter() - t0
def _stats(samples: list) -> dict:
return {
"n": len(samples),
"min_s": round(min(samples), 4),
"mean_s": round(statistics.mean(samples), 4),
"median_s": round(statistics.median(samples), 4),
"max_s": round(max(samples), 4),
"p95_s": round(sorted(samples)[int(len(samples) * 0.95)], 4)
if len(samples) >= 2 else round(max(samples), 4),
}
# ---------------------------------------------------------------------------
# 2.1 Lightweight endpoint latency
# ---------------------------------------------------------------------------
class TestLightweightEndpointLatency:
"""Sequential GET / and GET /health β€” N samples each."""
def test_root_latency_under_threshold(self, http):
samples = []
for _ in range(N_SAMPLES):
samples.append(_timeit(lambda: http.get(f"{BASE_URL}/", timeout=TIMEOUT)))
st = _stats(samples)
_perf_results["root_latency"] = st
print(f"\n GET / β€” {st}")
assert st["mean_s"] < MAX_HEALTH_MEAN_S, (
f"GET / mean latency {st['mean_s']:.3f}s exceeds {MAX_HEALTH_MEAN_S}s"
)
def test_health_latency_under_threshold(self, http):
samples = []
for _ in range(N_SAMPLES):
samples.append(_timeit(lambda: http.get(f"{BASE_URL}/health", timeout=TIMEOUT)))
st = _stats(samples)
_perf_results["health_latency"] = st
print(f"\n GET /health β€” {st}")
assert st["mean_s"] < MAX_HEALTH_MEAN_S, (
f"GET /health mean latency {st['mean_s']:.3f}s exceeds {MAX_HEALTH_MEAN_S}s"
)
def test_user_jobs_latency_under_threshold(self, http):
url = f"{BASE_URL}/jobs/user/{NONEXISTENT_USER_ID}"
samples = []
for _ in range(N_SAMPLES):
samples.append(_timeit(lambda: http.get(url, timeout=TIMEOUT)))
st = _stats(samples)
_perf_results["user_jobs_latency"] = st
print(f"\n GET /jobs/user/{{id}} β€” {st}")
assert st["mean_s"] < MAX_USER_JOBS_S, (
f"GET /jobs/user mean latency {st['mean_s']:.3f}s exceeds {MAX_USER_JOBS_S}s"
)
# ---------------------------------------------------------------------------
# 2.2 Input-validation (422) path latency for /generate/pdf
# ---------------------------------------------------------------------------
class TestGeneratePdfValidationLatency:
"""422 responses are pure FastAPI work (no DB / LLM calls)."""
def test_schema_rejection_is_fast(self, http):
bad_payload = {} # missing required request_id β†’ immediate 422
samples = []
for _ in range(N_SAMPLES):
samples.append(_timeit(
lambda: http.post(f"{BASE_URL}/generate/pdf", json=bad_payload, timeout=TIMEOUT)
))
st = _stats(samples)
_perf_results["pdf_validation_latency"] = st
print(f"\n POST /generate/pdf (422 path) β€” {st}")
assert st["mean_s"] < MAX_SCHEMA_REJECT_S, (
f"422 path mean {st['mean_s']:.3f}s exceeds {MAX_SCHEMA_REJECT_S}s"
)
# ---------------------------------------------------------------------------
# 2.3 Input-validation (422) path latency for /generate/async
# ---------------------------------------------------------------------------
class TestGenerateAsyncValidationLatency:
def test_schema_rejection_is_fast(self, http):
bad_payload = {}
samples = []
for _ in range(N_SAMPLES):
samples.append(_timeit(
lambda: http.post(f"{BASE_URL}/generate/async", json=bad_payload, timeout=TIMEOUT)
))
st = _stats(samples)
_perf_results["async_validation_latency"] = st
print(f"\n POST /generate/async (422 path) β€” {st}")
assert st["mean_s"] < MAX_SCHEMA_REJECT_S
# ---------------------------------------------------------------------------
# 2.4 Sequential throughput β€” GET /health
# ---------------------------------------------------------------------------
class TestSequentialThroughput:
"""How many /health requests can the API serve sequentially per second?"""
def test_health_sequential_throughput(self, http):
n = N_SAMPLES
t_start = time.perf_counter()
statuses = []
for _ in range(n):
r = http.get(f"{BASE_URL}/health", timeout=TIMEOUT)
statuses.append(r.status_code)
wall = time.perf_counter() - t_start
ok = statuses.count(200)
req_per_min = round(ok / wall * 60, 2)
result = {
"requests": n, "ok": ok, "failures": n - ok,
"wall_s": round(wall, 3),
"mean_per_req_s": round(wall / n, 3),
"req_per_min": req_per_min,
}
_perf_results["sequential_throughput"] = result
print(f"\n Sequential throughput β€” {result}")
assert ok == n, f"Expected all {n} requests to succeed, got {ok}"
# ---------------------------------------------------------------------------
# 2.5 Concurrent GET /health
# ---------------------------------------------------------------------------
class TestConcurrentRequests:
"""Fire N concurrent GET /health requests and measure wall-clock time."""
def _run_concurrent(self, n_workers: int, http_session: requests.Session):
url = f"{BASE_URL}/health"
results = []
wall_start = time.perf_counter()
def _fetch():
t0 = time.perf_counter()
r = requests.get(url, timeout=TIMEOUT)
elapsed = time.perf_counter() - t0
return r.status_code, elapsed
with ThreadPoolExecutor(max_workers=n_workers) as pool:
futures = [pool.submit(_fetch) for _ in range(n_workers)]
for f in as_completed(futures):
results.append(f.result())
wall = time.perf_counter() - wall_start
statuses = [r[0] for r in results]
per_req = [r[1] for r in results]
ok_count = statuses.count(200)
return {
"concurrency": n_workers,
"ok": ok_count, "fail": n_workers - ok_count,
"wall_s": round(wall, 3),
"min_req_s": round(min(per_req), 3),
"mean_req_s": round(statistics.mean(per_req), 3),
"max_req_s": round(max(per_req), 3),
}
def test_concurrent_2_health_requests(self, http):
result = self._run_concurrent(2, http)
_perf_results["concurrent_2"] = result
print(f"\n Concurrent (2) β€” {result}")
assert result["ok"] == 2, f"Expected 2/2 successes, got {result}"
def test_concurrent_4_health_requests(self, http):
result = self._run_concurrent(4, http)
_perf_results["concurrent_4"] = result
print(f"\n Concurrent (4) β€” {result}")
assert result["ok"] == 4, f"Expected 4/4 successes, got {result}"
def test_concurrent_wall_less_than_serial(self, http):
"""
Wall-clock for N concurrent requests should be less than
N Γ— mean single-request time (i.e., some parallelism is achieved).
"""
single_latency = _timeit(lambda: http.get(f"{BASE_URL}/health", timeout=TIMEOUT))
result = self._run_concurrent(N_CONCURRENT, http)
serial_estimate = single_latency * N_CONCURRENT
# Allow a generous 80 % of serial time as "acceptable parallelism"
assert result["wall_s"] < serial_estimate * 0.95 or result["wall_s"] < 30, (
f"Concurrent wall={result['wall_s']:.2f}s not better than "
f"serial estimate={serial_estimate:.2f}s"
)