Ariyan-Pro's picture
Deploy RAG Latency Optimization v1.0
04ab625
import csv
import json
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Any
import statistics
from collections import defaultdict
from config import METRICS_FILE
class MetricsTracker:
def __init__(self):
self.metrics_file = METRICS_FILE
self.queries = []
self._ensure_metrics_file()
def _ensure_metrics_file(self):
"""Create metrics file with headers if it doesn't exist."""
if not self.metrics_file.exists():
with open(self.metrics_file, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow([
'timestamp', 'model', 'question_length',
'latency_ms', 'memory_mb', 'chunks_used',
'embedding_time', 'retrieval_time', 'generation_time'
])
def record_query(self, model: str, latency_ms: float, memory_mb: float,
chunks_used: int, question_length: int,
embedding_time: float = 0, retrieval_time: float = 0,
generation_time: float = 0):
"""Record a query with all timing metrics."""
metric = {
'timestamp': datetime.now().isoformat(),
'model': model,
'question_length': question_length,
'latency_ms': round(latency_ms, 2),
'memory_mb': round(memory_mb, 2),
'chunks_used': chunks_used,
'embedding_time': round(embedding_time, 2),
'retrieval_time': round(retrieval_time, 2),
'generation_time': round(generation_time, 2)
}
self.queries.append(metric)
# Append to CSV
with open(self.metrics_file, 'a', newline='') as f:
writer = csv.writer(f)
writer.writerow([
metric['timestamp'], metric['model'], metric['question_length'],
metric['latency_ms'], metric['memory_mb'], metric['chunks_used'],
metric['embedding_time'], metric['retrieval_time'], metric['generation_time']
])
def get_summary(self) -> Dict[str, Any]:
"""Get comprehensive metrics summary."""
if not self.queries:
return {"message": "No metrics recorded yet"}
naive_metrics = [q for q in self.queries if q['model'] == 'naive']
optimized_metrics = [q for q in self.queries if q['model'] == 'optimized']
def calculate_stats(metrics_list: List[Dict]) -> Dict:
if not metrics_list:
return {}
latencies = [m['latency_ms'] for m in metrics_list]
memories = [m['memory_mb'] for m in metrics_list]
return {
'count': len(metrics_list),
'avg_latency': round(statistics.mean(latencies), 2),
'median_latency': round(statistics.median(latencies), 2),
'min_latency': round(min(latencies), 2),
'max_latency': round(max(latencies), 2),
'avg_memory': round(statistics.mean(memories), 2),
'avg_chunks': round(statistics.mean([m['chunks_used'] for m in metrics_list]), 2)
}
summary = {
'total_queries': len(self.queries),
'naive': calculate_stats(naive_metrics),
'optimized': calculate_stats(optimized_metrics),
'improvement': {}
}
# Calculate improvement if we have both
if naive_metrics and optimized_metrics:
naive_avg = summary['naive']['avg_latency']
optimized_avg = summary['optimized']['avg_latency']
if naive_avg > 0:
improvement = ((naive_avg - optimized_avg) / naive_avg) * 100
summary['improvement'] = {
'latency_reduction_percent': round(improvement, 2),
'speedup_factor': round(naive_avg / optimized_avg, 2)
}
return summary
def reset(self):
"""Reset in-memory metrics."""
self.queries = []
def export_json(self, output_path: Path = None):
"""Export metrics to JSON file."""
if output_path is None:
output_path = self.metrics_file.with_suffix('.json')
with open(output_path, 'w') as f:
json.dump({
'queries': self.queries,
'summary': self.get_summary(),
'exported_at': datetime.now().isoformat()
}, f, indent=2)
return output_path