Spaces:
Sleeping
Sleeping
| """ | |
| In-process load benchmark: 50 requests against the real FastAPI app via | |
| httpx.ASGITransport (no real HTTP server, no real external services). | |
| PURPOSE | |
| Measures framework overhead — FastAPI middleware, LangGraph graph.invoke(), | |
| Pydantic schema validation, response serialization — with zero I/O latency. | |
| This is NOT a throughput projection for production (which is dominated by | |
| Pinecone + Groq latency). See docs/LOAD_TEST.md for the full interpretation. | |
| WHAT RUNS FOR REAL | |
| FastAPI routing, auth dependency (require_api_key), slowapi rate-limit | |
| middleware (disabled via RATE_LIMIT_ENABLED=false), the LangGraph pipeline | |
| (all 7 nodes), prompt builders, ChatResponse schema. | |
| WHAT IS MOCKED | |
| - pinecone_search → one realistic chunk hit (0.92 cosine score) | |
| - get_llm (graph + streaming) → MagicMock with instant .invoke() | |
| - is_tavily_configured → False | |
| - init_pinecone → no-op (startup event, not triggered by ASGITransport | |
| anyway, but patched for belt-and-suspenders) | |
| - cache_enabled (router) → False (avoids serving identical cached response) | |
| RATE LIMITING | |
| RATE_LIMIT_ENABLED=false prevents slowapi from registering its middleware | |
| when the app is imported. With 50 concurrent requests from a single IP | |
| the 30/minute limiter would otherwise fire. | |
| TRANSPORT | |
| httpx.ASGITransport(app=_app) routes httpx requests directly through the | |
| ASGI interface. The ASGITransport does NOT trigger lifespan events, so | |
| the @app.on_event("startup") hook (init_pinecone) never fires regardless | |
| of the patch — but we patch it for safety in case this changes. | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import os | |
| import statistics | |
| import sys | |
| import time | |
| from types import SimpleNamespace | |
| from typing import Any, Dict, List | |
| from unittest.mock import MagicMock, patch | |
| # --------------------------------------------------------------------------- | |
| # Environment must be set BEFORE any app import so: | |
| # - get_settings() reads RATE_LIMIT_ENABLED=false → rate-limit middleware | |
| # is not registered | |
| # - LRU-cached settings picks up the test values | |
| # --------------------------------------------------------------------------- | |
| os.environ.setdefault("PINECONE_API_KEY", "bench-dummy-key") | |
| os.environ.setdefault("PINECONE_INDEX_NAME", "bench-dummy-index") | |
| os.environ.setdefault("PINECONE_HOST", "https://bench-dummy.pinecone.io") | |
| os.environ.setdefault("GROQ_API_KEY", "bench-dummy-groq") | |
| os.environ["RATE_LIMIT_ENABLED"] = "false" | |
| os.environ["CACHE_ENABLED"] = "false" | |
| _BENCH_API_KEY = "bench-test-key" | |
| os.environ["API_KEY"] = _BENCH_API_KEY | |
| import httpx # noqa: E402 (after env setup) | |
| # Clear LRU caches populated by any earlier imports in this process | |
| from app.core.config import get_settings as _gs # noqa: E402 | |
| _gs.cache_clear() | |
| from app.core.auth import _get_configured_api_key as _gak # noqa: E402 | |
| _gak.cache_clear() | |
| from app.services.llm.groq_llm import get_llm as _gllm # noqa: E402 | |
| _gllm.cache_clear() | |
| import app.services.chat.graph as _graph_mod # noqa: E402 | |
| _graph_mod._graph = None | |
| # --------------------------------------------------------------------------- | |
| # Mock shapes | |
| # --------------------------------------------------------------------------- | |
| _FAKE_CHUNK = { | |
| "_score": 0.92, | |
| "fields": { | |
| "chunk_text": "RAG combines retrieval with generation to answer questions.", | |
| "title": "Retrieval-Augmented Generation", | |
| "source": "wiki", | |
| "url": "https://en.wikipedia.org/wiki/Retrieval-augmented_generation", | |
| }, | |
| } | |
| def _make_llm_response(answer: str = "RAG combines retrieval and generation.") -> MagicMock: | |
| resp = MagicMock() | |
| resp.content = answer | |
| resp.usage_metadata = {"input_tokens": 120, "output_tokens": 25, "total_tokens": 145} | |
| resp.response_metadata = {} | |
| return resp | |
| _mock_llm = MagicMock() | |
| _mock_llm.invoke.return_value = _make_llm_response() | |
| # --------------------------------------------------------------------------- | |
| # Benchmark runner | |
| # --------------------------------------------------------------------------- | |
| _CONCURRENCY = 10 | |
| _TOTAL_REQUESTS = 50 | |
| _NAMESPACE = "bench" | |
| async def _one_request( | |
| client: httpx.AsyncClient, | |
| url: str, | |
| payload: Dict[str, Any], | |
| headers: Dict[str, str], | |
| sem: asyncio.Semaphore, | |
| ) -> tuple[float, bool]: | |
| async with sem: | |
| t0 = time.perf_counter() | |
| try: | |
| resp = await client.post(url, json=payload, headers=headers) | |
| elapsed = (time.perf_counter() - t0) * 1000.0 | |
| return elapsed, resp.status_code >= 400 | |
| except Exception as exc: | |
| elapsed = (time.perf_counter() - t0) * 1000.0 | |
| print(f" [error] {exc}", file=sys.stderr) | |
| return elapsed, True | |
| async def _run(app) -> Dict[str, Any]: | |
| transport = httpx.ASGITransport(app=app) | |
| url = "http://testserver/chat" | |
| payload: Dict[str, Any] = { | |
| "query": "Briefly explain retrieval-augmented generation.", | |
| "namespace": _NAMESPACE, | |
| "top_k": 5, | |
| "use_web_fallback": False, | |
| } | |
| headers = { | |
| "Content-Type": "application/json", | |
| "X-API-Key": _BENCH_API_KEY, | |
| } | |
| sem = asyncio.Semaphore(_CONCURRENCY) | |
| latencies: List[float] = [] | |
| errors = 0 | |
| async with httpx.AsyncClient(transport=transport, timeout=30.0) as client: | |
| tasks = [_one_request(client, url, payload, headers, sem) for _ in range(_TOTAL_REQUESTS)] | |
| wall_start = time.perf_counter() | |
| for coro in asyncio.as_completed(tasks): | |
| ms, is_err = await coro | |
| latencies.append(ms) | |
| if is_err: | |
| errors += 1 | |
| wall_elapsed = (time.perf_counter() - wall_start) * 1000.0 | |
| return { | |
| "latencies_ms": latencies, | |
| "errors": errors, | |
| "total": _TOTAL_REQUESTS, | |
| "wall_ms": wall_elapsed, | |
| } | |
| def _print_report(result: Dict[str, Any]) -> None: | |
| lats = sorted(result["latencies_ms"]) | |
| n = len(lats) | |
| errors = result["errors"] | |
| wall_ms = result["wall_ms"] | |
| avg = sum(lats) / n if n else 0.0 | |
| p50 = statistics.median(lats) if lats else 0.0 | |
| idx95 = max(0, int(round(0.95 * (n - 1)))) | |
| p95 = lats[idx95] if lats else 0.0 | |
| throughput = (_TOTAL_REQUESTS / (wall_ms / 1000.0)) if wall_ms > 0 else 0.0 | |
| print("=== /chat in-process bench (mocked externals) ===") | |
| print(f"Requests: {_TOTAL_REQUESTS}") | |
| print(f"Concurrency: {_CONCURRENCY}") | |
| print(f"Errors: {errors} ({errors / _TOTAL_REQUESTS * 100:.1f}%)") | |
| print(f"Wall time: {wall_ms:.0f} ms") | |
| print(f"Throughput: {throughput:.1f} req/s") | |
| print(f"Avg latency: {avg:.2f} ms") | |
| print(f"p50 latency: {p50:.2f} ms") | |
| print(f"p95 latency: {p95:.2f} ms") | |
| def main() -> None: | |
| with ( | |
| patch("app.main.init_pinecone"), | |
| patch("app.services.chat.graph.pinecone_search", return_value=[_FAKE_CHUNK]), | |
| patch("app.services.chat.graph.get_llm", return_value=_mock_llm), | |
| patch("app.services.chat.streaming.get_llm", return_value=_mock_llm), | |
| patch("app.services.chat.graph.is_tavily_configured", return_value=False), | |
| patch("app.routers.chat.cache_enabled", return_value=False), | |
| ): | |
| from app.main import app as _app | |
| # The @limiter.limit("30/minute") decorator is baked into the route at | |
| # import time; RATE_LIMIT_ENABLED=false prevents SlowAPIMiddleware from | |
| # being added, but the limiter object still counts requests. Disabling it | |
| # here ensures all 50 bench requests reach the handler. | |
| from app.core.rate_limit import limiter as _rate_limiter | |
| _rate_limiter.enabled = False | |
| result = asyncio.run(_run(_app)) | |
| _print_report(result) | |
| if __name__ == "__main__": | |
| main() | |