Spaces:
Sleeping
Sleeping
| import csv | |
| import json | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Dict, List, Any | |
| import statistics | |
| from collections import defaultdict | |
| from config import METRICS_FILE | |
| class MetricsTracker: | |
| def __init__(self): | |
| self.metrics_file = METRICS_FILE | |
| self.queries = [] | |
| self._ensure_metrics_file() | |
| def _ensure_metrics_file(self): | |
| """Create metrics file with headers if it doesn't exist.""" | |
| if not self.metrics_file.exists(): | |
| with open(self.metrics_file, 'w', newline='') as f: | |
| writer = csv.writer(f) | |
| writer.writerow([ | |
| 'timestamp', 'model', 'question_length', | |
| 'latency_ms', 'memory_mb', 'chunks_used', | |
| 'embedding_time', 'retrieval_time', 'generation_time' | |
| ]) | |
| def record_query(self, model: str, latency_ms: float, memory_mb: float, | |
| chunks_used: int, question_length: int, | |
| embedding_time: float = 0, retrieval_time: float = 0, | |
| generation_time: float = 0): | |
| """Record a query with all timing metrics.""" | |
| metric = { | |
| 'timestamp': datetime.now().isoformat(), | |
| 'model': model, | |
| 'question_length': question_length, | |
| 'latency_ms': round(latency_ms, 2), | |
| 'memory_mb': round(memory_mb, 2), | |
| 'chunks_used': chunks_used, | |
| 'embedding_time': round(embedding_time, 2), | |
| 'retrieval_time': round(retrieval_time, 2), | |
| 'generation_time': round(generation_time, 2) | |
| } | |
| self.queries.append(metric) | |
| # Append to CSV | |
| with open(self.metrics_file, 'a', newline='') as f: | |
| writer = csv.writer(f) | |
| writer.writerow([ | |
| metric['timestamp'], metric['model'], metric['question_length'], | |
| metric['latency_ms'], metric['memory_mb'], metric['chunks_used'], | |
| metric['embedding_time'], metric['retrieval_time'], metric['generation_time'] | |
| ]) | |
| def get_summary(self) -> Dict[str, Any]: | |
| """Get comprehensive metrics summary.""" | |
| if not self.queries: | |
| return {"message": "No metrics recorded yet"} | |
| naive_metrics = [q for q in self.queries if q['model'] == 'naive'] | |
| optimized_metrics = [q for q in self.queries if q['model'] == 'optimized'] | |
| def calculate_stats(metrics_list: List[Dict]) -> Dict: | |
| if not metrics_list: | |
| return {} | |
| latencies = [m['latency_ms'] for m in metrics_list] | |
| memories = [m['memory_mb'] for m in metrics_list] | |
| return { | |
| 'count': len(metrics_list), | |
| 'avg_latency': round(statistics.mean(latencies), 2), | |
| 'median_latency': round(statistics.median(latencies), 2), | |
| 'min_latency': round(min(latencies), 2), | |
| 'max_latency': round(max(latencies), 2), | |
| 'avg_memory': round(statistics.mean(memories), 2), | |
| 'avg_chunks': round(statistics.mean([m['chunks_used'] for m in metrics_list]), 2) | |
| } | |
| summary = { | |
| 'total_queries': len(self.queries), | |
| 'naive': calculate_stats(naive_metrics), | |
| 'optimized': calculate_stats(optimized_metrics), | |
| 'improvement': {} | |
| } | |
| # Calculate improvement if we have both | |
| if naive_metrics and optimized_metrics: | |
| naive_avg = summary['naive']['avg_latency'] | |
| optimized_avg = summary['optimized']['avg_latency'] | |
| if naive_avg > 0: | |
| improvement = ((naive_avg - optimized_avg) / naive_avg) * 100 | |
| summary['improvement'] = { | |
| 'latency_reduction_percent': round(improvement, 2), | |
| 'speedup_factor': round(naive_avg / optimized_avg, 2) | |
| } | |
| return summary | |
| def reset(self): | |
| """Reset in-memory metrics.""" | |
| self.queries = [] | |
| def export_json(self, output_path: Path = None): | |
| """Export metrics to JSON file.""" | |
| if output_path is None: | |
| output_path = self.metrics_file.with_suffix('.json') | |
| with open(output_path, 'w') as f: | |
| json.dump({ | |
| 'queries': self.queries, | |
| 'summary': self.get_summary(), | |
| 'exported_at': datetime.now().isoformat() | |
| }, f, indent=2) | |
| return output_path | |