import csv import json from datetime import datetime from pathlib import Path from typing import Dict, List, Any import statistics from collections import defaultdict from config import METRICS_FILE class MetricsTracker: def __init__(self): self.metrics_file = METRICS_FILE self.queries = [] self._ensure_metrics_file() def _ensure_metrics_file(self): """Create metrics file with headers if it doesn't exist.""" if not self.metrics_file.exists(): with open(self.metrics_file, 'w', newline='') as f: writer = csv.writer(f) writer.writerow([ 'timestamp', 'model', 'question_length', 'latency_ms', 'memory_mb', 'chunks_used', 'embedding_time', 'retrieval_time', 'generation_time' ]) def record_query(self, model: str, latency_ms: float, memory_mb: float, chunks_used: int, question_length: int, embedding_time: float = 0, retrieval_time: float = 0, generation_time: float = 0): """Record a query with all timing metrics.""" metric = { 'timestamp': datetime.now().isoformat(), 'model': model, 'question_length': question_length, 'latency_ms': round(latency_ms, 2), 'memory_mb': round(memory_mb, 2), 'chunks_used': chunks_used, 'embedding_time': round(embedding_time, 2), 'retrieval_time': round(retrieval_time, 2), 'generation_time': round(generation_time, 2) } self.queries.append(metric) # Append to CSV with open(self.metrics_file, 'a', newline='') as f: writer = csv.writer(f) writer.writerow([ metric['timestamp'], metric['model'], metric['question_length'], metric['latency_ms'], metric['memory_mb'], metric['chunks_used'], metric['embedding_time'], metric['retrieval_time'], metric['generation_time'] ]) def get_summary(self) -> Dict[str, Any]: """Get comprehensive metrics summary.""" if not self.queries: return {"message": "No metrics recorded yet"} naive_metrics = [q for q in self.queries if q['model'] == 'naive'] optimized_metrics = [q for q in self.queries if q['model'] == 'optimized'] def calculate_stats(metrics_list: List[Dict]) -> Dict: if not metrics_list: return {} latencies = [m['latency_ms'] for m in metrics_list] memories = [m['memory_mb'] for m in metrics_list] return { 'count': len(metrics_list), 'avg_latency': round(statistics.mean(latencies), 2), 'median_latency': round(statistics.median(latencies), 2), 'min_latency': round(min(latencies), 2), 'max_latency': round(max(latencies), 2), 'avg_memory': round(statistics.mean(memories), 2), 'avg_chunks': round(statistics.mean([m['chunks_used'] for m in metrics_list]), 2) } summary = { 'total_queries': len(self.queries), 'naive': calculate_stats(naive_metrics), 'optimized': calculate_stats(optimized_metrics), 'improvement': {} } # Calculate improvement if we have both if naive_metrics and optimized_metrics: naive_avg = summary['naive']['avg_latency'] optimized_avg = summary['optimized']['avg_latency'] if naive_avg > 0: improvement = ((naive_avg - optimized_avg) / naive_avg) * 100 summary['improvement'] = { 'latency_reduction_percent': round(improvement, 2), 'speedup_factor': round(naive_avg / optimized_avg, 2) } return summary def reset(self): """Reset in-memory metrics.""" self.queries = [] def export_json(self, output_path: Path = None): """Export metrics to JSON file.""" if output_path is None: output_path = self.metrics_file.with_suffix('.json') with open(output_path, 'w') as f: json.dump({ 'queries': self.queries, 'summary': self.get_summary(), 'exported_at': datetime.now().isoformat() }, f, indent=2) return output_path