| """ |
| Performance tests β latency and throughput |
| ========================================== |
| Measures: |
| 2.1 Lightweight endpoint latency (GET / and GET /health, N sequential samples) |
| 2.2 POST /generate/pdf input-validation latency (schema rejection path β fast) |
| 2.3 POST /generate/async input-validation latency (schema rejection path β fast) |
| 2.4 GET /jobs/user/{id} latency (Supabase read β lightweight) |
| 2.5 Sequential throughput across lightweight GETs |
| 2.6 Concurrent lightweight GET requests (ThreadPoolExecutor) |
| |
| NOTE: The generation endpoints (/generate/pdf, /generate/async) are NOT exercised |
| with real requests because they require a valid Supabase request_id and call |
| expensive downstream services (Claude API, PDF rendering). Instead, we measure |
| the *input-validation* (422 / 404) paths which are still real network round-trips |
| to the deployed API and reflect cold-path overhead. |
| |
| All timings are collected into _perf_results and saved to artifacts/perf_metrics.json |
| at the end of the session so compile_results.py can embed them in the report. |
| """ |
| import json |
| import time |
| import pathlib |
| import statistics |
| import pytest |
| import requests |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
| from tests.conftest import ( |
| BASE_URL, TIMEOUT, SEED_IMAGE_URL, |
| NONEXISTENT_REQUEST_ID, NONEXISTENT_USER_ID, |
| MINIMAL_GENERATE_PAYLOAD, |
| ) |
|
|
| ARTIFACTS = pathlib.Path(__file__).parent.parent / "artifacts" |
| ARTIFACTS.mkdir(exist_ok=True) |
|
|
| |
| |
| |
| MAX_HEALTH_MEAN_S = 10.0 |
| MAX_HEALTH_P95_S = 20.0 |
| MAX_SCHEMA_REJECT_S = 5.0 |
| MAX_USER_JOBS_S = 10.0 |
|
|
| N_SAMPLES = 5 |
| N_CONCURRENT = 4 |
|
|
| |
| _perf_results: dict = {} |
|
|
|
|
| |
| |
| |
|
|
| @pytest.fixture(scope="session", autouse=True) |
| def _persist_perf_metrics(): |
| """Yield during the test session; write metrics JSON afterwards.""" |
| yield |
| try: |
| (ARTIFACTS / "perf_metrics.json").write_text( |
| json.dumps(_perf_results, indent=2) |
| ) |
| except Exception as e: |
| print(f"Warning: could not save perf metrics: {e}") |
|
|
|
|
| |
| |
| |
|
|
| def _timeit(fn) -> float: |
| t0 = time.perf_counter() |
| fn() |
| return time.perf_counter() - t0 |
|
|
|
|
| def _stats(samples: list) -> dict: |
| return { |
| "n": len(samples), |
| "min_s": round(min(samples), 4), |
| "mean_s": round(statistics.mean(samples), 4), |
| "median_s": round(statistics.median(samples), 4), |
| "max_s": round(max(samples), 4), |
| "p95_s": round(sorted(samples)[int(len(samples) * 0.95)], 4) |
| if len(samples) >= 2 else round(max(samples), 4), |
| } |
|
|
|
|
| |
| |
| |
|
|
| class TestLightweightEndpointLatency: |
| """Sequential GET / and GET /health β N samples each.""" |
|
|
| def test_root_latency_under_threshold(self, http): |
| samples = [] |
| for _ in range(N_SAMPLES): |
| samples.append(_timeit(lambda: http.get(f"{BASE_URL}/", timeout=TIMEOUT))) |
| st = _stats(samples) |
| _perf_results["root_latency"] = st |
| print(f"\n GET / β {st}") |
| assert st["mean_s"] < MAX_HEALTH_MEAN_S, ( |
| f"GET / mean latency {st['mean_s']:.3f}s exceeds {MAX_HEALTH_MEAN_S}s" |
| ) |
|
|
| def test_health_latency_under_threshold(self, http): |
| samples = [] |
| for _ in range(N_SAMPLES): |
| samples.append(_timeit(lambda: http.get(f"{BASE_URL}/health", timeout=TIMEOUT))) |
| st = _stats(samples) |
| _perf_results["health_latency"] = st |
| print(f"\n GET /health β {st}") |
| assert st["mean_s"] < MAX_HEALTH_MEAN_S, ( |
| f"GET /health mean latency {st['mean_s']:.3f}s exceeds {MAX_HEALTH_MEAN_S}s" |
| ) |
|
|
| def test_user_jobs_latency_under_threshold(self, http): |
| url = f"{BASE_URL}/jobs/user/{NONEXISTENT_USER_ID}" |
| samples = [] |
| for _ in range(N_SAMPLES): |
| samples.append(_timeit(lambda: http.get(url, timeout=TIMEOUT))) |
| st = _stats(samples) |
| _perf_results["user_jobs_latency"] = st |
| print(f"\n GET /jobs/user/{{id}} β {st}") |
| assert st["mean_s"] < MAX_USER_JOBS_S, ( |
| f"GET /jobs/user mean latency {st['mean_s']:.3f}s exceeds {MAX_USER_JOBS_S}s" |
| ) |
|
|
|
|
| |
| |
| |
|
|
| class TestGeneratePdfValidationLatency: |
| """422 responses are pure FastAPI work (no DB / LLM calls).""" |
|
|
| def test_schema_rejection_is_fast(self, http): |
| bad_payload = {} |
| samples = [] |
| for _ in range(N_SAMPLES): |
| samples.append(_timeit( |
| lambda: http.post(f"{BASE_URL}/generate/pdf", json=bad_payload, timeout=TIMEOUT) |
| )) |
| st = _stats(samples) |
| _perf_results["pdf_validation_latency"] = st |
| print(f"\n POST /generate/pdf (422 path) β {st}") |
| assert st["mean_s"] < MAX_SCHEMA_REJECT_S, ( |
| f"422 path mean {st['mean_s']:.3f}s exceeds {MAX_SCHEMA_REJECT_S}s" |
| ) |
|
|
|
|
| |
| |
| |
|
|
| class TestGenerateAsyncValidationLatency: |
|
|
| def test_schema_rejection_is_fast(self, http): |
| bad_payload = {} |
| samples = [] |
| for _ in range(N_SAMPLES): |
| samples.append(_timeit( |
| lambda: http.post(f"{BASE_URL}/generate/async", json=bad_payload, timeout=TIMEOUT) |
| )) |
| st = _stats(samples) |
| _perf_results["async_validation_latency"] = st |
| print(f"\n POST /generate/async (422 path) β {st}") |
| assert st["mean_s"] < MAX_SCHEMA_REJECT_S |
|
|
|
|
| |
| |
| |
|
|
| class TestSequentialThroughput: |
| """How many /health requests can the API serve sequentially per second?""" |
|
|
| def test_health_sequential_throughput(self, http): |
| n = N_SAMPLES |
| t_start = time.perf_counter() |
| statuses = [] |
| for _ in range(n): |
| r = http.get(f"{BASE_URL}/health", timeout=TIMEOUT) |
| statuses.append(r.status_code) |
| wall = time.perf_counter() - t_start |
|
|
| ok = statuses.count(200) |
| req_per_min = round(ok / wall * 60, 2) |
| result = { |
| "requests": n, "ok": ok, "failures": n - ok, |
| "wall_s": round(wall, 3), |
| "mean_per_req_s": round(wall / n, 3), |
| "req_per_min": req_per_min, |
| } |
| _perf_results["sequential_throughput"] = result |
| print(f"\n Sequential throughput β {result}") |
|
|
| assert ok == n, f"Expected all {n} requests to succeed, got {ok}" |
|
|
|
|
| |
| |
| |
|
|
| class TestConcurrentRequests: |
| """Fire N concurrent GET /health requests and measure wall-clock time.""" |
|
|
| def _run_concurrent(self, n_workers: int, http_session: requests.Session): |
| url = f"{BASE_URL}/health" |
| results = [] |
| wall_start = time.perf_counter() |
|
|
| def _fetch(): |
| t0 = time.perf_counter() |
| r = requests.get(url, timeout=TIMEOUT) |
| elapsed = time.perf_counter() - t0 |
| return r.status_code, elapsed |
|
|
| with ThreadPoolExecutor(max_workers=n_workers) as pool: |
| futures = [pool.submit(_fetch) for _ in range(n_workers)] |
| for f in as_completed(futures): |
| results.append(f.result()) |
|
|
| wall = time.perf_counter() - wall_start |
| statuses = [r[0] for r in results] |
| per_req = [r[1] for r in results] |
| ok_count = statuses.count(200) |
| return { |
| "concurrency": n_workers, |
| "ok": ok_count, "fail": n_workers - ok_count, |
| "wall_s": round(wall, 3), |
| "min_req_s": round(min(per_req), 3), |
| "mean_req_s": round(statistics.mean(per_req), 3), |
| "max_req_s": round(max(per_req), 3), |
| } |
|
|
| def test_concurrent_2_health_requests(self, http): |
| result = self._run_concurrent(2, http) |
| _perf_results["concurrent_2"] = result |
| print(f"\n Concurrent (2) β {result}") |
| assert result["ok"] == 2, f"Expected 2/2 successes, got {result}" |
|
|
| def test_concurrent_4_health_requests(self, http): |
| result = self._run_concurrent(4, http) |
| _perf_results["concurrent_4"] = result |
| print(f"\n Concurrent (4) β {result}") |
| assert result["ok"] == 4, f"Expected 4/4 successes, got {result}" |
|
|
| def test_concurrent_wall_less_than_serial(self, http): |
| """ |
| Wall-clock for N concurrent requests should be less than |
| N Γ mean single-request time (i.e., some parallelism is achieved). |
| """ |
| single_latency = _timeit(lambda: http.get(f"{BASE_URL}/health", timeout=TIMEOUT)) |
| result = self._run_concurrent(N_CONCURRENT, http) |
| serial_estimate = single_latency * N_CONCURRENT |
| |
| assert result["wall_s"] < serial_estimate * 0.95 or result["wall_s"] < 30, ( |
| f"Concurrent wall={result['wall_s']:.2f}s not better than " |
| f"serial estimate={serial_estimate:.2f}s" |
| ) |
|
|