Spaces:
Sleeping
Sleeping
| """SQLite database operations for metrics storage.""" | |
| import sqlite3 | |
| import json | |
| import logging | |
| from datetime import datetime, timedelta | |
| from pathlib import Path | |
| from typing import List, Optional, Dict, Any | |
| from contextlib import contextmanager | |
| from .models import MetricRecord, AlertRecord, RequestTrace, LoadTestResult | |
| logger = logging.getLogger(__name__) | |
| class MetricsDB: | |
| """SQLite database for storing metrics, alerts, and traces.""" | |
| def __init__(self, db_path: str = "data/metrics.db"): | |
| """ | |
| Initialize database connection. | |
| Args: | |
| db_path: Path to SQLite database file | |
| """ | |
| self.db_path = db_path | |
| self._ensure_directory() | |
| self._init_schema() | |
| def _ensure_directory(self) -> None: | |
| """Ensure the database directory exists.""" | |
| Path(self.db_path).parent.mkdir(parents=True, exist_ok=True) | |
| def _get_connection(self): | |
| """Get a database connection with context manager.""" | |
| conn = sqlite3.connect(self.db_path) | |
| conn.row_factory = sqlite3.Row | |
| try: | |
| yield conn | |
| conn.commit() | |
| except Exception: | |
| conn.rollback() | |
| raise | |
| finally: | |
| conn.close() | |
| def _init_schema(self) -> None: | |
| """Initialize database schema.""" | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| # Metrics table | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS metrics ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, | |
| metric_name TEXT NOT NULL, | |
| value REAL NOT NULL, | |
| labels TEXT | |
| ) | |
| """) | |
| # Indexes for metrics | |
| cursor.execute(""" | |
| CREATE INDEX IF NOT EXISTS idx_metrics_timestamp | |
| ON metrics(timestamp) | |
| """) | |
| cursor.execute(""" | |
| CREATE INDEX IF NOT EXISTS idx_metrics_name_time | |
| ON metrics(metric_name, timestamp) | |
| """) | |
| # Alerts table | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS alerts ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, | |
| rule_name TEXT, | |
| severity TEXT, | |
| metric_name TEXT, | |
| value REAL, | |
| threshold REAL, | |
| message TEXT, | |
| resolved_at DATETIME | |
| ) | |
| """) | |
| # Request traces table | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS request_traces ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| request_id TEXT UNIQUE, | |
| timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, | |
| prompt_tokens INTEGER, | |
| output_tokens INTEGER, | |
| queue_time_ms REAL, | |
| prefill_time_ms REAL, | |
| decode_time_ms REAL, | |
| total_time_ms REAL, | |
| tokens_per_second REAL, | |
| is_slow BOOLEAN | |
| ) | |
| """) | |
| # Load test results table | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS load_tests ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| test_id TEXT UNIQUE, | |
| timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, | |
| target_endpoint TEXT, | |
| concurrent_users INTEGER, | |
| requests_per_second REAL, | |
| duration_seconds INTEGER, | |
| total_requests INTEGER, | |
| successful_requests INTEGER, | |
| failed_requests INTEGER, | |
| avg_latency_ms REAL, | |
| p50_latency_ms REAL, | |
| p95_latency_ms REAL, | |
| p99_latency_ms REAL, | |
| throughput_rps REAL, | |
| saturation_point REAL | |
| ) | |
| """) | |
| # Metrics operations | |
| def insert_metric(self, record: MetricRecord) -> int: | |
| """Insert a metric record.""" | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| cursor.execute( | |
| """ | |
| INSERT INTO metrics (timestamp, metric_name, value, labels) | |
| VALUES (?, ?, ?, ?) | |
| """, | |
| ( | |
| record.timestamp.isoformat(), | |
| record.metric_name, | |
| record.value, | |
| json.dumps(record.labels) if record.labels else None, | |
| ), | |
| ) | |
| return cursor.lastrowid | |
| def insert_metrics_batch(self, records: List[MetricRecord]) -> None: | |
| """Insert multiple metric records efficiently.""" | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| cursor.executemany( | |
| """ | |
| INSERT INTO metrics (timestamp, metric_name, value, labels) | |
| VALUES (?, ?, ?, ?) | |
| """, | |
| [ | |
| ( | |
| r.timestamp.isoformat(), | |
| r.metric_name, | |
| r.value, | |
| json.dumps(r.labels) if r.labels else None, | |
| ) | |
| for r in records | |
| ], | |
| ) | |
| def query_metrics( | |
| self, | |
| metric_name: str, | |
| start: datetime, | |
| end: datetime, | |
| labels: Optional[Dict[str, str]] = None, | |
| ) -> List[MetricRecord]: | |
| """Query metrics by name and time range.""" | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| cursor.execute( | |
| """ | |
| SELECT id, timestamp, metric_name, value, labels | |
| FROM metrics | |
| WHERE metric_name = ? AND timestamp BETWEEN ? AND ? | |
| ORDER BY timestamp | |
| """, | |
| (metric_name, start.isoformat(), end.isoformat()), | |
| ) | |
| records = [] | |
| for row in cursor.fetchall(): | |
| record = MetricRecord.from_row(tuple(row)) | |
| if labels: | |
| # Filter by labels if specified | |
| if all(record.labels.get(k) == v for k, v in labels.items()): | |
| records.append(record) | |
| else: | |
| records.append(record) | |
| return records | |
| def query_aggregated( | |
| self, | |
| metric_name: str, | |
| start: datetime, | |
| end: datetime, | |
| aggregation: str = "avg", | |
| bucket_minutes: int = 1, | |
| ) -> List[Dict[str, Any]]: | |
| """Query metrics with time bucketing and aggregation.""" | |
| agg_func = { | |
| "avg": "AVG", | |
| "max": "MAX", | |
| "min": "MIN", | |
| "sum": "SUM", | |
| "count": "COUNT", | |
| }.get(aggregation, "AVG") | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| cursor.execute( | |
| f""" | |
| SELECT | |
| datetime( | |
| strftime('%Y-%m-%d %H:', timestamp) || | |
| printf('%02d', (CAST(strftime('%M', timestamp) AS INTEGER) / {bucket_minutes}) * {bucket_minutes}) || | |
| ':00' | |
| ) as bucket, | |
| {agg_func}(value) as value | |
| FROM metrics | |
| WHERE metric_name = ? AND timestamp BETWEEN ? AND ? | |
| GROUP BY bucket | |
| ORDER BY bucket | |
| """, | |
| (metric_name, start.isoformat(), end.isoformat()), | |
| ) | |
| return [ | |
| {"time": row["bucket"], "value": row["value"]} | |
| for row in cursor.fetchall() | |
| ] | |
| # Alert operations | |
| def insert_alert(self, alert: AlertRecord) -> int: | |
| """Insert an alert record.""" | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| cursor.execute( | |
| """ | |
| INSERT INTO alerts | |
| (timestamp, rule_name, severity, metric_name, value, threshold, message, resolved_at) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?) | |
| """, | |
| ( | |
| alert.timestamp.isoformat(), | |
| alert.rule_name, | |
| alert.severity, | |
| alert.metric_name, | |
| alert.value, | |
| alert.threshold, | |
| alert.message, | |
| alert.resolved_at.isoformat() if alert.resolved_at else None, | |
| ), | |
| ) | |
| return cursor.lastrowid | |
| def get_active_alerts(self) -> List[AlertRecord]: | |
| """Get all unresolved alerts.""" | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| cursor.execute( | |
| """ | |
| SELECT id, timestamp, rule_name, severity, metric_name, value, threshold, message, resolved_at | |
| FROM alerts | |
| WHERE resolved_at IS NULL | |
| ORDER BY timestamp DESC | |
| """ | |
| ) | |
| return [AlertRecord.from_row(tuple(row)) for row in cursor.fetchall()] | |
| def get_recent_alerts(self, limit: int = 100) -> List[AlertRecord]: | |
| """Get recent alerts.""" | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| cursor.execute( | |
| """ | |
| SELECT id, timestamp, rule_name, severity, metric_name, value, threshold, message, resolved_at | |
| FROM alerts | |
| ORDER BY timestamp DESC | |
| LIMIT ? | |
| """, | |
| (limit,), | |
| ) | |
| return [AlertRecord.from_row(tuple(row)) for row in cursor.fetchall()] | |
| def resolve_alert(self, alert_id: int) -> None: | |
| """Mark an alert as resolved.""" | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| cursor.execute( | |
| """ | |
| UPDATE alerts SET resolved_at = ? WHERE id = ? | |
| """, | |
| (datetime.now().isoformat(), alert_id), | |
| ) | |
| # Request trace operations | |
| def insert_trace(self, trace: RequestTrace) -> int: | |
| """Insert a request trace.""" | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| cursor.execute( | |
| """ | |
| INSERT OR REPLACE INTO request_traces | |
| (request_id, timestamp, prompt_tokens, output_tokens, | |
| queue_time_ms, prefill_time_ms, decode_time_ms, total_time_ms, | |
| tokens_per_second, is_slow) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, | |
| ( | |
| trace.request_id, | |
| trace.timestamp.isoformat(), | |
| trace.prompt_tokens, | |
| trace.output_tokens, | |
| trace.queue_time_ms, | |
| trace.prefill_time_ms, | |
| trace.decode_time_ms, | |
| trace.total_time_ms, | |
| trace.tokens_per_second, | |
| trace.is_slow, | |
| ), | |
| ) | |
| return cursor.lastrowid | |
| def get_recent_traces( | |
| self, limit: int = 100, slow_only: bool = False | |
| ) -> List[RequestTrace]: | |
| """Get recent request traces.""" | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| query = """ | |
| SELECT id, request_id, timestamp, prompt_tokens, output_tokens, | |
| queue_time_ms, prefill_time_ms, decode_time_ms, total_time_ms, | |
| tokens_per_second, is_slow | |
| FROM request_traces | |
| """ | |
| if slow_only: | |
| query += " WHERE is_slow = 1" | |
| query += " ORDER BY timestamp DESC LIMIT ?" | |
| cursor.execute(query, (limit,)) | |
| return [RequestTrace.from_row(tuple(row)) for row in cursor.fetchall()] | |
| def get_trace_stats(self) -> Dict[str, Any]: | |
| """Get aggregate statistics for traces.""" | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| cursor.execute( | |
| """ | |
| SELECT | |
| COUNT(*) as total, | |
| AVG(total_time_ms) as avg_latency, | |
| AVG(queue_time_ms) as avg_queue, | |
| AVG(prefill_time_ms) as avg_prefill, | |
| AVG(decode_time_ms) as avg_decode, | |
| SUM(CASE WHEN is_slow THEN 1 ELSE 0 END) as slow_count | |
| FROM request_traces | |
| WHERE timestamp > datetime('now', '-1 hour') | |
| """ | |
| ) | |
| row = cursor.fetchone() | |
| return { | |
| "total_requests": row["total"] or 0, | |
| "avg_latency_ms": row["avg_latency"] or 0, | |
| "avg_queue_ms": row["avg_queue"] or 0, | |
| "avg_prefill_ms": row["avg_prefill"] or 0, | |
| "avg_decode_ms": row["avg_decode"] or 0, | |
| "slow_request_count": row["slow_count"] or 0, | |
| } | |
| # Load test operations | |
| def insert_load_test(self, result: LoadTestResult) -> int: | |
| """Insert a load test result.""" | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| cursor.execute( | |
| """ | |
| INSERT INTO load_tests | |
| (test_id, timestamp, target_endpoint, concurrent_users, | |
| requests_per_second, duration_seconds, total_requests, | |
| successful_requests, failed_requests, avg_latency_ms, | |
| p50_latency_ms, p95_latency_ms, p99_latency_ms, | |
| throughput_rps, saturation_point) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, | |
| ( | |
| result.test_id, | |
| result.timestamp.isoformat(), | |
| result.target_endpoint, | |
| result.concurrent_users, | |
| result.requests_per_second, | |
| result.duration_seconds, | |
| result.total_requests, | |
| result.successful_requests, | |
| result.failed_requests, | |
| result.avg_latency_ms, | |
| result.p50_latency_ms, | |
| result.p95_latency_ms, | |
| result.p99_latency_ms, | |
| result.throughput_rps, | |
| result.saturation_point, | |
| ), | |
| ) | |
| return cursor.lastrowid | |
| def get_recent_load_tests(self, limit: int = 10) -> List[Dict[str, Any]]: | |
| """Get recent load test results.""" | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| cursor.execute( | |
| """ | |
| SELECT * FROM load_tests | |
| ORDER BY timestamp DESC | |
| LIMIT ? | |
| """, | |
| (limit,), | |
| ) | |
| return [dict(row) for row in cursor.fetchall()] | |
| # Cleanup operations | |
| def cleanup_old_data(self, days: int = 7) -> int: | |
| """Remove data older than specified days.""" | |
| cutoff = (datetime.now() - timedelta(days=days)).isoformat() | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| total_deleted = 0 | |
| for table in ["metrics", "alerts", "request_traces"]: | |
| cursor.execute( | |
| f"DELETE FROM {table} WHERE timestamp < ?", | |
| (cutoff,), | |
| ) | |
| total_deleted += cursor.rowcount | |
| return total_deleted | |