"""SQLite database operations for metrics storage.""" import sqlite3 import json import logging from datetime import datetime, timedelta from pathlib import Path from typing import List, Optional, Dict, Any from contextlib import contextmanager from .models import MetricRecord, AlertRecord, RequestTrace, LoadTestResult logger = logging.getLogger(__name__) class MetricsDB: """SQLite database for storing metrics, alerts, and traces.""" def __init__(self, db_path: str = "data/metrics.db"): """ Initialize database connection. Args: db_path: Path to SQLite database file """ self.db_path = db_path self._ensure_directory() self._init_schema() def _ensure_directory(self) -> None: """Ensure the database directory exists.""" Path(self.db_path).parent.mkdir(parents=True, exist_ok=True) @contextmanager def _get_connection(self): """Get a database connection with context manager.""" conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row try: yield conn conn.commit() except Exception: conn.rollback() raise finally: conn.close() def _init_schema(self) -> None: """Initialize database schema.""" with self._get_connection() as conn: cursor = conn.cursor() # Metrics table cursor.execute(""" CREATE TABLE IF NOT EXISTS metrics ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, metric_name TEXT NOT NULL, value REAL NOT NULL, labels TEXT ) """) # Indexes for metrics cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_metrics_timestamp ON metrics(timestamp) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_metrics_name_time ON metrics(metric_name, timestamp) """) # Alerts table cursor.execute(""" CREATE TABLE IF NOT EXISTS alerts ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, rule_name TEXT, severity TEXT, metric_name TEXT, value REAL, threshold REAL, message TEXT, resolved_at DATETIME ) """) # Request traces table cursor.execute(""" CREATE TABLE IF NOT EXISTS request_traces ( id INTEGER PRIMARY KEY AUTOINCREMENT, request_id TEXT UNIQUE, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, prompt_tokens INTEGER, output_tokens INTEGER, queue_time_ms REAL, prefill_time_ms REAL, decode_time_ms REAL, total_time_ms REAL, tokens_per_second REAL, is_slow BOOLEAN ) """) # Load test results table cursor.execute(""" CREATE TABLE IF NOT EXISTS load_tests ( id INTEGER PRIMARY KEY AUTOINCREMENT, test_id TEXT UNIQUE, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, target_endpoint TEXT, concurrent_users INTEGER, requests_per_second REAL, duration_seconds INTEGER, total_requests INTEGER, successful_requests INTEGER, failed_requests INTEGER, avg_latency_ms REAL, p50_latency_ms REAL, p95_latency_ms REAL, p99_latency_ms REAL, throughput_rps REAL, saturation_point REAL ) """) # Metrics operations def insert_metric(self, record: MetricRecord) -> int: """Insert a metric record.""" with self._get_connection() as conn: cursor = conn.cursor() cursor.execute( """ INSERT INTO metrics (timestamp, metric_name, value, labels) VALUES (?, ?, ?, ?) """, ( record.timestamp.isoformat(), record.metric_name, record.value, json.dumps(record.labels) if record.labels else None, ), ) return cursor.lastrowid def insert_metrics_batch(self, records: List[MetricRecord]) -> None: """Insert multiple metric records efficiently.""" with self._get_connection() as conn: cursor = conn.cursor() cursor.executemany( """ INSERT INTO metrics (timestamp, metric_name, value, labels) VALUES (?, ?, ?, ?) """, [ ( r.timestamp.isoformat(), r.metric_name, r.value, json.dumps(r.labels) if r.labels else None, ) for r in records ], ) def query_metrics( self, metric_name: str, start: datetime, end: datetime, labels: Optional[Dict[str, str]] = None, ) -> List[MetricRecord]: """Query metrics by name and time range.""" with self._get_connection() as conn: cursor = conn.cursor() cursor.execute( """ SELECT id, timestamp, metric_name, value, labels FROM metrics WHERE metric_name = ? AND timestamp BETWEEN ? AND ? ORDER BY timestamp """, (metric_name, start.isoformat(), end.isoformat()), ) records = [] for row in cursor.fetchall(): record = MetricRecord.from_row(tuple(row)) if labels: # Filter by labels if specified if all(record.labels.get(k) == v for k, v in labels.items()): records.append(record) else: records.append(record) return records def query_aggregated( self, metric_name: str, start: datetime, end: datetime, aggregation: str = "avg", bucket_minutes: int = 1, ) -> List[Dict[str, Any]]: """Query metrics with time bucketing and aggregation.""" agg_func = { "avg": "AVG", "max": "MAX", "min": "MIN", "sum": "SUM", "count": "COUNT", }.get(aggregation, "AVG") with self._get_connection() as conn: cursor = conn.cursor() cursor.execute( f""" SELECT datetime( strftime('%Y-%m-%d %H:', timestamp) || printf('%02d', (CAST(strftime('%M', timestamp) AS INTEGER) / {bucket_minutes}) * {bucket_minutes}) || ':00' ) as bucket, {agg_func}(value) as value FROM metrics WHERE metric_name = ? AND timestamp BETWEEN ? AND ? GROUP BY bucket ORDER BY bucket """, (metric_name, start.isoformat(), end.isoformat()), ) return [ {"time": row["bucket"], "value": row["value"]} for row in cursor.fetchall() ] # Alert operations def insert_alert(self, alert: AlertRecord) -> int: """Insert an alert record.""" with self._get_connection() as conn: cursor = conn.cursor() cursor.execute( """ INSERT INTO alerts (timestamp, rule_name, severity, metric_name, value, threshold, message, resolved_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( alert.timestamp.isoformat(), alert.rule_name, alert.severity, alert.metric_name, alert.value, alert.threshold, alert.message, alert.resolved_at.isoformat() if alert.resolved_at else None, ), ) return cursor.lastrowid def get_active_alerts(self) -> List[AlertRecord]: """Get all unresolved alerts.""" with self._get_connection() as conn: cursor = conn.cursor() cursor.execute( """ SELECT id, timestamp, rule_name, severity, metric_name, value, threshold, message, resolved_at FROM alerts WHERE resolved_at IS NULL ORDER BY timestamp DESC """ ) return [AlertRecord.from_row(tuple(row)) for row in cursor.fetchall()] def get_recent_alerts(self, limit: int = 100) -> List[AlertRecord]: """Get recent alerts.""" with self._get_connection() as conn: cursor = conn.cursor() cursor.execute( """ SELECT id, timestamp, rule_name, severity, metric_name, value, threshold, message, resolved_at FROM alerts ORDER BY timestamp DESC LIMIT ? """, (limit,), ) return [AlertRecord.from_row(tuple(row)) for row in cursor.fetchall()] def resolve_alert(self, alert_id: int) -> None: """Mark an alert as resolved.""" with self._get_connection() as conn: cursor = conn.cursor() cursor.execute( """ UPDATE alerts SET resolved_at = ? WHERE id = ? """, (datetime.now().isoformat(), alert_id), ) # Request trace operations def insert_trace(self, trace: RequestTrace) -> int: """Insert a request trace.""" with self._get_connection() as conn: cursor = conn.cursor() cursor.execute( """ INSERT OR REPLACE INTO request_traces (request_id, timestamp, prompt_tokens, output_tokens, queue_time_ms, prefill_time_ms, decode_time_ms, total_time_ms, tokens_per_second, is_slow) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( trace.request_id, trace.timestamp.isoformat(), trace.prompt_tokens, trace.output_tokens, trace.queue_time_ms, trace.prefill_time_ms, trace.decode_time_ms, trace.total_time_ms, trace.tokens_per_second, trace.is_slow, ), ) return cursor.lastrowid def get_recent_traces( self, limit: int = 100, slow_only: bool = False ) -> List[RequestTrace]: """Get recent request traces.""" with self._get_connection() as conn: cursor = conn.cursor() query = """ SELECT id, request_id, timestamp, prompt_tokens, output_tokens, queue_time_ms, prefill_time_ms, decode_time_ms, total_time_ms, tokens_per_second, is_slow FROM request_traces """ if slow_only: query += " WHERE is_slow = 1" query += " ORDER BY timestamp DESC LIMIT ?" cursor.execute(query, (limit,)) return [RequestTrace.from_row(tuple(row)) for row in cursor.fetchall()] def get_trace_stats(self) -> Dict[str, Any]: """Get aggregate statistics for traces.""" with self._get_connection() as conn: cursor = conn.cursor() cursor.execute( """ SELECT COUNT(*) as total, AVG(total_time_ms) as avg_latency, AVG(queue_time_ms) as avg_queue, AVG(prefill_time_ms) as avg_prefill, AVG(decode_time_ms) as avg_decode, SUM(CASE WHEN is_slow THEN 1 ELSE 0 END) as slow_count FROM request_traces WHERE timestamp > datetime('now', '-1 hour') """ ) row = cursor.fetchone() return { "total_requests": row["total"] or 0, "avg_latency_ms": row["avg_latency"] or 0, "avg_queue_ms": row["avg_queue"] or 0, "avg_prefill_ms": row["avg_prefill"] or 0, "avg_decode_ms": row["avg_decode"] or 0, "slow_request_count": row["slow_count"] or 0, } # Load test operations def insert_load_test(self, result: LoadTestResult) -> int: """Insert a load test result.""" with self._get_connection() as conn: cursor = conn.cursor() cursor.execute( """ INSERT INTO load_tests (test_id, timestamp, target_endpoint, concurrent_users, requests_per_second, duration_seconds, total_requests, successful_requests, failed_requests, avg_latency_ms, p50_latency_ms, p95_latency_ms, p99_latency_ms, throughput_rps, saturation_point) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( result.test_id, result.timestamp.isoformat(), result.target_endpoint, result.concurrent_users, result.requests_per_second, result.duration_seconds, result.total_requests, result.successful_requests, result.failed_requests, result.avg_latency_ms, result.p50_latency_ms, result.p95_latency_ms, result.p99_latency_ms, result.throughput_rps, result.saturation_point, ), ) return cursor.lastrowid def get_recent_load_tests(self, limit: int = 10) -> List[Dict[str, Any]]: """Get recent load test results.""" with self._get_connection() as conn: cursor = conn.cursor() cursor.execute( """ SELECT * FROM load_tests ORDER BY timestamp DESC LIMIT ? """, (limit,), ) return [dict(row) for row in cursor.fetchall()] # Cleanup operations def cleanup_old_data(self, days: int = 7) -> int: """Remove data older than specified days.""" cutoff = (datetime.now() - timedelta(days=days)).isoformat() with self._get_connection() as conn: cursor = conn.cursor() total_deleted = 0 for table in ["metrics", "alerts", "request_traces"]: cursor.execute( f"DELETE FROM {table} WHERE timestamp < ?", (cutoff,), ) total_deleted += cursor.rowcount return total_deleted