import json import os import time import threading from datetime import datetime, timedelta from pathlib import Path from typing import Optional from collections import defaultdict LOG_DIR = Path("logs") LOG_DIR.mkdir(parents=True, exist_ok=True) RETRIEVAL_LOG = LOG_DIR / "retrieval_details.jsonl" LATENCY_LOG = LOG_DIR / "latency.jsonl" METRICS_SUMMARY = LOG_DIR / "metrics_summary.json" class PerformanceLogger: _instance = None _lock = threading.Lock() def __new__(cls): if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._initialized = False return cls._instance def __init__(self): if self._initialized: return self._initialized = True self._metrics = defaultdict(list) self._last_summary_time = time.time() self._retrieval_count = 0 self._total_latency = 0.0 self._total_tokens = 0 self._total_gen_time = 0.0 def log_retrieval( self, query: str, results: list, search_time_ms: float, filters: Optional[dict] = None, intent: Optional[str] = None ): chunks = [] for r in results: chunk_info = { "chunk_id": r.get("chunk_id", r.get("id", "unknown")), "score": round(r.get("score", 0.0), 4), "source_file": r.get("source_file", r.get("metadata", {}).get("source_file", "unknown")), "text_preview": (r.get("text", r.get("content", ""))[:200] if r.get("text") or r.get("content") else "") } chunks.append(chunk_info) entry = { "timestamp": datetime.utcnow().isoformat() + "Z", "query": query[:500], "intent": intent, "chunks_retrieved": len(chunks), "chunks": chunks, "search_time_ms": round(search_time_ms, 2), "filters": filters or {} } with open(RETRIEVAL_LOG, "a", encoding="utf-8") as f: f.write(json.dumps(entry, ensure_ascii=False) + "\n") self._retrieval_count += 1 def log_latency( self, retrieval_time_ms: float, generation_time_ms: float, total_time_ms: float, tokens_generated: int, question: str ): tokens_per_sec = tokens_generated / (generation_time_ms / 1000) if generation_time_ms > 0 else 0 entry = { "timestamp": datetime.utcnow().isoformat() + "Z", "retrieval_time_ms": round(retrieval_time_ms, 2), "generation_time_ms": round(generation_time_ms, 2), "total_time_ms": round(total_time_ms, 2), "tokens_generated": tokens_generated, "tokens_per_second": round(tokens_per_sec, 2), "question_preview": question[:100] } with open(LATENCY_LOG, "a", encoding="utf-8") as f: f.write(json.dumps(entry, ensure_ascii=False) + "\n") self._total_latency += total_time_ms self._total_tokens += tokens_generated self._total_gen_time += generation_time_ms self._maybe_save_summary() def _maybe_save_summary(self): now = time.time() if now - self._last_summary_time >= 3600: self.save_summary() self._last_summary_time = now def save_summary(self): avg_latency = self._total_latency / max(self._retrieval_count, 1) avg_tps = self._total_tokens / max(self._total_gen_time, 1) if self._total_gen_time > 0 else 0 summary = { "timestamp": datetime.utcnow().isoformat() + "Z", "period_hours": 1, "total_queries": self._retrieval_count, "avg_latency_ms": round(avg_latency, 2), "avg_tokens_per_second": round(avg_tps, 2), "total_tokens_generated": self._total_tokens } with open(METRICS_SUMMARY, "w", encoding="utf-8") as f: json.dump(summary, f, ensure_ascii=False, indent=2) self._retrieval_count = 0 self._total_latency = 0.0 self._total_tokens = 0 self._total_gen_time = 0.0 perf_logger = PerformanceLogger() def log_retrieval(*args, **kwargs): perf_logger.log_retrieval(*args, **kwargs) def log_latency(*args, **kwargs): perf_logger.log_latency(*args, **kwargs) def save_metrics_summary(): perf_logger.save_summary()