Chatbot-RAG-v4 / evaluation /performance_logger.py
NoeMartinezSanchez
Logs completados
c68dc2b
import json
import os
import time
import threading
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional
from collections import defaultdict
LOG_DIR = Path("logs")
LOG_DIR.mkdir(parents=True, exist_ok=True)
RETRIEVAL_LOG = LOG_DIR / "retrieval_details.jsonl"
LATENCY_LOG = LOG_DIR / "latency.jsonl"
METRICS_SUMMARY = LOG_DIR / "metrics_summary.json"
class PerformanceLogger:
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self._initialized = True
self._metrics = defaultdict(list)
self._last_summary_time = time.time()
self._retrieval_count = 0
self._total_latency = 0.0
self._total_tokens = 0
self._total_gen_time = 0.0
def log_retrieval(
self,
query: str,
results: list,
search_time_ms: float,
filters: Optional[dict] = None,
intent: Optional[str] = None
):
chunks = []
for r in results:
chunk_info = {
"chunk_id": r.get("chunk_id", r.get("id", "unknown")),
"score": round(r.get("score", 0.0), 4),
"source_file": r.get("source_file", r.get("metadata", {}).get("source_file", "unknown")),
"text_preview": (r.get("text", r.get("content", ""))[:200] if r.get("text") or r.get("content") else "")
}
chunks.append(chunk_info)
entry = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"query": query[:500],
"intent": intent,
"chunks_retrieved": len(chunks),
"chunks": chunks,
"search_time_ms": round(search_time_ms, 2),
"filters": filters or {}
}
with open(RETRIEVAL_LOG, "a", encoding="utf-8") as f:
f.write(json.dumps(entry, ensure_ascii=False) + "\n")
self._retrieval_count += 1
def log_latency(
self,
retrieval_time_ms: float,
generation_time_ms: float,
total_time_ms: float,
tokens_generated: int,
question: str
):
tokens_per_sec = tokens_generated / (generation_time_ms / 1000) if generation_time_ms > 0 else 0
entry = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"retrieval_time_ms": round(retrieval_time_ms, 2),
"generation_time_ms": round(generation_time_ms, 2),
"total_time_ms": round(total_time_ms, 2),
"tokens_generated": tokens_generated,
"tokens_per_second": round(tokens_per_sec, 2),
"question_preview": question[:100]
}
with open(LATENCY_LOG, "a", encoding="utf-8") as f:
f.write(json.dumps(entry, ensure_ascii=False) + "\n")
self._total_latency += total_time_ms
self._total_tokens += tokens_generated
self._total_gen_time += generation_time_ms
self._maybe_save_summary()
def _maybe_save_summary(self):
now = time.time()
if now - self._last_summary_time >= 3600:
self.save_summary()
self._last_summary_time = now
def save_summary(self):
avg_latency = self._total_latency / max(self._retrieval_count, 1)
avg_tps = self._total_tokens / max(self._total_gen_time, 1) if self._total_gen_time > 0 else 0
summary = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"period_hours": 1,
"total_queries": self._retrieval_count,
"avg_latency_ms": round(avg_latency, 2),
"avg_tokens_per_second": round(avg_tps, 2),
"total_tokens_generated": self._total_tokens
}
with open(METRICS_SUMMARY, "w", encoding="utf-8") as f:
json.dump(summary, f, ensure_ascii=False, indent=2)
self._retrieval_count = 0
self._total_latency = 0.0
self._total_tokens = 0
self._total_gen_time = 0.0
perf_logger = PerformanceLogger()
def log_retrieval(*args, **kwargs):
perf_logger.log_retrieval(*args, **kwargs)
def log_latency(*args, **kwargs):
perf_logger.log_latency(*args, **kwargs)
def save_metrics_summary():
perf_logger.save_summary()