File size: 4,642 Bytes
04ab625
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import csv
import json
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Any
import statistics
from collections import defaultdict

from config import METRICS_FILE

class MetricsTracker:
    def __init__(self):
        self.metrics_file = METRICS_FILE
        self.queries = []
        self._ensure_metrics_file()
    
    def _ensure_metrics_file(self):
        """Create metrics file with headers if it doesn't exist."""
        if not self.metrics_file.exists():
            with open(self.metrics_file, 'w', newline='') as f:
                writer = csv.writer(f)
                writer.writerow([
                    'timestamp', 'model', 'question_length',
                    'latency_ms', 'memory_mb', 'chunks_used',
                    'embedding_time', 'retrieval_time', 'generation_time'
                ])
    
    def record_query(self, model: str, latency_ms: float, memory_mb: float,
                     chunks_used: int, question_length: int,
                     embedding_time: float = 0, retrieval_time: float = 0,
                     generation_time: float = 0):
        """Record a query with all timing metrics."""
        metric = {
            'timestamp': datetime.now().isoformat(),
            'model': model,
            'question_length': question_length,
            'latency_ms': round(latency_ms, 2),
            'memory_mb': round(memory_mb, 2),
            'chunks_used': chunks_used,
            'embedding_time': round(embedding_time, 2),
            'retrieval_time': round(retrieval_time, 2),
            'generation_time': round(generation_time, 2)
        }
        
        self.queries.append(metric)
        
        # Append to CSV
        with open(self.metrics_file, 'a', newline='') as f:
            writer = csv.writer(f)
            writer.writerow([
                metric['timestamp'], metric['model'], metric['question_length'],
                metric['latency_ms'], metric['memory_mb'], metric['chunks_used'],
                metric['embedding_time'], metric['retrieval_time'], metric['generation_time']
            ])
    
    def get_summary(self) -> Dict[str, Any]:
        """Get comprehensive metrics summary."""
        if not self.queries:
            return {"message": "No metrics recorded yet"}
        
        naive_metrics = [q for q in self.queries if q['model'] == 'naive']
        optimized_metrics = [q for q in self.queries if q['model'] == 'optimized']
        
        def calculate_stats(metrics_list: List[Dict]) -> Dict:
            if not metrics_list:
                return {}
            
            latencies = [m['latency_ms'] for m in metrics_list]
            memories = [m['memory_mb'] for m in metrics_list]
            
            return {
                'count': len(metrics_list),
                'avg_latency': round(statistics.mean(latencies), 2),
                'median_latency': round(statistics.median(latencies), 2),
                'min_latency': round(min(latencies), 2),
                'max_latency': round(max(latencies), 2),
                'avg_memory': round(statistics.mean(memories), 2),
                'avg_chunks': round(statistics.mean([m['chunks_used'] for m in metrics_list]), 2)
            }
        
        summary = {
            'total_queries': len(self.queries),
            'naive': calculate_stats(naive_metrics),
            'optimized': calculate_stats(optimized_metrics),
            'improvement': {}
        }
        
        # Calculate improvement if we have both
        if naive_metrics and optimized_metrics:
            naive_avg = summary['naive']['avg_latency']
            optimized_avg = summary['optimized']['avg_latency']
            
            if naive_avg > 0:
                improvement = ((naive_avg - optimized_avg) / naive_avg) * 100
                summary['improvement'] = {
                    'latency_reduction_percent': round(improvement, 2),
                    'speedup_factor': round(naive_avg / optimized_avg, 2)
                }
        
        return summary
    
    def reset(self):
        """Reset in-memory metrics."""
        self.queries = []
    
    def export_json(self, output_path: Path = None):
        """Export metrics to JSON file."""
        if output_path is None:
            output_path = self.metrics_file.with_suffix('.json')
        
        with open(output_path, 'w') as f:
            json.dump({
                'queries': self.queries,
                'summary': self.get_summary(),
                'exported_at': datetime.now().isoformat()
            }, f, indent=2)
        
        return output_path