Spaces:
Running
Running
File size: 4,628 Bytes
c68dc2b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 | import json
import os
import time
import threading
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional
from collections import defaultdict
LOG_DIR = Path("logs")
LOG_DIR.mkdir(parents=True, exist_ok=True)
RETRIEVAL_LOG = LOG_DIR / "retrieval_details.jsonl"
LATENCY_LOG = LOG_DIR / "latency.jsonl"
METRICS_SUMMARY = LOG_DIR / "metrics_summary.json"
class PerformanceLogger:
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
return
self._initialized = True
self._metrics = defaultdict(list)
self._last_summary_time = time.time()
self._retrieval_count = 0
self._total_latency = 0.0
self._total_tokens = 0
self._total_gen_time = 0.0
def log_retrieval(
self,
query: str,
results: list,
search_time_ms: float,
filters: Optional[dict] = None,
intent: Optional[str] = None
):
chunks = []
for r in results:
chunk_info = {
"chunk_id": r.get("chunk_id", r.get("id", "unknown")),
"score": round(r.get("score", 0.0), 4),
"source_file": r.get("source_file", r.get("metadata", {}).get("source_file", "unknown")),
"text_preview": (r.get("text", r.get("content", ""))[:200] if r.get("text") or r.get("content") else "")
}
chunks.append(chunk_info)
entry = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"query": query[:500],
"intent": intent,
"chunks_retrieved": len(chunks),
"chunks": chunks,
"search_time_ms": round(search_time_ms, 2),
"filters": filters or {}
}
with open(RETRIEVAL_LOG, "a", encoding="utf-8") as f:
f.write(json.dumps(entry, ensure_ascii=False) + "\n")
self._retrieval_count += 1
def log_latency(
self,
retrieval_time_ms: float,
generation_time_ms: float,
total_time_ms: float,
tokens_generated: int,
question: str
):
tokens_per_sec = tokens_generated / (generation_time_ms / 1000) if generation_time_ms > 0 else 0
entry = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"retrieval_time_ms": round(retrieval_time_ms, 2),
"generation_time_ms": round(generation_time_ms, 2),
"total_time_ms": round(total_time_ms, 2),
"tokens_generated": tokens_generated,
"tokens_per_second": round(tokens_per_sec, 2),
"question_preview": question[:100]
}
with open(LATENCY_LOG, "a", encoding="utf-8") as f:
f.write(json.dumps(entry, ensure_ascii=False) + "\n")
self._total_latency += total_time_ms
self._total_tokens += tokens_generated
self._total_gen_time += generation_time_ms
self._maybe_save_summary()
def _maybe_save_summary(self):
now = time.time()
if now - self._last_summary_time >= 3600:
self.save_summary()
self._last_summary_time = now
def save_summary(self):
avg_latency = self._total_latency / max(self._retrieval_count, 1)
avg_tps = self._total_tokens / max(self._total_gen_time, 1) if self._total_gen_time > 0 else 0
summary = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"period_hours": 1,
"total_queries": self._retrieval_count,
"avg_latency_ms": round(avg_latency, 2),
"avg_tokens_per_second": round(avg_tps, 2),
"total_tokens_generated": self._total_tokens
}
with open(METRICS_SUMMARY, "w", encoding="utf-8") as f:
json.dump(summary, f, ensure_ascii=False, indent=2)
self._retrieval_count = 0
self._total_latency = 0.0
self._total_tokens = 0
self._total_gen_time = 0.0
perf_logger = PerformanceLogger()
def log_retrieval(*args, **kwargs):
perf_logger.log_retrieval(*args, **kwargs)
def log_latency(*args, **kwargs):
perf_logger.log_latency(*args, **kwargs)
def save_metrics_summary():
perf_logger.save_summary() |