jkottu's picture
Initial commit: LLM Inference Dashboard
aefabf0
"""SQLite database operations for metrics storage."""
import sqlite3
import json
import logging
from datetime import datetime, timedelta
from pathlib import Path
from typing import List, Optional, Dict, Any
from contextlib import contextmanager
from .models import MetricRecord, AlertRecord, RequestTrace, LoadTestResult
logger = logging.getLogger(__name__)
class MetricsDB:
"""SQLite database for storing metrics, alerts, and traces."""
def __init__(self, db_path: str = "data/metrics.db"):
"""
Initialize database connection.
Args:
db_path: Path to SQLite database file
"""
self.db_path = db_path
self._ensure_directory()
self._init_schema()
def _ensure_directory(self) -> None:
"""Ensure the database directory exists."""
Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)
@contextmanager
def _get_connection(self):
"""Get a database connection with context manager."""
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
def _init_schema(self) -> None:
"""Initialize database schema."""
with self._get_connection() as conn:
cursor = conn.cursor()
# Metrics table
cursor.execute("""
CREATE TABLE IF NOT EXISTS metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
metric_name TEXT NOT NULL,
value REAL NOT NULL,
labels TEXT
)
""")
# Indexes for metrics
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_metrics_timestamp
ON metrics(timestamp)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_metrics_name_time
ON metrics(metric_name, timestamp)
""")
# Alerts table
cursor.execute("""
CREATE TABLE IF NOT EXISTS alerts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
rule_name TEXT,
severity TEXT,
metric_name TEXT,
value REAL,
threshold REAL,
message TEXT,
resolved_at DATETIME
)
""")
# Request traces table
cursor.execute("""
CREATE TABLE IF NOT EXISTS request_traces (
id INTEGER PRIMARY KEY AUTOINCREMENT,
request_id TEXT UNIQUE,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
prompt_tokens INTEGER,
output_tokens INTEGER,
queue_time_ms REAL,
prefill_time_ms REAL,
decode_time_ms REAL,
total_time_ms REAL,
tokens_per_second REAL,
is_slow BOOLEAN
)
""")
# Load test results table
cursor.execute("""
CREATE TABLE IF NOT EXISTS load_tests (
id INTEGER PRIMARY KEY AUTOINCREMENT,
test_id TEXT UNIQUE,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
target_endpoint TEXT,
concurrent_users INTEGER,
requests_per_second REAL,
duration_seconds INTEGER,
total_requests INTEGER,
successful_requests INTEGER,
failed_requests INTEGER,
avg_latency_ms REAL,
p50_latency_ms REAL,
p95_latency_ms REAL,
p99_latency_ms REAL,
throughput_rps REAL,
saturation_point REAL
)
""")
# Metrics operations
def insert_metric(self, record: MetricRecord) -> int:
"""Insert a metric record."""
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
INSERT INTO metrics (timestamp, metric_name, value, labels)
VALUES (?, ?, ?, ?)
""",
(
record.timestamp.isoformat(),
record.metric_name,
record.value,
json.dumps(record.labels) if record.labels else None,
),
)
return cursor.lastrowid
def insert_metrics_batch(self, records: List[MetricRecord]) -> None:
"""Insert multiple metric records efficiently."""
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.executemany(
"""
INSERT INTO metrics (timestamp, metric_name, value, labels)
VALUES (?, ?, ?, ?)
""",
[
(
r.timestamp.isoformat(),
r.metric_name,
r.value,
json.dumps(r.labels) if r.labels else None,
)
for r in records
],
)
def query_metrics(
self,
metric_name: str,
start: datetime,
end: datetime,
labels: Optional[Dict[str, str]] = None,
) -> List[MetricRecord]:
"""Query metrics by name and time range."""
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT id, timestamp, metric_name, value, labels
FROM metrics
WHERE metric_name = ? AND timestamp BETWEEN ? AND ?
ORDER BY timestamp
""",
(metric_name, start.isoformat(), end.isoformat()),
)
records = []
for row in cursor.fetchall():
record = MetricRecord.from_row(tuple(row))
if labels:
# Filter by labels if specified
if all(record.labels.get(k) == v for k, v in labels.items()):
records.append(record)
else:
records.append(record)
return records
def query_aggregated(
self,
metric_name: str,
start: datetime,
end: datetime,
aggregation: str = "avg",
bucket_minutes: int = 1,
) -> List[Dict[str, Any]]:
"""Query metrics with time bucketing and aggregation."""
agg_func = {
"avg": "AVG",
"max": "MAX",
"min": "MIN",
"sum": "SUM",
"count": "COUNT",
}.get(aggregation, "AVG")
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
f"""
SELECT
datetime(
strftime('%Y-%m-%d %H:', timestamp) ||
printf('%02d', (CAST(strftime('%M', timestamp) AS INTEGER) / {bucket_minutes}) * {bucket_minutes}) ||
':00'
) as bucket,
{agg_func}(value) as value
FROM metrics
WHERE metric_name = ? AND timestamp BETWEEN ? AND ?
GROUP BY bucket
ORDER BY bucket
""",
(metric_name, start.isoformat(), end.isoformat()),
)
return [
{"time": row["bucket"], "value": row["value"]}
for row in cursor.fetchall()
]
# Alert operations
def insert_alert(self, alert: AlertRecord) -> int:
"""Insert an alert record."""
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
INSERT INTO alerts
(timestamp, rule_name, severity, metric_name, value, threshold, message, resolved_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(
alert.timestamp.isoformat(),
alert.rule_name,
alert.severity,
alert.metric_name,
alert.value,
alert.threshold,
alert.message,
alert.resolved_at.isoformat() if alert.resolved_at else None,
),
)
return cursor.lastrowid
def get_active_alerts(self) -> List[AlertRecord]:
"""Get all unresolved alerts."""
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT id, timestamp, rule_name, severity, metric_name, value, threshold, message, resolved_at
FROM alerts
WHERE resolved_at IS NULL
ORDER BY timestamp DESC
"""
)
return [AlertRecord.from_row(tuple(row)) for row in cursor.fetchall()]
def get_recent_alerts(self, limit: int = 100) -> List[AlertRecord]:
"""Get recent alerts."""
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT id, timestamp, rule_name, severity, metric_name, value, threshold, message, resolved_at
FROM alerts
ORDER BY timestamp DESC
LIMIT ?
""",
(limit,),
)
return [AlertRecord.from_row(tuple(row)) for row in cursor.fetchall()]
def resolve_alert(self, alert_id: int) -> None:
"""Mark an alert as resolved."""
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
UPDATE alerts SET resolved_at = ? WHERE id = ?
""",
(datetime.now().isoformat(), alert_id),
)
# Request trace operations
def insert_trace(self, trace: RequestTrace) -> int:
"""Insert a request trace."""
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
INSERT OR REPLACE INTO request_traces
(request_id, timestamp, prompt_tokens, output_tokens,
queue_time_ms, prefill_time_ms, decode_time_ms, total_time_ms,
tokens_per_second, is_slow)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
trace.request_id,
trace.timestamp.isoformat(),
trace.prompt_tokens,
trace.output_tokens,
trace.queue_time_ms,
trace.prefill_time_ms,
trace.decode_time_ms,
trace.total_time_ms,
trace.tokens_per_second,
trace.is_slow,
),
)
return cursor.lastrowid
def get_recent_traces(
self, limit: int = 100, slow_only: bool = False
) -> List[RequestTrace]:
"""Get recent request traces."""
with self._get_connection() as conn:
cursor = conn.cursor()
query = """
SELECT id, request_id, timestamp, prompt_tokens, output_tokens,
queue_time_ms, prefill_time_ms, decode_time_ms, total_time_ms,
tokens_per_second, is_slow
FROM request_traces
"""
if slow_only:
query += " WHERE is_slow = 1"
query += " ORDER BY timestamp DESC LIMIT ?"
cursor.execute(query, (limit,))
return [RequestTrace.from_row(tuple(row)) for row in cursor.fetchall()]
def get_trace_stats(self) -> Dict[str, Any]:
"""Get aggregate statistics for traces."""
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT
COUNT(*) as total,
AVG(total_time_ms) as avg_latency,
AVG(queue_time_ms) as avg_queue,
AVG(prefill_time_ms) as avg_prefill,
AVG(decode_time_ms) as avg_decode,
SUM(CASE WHEN is_slow THEN 1 ELSE 0 END) as slow_count
FROM request_traces
WHERE timestamp > datetime('now', '-1 hour')
"""
)
row = cursor.fetchone()
return {
"total_requests": row["total"] or 0,
"avg_latency_ms": row["avg_latency"] or 0,
"avg_queue_ms": row["avg_queue"] or 0,
"avg_prefill_ms": row["avg_prefill"] or 0,
"avg_decode_ms": row["avg_decode"] or 0,
"slow_request_count": row["slow_count"] or 0,
}
# Load test operations
def insert_load_test(self, result: LoadTestResult) -> int:
"""Insert a load test result."""
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
INSERT INTO load_tests
(test_id, timestamp, target_endpoint, concurrent_users,
requests_per_second, duration_seconds, total_requests,
successful_requests, failed_requests, avg_latency_ms,
p50_latency_ms, p95_latency_ms, p99_latency_ms,
throughput_rps, saturation_point)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
result.test_id,
result.timestamp.isoformat(),
result.target_endpoint,
result.concurrent_users,
result.requests_per_second,
result.duration_seconds,
result.total_requests,
result.successful_requests,
result.failed_requests,
result.avg_latency_ms,
result.p50_latency_ms,
result.p95_latency_ms,
result.p99_latency_ms,
result.throughput_rps,
result.saturation_point,
),
)
return cursor.lastrowid
def get_recent_load_tests(self, limit: int = 10) -> List[Dict[str, Any]]:
"""Get recent load test results."""
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT * FROM load_tests
ORDER BY timestamp DESC
LIMIT ?
""",
(limit,),
)
return [dict(row) for row in cursor.fetchall()]
# Cleanup operations
def cleanup_old_data(self, days: int = 7) -> int:
"""Remove data older than specified days."""
cutoff = (datetime.now() - timedelta(days=days)).isoformat()
with self._get_connection() as conn:
cursor = conn.cursor()
total_deleted = 0
for table in ["metrics", "alerts", "request_traces"]:
cursor.execute(
f"DELETE FROM {table} WHERE timestamp < ?",
(cutoff,),
)
total_deleted += cursor.rowcount
return total_deleted