import logging import json from logging.handlers import RotatingFileHandler from pathlib import Path from datetime import datetime from typing import Optional, Union from .error_handler import mask_credential from .utils.paths import get_logs_dir class JsonFormatter(logging.Formatter): """Custom JSON formatter for structured logs.""" def format(self, record): # The message is already a dict, so we just format it as a JSON string return json.dumps(record.msg) # Module-level state for lazy initialization _failure_logger: Optional[logging.Logger] = None _configured_logs_dir: Optional[Path] = None def configure_failure_logger(logs_dir: Optional[Union[Path, str]] = None) -> None: """ Configure the failure logger to use a specific logs directory. Call this before first use if you want to override the default location. If not called, the logger will use get_logs_dir() on first use. Args: logs_dir: Path to the logs directory. If None, uses get_logs_dir(). """ global _configured_logs_dir, _failure_logger _configured_logs_dir = Path(logs_dir) if logs_dir else None # Reset logger so it gets reconfigured on next use _failure_logger = None def _setup_failure_logger(logs_dir: Path) -> logging.Logger: """ Sets up a dedicated JSON logger for writing detailed failure logs to a file. Args: logs_dir: Path to the logs directory. Returns: Configured logger instance. """ logger = logging.getLogger("failure_logger") logger.setLevel(logging.INFO) logger.propagate = False # Clear existing handlers to prevent duplicates on re-setup logger.handlers.clear() try: logs_dir.mkdir(parents=True, exist_ok=True) handler = RotatingFileHandler( logs_dir / "failures.log", maxBytes=5 * 1024 * 1024, # 5 MB backupCount=2, ) handler.setFormatter(JsonFormatter()) logger.addHandler(handler) except (OSError, PermissionError, IOError) as e: logging.warning(f"Cannot create failure log file handler: {e}") # Add NullHandler to prevent "no handlers" warning logger.addHandler(logging.NullHandler()) return logger def get_failure_logger() -> logging.Logger: """ Get the failure logger, initializing it lazily if needed. Returns: The configured failure logger. """ global _failure_logger, _configured_logs_dir if _failure_logger is None: logs_dir = _configured_logs_dir if _configured_logs_dir else get_logs_dir() _failure_logger = _setup_failure_logger(logs_dir) return _failure_logger # Get the main library logger for concise, propagated messages main_lib_logger = logging.getLogger("rotator_library") def _extract_response_body(error: Exception) -> str: """ Extract the full response body from various error types. Handles: - StreamedAPIError: wraps original exception in .data attribute - httpx.HTTPStatusError: response.text or response.content - litellm exceptions: various response attributes - Other exceptions: str(error) """ # Handle StreamedAPIError which wraps the original exception in .data # This is used by our streaming wrapper when catching provider errors if hasattr(error, "data") and error.data is not None: inner = error.data # If data is a dict (parsed JSON error), return it as JSON if isinstance(inner, dict): try: return json.dumps(inner, indent=2) except Exception: return str(inner) # If data is an exception, recurse to extract from it if isinstance(inner, Exception): result = _extract_response_body(inner) if result: return result # Try to get response body from httpx errors if hasattr(error, "response") and error.response is not None: response = error.response # Try .text first (decoded) if hasattr(response, "text") and response.text: return response.text # Try .content (bytes) if hasattr(response, "content") and response.content: try: return response.content.decode("utf-8", errors="replace") except Exception: return str(response.content) # Check for litellm's body attribute if hasattr(error, "body") and error.body: return str(error.body) # Check for message attribute that might contain response if hasattr(error, "message") and error.message: return str(error.message) return None def log_failure( api_key: str, model: str, attempt: int, error: Exception, request_headers: dict, raw_response_text: str = None, ): """ Logs a detailed failure message to a file and a concise summary to the main logger. Args: api_key: The API key or credential path that was used model: The model that was requested attempt: The attempt number (1-based) error: The exception that occurred request_headers: Headers from the original request raw_response_text: Optional pre-extracted response body (e.g., from streaming) """ # 1. Log the full, detailed error to the dedicated failures.log file # Prioritize the explicitly passed raw response text, as it may contain # reassembled data from a stream that is not available on the exception object. raw_response = raw_response_text if not raw_response: raw_response = _extract_response_body(error) # Get full error message (not truncated) full_error_message = str(error) # Also capture any nested/wrapped exception info error_chain = [] visited = set() # Track visited exceptions to detect circular references current_error = error while current_error: # Check for circular references error_id = id(current_error) if error_id in visited: break visited.add(error_id) error_chain.append( { "type": type(current_error).__name__, "message": str(current_error)[:2000], # Limit per-error message size } ) current_error = getattr(current_error, "__cause__", None) or getattr( current_error, "__context__", None ) if len(error_chain) > 5: # Prevent excessive chain length break detailed_log_data = { "timestamp": datetime.utcnow().isoformat(), "api_key_ending": mask_credential(api_key), "model": model, "attempt_number": attempt, "error_type": type(error).__name__, "error_message": full_error_message[:5000], # Limit total size "raw_response": raw_response[:10000] if raw_response else None, # Limit response size "request_headers": request_headers, "error_chain": error_chain if len(error_chain) > 1 else None, } # 2. Log a concise summary to the main library logger, which will propagate summary_message = ( f"API call failed for model {model} with key {mask_credential(api_key)}. " f"Error: {type(error).__name__}. See failures.log for details." ) # Log to failure logger with resilience - if it fails, just continue try: get_failure_logger().error(detailed_log_data) except (OSError, IOError) as e: # Log file write failed - log to console instead logging.warning(f"Failed to write to failures.log: {e}") # Console log always succeeds main_lib_logger.error(summary_message)