Spaces:
Paused
Paused
| import logging | |
| import json | |
| from logging.handlers import RotatingFileHandler | |
| from pathlib import Path | |
| from datetime import datetime | |
| from typing import Optional, Union | |
| from .error_handler import mask_credential | |
| from .utils.paths import get_logs_dir | |
| class JsonFormatter(logging.Formatter): | |
| """Custom JSON formatter for structured logs.""" | |
| def format(self, record): | |
| # The message is already a dict, so we just format it as a JSON string | |
| return json.dumps(record.msg) | |
| # Module-level state for lazy initialization | |
| _failure_logger: Optional[logging.Logger] = None | |
| _configured_logs_dir: Optional[Path] = None | |
| def configure_failure_logger(logs_dir: Optional[Union[Path, str]] = None) -> None: | |
| """ | |
| Configure the failure logger to use a specific logs directory. | |
| Call this before first use if you want to override the default location. | |
| If not called, the logger will use get_logs_dir() on first use. | |
| Args: | |
| logs_dir: Path to the logs directory. If None, uses get_logs_dir(). | |
| """ | |
| global _configured_logs_dir, _failure_logger | |
| _configured_logs_dir = Path(logs_dir) if logs_dir else None | |
| # Reset logger so it gets reconfigured on next use | |
| _failure_logger = None | |
| def _setup_failure_logger(logs_dir: Path) -> logging.Logger: | |
| """ | |
| Sets up a dedicated JSON logger for writing detailed failure logs to a file. | |
| Args: | |
| logs_dir: Path to the logs directory. | |
| Returns: | |
| Configured logger instance. | |
| """ | |
| logger = logging.getLogger("failure_logger") | |
| logger.setLevel(logging.INFO) | |
| logger.propagate = False | |
| # Clear existing handlers to prevent duplicates on re-setup | |
| logger.handlers.clear() | |
| try: | |
| logs_dir.mkdir(parents=True, exist_ok=True) | |
| handler = RotatingFileHandler( | |
| logs_dir / "failures.log", | |
| maxBytes=5 * 1024 * 1024, # 5 MB | |
| backupCount=2, | |
| ) | |
| handler.setFormatter(JsonFormatter()) | |
| logger.addHandler(handler) | |
| except (OSError, PermissionError, IOError) as e: | |
| logging.warning(f"Cannot create failure log file handler: {e}") | |
| # Add NullHandler to prevent "no handlers" warning | |
| logger.addHandler(logging.NullHandler()) | |
| return logger | |
| def get_failure_logger() -> logging.Logger: | |
| """ | |
| Get the failure logger, initializing it lazily if needed. | |
| Returns: | |
| The configured failure logger. | |
| """ | |
| global _failure_logger, _configured_logs_dir | |
| if _failure_logger is None: | |
| logs_dir = _configured_logs_dir if _configured_logs_dir else get_logs_dir() | |
| _failure_logger = _setup_failure_logger(logs_dir) | |
| return _failure_logger | |
| # Get the main library logger for concise, propagated messages | |
| main_lib_logger = logging.getLogger("rotator_library") | |
| def _extract_response_body(error: Exception) -> str: | |
| """ | |
| Extract the full response body from various error types. | |
| Handles: | |
| - StreamedAPIError: wraps original exception in .data attribute | |
| - httpx.HTTPStatusError: response.text or response.content | |
| - litellm exceptions: various response attributes | |
| - Other exceptions: str(error) | |
| """ | |
| # Handle StreamedAPIError which wraps the original exception in .data | |
| # This is used by our streaming wrapper when catching provider errors | |
| if hasattr(error, "data") and error.data is not None: | |
| inner = error.data | |
| # If data is a dict (parsed JSON error), return it as JSON | |
| if isinstance(inner, dict): | |
| try: | |
| return json.dumps(inner, indent=2) | |
| except Exception: | |
| return str(inner) | |
| # If data is an exception, recurse to extract from it | |
| if isinstance(inner, Exception): | |
| result = _extract_response_body(inner) | |
| if result: | |
| return result | |
| # Try to get response body from httpx errors | |
| if hasattr(error, "response") and error.response is not None: | |
| response = error.response | |
| # Try .text first (decoded) | |
| if hasattr(response, "text") and response.text: | |
| return response.text | |
| # Try .content (bytes) | |
| if hasattr(response, "content") and response.content: | |
| try: | |
| return response.content.decode("utf-8", errors="replace") | |
| except Exception: | |
| return str(response.content) | |
| # Check for litellm's body attribute | |
| if hasattr(error, "body") and error.body: | |
| return str(error.body) | |
| # Check for message attribute that might contain response | |
| if hasattr(error, "message") and error.message: | |
| return str(error.message) | |
| return None | |
| def log_failure( | |
| api_key: str, | |
| model: str, | |
| attempt: int, | |
| error: Exception, | |
| request_headers: dict, | |
| raw_response_text: str = None, | |
| ): | |
| """ | |
| Logs a detailed failure message to a file and a concise summary to the main logger. | |
| Args: | |
| api_key: The API key or credential path that was used | |
| model: The model that was requested | |
| attempt: The attempt number (1-based) | |
| error: The exception that occurred | |
| request_headers: Headers from the original request | |
| raw_response_text: Optional pre-extracted response body (e.g., from streaming) | |
| """ | |
| # 1. Log the full, detailed error to the dedicated failures.log file | |
| # Prioritize the explicitly passed raw response text, as it may contain | |
| # reassembled data from a stream that is not available on the exception object. | |
| raw_response = raw_response_text | |
| if not raw_response: | |
| raw_response = _extract_response_body(error) | |
| # Get full error message (not truncated) | |
| full_error_message = str(error) | |
| # Also capture any nested/wrapped exception info | |
| error_chain = [] | |
| visited = set() # Track visited exceptions to detect circular references | |
| current_error = error | |
| while current_error: | |
| # Check for circular references | |
| error_id = id(current_error) | |
| if error_id in visited: | |
| break | |
| visited.add(error_id) | |
| error_chain.append( | |
| { | |
| "type": type(current_error).__name__, | |
| "message": str(current_error)[:2000], # Limit per-error message size | |
| } | |
| ) | |
| current_error = getattr(current_error, "__cause__", None) or getattr( | |
| current_error, "__context__", None | |
| ) | |
| if len(error_chain) > 5: # Prevent excessive chain length | |
| break | |
| detailed_log_data = { | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "api_key_ending": mask_credential(api_key), | |
| "model": model, | |
| "attempt_number": attempt, | |
| "error_type": type(error).__name__, | |
| "error_message": full_error_message[:5000], # Limit total size | |
| "raw_response": raw_response[:10000] | |
| if raw_response | |
| else None, # Limit response size | |
| "request_headers": request_headers, | |
| "error_chain": error_chain if len(error_chain) > 1 else None, | |
| } | |
| # 2. Log a concise summary to the main library logger, which will propagate | |
| summary_message = ( | |
| f"API call failed for model {model} with key {mask_credential(api_key)}. " | |
| f"Error: {type(error).__name__}. See failures.log for details." | |
| ) | |
| # Log to failure logger with resilience - if it fails, just continue | |
| try: | |
| get_failure_logger().error(detailed_log_data) | |
| except (OSError, IOError) as e: | |
| # Log file write failed - log to console instead | |
| logging.warning(f"Failed to write to failures.log: {e}") | |
| # Console log always succeeds | |
| main_lib_logger.error(summary_message) | |