Spaces:
Paused
Paused
File size: 7,673 Bytes
aea7b14 467f294 1d838ea 467f294 fce1762 467f294 aea7b14 a1cc875 31c3d36 7f148b3 31c3d36 aea7b14 467f294 7f148b3 31c3d36 467f294 7f148b3 31c3d36 467f294 31c3d36 7f148b3 31c3d36 7f148b3 31c3d36 a1cc875 7f148b3 31c3d36 467f294 aea7b14 1d838ea a1cc875 1d838ea 7cb148b a1cc875 7cb148b e9feba3 7cb148b e9feba3 7cb148b a1cc875 7cb148b a1cc875 7cb148b a1cc875 7cb148b a1cc875 7cb148b a1cc875 7cb148b a1cc875 7cb148b a1cc875 7cb148b a1cc875 7cb148b a1cc875 7cb148b a1cc875 1d838ea a1cc875 7cb148b 1d838ea 10dabd5 7cb148b aea7b14 7cb148b a1cc875 7cb148b a1cc875 7cb148b a1cc875 7cb148b a1cc875 1d838ea fce1762 aea7b14 7cb148b a1cc875 2fd66c8 7cb148b aea7b14 7f148b3 1d838ea fce1762 1d838ea 7f148b3 31c3d36 467f294 31c3d36 7f148b3 31c3d36 1d838ea | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 | import logging
import json
from logging.handlers import RotatingFileHandler
from pathlib import Path
from datetime import datetime
from typing import Optional, Union
from .error_handler import mask_credential
from .utils.paths import get_logs_dir
class JsonFormatter(logging.Formatter):
"""Custom JSON formatter for structured logs."""
def format(self, record):
# The message is already a dict, so we just format it as a JSON string
return json.dumps(record.msg)
# Module-level state for lazy initialization
_failure_logger: Optional[logging.Logger] = None
_configured_logs_dir: Optional[Path] = None
def configure_failure_logger(logs_dir: Optional[Union[Path, str]] = None) -> None:
"""
Configure the failure logger to use a specific logs directory.
Call this before first use if you want to override the default location.
If not called, the logger will use get_logs_dir() on first use.
Args:
logs_dir: Path to the logs directory. If None, uses get_logs_dir().
"""
global _configured_logs_dir, _failure_logger
_configured_logs_dir = Path(logs_dir) if logs_dir else None
# Reset logger so it gets reconfigured on next use
_failure_logger = None
def _setup_failure_logger(logs_dir: Path) -> logging.Logger:
"""
Sets up a dedicated JSON logger for writing detailed failure logs to a file.
Args:
logs_dir: Path to the logs directory.
Returns:
Configured logger instance.
"""
logger = logging.getLogger("failure_logger")
logger.setLevel(logging.INFO)
logger.propagate = False
# Clear existing handlers to prevent duplicates on re-setup
logger.handlers.clear()
try:
logs_dir.mkdir(parents=True, exist_ok=True)
handler = RotatingFileHandler(
logs_dir / "failures.log",
maxBytes=5 * 1024 * 1024, # 5 MB
backupCount=2,
)
handler.setFormatter(JsonFormatter())
logger.addHandler(handler)
except (OSError, PermissionError, IOError) as e:
logging.warning(f"Cannot create failure log file handler: {e}")
# Add NullHandler to prevent "no handlers" warning
logger.addHandler(logging.NullHandler())
return logger
def get_failure_logger() -> logging.Logger:
"""
Get the failure logger, initializing it lazily if needed.
Returns:
The configured failure logger.
"""
global _failure_logger, _configured_logs_dir
if _failure_logger is None:
logs_dir = _configured_logs_dir if _configured_logs_dir else get_logs_dir()
_failure_logger = _setup_failure_logger(logs_dir)
return _failure_logger
# Get the main library logger for concise, propagated messages
main_lib_logger = logging.getLogger("rotator_library")
def _extract_response_body(error: Exception) -> str:
"""
Extract the full response body from various error types.
Handles:
- StreamedAPIError: wraps original exception in .data attribute
- httpx.HTTPStatusError: response.text or response.content
- litellm exceptions: various response attributes
- Other exceptions: str(error)
"""
# Handle StreamedAPIError which wraps the original exception in .data
# This is used by our streaming wrapper when catching provider errors
if hasattr(error, "data") and error.data is not None:
inner = error.data
# If data is a dict (parsed JSON error), return it as JSON
if isinstance(inner, dict):
try:
return json.dumps(inner, indent=2)
except Exception:
return str(inner)
# If data is an exception, recurse to extract from it
if isinstance(inner, Exception):
result = _extract_response_body(inner)
if result:
return result
# Try to get response body from httpx errors
if hasattr(error, "response") and error.response is not None:
response = error.response
# Try .text first (decoded)
if hasattr(response, "text") and response.text:
return response.text
# Try .content (bytes)
if hasattr(response, "content") and response.content:
try:
return response.content.decode("utf-8", errors="replace")
except Exception:
return str(response.content)
# Check for litellm's body attribute
if hasattr(error, "body") and error.body:
return str(error.body)
# Check for message attribute that might contain response
if hasattr(error, "message") and error.message:
return str(error.message)
return None
def log_failure(
api_key: str,
model: str,
attempt: int,
error: Exception,
request_headers: dict,
raw_response_text: str = None,
):
"""
Logs a detailed failure message to a file and a concise summary to the main logger.
Args:
api_key: The API key or credential path that was used
model: The model that was requested
attempt: The attempt number (1-based)
error: The exception that occurred
request_headers: Headers from the original request
raw_response_text: Optional pre-extracted response body (e.g., from streaming)
"""
# 1. Log the full, detailed error to the dedicated failures.log file
# Prioritize the explicitly passed raw response text, as it may contain
# reassembled data from a stream that is not available on the exception object.
raw_response = raw_response_text
if not raw_response:
raw_response = _extract_response_body(error)
# Get full error message (not truncated)
full_error_message = str(error)
# Also capture any nested/wrapped exception info
error_chain = []
visited = set() # Track visited exceptions to detect circular references
current_error = error
while current_error:
# Check for circular references
error_id = id(current_error)
if error_id in visited:
break
visited.add(error_id)
error_chain.append(
{
"type": type(current_error).__name__,
"message": str(current_error)[:2000], # Limit per-error message size
}
)
current_error = getattr(current_error, "__cause__", None) or getattr(
current_error, "__context__", None
)
if len(error_chain) > 5: # Prevent excessive chain length
break
detailed_log_data = {
"timestamp": datetime.utcnow().isoformat(),
"api_key_ending": mask_credential(api_key),
"model": model,
"attempt_number": attempt,
"error_type": type(error).__name__,
"error_message": full_error_message[:5000], # Limit total size
"raw_response": raw_response[:10000]
if raw_response
else None, # Limit response size
"request_headers": request_headers,
"error_chain": error_chain if len(error_chain) > 1 else None,
}
# 2. Log a concise summary to the main library logger, which will propagate
summary_message = (
f"API call failed for model {model} with key {mask_credential(api_key)}. "
f"Error: {type(error).__name__}. See failures.log for details."
)
# Log to failure logger with resilience - if it fails, just continue
try:
get_failure_logger().error(detailed_log_data)
except (OSError, IOError) as e:
# Log file write failed - log to console instead
logging.warning(f"Failed to write to failures.log: {e}")
# Console log always succeeds
main_lib_logger.error(summary_message)
|