Spaces:
Sleeping
Sleeping
| """A/B comparison of vLLM deployments.""" | |
| import asyncio | |
| import logging | |
| from dataclasses import dataclass, field | |
| from typing import Optional, Dict, List, Any | |
| from datetime import datetime | |
| import aiohttp | |
| from scipy import stats | |
| from utils.prometheus_parser import ( | |
| parse_prometheus_metrics, | |
| get_metric_value, | |
| get_histogram_quantile, | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class DeploymentConfig: | |
| """Configuration for a vLLM deployment.""" | |
| name: str | |
| endpoint: str # Base URL (e.g., http://localhost:8000) | |
| model_name: str = "" | |
| quantization: str = "" | |
| def metrics_url(self) -> str: | |
| return f"{self.endpoint}/metrics" | |
| class DeploymentMetrics: | |
| """Metrics collected from a deployment.""" | |
| endpoint: str | |
| timestamp: datetime = field(default_factory=datetime.now) | |
| connected: bool = False | |
| # Throughput | |
| tokens_per_second: float = 0.0 | |
| throughput_samples: List[float] = field(default_factory=list) | |
| # Latency | |
| ttft_ms: float = 0.0 | |
| tpot_ms: float = 0.0 | |
| e2e_latency_ms: float = 0.0 | |
| latency_samples: List[float] = field(default_factory=list) | |
| # Resources | |
| gpu_memory_gb: float = 0.0 | |
| kv_cache_percent: float = 0.0 | |
| batch_size: int = 0 | |
| # Model info | |
| model_name: str = "" | |
| class ComparisonResult: | |
| """Result of comparing two deployments.""" | |
| deployment_a: DeploymentMetrics | |
| deployment_b: DeploymentMetrics | |
| timestamp: datetime = field(default_factory=datetime.now) | |
| # Differences | |
| throughput_diff_pct: float = 0.0 | |
| ttft_diff_pct: float = 0.0 | |
| latency_diff_pct: float = 0.0 | |
| memory_diff_gb: float = 0.0 | |
| # Statistical significance | |
| throughput_significant: bool = False | |
| latency_significant: bool = False | |
| p_value_throughput: float = 1.0 | |
| p_value_latency: float = 1.0 | |
| # Recommendation | |
| recommendation: str = "" | |
| class ABComparator: | |
| """Compares metrics between two vLLM deployments.""" | |
| def __init__( | |
| self, | |
| deployment_a: DeploymentConfig, | |
| deployment_b: DeploymentConfig, | |
| sample_count: int = 30, | |
| ): | |
| """ | |
| Initialize comparator. | |
| Args: | |
| deployment_a: First deployment configuration | |
| deployment_b: Second deployment configuration | |
| sample_count: Number of samples to collect for statistical tests | |
| """ | |
| self.deployment_a = deployment_a | |
| self.deployment_b = deployment_b | |
| self.sample_count = sample_count | |
| self._samples_a: List[DeploymentMetrics] = [] | |
| self._samples_b: List[DeploymentMetrics] = [] | |
| async def collect_metrics(self, config: DeploymentConfig) -> DeploymentMetrics: | |
| """ | |
| Collect current metrics from a deployment. | |
| Args: | |
| config: Deployment configuration | |
| Returns: | |
| DeploymentMetrics with current values | |
| """ | |
| metrics = DeploymentMetrics(endpoint=config.endpoint) | |
| try: | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get( | |
| config.metrics_url, | |
| timeout=aiohttp.ClientTimeout(total=5), | |
| ) as response: | |
| if response.status != 200: | |
| return metrics | |
| text = await response.text() | |
| raw = parse_prometheus_metrics(text) | |
| metrics.connected = True | |
| # Parse metrics | |
| metrics.tokens_per_second = self._calculate_tps(raw) | |
| metrics.ttft_ms = ( | |
| get_histogram_quantile( | |
| raw, "vllm:time_to_first_token_seconds", 0.5 | |
| ) | |
| or 0 | |
| ) * 1000 | |
| metrics.tpot_ms = ( | |
| get_histogram_quantile( | |
| raw, "vllm:time_per_output_token_seconds", 0.5 | |
| ) | |
| or 0 | |
| ) * 1000 | |
| metrics.e2e_latency_ms = ( | |
| get_histogram_quantile( | |
| raw, "vllm:e2e_request_latency_seconds", 0.5 | |
| ) | |
| or 0 | |
| ) * 1000 | |
| metrics.kv_cache_percent = ( | |
| get_metric_value(raw, "vllm:gpu_cache_usage_perc") or 0 | |
| ) * 100 | |
| metrics.batch_size = int( | |
| get_metric_value(raw, "vllm:num_requests_running") or 0 | |
| ) | |
| # Model name from labels | |
| for samples in raw.values(): | |
| for sample in samples: | |
| if "model_name" in sample.labels: | |
| metrics.model_name = sample.labels["model_name"] | |
| break | |
| except Exception as e: | |
| logger.error(f"Error collecting metrics from {config.endpoint}: {e}") | |
| return metrics | |
| def _calculate_tps(self, raw: Dict) -> float: | |
| """Calculate tokens per second from counter metrics.""" | |
| # This is a simplified calculation | |
| # In practice, you'd track delta over time | |
| generation_total = get_metric_value(raw, "vllm:generation_tokens_total") or 0 | |
| if generation_total > 0: | |
| # Estimate based on running requests | |
| running = get_metric_value(raw, "vllm:num_requests_running") or 1 | |
| tpot = ( | |
| get_histogram_quantile( | |
| raw, "vllm:time_per_output_token_seconds", 0.5 | |
| ) | |
| or 0.05 | |
| ) | |
| if tpot > 0: | |
| return running / tpot | |
| return 0 | |
| async def collect_samples(self, count: Optional[int] = None) -> None: | |
| """ | |
| Collect multiple samples for statistical comparison. | |
| Args: | |
| count: Number of samples to collect | |
| """ | |
| if count is None: | |
| count = self.sample_count | |
| self._samples_a.clear() | |
| self._samples_b.clear() | |
| for i in range(count): | |
| metrics_a, metrics_b = await asyncio.gather( | |
| self.collect_metrics(self.deployment_a), | |
| self.collect_metrics(self.deployment_b), | |
| ) | |
| if metrics_a.connected: | |
| metrics_a.throughput_samples = [metrics_a.tokens_per_second] | |
| metrics_a.latency_samples = [metrics_a.e2e_latency_ms] | |
| self._samples_a.append(metrics_a) | |
| if metrics_b.connected: | |
| metrics_b.throughput_samples = [metrics_b.tokens_per_second] | |
| metrics_b.latency_samples = [metrics_b.e2e_latency_ms] | |
| self._samples_b.append(metrics_b) | |
| # Wait between samples | |
| if i < count - 1: | |
| await asyncio.sleep(1) | |
| async def compare(self) -> ComparisonResult: | |
| """ | |
| Perform comparison between deployments. | |
| Returns: | |
| ComparisonResult with comparison data | |
| """ | |
| # Collect current metrics | |
| metrics_a, metrics_b = await asyncio.gather( | |
| self.collect_metrics(self.deployment_a), | |
| self.collect_metrics(self.deployment_b), | |
| ) | |
| result = ComparisonResult( | |
| deployment_a=metrics_a, | |
| deployment_b=metrics_b, | |
| ) | |
| # Calculate differences | |
| if metrics_a.tokens_per_second > 0: | |
| result.throughput_diff_pct = ( | |
| (metrics_b.tokens_per_second - metrics_a.tokens_per_second) | |
| / metrics_a.tokens_per_second | |
| ) * 100 | |
| if metrics_a.ttft_ms > 0: | |
| result.ttft_diff_pct = ( | |
| (metrics_b.ttft_ms - metrics_a.ttft_ms) / metrics_a.ttft_ms | |
| ) * 100 | |
| if metrics_a.e2e_latency_ms > 0: | |
| result.latency_diff_pct = ( | |
| (metrics_b.e2e_latency_ms - metrics_a.e2e_latency_ms) | |
| / metrics_a.e2e_latency_ms | |
| ) * 100 | |
| result.memory_diff_gb = metrics_b.gpu_memory_gb - metrics_a.gpu_memory_gb | |
| # Statistical significance (if we have samples) | |
| if self._samples_a and self._samples_b: | |
| result = self._add_significance(result) | |
| # Generate recommendation | |
| result.recommendation = self._generate_recommendation(result) | |
| return result | |
| def _add_significance(self, result: ComparisonResult) -> ComparisonResult: | |
| """Add statistical significance tests to result.""" | |
| tps_a = [s.tokens_per_second for s in self._samples_a] | |
| tps_b = [s.tokens_per_second for s in self._samples_b] | |
| lat_a = [s.e2e_latency_ms for s in self._samples_a] | |
| lat_b = [s.e2e_latency_ms for s in self._samples_b] | |
| if len(tps_a) >= 2 and len(tps_b) >= 2: | |
| try: | |
| _, p_tps = stats.ttest_ind(tps_a, tps_b) | |
| result.p_value_throughput = p_tps | |
| result.throughput_significant = p_tps < 0.05 | |
| except Exception: | |
| pass | |
| if len(lat_a) >= 2 and len(lat_b) >= 2: | |
| try: | |
| _, p_lat = stats.ttest_ind(lat_a, lat_b) | |
| result.p_value_latency = p_lat | |
| result.latency_significant = p_lat < 0.05 | |
| except Exception: | |
| pass | |
| return result | |
| def _generate_recommendation(self, result: ComparisonResult) -> str: | |
| """Generate a human-readable recommendation.""" | |
| parts = [] | |
| a_name = self.deployment_a.name | |
| b_name = self.deployment_b.name | |
| # Throughput comparison | |
| if abs(result.throughput_diff_pct) > 5: | |
| faster = b_name if result.throughput_diff_pct > 0 else a_name | |
| diff = abs(result.throughput_diff_pct) | |
| sig = " (statistically significant)" if result.throughput_significant else "" | |
| parts.append(f"{faster} has {diff:.1f}% higher throughput{sig}") | |
| # Latency comparison | |
| if abs(result.latency_diff_pct) > 5: | |
| faster = a_name if result.latency_diff_pct > 0 else b_name | |
| diff = abs(result.latency_diff_pct) | |
| sig = " (statistically significant)" if result.latency_significant else "" | |
| parts.append(f"{faster} has {diff:.1f}% lower latency{sig}") | |
| # Memory comparison | |
| if abs(result.memory_diff_gb) > 1: | |
| lower = a_name if result.memory_diff_gb > 0 else b_name | |
| diff = abs(result.memory_diff_gb) | |
| parts.append(f"{lower} uses {diff:.1f}GB less GPU memory") | |
| if not parts: | |
| return "Both deployments show similar performance" | |
| return ". ".join(parts) + "." | |
| def get_comparison_table(self, result: ComparisonResult) -> List[Dict[str, Any]]: | |
| """ | |
| Generate comparison table data. | |
| Args: | |
| result: Comparison result | |
| Returns: | |
| List of rows for comparison table | |
| """ | |
| return [ | |
| { | |
| "Metric": "Throughput (tok/s)", | |
| self.deployment_a.name: f"{result.deployment_a.tokens_per_second:.1f}", | |
| self.deployment_b.name: f"{result.deployment_b.tokens_per_second:.1f}", | |
| "Diff": f"{result.throughput_diff_pct:+.1f}%", | |
| }, | |
| { | |
| "Metric": "TTFT (ms)", | |
| self.deployment_a.name: f"{result.deployment_a.ttft_ms:.1f}", | |
| self.deployment_b.name: f"{result.deployment_b.ttft_ms:.1f}", | |
| "Diff": f"{result.ttft_diff_pct:+.1f}%", | |
| }, | |
| { | |
| "Metric": "E2E Latency (ms)", | |
| self.deployment_a.name: f"{result.deployment_a.e2e_latency_ms:.1f}", | |
| self.deployment_b.name: f"{result.deployment_b.e2e_latency_ms:.1f}", | |
| "Diff": f"{result.latency_diff_pct:+.1f}%", | |
| }, | |
| { | |
| "Metric": "KV Cache %", | |
| self.deployment_a.name: f"{result.deployment_a.kv_cache_percent:.1f}", | |
| self.deployment_b.name: f"{result.deployment_b.kv_cache_percent:.1f}", | |
| "Diff": "-", | |
| }, | |
| { | |
| "Metric": "Batch Size", | |
| self.deployment_a.name: str(result.deployment_a.batch_size), | |
| self.deployment_b.name: str(result.deployment_b.batch_size), | |
| "Diff": "-", | |
| }, | |
| ] | |