jkottu's picture
Initial commit: LLM Inference Dashboard
aefabf0
"""A/B comparison of vLLM deployments."""
import asyncio
import logging
from dataclasses import dataclass, field
from typing import Optional, Dict, List, Any
from datetime import datetime
import aiohttp
from scipy import stats
from utils.prometheus_parser import (
parse_prometheus_metrics,
get_metric_value,
get_histogram_quantile,
)
logger = logging.getLogger(__name__)
@dataclass
class DeploymentConfig:
"""Configuration for a vLLM deployment."""
name: str
endpoint: str # Base URL (e.g., http://localhost:8000)
model_name: str = ""
quantization: str = ""
@property
def metrics_url(self) -> str:
return f"{self.endpoint}/metrics"
@dataclass
class DeploymentMetrics:
"""Metrics collected from a deployment."""
endpoint: str
timestamp: datetime = field(default_factory=datetime.now)
connected: bool = False
# Throughput
tokens_per_second: float = 0.0
throughput_samples: List[float] = field(default_factory=list)
# Latency
ttft_ms: float = 0.0
tpot_ms: float = 0.0
e2e_latency_ms: float = 0.0
latency_samples: List[float] = field(default_factory=list)
# Resources
gpu_memory_gb: float = 0.0
kv_cache_percent: float = 0.0
batch_size: int = 0
# Model info
model_name: str = ""
@dataclass
class ComparisonResult:
"""Result of comparing two deployments."""
deployment_a: DeploymentMetrics
deployment_b: DeploymentMetrics
timestamp: datetime = field(default_factory=datetime.now)
# Differences
throughput_diff_pct: float = 0.0
ttft_diff_pct: float = 0.0
latency_diff_pct: float = 0.0
memory_diff_gb: float = 0.0
# Statistical significance
throughput_significant: bool = False
latency_significant: bool = False
p_value_throughput: float = 1.0
p_value_latency: float = 1.0
# Recommendation
recommendation: str = ""
class ABComparator:
"""Compares metrics between two vLLM deployments."""
def __init__(
self,
deployment_a: DeploymentConfig,
deployment_b: DeploymentConfig,
sample_count: int = 30,
):
"""
Initialize comparator.
Args:
deployment_a: First deployment configuration
deployment_b: Second deployment configuration
sample_count: Number of samples to collect for statistical tests
"""
self.deployment_a = deployment_a
self.deployment_b = deployment_b
self.sample_count = sample_count
self._samples_a: List[DeploymentMetrics] = []
self._samples_b: List[DeploymentMetrics] = []
async def collect_metrics(self, config: DeploymentConfig) -> DeploymentMetrics:
"""
Collect current metrics from a deployment.
Args:
config: Deployment configuration
Returns:
DeploymentMetrics with current values
"""
metrics = DeploymentMetrics(endpoint=config.endpoint)
try:
async with aiohttp.ClientSession() as session:
async with session.get(
config.metrics_url,
timeout=aiohttp.ClientTimeout(total=5),
) as response:
if response.status != 200:
return metrics
text = await response.text()
raw = parse_prometheus_metrics(text)
metrics.connected = True
# Parse metrics
metrics.tokens_per_second = self._calculate_tps(raw)
metrics.ttft_ms = (
get_histogram_quantile(
raw, "vllm:time_to_first_token_seconds", 0.5
)
or 0
) * 1000
metrics.tpot_ms = (
get_histogram_quantile(
raw, "vllm:time_per_output_token_seconds", 0.5
)
or 0
) * 1000
metrics.e2e_latency_ms = (
get_histogram_quantile(
raw, "vllm:e2e_request_latency_seconds", 0.5
)
or 0
) * 1000
metrics.kv_cache_percent = (
get_metric_value(raw, "vllm:gpu_cache_usage_perc") or 0
) * 100
metrics.batch_size = int(
get_metric_value(raw, "vllm:num_requests_running") or 0
)
# Model name from labels
for samples in raw.values():
for sample in samples:
if "model_name" in sample.labels:
metrics.model_name = sample.labels["model_name"]
break
except Exception as e:
logger.error(f"Error collecting metrics from {config.endpoint}: {e}")
return metrics
def _calculate_tps(self, raw: Dict) -> float:
"""Calculate tokens per second from counter metrics."""
# This is a simplified calculation
# In practice, you'd track delta over time
generation_total = get_metric_value(raw, "vllm:generation_tokens_total") or 0
if generation_total > 0:
# Estimate based on running requests
running = get_metric_value(raw, "vllm:num_requests_running") or 1
tpot = (
get_histogram_quantile(
raw, "vllm:time_per_output_token_seconds", 0.5
)
or 0.05
)
if tpot > 0:
return running / tpot
return 0
async def collect_samples(self, count: Optional[int] = None) -> None:
"""
Collect multiple samples for statistical comparison.
Args:
count: Number of samples to collect
"""
if count is None:
count = self.sample_count
self._samples_a.clear()
self._samples_b.clear()
for i in range(count):
metrics_a, metrics_b = await asyncio.gather(
self.collect_metrics(self.deployment_a),
self.collect_metrics(self.deployment_b),
)
if metrics_a.connected:
metrics_a.throughput_samples = [metrics_a.tokens_per_second]
metrics_a.latency_samples = [metrics_a.e2e_latency_ms]
self._samples_a.append(metrics_a)
if metrics_b.connected:
metrics_b.throughput_samples = [metrics_b.tokens_per_second]
metrics_b.latency_samples = [metrics_b.e2e_latency_ms]
self._samples_b.append(metrics_b)
# Wait between samples
if i < count - 1:
await asyncio.sleep(1)
async def compare(self) -> ComparisonResult:
"""
Perform comparison between deployments.
Returns:
ComparisonResult with comparison data
"""
# Collect current metrics
metrics_a, metrics_b = await asyncio.gather(
self.collect_metrics(self.deployment_a),
self.collect_metrics(self.deployment_b),
)
result = ComparisonResult(
deployment_a=metrics_a,
deployment_b=metrics_b,
)
# Calculate differences
if metrics_a.tokens_per_second > 0:
result.throughput_diff_pct = (
(metrics_b.tokens_per_second - metrics_a.tokens_per_second)
/ metrics_a.tokens_per_second
) * 100
if metrics_a.ttft_ms > 0:
result.ttft_diff_pct = (
(metrics_b.ttft_ms - metrics_a.ttft_ms) / metrics_a.ttft_ms
) * 100
if metrics_a.e2e_latency_ms > 0:
result.latency_diff_pct = (
(metrics_b.e2e_latency_ms - metrics_a.e2e_latency_ms)
/ metrics_a.e2e_latency_ms
) * 100
result.memory_diff_gb = metrics_b.gpu_memory_gb - metrics_a.gpu_memory_gb
# Statistical significance (if we have samples)
if self._samples_a and self._samples_b:
result = self._add_significance(result)
# Generate recommendation
result.recommendation = self._generate_recommendation(result)
return result
def _add_significance(self, result: ComparisonResult) -> ComparisonResult:
"""Add statistical significance tests to result."""
tps_a = [s.tokens_per_second for s in self._samples_a]
tps_b = [s.tokens_per_second for s in self._samples_b]
lat_a = [s.e2e_latency_ms for s in self._samples_a]
lat_b = [s.e2e_latency_ms for s in self._samples_b]
if len(tps_a) >= 2 and len(tps_b) >= 2:
try:
_, p_tps = stats.ttest_ind(tps_a, tps_b)
result.p_value_throughput = p_tps
result.throughput_significant = p_tps < 0.05
except Exception:
pass
if len(lat_a) >= 2 and len(lat_b) >= 2:
try:
_, p_lat = stats.ttest_ind(lat_a, lat_b)
result.p_value_latency = p_lat
result.latency_significant = p_lat < 0.05
except Exception:
pass
return result
def _generate_recommendation(self, result: ComparisonResult) -> str:
"""Generate a human-readable recommendation."""
parts = []
a_name = self.deployment_a.name
b_name = self.deployment_b.name
# Throughput comparison
if abs(result.throughput_diff_pct) > 5:
faster = b_name if result.throughput_diff_pct > 0 else a_name
diff = abs(result.throughput_diff_pct)
sig = " (statistically significant)" if result.throughput_significant else ""
parts.append(f"{faster} has {diff:.1f}% higher throughput{sig}")
# Latency comparison
if abs(result.latency_diff_pct) > 5:
faster = a_name if result.latency_diff_pct > 0 else b_name
diff = abs(result.latency_diff_pct)
sig = " (statistically significant)" if result.latency_significant else ""
parts.append(f"{faster} has {diff:.1f}% lower latency{sig}")
# Memory comparison
if abs(result.memory_diff_gb) > 1:
lower = a_name if result.memory_diff_gb > 0 else b_name
diff = abs(result.memory_diff_gb)
parts.append(f"{lower} uses {diff:.1f}GB less GPU memory")
if not parts:
return "Both deployments show similar performance"
return ". ".join(parts) + "."
def get_comparison_table(self, result: ComparisonResult) -> List[Dict[str, Any]]:
"""
Generate comparison table data.
Args:
result: Comparison result
Returns:
List of rows for comparison table
"""
return [
{
"Metric": "Throughput (tok/s)",
self.deployment_a.name: f"{result.deployment_a.tokens_per_second:.1f}",
self.deployment_b.name: f"{result.deployment_b.tokens_per_second:.1f}",
"Diff": f"{result.throughput_diff_pct:+.1f}%",
},
{
"Metric": "TTFT (ms)",
self.deployment_a.name: f"{result.deployment_a.ttft_ms:.1f}",
self.deployment_b.name: f"{result.deployment_b.ttft_ms:.1f}",
"Diff": f"{result.ttft_diff_pct:+.1f}%",
},
{
"Metric": "E2E Latency (ms)",
self.deployment_a.name: f"{result.deployment_a.e2e_latency_ms:.1f}",
self.deployment_b.name: f"{result.deployment_b.e2e_latency_ms:.1f}",
"Diff": f"{result.latency_diff_pct:+.1f}%",
},
{
"Metric": "KV Cache %",
self.deployment_a.name: f"{result.deployment_a.kv_cache_percent:.1f}",
self.deployment_b.name: f"{result.deployment_b.kv_cache_percent:.1f}",
"Diff": "-",
},
{
"Metric": "Batch Size",
self.deployment_a.name: str(result.deployment_a.batch_size),
self.deployment_b.name: str(result.deployment_b.batch_size),
"Diff": "-",
},
]