Spaces:
Build error
Build error
| """ | |
| observability.py — Tracing with Langfuse (optional) and local JSONL logging. | |
| If Langfuse keys are not set, the system degrades gracefully to local | |
| file logging only. No errors, no crashes — just silent fallback. | |
| Langfuse free tier: 50,000 events/month at cloud.langfuse.com. | |
| To enable: add LANGFUSE_PUBLIC_KEY and LANGFUSE_SECRET_KEY to HF Space secrets. | |
| """ | |
| import json | |
| import logging | |
| from datetime import datetime | |
| from pathlib import Path | |
| from src.utils import get_env | |
| logger = logging.getLogger("enterprise-rag.observability") | |
| _langfuse_client = None | |
| _langfuse_enabled = False | |
| _langfuse_checked = False | |
| def _get_langfuse(): | |
| """ | |
| Lazily initialize Langfuse on first trace call. | |
| Returns None silently if not configured. | |
| """ | |
| global _langfuse_client, _langfuse_enabled, _langfuse_checked | |
| if _langfuse_checked: | |
| return _langfuse_client if _langfuse_enabled else None | |
| _langfuse_checked = True | |
| public_key = get_env("LANGFUSE_PUBLIC_KEY", "") | |
| secret_key = get_env("LANGFUSE_SECRET_KEY", "") | |
| host = get_env("LANGFUSE_HOST", "https://cloud.langfuse.com") | |
| if not public_key or not secret_key: | |
| logger.info("Langfuse not configured — using local logging only") | |
| return None | |
| try: | |
| from langfuse import Langfuse | |
| _langfuse_client = Langfuse( | |
| public_key=public_key, | |
| secret_key=secret_key, | |
| host=host, | |
| ) | |
| _langfuse_enabled = True | |
| logger.info(f"Langfuse connected: {host}") | |
| return _langfuse_client | |
| except Exception as e: | |
| logger.warning(f"Langfuse init failed: {e}") | |
| return None | |
| def trace_rag_query( | |
| query: str, | |
| answer: str, | |
| retrieved_chunks: list, | |
| retrieval_scores: list, | |
| eval_scores: dict, | |
| retrieval_latency_ms: float, | |
| generation_latency_ms: float, | |
| prompt_tokens: int, | |
| response_tokens: int, | |
| model_used: str, | |
| fallback_used: bool, | |
| session_id: str = "default", | |
| ): | |
| """ | |
| Record a full RAG query trace. | |
| Writes to local JSONL file always. Sends to Langfuse if configured. | |
| """ | |
| trace_data = { | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "session_id": session_id, | |
| "query": query, | |
| "answer_preview": answer[:200] if answer else "", | |
| "chunks_retrieved": len(retrieved_chunks), | |
| "top_score": retrieval_scores[0] if retrieval_scores else 0, | |
| "retrieval_latency_ms": retrieval_latency_ms, | |
| "generation_latency_ms": generation_latency_ms, | |
| "total_latency_ms": retrieval_latency_ms + generation_latency_ms, | |
| "prompt_tokens": prompt_tokens, | |
| "response_tokens": response_tokens, | |
| "total_tokens": prompt_tokens + response_tokens, | |
| "model_used": model_used, | |
| "fallback_used": fallback_used, | |
| "eval_faithfulness": eval_scores.get("faithfulness", 0), | |
| "eval_relevance": eval_scores.get("answer_relevance", 0), | |
| "eval_precision": eval_scores.get("context_precision", 0), | |
| "eval_overall": eval_scores.get("overall", 0), | |
| } | |
| _write_local_log(trace_data) | |
| lf = _get_langfuse() | |
| if lf: | |
| _send_to_langfuse(lf, trace_data, query, answer, retrieved_chunks) | |
| return trace_data | |
| def _write_local_log(trace_data: dict): | |
| """Append trace record to local JSONL file.""" | |
| try: | |
| log_dir = Path("logs") | |
| log_dir.mkdir(exist_ok=True) | |
| with open(log_dir / "rag_traces.jsonl", "a") as f: | |
| f.write(json.dumps(trace_data) + "\n") | |
| except Exception as e: | |
| logger.warning(f"Local log write failed: {e}") | |
| def _send_to_langfuse(lf, trace_data: dict, query: str, answer: str, chunks: list): | |
| """Send structured trace to Langfuse cloud.""" | |
| try: | |
| trace = lf.trace( | |
| name="rag-query", | |
| input={"query": query}, | |
| output={"answer": answer}, | |
| metadata={ | |
| "total_latency_ms": trace_data["total_latency_ms"], | |
| "total_tokens": trace_data["total_tokens"], | |
| "eval_overall": trace_data["eval_overall"], | |
| "fallback_used": trace_data["fallback_used"], | |
| }, | |
| ) | |
| trace.span( | |
| name="retrieval", | |
| input={"query": query}, | |
| output={"top_score": trace_data["top_score"]}, | |
| metadata={"latency_ms": trace_data["retrieval_latency_ms"]}, | |
| ) | |
| trace.span( | |
| name="generation", | |
| input={"chunks_count": len(chunks)}, | |
| output={"tokens": trace_data["response_tokens"]}, | |
| metadata={ | |
| "latency_ms": trace_data["generation_latency_ms"], | |
| "model": trace_data["model_used"], | |
| }, | |
| ) | |
| lf.flush() | |
| except Exception as e: | |
| logger.warning(f"Langfuse trace send failed: {e}") | |
| def get_observability_status() -> str: | |
| """Return human-readable observability status for the UI panel.""" | |
| lf = _get_langfuse() | |
| if lf: | |
| host = get_env("LANGFUSE_HOST", "cloud.langfuse.com") | |
| return ( | |
| f"✅ Langfuse connected\n" | |
| f"Host: {host}\n" | |
| f"📁 Local log: logs/rag_traces.jsonl" | |
| ) | |
| return ( | |
| "📁 Local logging active\n" | |
| "File: logs/rag_traces.jsonl\n\n" | |
| "To enable Langfuse: add LANGFUSE_PUBLIC_KEY\n" | |
| "and LANGFUSE_SECRET_KEY in Space secrets." | |
| ) |