""" observability.py — Tracing with Langfuse (optional) and local JSONL logging. If Langfuse keys are not set, the system degrades gracefully to local file logging only. No errors, no crashes — just silent fallback. Langfuse free tier: 50,000 events/month at cloud.langfuse.com. To enable: add LANGFUSE_PUBLIC_KEY and LANGFUSE_SECRET_KEY to HF Space secrets. """ import json import logging from datetime import datetime from pathlib import Path from src.utils import get_env logger = logging.getLogger("enterprise-rag.observability") _langfuse_client = None _langfuse_enabled = False _langfuse_checked = False def _get_langfuse(): """ Lazily initialize Langfuse on first trace call. Returns None silently if not configured. """ global _langfuse_client, _langfuse_enabled, _langfuse_checked if _langfuse_checked: return _langfuse_client if _langfuse_enabled else None _langfuse_checked = True public_key = get_env("LANGFUSE_PUBLIC_KEY", "") secret_key = get_env("LANGFUSE_SECRET_KEY", "") host = get_env("LANGFUSE_HOST", "https://cloud.langfuse.com") if not public_key or not secret_key: logger.info("Langfuse not configured — using local logging only") return None try: from langfuse import Langfuse _langfuse_client = Langfuse( public_key=public_key, secret_key=secret_key, host=host, ) _langfuse_enabled = True logger.info(f"Langfuse connected: {host}") return _langfuse_client except Exception as e: logger.warning(f"Langfuse init failed: {e}") return None def trace_rag_query( query: str, answer: str, retrieved_chunks: list, retrieval_scores: list, eval_scores: dict, retrieval_latency_ms: float, generation_latency_ms: float, prompt_tokens: int, response_tokens: int, model_used: str, fallback_used: bool, session_id: str = "default", ): """ Record a full RAG query trace. Writes to local JSONL file always. Sends to Langfuse if configured. """ trace_data = { "timestamp": datetime.utcnow().isoformat(), "session_id": session_id, "query": query, "answer_preview": answer[:200] if answer else "", "chunks_retrieved": len(retrieved_chunks), "top_score": retrieval_scores[0] if retrieval_scores else 0, "retrieval_latency_ms": retrieval_latency_ms, "generation_latency_ms": generation_latency_ms, "total_latency_ms": retrieval_latency_ms + generation_latency_ms, "prompt_tokens": prompt_tokens, "response_tokens": response_tokens, "total_tokens": prompt_tokens + response_tokens, "model_used": model_used, "fallback_used": fallback_used, "eval_faithfulness": eval_scores.get("faithfulness", 0), "eval_relevance": eval_scores.get("answer_relevance", 0), "eval_precision": eval_scores.get("context_precision", 0), "eval_overall": eval_scores.get("overall", 0), } _write_local_log(trace_data) lf = _get_langfuse() if lf: _send_to_langfuse(lf, trace_data, query, answer, retrieved_chunks) return trace_data def _write_local_log(trace_data: dict): """Append trace record to local JSONL file.""" try: log_dir = Path("logs") log_dir.mkdir(exist_ok=True) with open(log_dir / "rag_traces.jsonl", "a") as f: f.write(json.dumps(trace_data) + "\n") except Exception as e: logger.warning(f"Local log write failed: {e}") def _send_to_langfuse(lf, trace_data: dict, query: str, answer: str, chunks: list): """Send structured trace to Langfuse cloud.""" try: trace = lf.trace( name="rag-query", input={"query": query}, output={"answer": answer}, metadata={ "total_latency_ms": trace_data["total_latency_ms"], "total_tokens": trace_data["total_tokens"], "eval_overall": trace_data["eval_overall"], "fallback_used": trace_data["fallback_used"], }, ) trace.span( name="retrieval", input={"query": query}, output={"top_score": trace_data["top_score"]}, metadata={"latency_ms": trace_data["retrieval_latency_ms"]}, ) trace.span( name="generation", input={"chunks_count": len(chunks)}, output={"tokens": trace_data["response_tokens"]}, metadata={ "latency_ms": trace_data["generation_latency_ms"], "model": trace_data["model_used"], }, ) lf.flush() except Exception as e: logger.warning(f"Langfuse trace send failed: {e}") def get_observability_status() -> str: """Return human-readable observability status for the UI panel.""" lf = _get_langfuse() if lf: host = get_env("LANGFUSE_HOST", "cloud.langfuse.com") return ( f"✅ Langfuse connected\n" f"Host: {host}\n" f"📁 Local log: logs/rag_traces.jsonl" ) return ( "📁 Local logging active\n" "File: logs/rag_traces.jsonl\n\n" "To enable Langfuse: add LANGFUSE_PUBLIC_KEY\n" "and LANGFUSE_SECRET_KEY in Space secrets." )