rag-agent-workbench-api / scripts /bench_mocked.py
BrejBala's picture
feat: deploy Tiers 2 & 3 — CRAG, faithfulness, streaming, Prometheus, eval-driven retrieval
6686f13
Raw
History Blame Contribute Delete
7.79 kB
"""
In-process load benchmark: 50 requests against the real FastAPI app via
httpx.ASGITransport (no real HTTP server, no real external services).
PURPOSE
Measures framework overhead — FastAPI middleware, LangGraph graph.invoke(),
Pydantic schema validation, response serialization — with zero I/O latency.
This is NOT a throughput projection for production (which is dominated by
Pinecone + Groq latency). See docs/LOAD_TEST.md for the full interpretation.
WHAT RUNS FOR REAL
FastAPI routing, auth dependency (require_api_key), slowapi rate-limit
middleware (disabled via RATE_LIMIT_ENABLED=false), the LangGraph pipeline
(all 7 nodes), prompt builders, ChatResponse schema.
WHAT IS MOCKED
- pinecone_search → one realistic chunk hit (0.92 cosine score)
- get_llm (graph + streaming) → MagicMock with instant .invoke()
- is_tavily_configured → False
- init_pinecone → no-op (startup event, not triggered by ASGITransport
anyway, but patched for belt-and-suspenders)
- cache_enabled (router) → False (avoids serving identical cached response)
RATE LIMITING
RATE_LIMIT_ENABLED=false prevents slowapi from registering its middleware
when the app is imported. With 50 concurrent requests from a single IP
the 30/minute limiter would otherwise fire.
TRANSPORT
httpx.ASGITransport(app=_app) routes httpx requests directly through the
ASGI interface. The ASGITransport does NOT trigger lifespan events, so
the @app.on_event("startup") hook (init_pinecone) never fires regardless
of the patch — but we patch it for safety in case this changes.
"""
from __future__ import annotations
import asyncio
import os
import statistics
import sys
import time
from types import SimpleNamespace
from typing import Any, Dict, List
from unittest.mock import MagicMock, patch
# ---------------------------------------------------------------------------
# Environment must be set BEFORE any app import so:
# - get_settings() reads RATE_LIMIT_ENABLED=false → rate-limit middleware
# is not registered
# - LRU-cached settings picks up the test values
# ---------------------------------------------------------------------------
os.environ.setdefault("PINECONE_API_KEY", "bench-dummy-key")
os.environ.setdefault("PINECONE_INDEX_NAME", "bench-dummy-index")
os.environ.setdefault("PINECONE_HOST", "https://bench-dummy.pinecone.io")
os.environ.setdefault("GROQ_API_KEY", "bench-dummy-groq")
os.environ["RATE_LIMIT_ENABLED"] = "false"
os.environ["CACHE_ENABLED"] = "false"
_BENCH_API_KEY = "bench-test-key"
os.environ["API_KEY"] = _BENCH_API_KEY
import httpx # noqa: E402 (after env setup)
# Clear LRU caches populated by any earlier imports in this process
from app.core.config import get_settings as _gs # noqa: E402
_gs.cache_clear()
from app.core.auth import _get_configured_api_key as _gak # noqa: E402
_gak.cache_clear()
from app.services.llm.groq_llm import get_llm as _gllm # noqa: E402
_gllm.cache_clear()
import app.services.chat.graph as _graph_mod # noqa: E402
_graph_mod._graph = None
# ---------------------------------------------------------------------------
# Mock shapes
# ---------------------------------------------------------------------------
_FAKE_CHUNK = {
"_score": 0.92,
"fields": {
"chunk_text": "RAG combines retrieval with generation to answer questions.",
"title": "Retrieval-Augmented Generation",
"source": "wiki",
"url": "https://en.wikipedia.org/wiki/Retrieval-augmented_generation",
},
}
def _make_llm_response(answer: str = "RAG combines retrieval and generation.") -> MagicMock:
resp = MagicMock()
resp.content = answer
resp.usage_metadata = {"input_tokens": 120, "output_tokens": 25, "total_tokens": 145}
resp.response_metadata = {}
return resp
_mock_llm = MagicMock()
_mock_llm.invoke.return_value = _make_llm_response()
# ---------------------------------------------------------------------------
# Benchmark runner
# ---------------------------------------------------------------------------
_CONCURRENCY = 10
_TOTAL_REQUESTS = 50
_NAMESPACE = "bench"
async def _one_request(
client: httpx.AsyncClient,
url: str,
payload: Dict[str, Any],
headers: Dict[str, str],
sem: asyncio.Semaphore,
) -> tuple[float, bool]:
async with sem:
t0 = time.perf_counter()
try:
resp = await client.post(url, json=payload, headers=headers)
elapsed = (time.perf_counter() - t0) * 1000.0
return elapsed, resp.status_code >= 400
except Exception as exc:
elapsed = (time.perf_counter() - t0) * 1000.0
print(f" [error] {exc}", file=sys.stderr)
return elapsed, True
async def _run(app) -> Dict[str, Any]:
transport = httpx.ASGITransport(app=app)
url = "http://testserver/chat"
payload: Dict[str, Any] = {
"query": "Briefly explain retrieval-augmented generation.",
"namespace": _NAMESPACE,
"top_k": 5,
"use_web_fallback": False,
}
headers = {
"Content-Type": "application/json",
"X-API-Key": _BENCH_API_KEY,
}
sem = asyncio.Semaphore(_CONCURRENCY)
latencies: List[float] = []
errors = 0
async with httpx.AsyncClient(transport=transport, timeout=30.0) as client:
tasks = [_one_request(client, url, payload, headers, sem) for _ in range(_TOTAL_REQUESTS)]
wall_start = time.perf_counter()
for coro in asyncio.as_completed(tasks):
ms, is_err = await coro
latencies.append(ms)
if is_err:
errors += 1
wall_elapsed = (time.perf_counter() - wall_start) * 1000.0
return {
"latencies_ms": latencies,
"errors": errors,
"total": _TOTAL_REQUESTS,
"wall_ms": wall_elapsed,
}
def _print_report(result: Dict[str, Any]) -> None:
lats = sorted(result["latencies_ms"])
n = len(lats)
errors = result["errors"]
wall_ms = result["wall_ms"]
avg = sum(lats) / n if n else 0.0
p50 = statistics.median(lats) if lats else 0.0
idx95 = max(0, int(round(0.95 * (n - 1))))
p95 = lats[idx95] if lats else 0.0
throughput = (_TOTAL_REQUESTS / (wall_ms / 1000.0)) if wall_ms > 0 else 0.0
print("=== /chat in-process bench (mocked externals) ===")
print(f"Requests: {_TOTAL_REQUESTS}")
print(f"Concurrency: {_CONCURRENCY}")
print(f"Errors: {errors} ({errors / _TOTAL_REQUESTS * 100:.1f}%)")
print(f"Wall time: {wall_ms:.0f} ms")
print(f"Throughput: {throughput:.1f} req/s")
print(f"Avg latency: {avg:.2f} ms")
print(f"p50 latency: {p50:.2f} ms")
print(f"p95 latency: {p95:.2f} ms")
def main() -> None:
with (
patch("app.main.init_pinecone"),
patch("app.services.chat.graph.pinecone_search", return_value=[_FAKE_CHUNK]),
patch("app.services.chat.graph.get_llm", return_value=_mock_llm),
patch("app.services.chat.streaming.get_llm", return_value=_mock_llm),
patch("app.services.chat.graph.is_tavily_configured", return_value=False),
patch("app.routers.chat.cache_enabled", return_value=False),
):
from app.main import app as _app
# The @limiter.limit("30/minute") decorator is baked into the route at
# import time; RATE_LIMIT_ENABLED=false prevents SlowAPIMiddleware from
# being added, but the limiter object still counts requests. Disabling it
# here ensures all 50 bench requests reach the handler.
from app.core.rate_limit import limiter as _rate_limiter
_rate_limiter.enabled = False
result = asyncio.run(_run(_app))
_print_report(result)
if __name__ == "__main__":
main()