enterprise-rag-system / src /observability.py
Faraz618's picture
Update src/observability.py
fe14354 verified
Raw
History Blame Contribute Delete
5.41 kB
"""
observability.py — Tracing with Langfuse (optional) and local JSONL logging.
If Langfuse keys are not set, the system degrades gracefully to local
file logging only. No errors, no crashes — just silent fallback.
Langfuse free tier: 50,000 events/month at cloud.langfuse.com.
To enable: add LANGFUSE_PUBLIC_KEY and LANGFUSE_SECRET_KEY to HF Space secrets.
"""
import json
import logging
from datetime import datetime
from pathlib import Path
from src.utils import get_env
logger = logging.getLogger("enterprise-rag.observability")
_langfuse_client = None
_langfuse_enabled = False
_langfuse_checked = False
def _get_langfuse():
"""
Lazily initialize Langfuse on first trace call.
Returns None silently if not configured.
"""
global _langfuse_client, _langfuse_enabled, _langfuse_checked
if _langfuse_checked:
return _langfuse_client if _langfuse_enabled else None
_langfuse_checked = True
public_key = get_env("LANGFUSE_PUBLIC_KEY", "")
secret_key = get_env("LANGFUSE_SECRET_KEY", "")
host = get_env("LANGFUSE_HOST", "https://cloud.langfuse.com")
if not public_key or not secret_key:
logger.info("Langfuse not configured — using local logging only")
return None
try:
from langfuse import Langfuse
_langfuse_client = Langfuse(
public_key=public_key,
secret_key=secret_key,
host=host,
)
_langfuse_enabled = True
logger.info(f"Langfuse connected: {host}")
return _langfuse_client
except Exception as e:
logger.warning(f"Langfuse init failed: {e}")
return None
def trace_rag_query(
query: str,
answer: str,
retrieved_chunks: list,
retrieval_scores: list,
eval_scores: dict,
retrieval_latency_ms: float,
generation_latency_ms: float,
prompt_tokens: int,
response_tokens: int,
model_used: str,
fallback_used: bool,
session_id: str = "default",
):
"""
Record a full RAG query trace.
Writes to local JSONL file always. Sends to Langfuse if configured.
"""
trace_data = {
"timestamp": datetime.utcnow().isoformat(),
"session_id": session_id,
"query": query,
"answer_preview": answer[:200] if answer else "",
"chunks_retrieved": len(retrieved_chunks),
"top_score": retrieval_scores[0] if retrieval_scores else 0,
"retrieval_latency_ms": retrieval_latency_ms,
"generation_latency_ms": generation_latency_ms,
"total_latency_ms": retrieval_latency_ms + generation_latency_ms,
"prompt_tokens": prompt_tokens,
"response_tokens": response_tokens,
"total_tokens": prompt_tokens + response_tokens,
"model_used": model_used,
"fallback_used": fallback_used,
"eval_faithfulness": eval_scores.get("faithfulness", 0),
"eval_relevance": eval_scores.get("answer_relevance", 0),
"eval_precision": eval_scores.get("context_precision", 0),
"eval_overall": eval_scores.get("overall", 0),
}
_write_local_log(trace_data)
lf = _get_langfuse()
if lf:
_send_to_langfuse(lf, trace_data, query, answer, retrieved_chunks)
return trace_data
def _write_local_log(trace_data: dict):
"""Append trace record to local JSONL file."""
try:
log_dir = Path("logs")
log_dir.mkdir(exist_ok=True)
with open(log_dir / "rag_traces.jsonl", "a") as f:
f.write(json.dumps(trace_data) + "\n")
except Exception as e:
logger.warning(f"Local log write failed: {e}")
def _send_to_langfuse(lf, trace_data: dict, query: str, answer: str, chunks: list):
"""Send structured trace to Langfuse cloud."""
try:
trace = lf.trace(
name="rag-query",
input={"query": query},
output={"answer": answer},
metadata={
"total_latency_ms": trace_data["total_latency_ms"],
"total_tokens": trace_data["total_tokens"],
"eval_overall": trace_data["eval_overall"],
"fallback_used": trace_data["fallback_used"],
},
)
trace.span(
name="retrieval",
input={"query": query},
output={"top_score": trace_data["top_score"]},
metadata={"latency_ms": trace_data["retrieval_latency_ms"]},
)
trace.span(
name="generation",
input={"chunks_count": len(chunks)},
output={"tokens": trace_data["response_tokens"]},
metadata={
"latency_ms": trace_data["generation_latency_ms"],
"model": trace_data["model_used"],
},
)
lf.flush()
except Exception as e:
logger.warning(f"Langfuse trace send failed: {e}")
def get_observability_status() -> str:
"""Return human-readable observability status for the UI panel."""
lf = _get_langfuse()
if lf:
host = get_env("LANGFUSE_HOST", "cloud.langfuse.com")
return (
f"✅ Langfuse connected\n"
f"Host: {host}\n"
f"📁 Local log: logs/rag_traces.jsonl"
)
return (
"📁 Local logging active\n"
"File: logs/rag_traces.jsonl\n\n"
"To enable Langfuse: add LANGFUSE_PUBLIC_KEY\n"
"and LANGFUSE_SECRET_KEY in Space secrets."
)