""" utils.py — Shared utilities for the Enterprise RAG System. """ import os import time import logging from typing import Optional logging.basicConfig( level=logging.INFO, format="%(asctime)s | %(levelname)s | %(name)s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) logger = logging.getLogger("enterprise-rag") def get_env(key: str, default: Optional[str] = None) -> Optional[str]: """Fetch env variable with optional default. Returns None if not set and no default.""" return os.environ.get(key, default) def get_required_env(key: str) -> str: """Fetch a required env variable. Raises clearly if missing.""" value = os.environ.get(key) if not value: raise EnvironmentError( f"Required environment variable '{key}' is missing. " f"Add it to your HF Space secrets or .env file." ) return value def count_tokens_estimate(text: str) -> int: """ Estimate token count without any external tokenizer. Rule of thumb: 1 token ~ 4 characters in English. Accurate to within 10-15% — good enough for dashboard display. """ if not text: return 0 return max(1, len(text) // 4) class Timer: """Context manager for latency measurement in milliseconds.""" def __enter__(self): self.start = time.perf_counter() self.elapsed_ms = 0 return self def __exit__(self, *args): self.elapsed_ms = (time.perf_counter() - self.start) * 1000 @property def elapsed_s(self) -> float: return self.elapsed_ms / 1000 def truncate_text(text: str, max_chars: int = 400) -> str: """Truncate text for UI display without breaking mid-word.""" if len(text) <= max_chars: return text return text[:max_chars].rsplit(" ", 1)[0] + "..." def format_retrieved_chunks(chunks: list, scores: list) -> str: """ Format retrieved chunks for display in the Gradio right panel. Shows chunk preview and similarity score so users can verify the AI answer is grounded in the actual document. """ if not chunks: return "No relevant chunks retrieved." output_parts = [] for i, (chunk, score) in enumerate(zip(chunks, scores), 1): preview = truncate_text(chunk, 350) filled = int(score * 10) bar = "█" * filled + "░" * (10 - filled) output_parts.append( f"**Chunk {i}** — Similarity: `{score:.4f}` {bar}\n\n" f"{preview}\n\n" f"{'─' * 40}" ) return "\n\n".join(output_parts) def safe_divide(a: float, b: float, default: float = 0.0) -> float: """Avoid ZeroDivisionError in metric calculations.""" return a / b if b != 0 else default