SupportRAG / src /utils /metrics.py
Sakshamyadav15
Initial commit
ffa03b9
"""
Metrics tracking and logging utility.
Tracks query performance, escalations, and system metrics.
"""
import json
import time
from typing import Optional, Dict, Any
from datetime import datetime
from pathlib import Path
from src.config import settings
from src.models.schemas import EscalationReason
class MetricsTracker:
"""Tracks and logs application metrics."""
def __init__(self):
"""Initialize metrics tracker."""
self.start_time = time.time()
self.metrics = {
'total_queries': 0,
'escalated_queries': 0,
'total_latency_ms': 0.0,
'total_confidence': 0.0,
'queries': []
}
self.metrics_path = settings.get_metrics_file_path()
self.metrics_path.parent.mkdir(parents=True, exist_ok=True)
self._load_metrics()
def log_query(
self,
query_id: str,
question: str,
answer: str,
confidence_score: float,
escalated: bool,
latency_ms: float,
citations_count: int,
escalation_reason: Optional[EscalationReason] = None,
user_id: Optional[str] = None
):
"""
Log a query and its results.
Args:
query_id: Unique query identifier
question: User's question
answer: Generated answer
confidence_score: Confidence score
escalated: Whether query was escalated
latency_ms: Response latency in milliseconds
citations_count: Number of citations
escalation_reason: Reason for escalation if applicable
user_id: Optional user identifier
"""
entry = {
'query_id': query_id,
'question': question,
'answer': answer,
'confidence_score': confidence_score,
'escalated': escalated,
'escalation_reason': escalation_reason.value if escalation_reason else None,
'latency_ms': latency_ms,
'citations_count': citations_count,
'user_id': user_id,
'timestamp': datetime.utcnow().isoformat()
}
# Update aggregates
self.metrics['total_queries'] += 1
if escalated:
self.metrics['escalated_queries'] += 1
self.metrics['total_latency_ms'] += latency_ms
self.metrics['total_confidence'] += confidence_score
# Append query
self.metrics['queries'].append(entry)
# Save periodically (every 10 queries)
if self.metrics['total_queries'] % 10 == 0:
self._save_metrics()
def get_metrics(self) -> Dict[str, Any]:
"""
Get current metrics summary.
Returns:
Dictionary of current metrics
"""
total = self.metrics['total_queries']
if total == 0:
return {
'total_queries': 0,
'escalation_rate': 0.0,
'average_latency_ms': 0.0,
'average_confidence': 0.0,
'uptime_seconds': time.time() - self.start_time
}
return {
'total_queries': total,
'escalation_rate': (self.metrics['escalated_queries'] / total) * 100,
'average_latency_ms': self.metrics['total_latency_ms'] / total,
'average_confidence': self.metrics['total_confidence'] / total,
'uptime_seconds': time.time() - self.start_time
}
def _save_metrics(self):
"""Save metrics to disk."""
try:
with open(self.metrics_path, 'w') as f:
json.dump(self.metrics, f, indent=2)
except Exception as e:
# Don't fail if we can't save metrics
print(f"Warning: Could not save metrics: {e}")
def _load_metrics(self):
"""Load metrics from disk if available."""
try:
if self.metrics_path.exists():
with open(self.metrics_path, 'r') as f:
loaded = json.load(f)
# Only load aggregates, not all historical queries
self.metrics['total_queries'] = loaded.get('total_queries', 0)
self.metrics['escalated_queries'] = loaded.get('escalated_queries', 0)
self.metrics['total_latency_ms'] = loaded.get('total_latency_ms', 0.0)
self.metrics['total_confidence'] = loaded.get('total_confidence', 0.0)
except Exception:
# Start fresh if we can't load
pass
def reset(self):
"""Reset all metrics."""
self.metrics = {
'total_queries': 0,
'escalated_queries': 0,
'total_latency_ms': 0.0,
'total_confidence': 0.0,
'queries': []
}
self._save_metrics()