Spaces:
Sleeping
Sleeping
File size: 4,642 Bytes
04ab625 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 | import csv
import json
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Any
import statistics
from collections import defaultdict
from config import METRICS_FILE
class MetricsTracker:
def __init__(self):
self.metrics_file = METRICS_FILE
self.queries = []
self._ensure_metrics_file()
def _ensure_metrics_file(self):
"""Create metrics file with headers if it doesn't exist."""
if not self.metrics_file.exists():
with open(self.metrics_file, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow([
'timestamp', 'model', 'question_length',
'latency_ms', 'memory_mb', 'chunks_used',
'embedding_time', 'retrieval_time', 'generation_time'
])
def record_query(self, model: str, latency_ms: float, memory_mb: float,
chunks_used: int, question_length: int,
embedding_time: float = 0, retrieval_time: float = 0,
generation_time: float = 0):
"""Record a query with all timing metrics."""
metric = {
'timestamp': datetime.now().isoformat(),
'model': model,
'question_length': question_length,
'latency_ms': round(latency_ms, 2),
'memory_mb': round(memory_mb, 2),
'chunks_used': chunks_used,
'embedding_time': round(embedding_time, 2),
'retrieval_time': round(retrieval_time, 2),
'generation_time': round(generation_time, 2)
}
self.queries.append(metric)
# Append to CSV
with open(self.metrics_file, 'a', newline='') as f:
writer = csv.writer(f)
writer.writerow([
metric['timestamp'], metric['model'], metric['question_length'],
metric['latency_ms'], metric['memory_mb'], metric['chunks_used'],
metric['embedding_time'], metric['retrieval_time'], metric['generation_time']
])
def get_summary(self) -> Dict[str, Any]:
"""Get comprehensive metrics summary."""
if not self.queries:
return {"message": "No metrics recorded yet"}
naive_metrics = [q for q in self.queries if q['model'] == 'naive']
optimized_metrics = [q for q in self.queries if q['model'] == 'optimized']
def calculate_stats(metrics_list: List[Dict]) -> Dict:
if not metrics_list:
return {}
latencies = [m['latency_ms'] for m in metrics_list]
memories = [m['memory_mb'] for m in metrics_list]
return {
'count': len(metrics_list),
'avg_latency': round(statistics.mean(latencies), 2),
'median_latency': round(statistics.median(latencies), 2),
'min_latency': round(min(latencies), 2),
'max_latency': round(max(latencies), 2),
'avg_memory': round(statistics.mean(memories), 2),
'avg_chunks': round(statistics.mean([m['chunks_used'] for m in metrics_list]), 2)
}
summary = {
'total_queries': len(self.queries),
'naive': calculate_stats(naive_metrics),
'optimized': calculate_stats(optimized_metrics),
'improvement': {}
}
# Calculate improvement if we have both
if naive_metrics and optimized_metrics:
naive_avg = summary['naive']['avg_latency']
optimized_avg = summary['optimized']['avg_latency']
if naive_avg > 0:
improvement = ((naive_avg - optimized_avg) / naive_avg) * 100
summary['improvement'] = {
'latency_reduction_percent': round(improvement, 2),
'speedup_factor': round(naive_avg / optimized_avg, 2)
}
return summary
def reset(self):
"""Reset in-memory metrics."""
self.queries = []
def export_json(self, output_path: Path = None):
"""Export metrics to JSON file."""
if output_path is None:
output_path = self.metrics_file.with_suffix('.json')
with open(output_path, 'w') as f:
json.dump({
'queries': self.queries,
'summary': self.get_summary(),
'exported_at': datetime.now().isoformat()
}, f, indent=2)
return output_path
|