Spaces:
Sleeping
Sleeping
| """ | |
| Performance Metrics Module | |
| Provides metrics collection, timing histograms, and performance monitoring. | |
| """ | |
| from __future__ import annotations | |
| import time | |
| from collections import defaultdict | |
| from dataclasses import dataclass, field | |
| from threading import Lock | |
| from typing import Any, Optional | |
| # ============================================================================= | |
| # METRIC TYPES | |
| # ============================================================================= | |
| class Counter: | |
| """Simple counter metric.""" | |
| name: str | |
| value: int = 0 | |
| labels: dict[str, str] = field(default_factory=dict) | |
| _lock: Lock = field(default_factory=Lock, repr=False) | |
| def inc(self, amount: int = 1) -> None: | |
| """Increment counter.""" | |
| with self._lock: | |
| self.value += amount | |
| def reset(self) -> None: | |
| """Reset counter to zero.""" | |
| with self._lock: | |
| self.value = 0 | |
| class Gauge: | |
| """Gauge metric that can go up or down.""" | |
| name: str | |
| value: float = 0.0 | |
| labels: dict[str, str] = field(default_factory=dict) | |
| _lock: Lock = field(default_factory=Lock, repr=False) | |
| def set(self, value: float) -> None: | |
| """Set gauge value.""" | |
| with self._lock: | |
| self.value = value | |
| def inc(self, amount: float = 1.0) -> None: | |
| """Increment gauge.""" | |
| with self._lock: | |
| self.value += amount | |
| def dec(self, amount: float = 1.0) -> None: | |
| """Decrement gauge.""" | |
| with self._lock: | |
| self.value -= amount | |
| class Histogram: | |
| """Histogram for measuring distributions.""" | |
| name: str | |
| buckets: tuple[float, ...] = (0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0) | |
| labels: dict[str, str] = field(default_factory=dict) | |
| _values: list[float] = field(default_factory=list, repr=False) | |
| _bucket_counts: dict[float, int] = field(default_factory=dict, repr=False) | |
| _lock: Lock = field(default_factory=Lock, repr=False) | |
| def __post_init__(self) -> None: | |
| """Initialize bucket counts.""" | |
| self._bucket_counts = {b: 0 for b in self.buckets} | |
| self._bucket_counts[float("inf")] = 0 | |
| def observe(self, value: float) -> None: | |
| """Record a value.""" | |
| with self._lock: | |
| self._values.append(value) | |
| for bucket in self.buckets: | |
| if value <= bucket: | |
| self._bucket_counts[bucket] += 1 | |
| self._bucket_counts[float("inf")] += 1 | |
| def count(self) -> int: | |
| """Total number of observations.""" | |
| return len(self._values) | |
| def sum(self) -> float: | |
| """Sum of all observations.""" | |
| return sum(self._values) | |
| def avg(self) -> float: | |
| """Average value.""" | |
| if not self._values: | |
| return 0.0 | |
| return self.sum / self.count | |
| def p50(self) -> float: | |
| """50th percentile.""" | |
| return self._percentile(0.5) | |
| def p95(self) -> float: | |
| """95th percentile.""" | |
| return self._percentile(0.95) | |
| def p99(self) -> float: | |
| """99th percentile.""" | |
| return self._percentile(0.99) | |
| def _percentile(self, p: float) -> float: | |
| """Calculate percentile.""" | |
| if not self._values: | |
| return 0.0 | |
| sorted_values = sorted(self._values) | |
| idx = int(len(sorted_values) * p) | |
| return sorted_values[min(idx, len(sorted_values) - 1)] | |
| def reset(self) -> None: | |
| """Reset histogram.""" | |
| with self._lock: | |
| self._values.clear() | |
| self._bucket_counts = {b: 0 for b in self.buckets} | |
| self._bucket_counts[float("inf")] = 0 | |
| # ============================================================================= | |
| # METRICS REGISTRY | |
| # ============================================================================= | |
| class MetricsRegistry: | |
| """Registry for all application metrics.""" | |
| def __init__(self) -> None: | |
| self._counters: dict[str, Counter] = {} | |
| self._gauges: dict[str, Gauge] = {} | |
| self._histograms: dict[str, Histogram] = {} | |
| self._lock = Lock() | |
| def counter(self, name: str, **labels) -> Counter: | |
| """Get or create a counter.""" | |
| key = self._make_key(name, labels) | |
| with self._lock: | |
| if key not in self._counters: | |
| self._counters[key] = Counter(name=name, labels=labels) | |
| return self._counters[key] | |
| def gauge(self, name: str, **labels) -> Gauge: | |
| """Get or create a gauge.""" | |
| key = self._make_key(name, labels) | |
| with self._lock: | |
| if key not in self._gauges: | |
| self._gauges[key] = Gauge(name=name, labels=labels) | |
| return self._gauges[key] | |
| def histogram(self, name: str, buckets: Optional[tuple] = None, **labels) -> Histogram: | |
| """Get or create a histogram.""" | |
| key = self._make_key(name, labels) | |
| with self._lock: | |
| if key not in self._histograms: | |
| kwargs = {"name": name, "labels": labels} | |
| if buckets: | |
| kwargs["buckets"] = buckets | |
| self._histograms[key] = Histogram(**kwargs) | |
| return self._histograms[key] | |
| def _make_key(self, name: str, labels: dict) -> str: | |
| """Create unique key for metric.""" | |
| label_str = ",".join(f"{k}={v}" for k, v in sorted(labels.items())) | |
| return f"{name}{{{label_str}}}" | |
| def get_all(self) -> dict[str, Any]: | |
| """Get all metrics as dictionary.""" | |
| result = {"counters": {}, "gauges": {}, "histograms": {}} | |
| with self._lock: | |
| for key, counter in self._counters.items(): | |
| result["counters"][key] = counter.value | |
| for key, gauge in self._gauges.items(): | |
| result["gauges"][key] = gauge.value | |
| for key, hist in self._histograms.items(): | |
| result["histograms"][key] = { | |
| "count": hist.count, | |
| "sum": hist.sum, | |
| "avg": hist.avg, | |
| "p50": hist.p50, | |
| "p95": hist.p95, | |
| "p99": hist.p99, | |
| } | |
| return result | |
| def reset_all(self) -> None: | |
| """Reset all metrics.""" | |
| with self._lock: | |
| for counter in self._counters.values(): | |
| counter.reset() | |
| for gauge in self._gauges.values(): | |
| gauge.set(0) | |
| for histogram in self._histograms.values(): | |
| histogram.reset() | |
| # Global registry instance | |
| _registry: Optional[MetricsRegistry] = None | |
| def get_registry() -> MetricsRegistry: | |
| """Get global metrics registry.""" | |
| global _registry | |
| if _registry is None: | |
| _registry = MetricsRegistry() | |
| return _registry | |
| # ============================================================================= | |
| # CONVENIENCE FUNCTIONS | |
| # ============================================================================= | |
| def counter(name: str, **labels) -> Counter: | |
| """Get or create a counter.""" | |
| return get_registry().counter(name, **labels) | |
| def gauge(name: str, **labels) -> Gauge: | |
| """Get or create a gauge.""" | |
| return get_registry().gauge(name, **labels) | |
| def histogram(name: str, **labels) -> Histogram: | |
| """Get or create a histogram.""" | |
| return get_registry().histogram(name, **labels) | |
| # ============================================================================= | |
| # TIMER CONTEXT MANAGER | |
| # ============================================================================= | |
| class TimerMetric: | |
| """Context manager for timing operations.""" | |
| def __init__(self, histogram_name: str, **labels) -> None: | |
| self.histogram = get_registry().histogram(histogram_name, **labels) | |
| self.start_time = 0.0 | |
| def __enter__(self) -> "TimerMetric": | |
| self.start_time = time.perf_counter() | |
| return self | |
| def __exit__(self, *args) -> None: | |
| duration = time.perf_counter() - self.start_time | |
| self.histogram.observe(duration) | |
| # ============================================================================= | |
| # PRE-DEFINED METRICS | |
| # ============================================================================= | |
| # Request metrics | |
| REQUESTS_TOTAL = "requests_total" | |
| REQUEST_DURATION = "request_duration_seconds" | |
| REQUEST_ERRORS = "request_errors_total" | |
| # Scraper metrics | |
| SCRAPE_DURATION = "scrape_duration_seconds" | |
| SCRAPE_SUCCESS = "scrape_success_total" | |
| SCRAPE_FAILURE = "scrape_failure_total" | |
| TIER_USAGE = "tier_usage_total" | |
| # Cache metrics | |
| CACHE_HITS = "cache_hits_total" | |
| CACHE_MISSES = "cache_misses_total" | |
| # Rate limit metrics | |
| RATE_LIMIT_HITS = "rate_limit_hits_total" | |
| def record_request(path: str, method: str, status: int, duration: float) -> None: | |
| """Record HTTP request metrics.""" | |
| counter(REQUESTS_TOTAL, path=path, method=method, status=str(status)).inc() | |
| histogram(REQUEST_DURATION, path=path).observe(duration) | |
| if status >= 400: | |
| counter(REQUEST_ERRORS, path=path, status=str(status)).inc() | |
| def record_scrape(url: str, tier: str, success: bool, duration: float) -> None: | |
| """Record scraping metrics.""" | |
| histogram(SCRAPE_DURATION, tier=tier).observe(duration) | |
| counter(TIER_USAGE, tier=tier).inc() | |
| if success: | |
| counter(SCRAPE_SUCCESS, tier=tier).inc() | |
| else: | |
| counter(SCRAPE_FAILURE, tier=tier).inc() | |
| def record_cache(hit: bool) -> None: | |
| """Record cache hit/miss.""" | |
| if hit: | |
| counter(CACHE_HITS).inc() | |
| else: | |
| counter(CACHE_MISSES).inc() | |