""" Performance tests — latency and throughput ========================================== Measures: 2.1 Lightweight endpoint latency (GET / and GET /health, N sequential samples) 2.2 POST /generate/pdf input-validation latency (schema rejection path — fast) 2.3 POST /generate/async input-validation latency (schema rejection path — fast) 2.4 GET /jobs/user/{id} latency (Supabase read — lightweight) 2.5 Sequential throughput across lightweight GETs 2.6 Concurrent lightweight GET requests (ThreadPoolExecutor) NOTE: The generation endpoints (/generate/pdf, /generate/async) are NOT exercised with real requests because they require a valid Supabase request_id and call expensive downstream services (Claude API, PDF rendering). Instead, we measure the *input-validation* (422 / 404) paths which are still real network round-trips to the deployed API and reflect cold-path overhead. All timings are collected into _perf_results and saved to artifacts/perf_metrics.json at the end of the session so compile_results.py can embed them in the report. """ import json import time import pathlib import statistics import pytest import requests from concurrent.futures import ThreadPoolExecutor, as_completed from tests.conftest import ( BASE_URL, TIMEOUT, SEED_IMAGE_URL, NONEXISTENT_REQUEST_ID, NONEXISTENT_USER_ID, MINIMAL_GENERATE_PAYLOAD, ) ARTIFACTS = pathlib.Path(__file__).parent.parent / "artifacts" ARTIFACTS.mkdir(exist_ok=True) # --------------------------------------------------------------------------- # Thresholds (loose — HuggingFace Spaces can have cold-start jitter) # --------------------------------------------------------------------------- MAX_HEALTH_MEAN_S = 10.0 # mean response time for /health MAX_HEALTH_P95_S = 20.0 # 95th-percentile for /health MAX_SCHEMA_REJECT_S = 5.0 # 422 responses should always be fast MAX_USER_JOBS_S = 10.0 # Supabase read should be fast when warm N_SAMPLES = 5 # sequential sample count for latency stats N_CONCURRENT = 4 # worker count for concurrent test # Shared results dict — populated by the test methods _perf_results: dict = {} # --------------------------------------------------------------------------- # Session-end fixture — persist metrics to JSON for compile_results.py # --------------------------------------------------------------------------- @pytest.fixture(scope="session", autouse=True) def _persist_perf_metrics(): """Yield during the test session; write metrics JSON afterwards.""" yield try: (ARTIFACTS / "perf_metrics.json").write_text( json.dumps(_perf_results, indent=2) ) except Exception as e: print(f"Warning: could not save perf metrics: {e}") # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _timeit(fn) -> float: t0 = time.perf_counter() fn() return time.perf_counter() - t0 def _stats(samples: list) -> dict: return { "n": len(samples), "min_s": round(min(samples), 4), "mean_s": round(statistics.mean(samples), 4), "median_s": round(statistics.median(samples), 4), "max_s": round(max(samples), 4), "p95_s": round(sorted(samples)[int(len(samples) * 0.95)], 4) if len(samples) >= 2 else round(max(samples), 4), } # --------------------------------------------------------------------------- # 2.1 Lightweight endpoint latency # --------------------------------------------------------------------------- class TestLightweightEndpointLatency: """Sequential GET / and GET /health — N samples each.""" def test_root_latency_under_threshold(self, http): samples = [] for _ in range(N_SAMPLES): samples.append(_timeit(lambda: http.get(f"{BASE_URL}/", timeout=TIMEOUT))) st = _stats(samples) _perf_results["root_latency"] = st print(f"\n GET / — {st}") assert st["mean_s"] < MAX_HEALTH_MEAN_S, ( f"GET / mean latency {st['mean_s']:.3f}s exceeds {MAX_HEALTH_MEAN_S}s" ) def test_health_latency_under_threshold(self, http): samples = [] for _ in range(N_SAMPLES): samples.append(_timeit(lambda: http.get(f"{BASE_URL}/health", timeout=TIMEOUT))) st = _stats(samples) _perf_results["health_latency"] = st print(f"\n GET /health — {st}") assert st["mean_s"] < MAX_HEALTH_MEAN_S, ( f"GET /health mean latency {st['mean_s']:.3f}s exceeds {MAX_HEALTH_MEAN_S}s" ) def test_user_jobs_latency_under_threshold(self, http): url = f"{BASE_URL}/jobs/user/{NONEXISTENT_USER_ID}" samples = [] for _ in range(N_SAMPLES): samples.append(_timeit(lambda: http.get(url, timeout=TIMEOUT))) st = _stats(samples) _perf_results["user_jobs_latency"] = st print(f"\n GET /jobs/user/{{id}} — {st}") assert st["mean_s"] < MAX_USER_JOBS_S, ( f"GET /jobs/user mean latency {st['mean_s']:.3f}s exceeds {MAX_USER_JOBS_S}s" ) # --------------------------------------------------------------------------- # 2.2 Input-validation (422) path latency for /generate/pdf # --------------------------------------------------------------------------- class TestGeneratePdfValidationLatency: """422 responses are pure FastAPI work (no DB / LLM calls).""" def test_schema_rejection_is_fast(self, http): bad_payload = {} # missing required request_id → immediate 422 samples = [] for _ in range(N_SAMPLES): samples.append(_timeit( lambda: http.post(f"{BASE_URL}/generate/pdf", json=bad_payload, timeout=TIMEOUT) )) st = _stats(samples) _perf_results["pdf_validation_latency"] = st print(f"\n POST /generate/pdf (422 path) — {st}") assert st["mean_s"] < MAX_SCHEMA_REJECT_S, ( f"422 path mean {st['mean_s']:.3f}s exceeds {MAX_SCHEMA_REJECT_S}s" ) # --------------------------------------------------------------------------- # 2.3 Input-validation (422) path latency for /generate/async # --------------------------------------------------------------------------- class TestGenerateAsyncValidationLatency: def test_schema_rejection_is_fast(self, http): bad_payload = {} samples = [] for _ in range(N_SAMPLES): samples.append(_timeit( lambda: http.post(f"{BASE_URL}/generate/async", json=bad_payload, timeout=TIMEOUT) )) st = _stats(samples) _perf_results["async_validation_latency"] = st print(f"\n POST /generate/async (422 path) — {st}") assert st["mean_s"] < MAX_SCHEMA_REJECT_S # --------------------------------------------------------------------------- # 2.4 Sequential throughput — GET /health # --------------------------------------------------------------------------- class TestSequentialThroughput: """How many /health requests can the API serve sequentially per second?""" def test_health_sequential_throughput(self, http): n = N_SAMPLES t_start = time.perf_counter() statuses = [] for _ in range(n): r = http.get(f"{BASE_URL}/health", timeout=TIMEOUT) statuses.append(r.status_code) wall = time.perf_counter() - t_start ok = statuses.count(200) req_per_min = round(ok / wall * 60, 2) result = { "requests": n, "ok": ok, "failures": n - ok, "wall_s": round(wall, 3), "mean_per_req_s": round(wall / n, 3), "req_per_min": req_per_min, } _perf_results["sequential_throughput"] = result print(f"\n Sequential throughput — {result}") assert ok == n, f"Expected all {n} requests to succeed, got {ok}" # --------------------------------------------------------------------------- # 2.5 Concurrent GET /health # --------------------------------------------------------------------------- class TestConcurrentRequests: """Fire N concurrent GET /health requests and measure wall-clock time.""" def _run_concurrent(self, n_workers: int, http_session: requests.Session): url = f"{BASE_URL}/health" results = [] wall_start = time.perf_counter() def _fetch(): t0 = time.perf_counter() r = requests.get(url, timeout=TIMEOUT) elapsed = time.perf_counter() - t0 return r.status_code, elapsed with ThreadPoolExecutor(max_workers=n_workers) as pool: futures = [pool.submit(_fetch) for _ in range(n_workers)] for f in as_completed(futures): results.append(f.result()) wall = time.perf_counter() - wall_start statuses = [r[0] for r in results] per_req = [r[1] for r in results] ok_count = statuses.count(200) return { "concurrency": n_workers, "ok": ok_count, "fail": n_workers - ok_count, "wall_s": round(wall, 3), "min_req_s": round(min(per_req), 3), "mean_req_s": round(statistics.mean(per_req), 3), "max_req_s": round(max(per_req), 3), } def test_concurrent_2_health_requests(self, http): result = self._run_concurrent(2, http) _perf_results["concurrent_2"] = result print(f"\n Concurrent (2) — {result}") assert result["ok"] == 2, f"Expected 2/2 successes, got {result}" def test_concurrent_4_health_requests(self, http): result = self._run_concurrent(4, http) _perf_results["concurrent_4"] = result print(f"\n Concurrent (4) — {result}") assert result["ok"] == 4, f"Expected 4/4 successes, got {result}" def test_concurrent_wall_less_than_serial(self, http): """ Wall-clock for N concurrent requests should be less than N × mean single-request time (i.e., some parallelism is achieved). """ single_latency = _timeit(lambda: http.get(f"{BASE_URL}/health", timeout=TIMEOUT)) result = self._run_concurrent(N_CONCURRENT, http) serial_estimate = single_latency * N_CONCURRENT # Allow a generous 80 % of serial time as "acceptable parallelism" assert result["wall_s"] < serial_estimate * 0.95 or result["wall_s"] < 30, ( f"Concurrent wall={result['wall_s']:.2f}s not better than " f"serial estimate={serial_estimate:.2f}s" )