Spaces:
Sleeping
Sleeping
| """ | |
| Observability utilities for LLM monitoring and evaluation. | |
| Provides centralized logging, tracing, and metrics tracking using LangSmith and W&B. | |
| """ | |
| import os | |
| import time | |
| import functools | |
| from typing import Dict, Any, Optional, Callable | |
| from datetime import datetime | |
| import logging | |
| # Import configuration | |
| from src.utils.config import ( | |
| LANGCHAIN_TRACING_V2, | |
| LANGCHAIN_API_KEY, | |
| LANGCHAIN_PROJECT, | |
| WANDB_API_KEY, | |
| WANDB_PROJECT, | |
| WANDB_ENTITY, | |
| WANDB_MODE | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class ObservabilityManager: | |
| """ | |
| Centralized manager for observability tools (LangSmith + W&B). | |
| Handles initialization, tracing, and metrics logging. | |
| """ | |
| def __init__(self): | |
| """Initialize observability tools.""" | |
| self.langsmith_enabled = False | |
| self.wandb_enabled = False | |
| self.wandb_run = None | |
| # Initialize LangSmith | |
| self._init_langsmith() | |
| # Initialize Weights & Biases | |
| self._init_wandb() | |
| def _init_langsmith(self): | |
| """Initialize LangSmith tracing.""" | |
| if LANGCHAIN_TRACING_V2 and LANGCHAIN_API_KEY: | |
| try: | |
| # Set environment variables for automatic LangChain tracing | |
| os.environ["LANGCHAIN_TRACING_V2"] = "true" | |
| os.environ["LANGCHAIN_API_KEY"] = LANGCHAIN_API_KEY | |
| os.environ["LANGCHAIN_PROJECT"] = LANGCHAIN_PROJECT | |
| self.langsmith_enabled = True | |
| logger.info(f"✅ LangSmith tracing enabled for project: {LANGCHAIN_PROJECT}") | |
| except Exception as e: | |
| logger.warning(f"⚠️ Failed to initialize LangSmith: {e}") | |
| self.langsmith_enabled = False | |
| else: | |
| logger.info("ℹ️ LangSmith tracing disabled (set LANGCHAIN_TRACING_V2=true and LANGCHAIN_API_KEY to enable)") | |
| def _init_wandb(self): | |
| """Initialize Weights & Biases.""" | |
| if WANDB_API_KEY and WANDB_MODE != "disabled": | |
| try: | |
| import wandb | |
| # Initialize W&B run | |
| self.wandb_run = wandb.init( | |
| project=WANDB_PROJECT, | |
| entity=WANDB_ENTITY, | |
| mode=WANDB_MODE, | |
| config={ | |
| "framework": "langchain", | |
| "application": "ai-learning-path-generator" | |
| }, | |
| # Don't reinitialize if already running | |
| reinit=False, | |
| resume="allow" | |
| ) | |
| self.wandb_enabled = True | |
| logger.info(f"✅ W&B tracking enabled for project: {WANDB_PROJECT}") | |
| except ImportError: | |
| logger.warning("⚠️ wandb package not installed. Run: pip install wandb") | |
| self.wandb_enabled = False | |
| except Exception as e: | |
| logger.warning(f"⚠️ Failed to initialize W&B: {e}") | |
| self.wandb_enabled = False | |
| else: | |
| logger.info("ℹ️ W&B tracking disabled (set WANDB_API_KEY to enable)") | |
| def log_llm_call( | |
| self, | |
| prompt: str, | |
| response: str, | |
| model: str, | |
| metadata: Optional[Dict[str, Any]] = None, | |
| latency_ms: Optional[float] = None, | |
| token_count: Optional[int] = None, | |
| cost: Optional[float] = None | |
| ): | |
| """ | |
| Log an LLM call to W&B Prompts. | |
| Args: | |
| prompt: The input prompt sent to the LLM | |
| response: The LLM's response | |
| model: Model name (e.g., 'gpt-4o-mini') | |
| metadata: Additional metadata (user_id, topic, etc.) | |
| latency_ms: Response time in milliseconds | |
| token_count: Total tokens used | |
| cost: Estimated cost in USD | |
| """ | |
| if not self.wandb_enabled: | |
| return | |
| try: | |
| import wandb | |
| # Log to W&B Prompts table | |
| prompts_table = wandb.Table( | |
| columns=[ | |
| "timestamp", "model", "prompt", "response", | |
| "latency_ms", "token_count", "cost", "metadata" | |
| ] | |
| ) | |
| prompts_table.add_data( | |
| datetime.utcnow().isoformat(), | |
| model, | |
| prompt[:500], # Truncate for display | |
| response[:500], | |
| latency_ms, | |
| token_count, | |
| cost, | |
| str(metadata or {}) | |
| ) | |
| wandb.log({"llm_calls": prompts_table}) | |
| # Also log metrics separately for easier aggregation | |
| if latency_ms: | |
| wandb.log({"llm_latency_ms": latency_ms}) | |
| if token_count: | |
| wandb.log({"llm_tokens": token_count}) | |
| if cost: | |
| wandb.log({"llm_cost_usd": cost}) | |
| except Exception as e: | |
| logger.warning(f"Failed to log LLM call to W&B: {e}") | |
| def log_metric(self, name: str, value: float, metadata: Optional[Dict] = None): | |
| """ | |
| Log a custom metric to W&B. | |
| Args: | |
| name: Metric name (e.g., 'path_generation_success') | |
| value: Metric value | |
| metadata: Additional context | |
| """ | |
| if not self.wandb_enabled: | |
| return | |
| try: | |
| import wandb | |
| log_data = {name: value} | |
| if metadata: | |
| # Flatten metadata into the log | |
| for key, val in metadata.items(): | |
| log_data[f"{name}_{key}"] = val | |
| wandb.log(log_data) | |
| except Exception as e: | |
| logger.warning(f"Failed to log metric to W&B: {e}") | |
| def log_event(self, event_name: str, properties: Optional[Dict] = None): | |
| """ | |
| Log a custom event to W&B. | |
| Args: | |
| event_name: Name of the event (e.g., 'path_generated', 'validation_failed') | |
| properties: Event properties | |
| """ | |
| if not self.wandb_enabled: | |
| return | |
| try: | |
| import wandb | |
| wandb.log({ | |
| "event": event_name, | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "properties": properties or {} | |
| }) | |
| except Exception as e: | |
| logger.warning(f"Failed to log event to W&B: {e}") | |
| def finish(self): | |
| """Clean up and finish W&B run.""" | |
| if self.wandb_enabled and self.wandb_run: | |
| try: | |
| import wandb | |
| wandb.finish() | |
| logger.info("✅ W&B run finished") | |
| except Exception as e: | |
| logger.warning(f"Failed to finish W&B run: {e}") | |
| # Global observability manager instance | |
| _observability_manager = None | |
| def get_observability_manager() -> ObservabilityManager: | |
| """Get or create the global observability manager instance.""" | |
| global _observability_manager | |
| if _observability_manager is None: | |
| _observability_manager = ObservabilityManager() | |
| return _observability_manager | |
| def traceable( | |
| name: Optional[str] = None, | |
| metadata: Optional[Dict[str, Any]] = None, | |
| log_to_wandb: bool = True | |
| ): | |
| """ | |
| Decorator to trace function execution with LangSmith and log to W&B. | |
| Usage: | |
| @traceable(name="generate_learning_path", metadata={"version": "v2"}) | |
| def generate_path(topic: str) -> dict: | |
| ... | |
| Args: | |
| name: Custom name for the trace (defaults to function name) | |
| metadata: Additional metadata to attach to the trace | |
| log_to_wandb: Whether to also log execution metrics to W&B | |
| """ | |
| def decorator(func: Callable) -> Callable: | |
| def wrapper(*args, **kwargs): | |
| obs_manager = get_observability_manager() | |
| trace_name = name or func.__name__ | |
| # Start timing | |
| start_time = time.time() | |
| success = False | |
| error = None | |
| result = None | |
| try: | |
| # Execute the function | |
| result = func(*args, **kwargs) | |
| success = True | |
| return result | |
| except Exception as e: | |
| error = str(e) | |
| raise | |
| finally: | |
| # Calculate latency | |
| latency_ms = (time.time() - start_time) * 1000 | |
| # Log to W&B if enabled | |
| if log_to_wandb and obs_manager.wandb_enabled: | |
| obs_manager.log_metric( | |
| f"{trace_name}_latency_ms", | |
| latency_ms, | |
| metadata={ | |
| "success": success, | |
| "error": error, | |
| **(metadata or {}) | |
| } | |
| ) | |
| obs_manager.log_metric( | |
| f"{trace_name}_success", | |
| 1.0 if success else 0.0 | |
| ) | |
| return wrapper | |
| return decorator | |
| def estimate_cost(model: str, input_tokens: int, output_tokens: int) -> float: | |
| """ | |
| Estimate the cost of an LLM call based on token usage. | |
| Args: | |
| model: Model name | |
| input_tokens: Number of input tokens | |
| output_tokens: Number of output tokens | |
| Returns: | |
| Estimated cost in USD | |
| """ | |
| # Pricing per 1M tokens (as of 2024) | |
| pricing = { | |
| "gpt-4o-mini": {"input": 0.15, "output": 0.60}, | |
| "gpt-4o": {"input": 5.00, "output": 15.00}, | |
| "gpt-3.5-turbo": {"input": 0.50, "output": 1.50}, | |
| "gpt-4": {"input": 30.00, "output": 60.00}, | |
| } | |
| # Default to gpt-4o-mini pricing if model not found | |
| model_pricing = pricing.get(model, pricing["gpt-4o-mini"]) | |
| input_cost = (input_tokens / 1_000_000) * model_pricing["input"] | |
| output_cost = (output_tokens / 1_000_000) * model_pricing["output"] | |
| return input_cost + output_cost | |