File size: 4,628 Bytes
c68dc2b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import json
import os
import time
import threading
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional
from collections import defaultdict


LOG_DIR = Path("logs")
LOG_DIR.mkdir(parents=True, exist_ok=True)

RETRIEVAL_LOG = LOG_DIR / "retrieval_details.jsonl"
LATENCY_LOG = LOG_DIR / "latency.jsonl"
METRICS_SUMMARY = LOG_DIR / "metrics_summary.json"


class PerformanceLogger:
    _instance = None
    _lock = threading.Lock()
    
    def __new__(cls):
        if cls._instance is None:
            with cls._lock:
                if cls._instance is None:
                    cls._instance = super().__new__(cls)
                    cls._instance._initialized = False
        return cls._instance
    
    def __init__(self):
        if self._initialized:
            return
        self._initialized = True
        self._metrics = defaultdict(list)
        self._last_summary_time = time.time()
        self._retrieval_count = 0
        self._total_latency = 0.0
        self._total_tokens = 0
        self._total_gen_time = 0.0
    
    def log_retrieval(
        self,
        query: str,
        results: list,
        search_time_ms: float,
        filters: Optional[dict] = None,
        intent: Optional[str] = None
    ):
        chunks = []
        for r in results:
            chunk_info = {
                "chunk_id": r.get("chunk_id", r.get("id", "unknown")),
                "score": round(r.get("score", 0.0), 4),
                "source_file": r.get("source_file", r.get("metadata", {}).get("source_file", "unknown")),
                "text_preview": (r.get("text", r.get("content", ""))[:200] if r.get("text") or r.get("content") else "")
            }
            chunks.append(chunk_info)
        
        entry = {
            "timestamp": datetime.utcnow().isoformat() + "Z",
            "query": query[:500],
            "intent": intent,
            "chunks_retrieved": len(chunks),
            "chunks": chunks,
            "search_time_ms": round(search_time_ms, 2),
            "filters": filters or {}
        }
        
        with open(RETRIEVAL_LOG, "a", encoding="utf-8") as f:
            f.write(json.dumps(entry, ensure_ascii=False) + "\n")
        
        self._retrieval_count += 1
    
    def log_latency(
        self,
        retrieval_time_ms: float,
        generation_time_ms: float,
        total_time_ms: float,
        tokens_generated: int,
        question: str
    ):
        tokens_per_sec = tokens_generated / (generation_time_ms / 1000) if generation_time_ms > 0 else 0
        
        entry = {
            "timestamp": datetime.utcnow().isoformat() + "Z",
            "retrieval_time_ms": round(retrieval_time_ms, 2),
            "generation_time_ms": round(generation_time_ms, 2),
            "total_time_ms": round(total_time_ms, 2),
            "tokens_generated": tokens_generated,
            "tokens_per_second": round(tokens_per_sec, 2),
            "question_preview": question[:100]
        }
        
        with open(LATENCY_LOG, "a", encoding="utf-8") as f:
            f.write(json.dumps(entry, ensure_ascii=False) + "\n")
        
        self._total_latency += total_time_ms
        self._total_tokens += tokens_generated
        self._total_gen_time += generation_time_ms
        
        self._maybe_save_summary()
    
    def _maybe_save_summary(self):
        now = time.time()
        if now - self._last_summary_time >= 3600:
            self.save_summary()
            self._last_summary_time = now
    
    def save_summary(self):
        avg_latency = self._total_latency / max(self._retrieval_count, 1)
        avg_tps = self._total_tokens / max(self._total_gen_time, 1) if self._total_gen_time > 0 else 0
        
        summary = {
            "timestamp": datetime.utcnow().isoformat() + "Z",
            "period_hours": 1,
            "total_queries": self._retrieval_count,
            "avg_latency_ms": round(avg_latency, 2),
            "avg_tokens_per_second": round(avg_tps, 2),
            "total_tokens_generated": self._total_tokens
        }
        
        with open(METRICS_SUMMARY, "w", encoding="utf-8") as f:
            json.dump(summary, f, ensure_ascii=False, indent=2)
        
        self._retrieval_count = 0
        self._total_latency = 0.0
        self._total_tokens = 0
        self._total_gen_time = 0.0


perf_logger = PerformanceLogger()


def log_retrieval(*args, **kwargs):
    perf_logger.log_retrieval(*args, **kwargs)


def log_latency(*args, **kwargs):
    perf_logger.log_latency(*args, **kwargs)


def save_metrics_summary():
    perf_logger.save_summary()