Mirrowel commited on
Commit
4caa52c
·
1 Parent(s): 15c96ab

feat(logging): centralize and enhance application logging

Browse files

Overhaul the application's logging infrastructure for better control, clarity, and operational visibility.

- Centralize logging configuration in `main.py` with file and console handlers.
- Add `src/proxy_app/provider_urls.py` for dynamic provider endpoint resolution, enhancing request context.
- Introduce `log_request_to_console` to provide real-time, concise summaries of incoming API requests.
- Migrate `RotatingClient` and other modules from `print()` statements to Python's `logging` module.
- Ensure `RotatingClient` logs propagate correctly and adjust internal log levels for clearer diagnostics.
- Suppress verbose output from external libraries (uvicorn, httpx, litellm) for focused logs.
- Improve error logging in `request_logger.py` to use `logging.error`.

src/proxy_app/main.py CHANGED
@@ -16,6 +16,7 @@ from pydantic import BaseModel
16
  import argparse
17
  import litellm
18
 
 
19
  # --- Pydantic Models ---
20
  class EmbeddingRequest(BaseModel):
21
  model: str
@@ -36,12 +37,34 @@ args, _ = parser.parse_known_args()
36
  sys.path.append(str(Path(__file__).resolve().parent.parent))
37
 
38
  from rotator_library import RotatingClient, PROVIDER_PLUGINS
39
- from proxy_app.request_logger import log_request_response
40
  from proxy_app.batch_manager import EmbeddingBatcher
41
 
42
- # Configure logging
43
- logging.basicConfig(level=logging.INFO)
44
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
45
  # Load environment variables from .env file
46
  load_dotenv()
47
 
@@ -70,16 +93,17 @@ if not api_keys:
70
  @asynccontextmanager
71
  async def lifespan(app: FastAPI):
72
  """Manage the RotatingClient's lifecycle with the app's lifespan."""
 
73
  client = RotatingClient(api_keys=api_keys, configure_logging=True)
74
  app.state.rotating_client = client
75
 
76
  if USE_EMBEDDING_BATCHER:
77
  batcher = EmbeddingBatcher(client=client)
78
  app.state.embedding_batcher = batcher
79
- print("RotatingClient and EmbeddingBatcher initialized.")
80
  else:
81
  app.state.embedding_batcher = None
82
- print("RotatingClient initialized (EmbeddingBatcher disabled).")
83
 
84
  yield
85
 
@@ -88,9 +112,9 @@ async def lifespan(app: FastAPI):
88
  await client.close()
89
 
90
  if app.state.embedding_batcher:
91
- print("RotatingClient and EmbeddingBatcher closed.")
92
  else:
93
- print("RotatingClient closed.")
94
 
95
  # --- FastAPI App Setup ---
96
  app = FastAPI(lifespan=lifespan)
@@ -261,6 +285,12 @@ async def chat_completions(
261
  """
262
  try:
263
  request_data = await request.json()
 
 
 
 
 
 
264
  is_streaming = request_data.get("stream", False)
265
 
266
  if is_streaming:
@@ -323,6 +353,13 @@ async def embeddings(
323
  - False: Passes requests directly to the provider.
324
  """
325
  try:
 
 
 
 
 
 
 
326
  if USE_EMBEDDING_BATCHER and batcher:
327
  # --- Server-Side Batching Logic ---
328
  request_data = body.model_dump(exclude_none=True)
 
16
  import argparse
17
  import litellm
18
 
19
+
20
  # --- Pydantic Models ---
21
  class EmbeddingRequest(BaseModel):
22
  model: str
 
37
  sys.path.append(str(Path(__file__).resolve().parent.parent))
38
 
39
  from rotator_library import RotatingClient, PROVIDER_PLUGINS
40
+ from proxy_app.request_logger import log_request_response, log_request_to_console
41
  from proxy_app.batch_manager import EmbeddingBatcher
42
 
43
+ # --- Logging Configuration ---
44
+ LOG_DIR = Path(__file__).resolve().parent.parent / "logs"
45
+ LOG_DIR.mkdir(exist_ok=True)
46
+
47
+ # Configure a file handler for detailed debug logs
48
+ # Configure a file handler for detailed debug logs
49
+ file_handler = logging.FileHandler(LOG_DIR / "proxy.log", encoding="utf-8")
50
+ file_handler.setLevel(logging.INFO)
51
+ file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
52
+
53
+ # Configure a console handler for concise, high-level info
54
+ console_handler = logging.StreamHandler(sys.stdout)
55
+ console_handler.setLevel(logging.INFO)
56
+ console_handler.setFormatter(logging.Formatter('%(message)s'))
57
+
58
+ # Get the root logger and add the handlers
59
+ root_logger = logging.getLogger()
60
+ root_logger.setLevel(logging.INFO) # Set root to INFO
61
+ root_logger.addHandler(file_handler)
62
+ root_logger.addHandler(console_handler)
63
+
64
+ # Silence other noisy loggers by setting their level higher than root
65
+ logging.getLogger("uvicorn").setLevel(logging.WARNING)
66
+ logging.getLogger("httpx").setLevel(logging.WARNING)
67
+ logging.getLogger("litellm").setLevel(logging.WARNING)
68
  # Load environment variables from .env file
69
  load_dotenv()
70
 
 
93
  @asynccontextmanager
94
  async def lifespan(app: FastAPI):
95
  """Manage the RotatingClient's lifecycle with the app's lifespan."""
96
+ # The client now uses the root logger configuration
97
  client = RotatingClient(api_keys=api_keys, configure_logging=True)
98
  app.state.rotating_client = client
99
 
100
  if USE_EMBEDDING_BATCHER:
101
  batcher = EmbeddingBatcher(client=client)
102
  app.state.embedding_batcher = batcher
103
+ logging.info("RotatingClient and EmbeddingBatcher initialized.")
104
  else:
105
  app.state.embedding_batcher = None
106
+ logging.info("RotatingClient initialized (EmbeddingBatcher disabled).")
107
 
108
  yield
109
 
 
112
  await client.close()
113
 
114
  if app.state.embedding_batcher:
115
+ logging.info("RotatingClient and EmbeddingBatcher closed.")
116
  else:
117
+ logging.info("RotatingClient closed.")
118
 
119
  # --- FastAPI App Setup ---
120
  app = FastAPI(lifespan=lifespan)
 
285
  """
286
  try:
287
  request_data = await request.json()
288
+ log_request_to_console(
289
+ url=str(request.url),
290
+ headers=dict(request.headers),
291
+ client_info=(request.client.host, request.client.port),
292
+ request_data=request_data
293
+ )
294
  is_streaming = request_data.get("stream", False)
295
 
296
  if is_streaming:
 
353
  - False: Passes requests directly to the provider.
354
  """
355
  try:
356
+ request_data = body.model_dump(exclude_none=True)
357
+ log_request_to_console(
358
+ url=str(request.url),
359
+ headers=dict(request.headers),
360
+ client_info=(request.client.host, request.client.port),
361
+ request_data=request_data
362
+ )
363
  if USE_EMBEDDING_BATCHER and batcher:
364
  # --- Server-Side Batching Logic ---
365
  request_data = body.model_dump(exclude_none=True)
src/proxy_app/provider_urls.py ADDED
@@ -0,0 +1,65 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import Optional
2
+
3
+ # A comprehensive map of provider names to their base URLs.
4
+ PROVIDER_URL_MAP = {
5
+ "perplexity": "https://api.perplexity.ai",
6
+ "anyscale": "https://api.endpoints.anyscale.com/v1",
7
+ "deepinfra": "https://api.deepinfra.com/v1/openai",
8
+ "mistral": "https://api.mistral.ai/v1",
9
+ "groq": "https://api.groq.com/openai/v1",
10
+ "nvidia_nim": "https://integrate.api.nvidia.com/v1",
11
+ "cerebras": "https://api.cerebras.ai/v1",
12
+ "sambanova": "https://api.sambanova.ai/v1",
13
+ "ai21_chat": "https://api.ai21.com/studio/v1",
14
+ "codestral": "https://codestral.mistral.ai/v1",
15
+ "text-completion-codestral": "https://codestral.mistral.ai/v1",
16
+ "empower": "https://app.empower.dev/api/v1",
17
+ "deepseek": "https://api.deepseek.com/v1",
18
+ "friendliai": "https://api.friendli.ai/serverless/v1",
19
+ "galadriel": "https://api.galadriel.com/v1",
20
+ "meta_llama": "https://api.llama.com/compat/v1",
21
+ "featherless_ai": "https://api.featherless.ai/v1",
22
+ "nscale": "https://api.nscale.com/v1",
23
+ "openai": "https://api.openai.com/v1",
24
+ "gemini": "https://generativelanguage.googleapis.com/v1beta",
25
+ "anthropic": "https://api.anthropic.com/v1",
26
+ "cohere": "https://api.cohere.ai/v1",
27
+ "bedrock": "https://bedrock-runtime.us-east-1.amazonaws.com",
28
+ "openrouter": "https://openrouter.ai/api/v1",
29
+ }
30
+
31
+ def get_provider_endpoint(provider: str, model_name: str, incoming_path: str) -> Optional[str]:
32
+ """
33
+ Constructs the full provider endpoint URL based on the provider and incoming request path.
34
+ """
35
+ base_url = PROVIDER_URL_MAP.get(provider)
36
+ if not base_url:
37
+ return None
38
+
39
+ # Determine the specific action from the incoming path (e.g., 'chat/completions')
40
+ action = incoming_path.split('/v1/', 1)[-1] if '/v1/' in incoming_path else incoming_path
41
+
42
+ # --- Provider-specific endpoint structures ---
43
+ if provider == "gemini":
44
+ if action == "chat/completions":
45
+ return f"{base_url}/models/{model_name}:generateContent"
46
+ elif action == "embeddings":
47
+ return f"{base_url}/models/{model_name}:embedContent"
48
+
49
+ elif provider == "anthropic":
50
+ if action == "chat/completions":
51
+ return f"{base_url}/messages"
52
+
53
+ elif provider == "cohere":
54
+ if action == "chat/completions":
55
+ return f"{base_url}/chat"
56
+ elif action == "embeddings":
57
+ return f"{base_url}/embed"
58
+
59
+ # Default for OpenAI-compatible providers
60
+ # Most of these have /v1 in the base URL already, so we just append the action.
61
+ if base_url.endswith(("/v1", "/v1/openai")):
62
+ return f"{base_url}/{action}"
63
+
64
+ # Fallback for other cases
65
+ return f"{base_url}/v1/{action}"
src/proxy_app/request_logger.py CHANGED
@@ -3,7 +3,10 @@ import os
3
  from datetime import datetime
4
  from pathlib import Path
5
  import uuid
6
- from typing import Literal
 
 
 
7
 
8
  LOGS_DIR = Path(__file__).resolve().parent.parent.parent / "logs"
9
  COMPLETIONS_LOGS_DIR = LOGS_DIR / "completions"
@@ -14,6 +17,27 @@ LOGS_DIR.mkdir(exist_ok=True)
14
  COMPLETIONS_LOGS_DIR.mkdir(exist_ok=True)
15
  EMBEDDINGS_LOGS_DIR.mkdir(exist_ok=True)
16
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
17
  def log_request_response(
18
  request_data: dict,
19
  response_data: dict,
@@ -48,4 +72,5 @@ def log_request_response(
48
 
49
  except Exception as e:
50
  # In case of logging failure, we don't want to crash the main application
51
- print(f"Error logging request/response: {e}")
 
 
3
  from datetime import datetime
4
  from pathlib import Path
5
  import uuid
6
+ from typing import Literal, Dict
7
+ import logging
8
+
9
+ from .provider_urls import get_provider_endpoint
10
 
11
  LOGS_DIR = Path(__file__).resolve().parent.parent.parent / "logs"
12
  COMPLETIONS_LOGS_DIR = LOGS_DIR / "completions"
 
17
  COMPLETIONS_LOGS_DIR.mkdir(exist_ok=True)
18
  EMBEDDINGS_LOGS_DIR.mkdir(exist_ok=True)
19
 
20
+ def log_request_to_console(url: str, headers: dict, client_info: tuple, request_data: dict):
21
+ """
22
+ Logs a concise, single-line summary of an incoming request to the console.
23
+ """
24
+ time_str = datetime.now().strftime("%H:%M")
25
+ model_full = request_data.get("model", "N/A")
26
+
27
+ provider = "N/A"
28
+ model_name = model_full
29
+ endpoint_url = "N/A"
30
+
31
+ if '/' in model_full:
32
+ parts = model_full.split('/', 1)
33
+ provider = parts[0]
34
+ model_name = parts[1]
35
+ # Use the helper function to get the full endpoint URL
36
+ endpoint_url = get_provider_endpoint(provider, model_name, url) or "N/A"
37
+
38
+ log_message = f"{time_str} - {client_info[0]}:{client_info[1]} - provider: {provider}, model: {model_name} - {endpoint_url}"
39
+ logging.info(log_message)
40
+
41
  def log_request_response(
42
  request_data: dict,
43
  response_data: dict,
 
72
 
73
  except Exception as e:
74
  # In case of logging failure, we don't want to crash the main application
75
+ # Use the root logger to log the error to the file.
76
+ logging.error(f"Error logging request/response to file: {e}")
src/rotator_library/client.py CHANGED
@@ -10,11 +10,11 @@ import logging
10
  from typing import List, Dict, Any, AsyncGenerator, Optional, Union
11
 
12
  lib_logger = logging.getLogger('rotator_library')
 
 
 
13
  lib_logger.propagate = False
14
 
15
- if not lib_logger.handlers:
16
- lib_logger.addHandler(logging.NullHandler())
17
-
18
  from .usage_manager import UsageManager
19
  from .failure_logger import log_failure
20
  from .error_handler import classify_error, AllProviders
@@ -34,13 +34,17 @@ class RotatingClient:
34
  with support for both streaming and non-streaming responses.
35
  """
36
  def __init__(self, api_keys: Dict[str, List[str]], max_retries: int = 2, usage_file_path: str = "key_usage.json", configure_logging: bool = True):
37
- os.environ["LITELLM_LOG"] = "ERROR"
38
- litellm.set_verbose = False
39
- litellm.drop_params = True
40
  if configure_logging:
 
 
41
  lib_logger.propagate = True
42
- if any(isinstance(h, logging.NullHandler) for h in lib_logger.handlers):
43
- lib_logger.handlers = [h for h in lib_logger.handlers if not isinstance(h, logging.NullHandler)]
 
 
 
 
 
44
  if not api_keys:
45
  raise ValueError("API keys dictionary cannot be empty.")
46
  self.api_keys = api_keys
@@ -103,7 +107,7 @@ class RotatingClient:
103
  try:
104
  while True:
105
  if request and await request.is_disconnected():
106
- lib_logger.warning(f"Client disconnected. Aborting stream for key ...{key[-4:]}.")
107
  # Do not yield [DONE] because the client is gone.
108
  # The 'finally' block will handle key release.
109
  break
@@ -111,7 +115,7 @@ class RotatingClient:
111
  try:
112
  chunk = await stream_iterator.__anext__()
113
  if json_buffer:
114
- lib_logger.warning(f"Discarding incomplete JSON buffer: {json_buffer}")
115
  json_buffer = ""
116
 
117
  yield f"data: {json.dumps(chunk.dict())}\n\n"
@@ -123,7 +127,7 @@ class RotatingClient:
123
  except StopAsyncIteration:
124
  stream_completed = True
125
  if json_buffer:
126
- lib_logger.warning(f"Stream ended with incomplete data in buffer: {json_buffer}")
127
  break
128
 
129
  except Exception as e:
@@ -132,7 +136,7 @@ class RotatingClient:
132
  json_buffer += raw_chunk
133
  parsed_data = json.loads(json_buffer)
134
 
135
- lib_logger.info(f"Successfully reassembled JSON from buffer: {json_buffer}")
136
 
137
  if "error" in parsed_data:
138
  lib_logger.warning(f"Reassembled object is an API error. Passing it to the client and raising internally.")
@@ -144,7 +148,7 @@ class RotatingClient:
144
 
145
  json_buffer = ""
146
  except json.JSONDecodeError:
147
- lib_logger.info(f"Buffer still incomplete. Waiting for more chunks: {json_buffer}")
148
  continue
149
  except StreamedAPIError:
150
  # Re-raise to be caught by the outer handler
@@ -240,17 +244,15 @@ class RotatingClient:
240
  log_failure(api_key=current_key, model=model, attempt=attempt + 1, error=e, request_data=kwargs)
241
  classified_error = classify_error(e)
242
  error_message = str(e).split('\n')[0]
243
- print(f"Key ...{current_key[-4:]} failed with {classified_error.error_type} (Status: {classified_error.status_code}). Error: {error_message}. Rotating key.")
244
 
245
  if classified_error.status_code == 429:
246
  cooldown_duration = classified_error.retry_after or 60
247
  await self.cooldown_manager.start_cooldown(provider, cooldown_duration)
248
- print(f"IP-based rate limit detected for {provider}. Starting a {cooldown_duration}-second global cooldown.")
249
- lib_logger.error(f"IP-based rate limit detected for {provider}. Starting a {cooldown_duration}-second global cooldown.")
250
 
251
  await self.usage_manager.record_failure(current_key, model, classified_error)
252
- print(f"Key ...{current_key[-4:]} encountered a rate limit. Trying next key.")
253
- lib_logger.warning(f"Key ...{current_key[-4:]} encountered a rate limit. Trying next key.")
254
  break # Move to the next key
255
 
256
  except (APIConnectionError, litellm.InternalServerError, litellm.ServiceUnavailableError) as e:
@@ -261,14 +263,12 @@ class RotatingClient:
261
 
262
  if attempt >= self.max_retries - 1:
263
  error_message = str(e).split('\n')[0]
264
- print(f"Key ...{current_key[-4:]} failed after {self.max_retries} retries with {classified_error.error_type} (Status: {classified_error.status_code}). Error: {error_message}. Rotating key.")
265
- lib_logger.warning(f"Key ...{current_key[-4:]} failed after {self.max_retries} retries for a server-side error. Trying next key.")
266
  break # Move to the next key
267
 
268
  wait_time = classified_error.retry_after or (1 * (2 ** attempt)) + random.uniform(0, 1)
269
  error_message = str(e).split('\n')[0]
270
- print(f"Key ...{current_key[-4:]} failed with {classified_error.error_type} (Status: {classified_error.status_code}). Error: {error_message}. Retrying in {wait_time:.2f} seconds.")
271
- lib_logger.info(f"Server-side error with key ...{current_key[-4:]}. Retrying in {wait_time:.2f} seconds.")
272
  await asyncio.sleep(wait_time)
273
  continue # Retry with the same key
274
 
@@ -282,12 +282,11 @@ class RotatingClient:
282
 
283
  classified_error = classify_error(e)
284
  error_message = str(e).split('\n')[0]
285
- print(f"Key ...{current_key[-4:]} failed with {classified_error.error_type} (Status: {classified_error.status_code}). Error: {error_message}. Rotating key.")
286
  if classified_error.status_code == 429:
287
  cooldown_duration = classified_error.retry_after or 60
288
  await self.cooldown_manager.start_cooldown(provider, cooldown_duration)
289
- print(f"IP-based rate limit detected for {provider} from generic exception. Starting a {cooldown_duration}-second global cooldown.")
290
- lib_logger.error(f"IP-based rate limit detected for {provider} from generic exception. Starting a {cooldown_duration}-second global cooldown.")
291
 
292
  if classified_error.error_type in ['invalid_request', 'context_window_exceeded', 'authentication']:
293
  # For these errors, we should not retry with other keys.
@@ -366,17 +365,15 @@ class RotatingClient:
366
  log_failure(api_key=current_key, model=model, attempt=attempt + 1, error=e, request_data=kwargs)
367
  classified_error = classify_error(e)
368
  error_message = str(e).split('\n')[0]
369
- print(f"Key ...{current_key[-4:]} failed with {classified_error.error_type} (Status: {classified_error.status_code}). Error: {error_message}. Rotating key.")
370
 
371
  if classified_error.error_type == 'rate_limit' and classified_error.status_code == 429:
372
  cooldown_duration = classified_error.retry_after or 60
373
  await self.cooldown_manager.start_cooldown(provider, cooldown_duration)
374
- print(f"IP-based rate limit detected for {provider}. Starting a {cooldown_duration}-second global cooldown.")
375
- lib_logger.error(f"IP-based rate limit detected for {provider}. Starting a {cooldown_duration}-second global cooldown.")
376
 
377
  await self.usage_manager.record_failure(current_key, model, classified_error)
378
- print(f"Key ...{current_key[-4:]} failed during stream initiation. Trying next key.")
379
- lib_logger.warning(f"Key ...{current_key[-4:]} failed during stream initiation. Trying next key.")
380
  break # Break inner loop to try next key
381
 
382
  except (APIConnectionError, litellm.InternalServerError, litellm.ServiceUnavailableError) as e:
@@ -387,14 +384,12 @@ class RotatingClient:
387
 
388
  if attempt >= self.max_retries - 1:
389
  error_message = str(e).split('\n')[0]
390
- print(f"Key ...{current_key[-4:]} failed after {self.max_retries} retries with {classified_error.error_type} (Status: {classified_error.status_code}). Error: {error_message}. Rotating key.")
391
- lib_logger.warning(f"Key ...{current_key[-4:]} failed after {self.max_retries} retries for a server-side error. Trying next key.")
392
  break # Move to the next key
393
 
394
  wait_time = classified_error.retry_after or (1 * (2 ** attempt)) + random.uniform(0, 1)
395
  error_message = str(e).split('\n')[0]
396
- print(f"Key ...{current_key[-4:]} failed with {classified_error.error_type} (Status: {classified_error.status_code}). Error: {error_message}. Retrying in {wait_time:.2f} seconds.")
397
- lib_logger.info(f"Server-side error with key ...{current_key[-4:]}. Retrying in {wait_time:.2f} seconds.")
398
  await asyncio.sleep(wait_time)
399
  continue # Retry with the same key
400
 
@@ -403,13 +398,12 @@ class RotatingClient:
403
  log_failure(api_key=current_key, model=model, attempt=attempt + 1, error=e, request_data=kwargs)
404
  classified_error = classify_error(e)
405
  error_message = str(e).split('\n')[0]
406
- print(f"Key ...{current_key[-4:]} failed with {classified_error.error_type} (Status: {classified_error.status_code}). Error: {error_message}. Rotating key.")
407
 
408
  if classified_error.status_code == 429:
409
  cooldown_duration = classified_error.retry_after or 60
410
  await self.cooldown_manager.start_cooldown(provider, cooldown_duration)
411
- print(f"IP-based rate limit detected for {provider} from generic stream exception. Starting a {cooldown_duration}-second global cooldown.")
412
- lib_logger.error(f"IP-based rate limit detected for {provider} from generic stream exception. Starting a {cooldown_duration}-second global cooldown.")
413
 
414
  if classified_error.error_type in ['invalid_request', 'context_window_exceeded', 'authentication']:
415
  raise last_exception # Do not retry for these errors
@@ -463,9 +457,9 @@ class RotatingClient:
463
 
464
  async def get_available_models(self, provider: str) -> List[str]:
465
  """Returns a list of available models for a specific provider, with caching."""
466
- lib_logger.info(f"Getting available models for provider: {provider}")
467
  if provider in self._model_list_cache:
468
- lib_logger.info(f"Returning cached models for provider: {provider}")
469
  return self._model_list_cache[provider]
470
 
471
  keys_for_provider = self.api_keys.get(provider)
@@ -481,9 +475,9 @@ class RotatingClient:
481
  if provider_instance:
482
  for api_key in shuffled_keys:
483
  try:
484
- lib_logger.info(f"Attempting to get models for {provider} with key ...{api_key[-4:]}")
485
  models = await provider_instance.get_models(api_key, self.http_client)
486
- lib_logger.info(f"Got {len(models)} models for provider: {provider}")
487
  self._model_list_cache[provider] = models
488
  return models
489
  except Exception as e:
 
10
  from typing import List, Dict, Any, AsyncGenerator, Optional, Union
11
 
12
  lib_logger = logging.getLogger('rotator_library')
13
+ # Ensure the logger is configured to propagate to the root logger
14
+ # which is set up in main.py. This allows the main app to control
15
+ # log levels and handlers centrally.
16
  lib_logger.propagate = False
17
 
 
 
 
18
  from .usage_manager import UsageManager
19
  from .failure_logger import log_failure
20
  from .error_handler import classify_error, AllProviders
 
34
  with support for both streaming and non-streaming responses.
35
  """
36
  def __init__(self, api_keys: Dict[str, List[str]], max_retries: int = 2, usage_file_path: str = "key_usage.json", configure_logging: bool = True):
 
 
 
37
  if configure_logging:
38
+ # When True, this allows logs from this library to be handled
39
+ # by the parent application's logging configuration.
40
  lib_logger.propagate = True
41
+ # Remove any default handlers to prevent duplicate logging
42
+ if lib_logger.hasHandlers():
43
+ lib_logger.handlers.clear()
44
+ lib_logger.addHandler(logging.NullHandler())
45
+ else:
46
+ lib_logger.propagate = False
47
+
48
  if not api_keys:
49
  raise ValueError("API keys dictionary cannot be empty.")
50
  self.api_keys = api_keys
 
107
  try:
108
  while True:
109
  if request and await request.is_disconnected():
110
+ lib_logger.info(f"Client disconnected. Aborting stream for key ...{key[-4:]}.")
111
  # Do not yield [DONE] because the client is gone.
112
  # The 'finally' block will handle key release.
113
  break
 
115
  try:
116
  chunk = await stream_iterator.__anext__()
117
  if json_buffer:
118
+ lib_logger.debug(f"Discarding incomplete JSON buffer: {json_buffer}")
119
  json_buffer = ""
120
 
121
  yield f"data: {json.dumps(chunk.dict())}\n\n"
 
127
  except StopAsyncIteration:
128
  stream_completed = True
129
  if json_buffer:
130
+ lib_logger.debug(f"Stream ended with incomplete data in buffer: {json_buffer}")
131
  break
132
 
133
  except Exception as e:
 
136
  json_buffer += raw_chunk
137
  parsed_data = json.loads(json_buffer)
138
 
139
+ lib_logger.debug(f"Successfully reassembled JSON from buffer: {json_buffer}")
140
 
141
  if "error" in parsed_data:
142
  lib_logger.warning(f"Reassembled object is an API error. Passing it to the client and raising internally.")
 
148
 
149
  json_buffer = ""
150
  except json.JSONDecodeError:
151
+ lib_logger.debug(f"Buffer still incomplete. Waiting for more chunks: {json_buffer}")
152
  continue
153
  except StreamedAPIError:
154
  # Re-raise to be caught by the outer handler
 
244
  log_failure(api_key=current_key, model=model, attempt=attempt + 1, error=e, request_data=kwargs)
245
  classified_error = classify_error(e)
246
  error_message = str(e).split('\n')[0]
247
+ lib_logger.warning(f"Key ...{current_key[-4:]} failed with {classified_error.error_type} (Status: {classified_error.status_code}). Error: {error_message}. Rotating key.")
248
 
249
  if classified_error.status_code == 429:
250
  cooldown_duration = classified_error.retry_after or 60
251
  await self.cooldown_manager.start_cooldown(provider, cooldown_duration)
252
+ lib_logger.warning(f"IP-based rate limit detected for {provider}. Starting a {cooldown_duration}-second global cooldown.")
 
253
 
254
  await self.usage_manager.record_failure(current_key, model, classified_error)
255
+ lib_logger.info(f"Key ...{current_key[-4:]} encountered a rate limit. Trying next key.")
 
256
  break # Move to the next key
257
 
258
  except (APIConnectionError, litellm.InternalServerError, litellm.ServiceUnavailableError) as e:
 
263
 
264
  if attempt >= self.max_retries - 1:
265
  error_message = str(e).split('\n')[0]
266
+ lib_logger.warning(f"Key ...{current_key[-4:]} failed after {self.max_retries} retries with {classified_error.error_type} (Status: {classified_error.status_code}). Error: {error_message}. Rotating key.")
 
267
  break # Move to the next key
268
 
269
  wait_time = classified_error.retry_after or (1 * (2 ** attempt)) + random.uniform(0, 1)
270
  error_message = str(e).split('\n')[0]
271
+ lib_logger.warning(f"Key ...{current_key[-4:]} failed with {classified_error.error_type} (Status: {classified_error.status_code}). Error: {error_message}. Retrying in {wait_time:.2f} seconds.")
 
272
  await asyncio.sleep(wait_time)
273
  continue # Retry with the same key
274
 
 
282
 
283
  classified_error = classify_error(e)
284
  error_message = str(e).split('\n')[0]
285
+ lib_logger.warning(f"Key ...{current_key[-4:]} failed with {classified_error.error_type} (Status: {classified_error.status_code}). Error: {error_message}. Rotating key.")
286
  if classified_error.status_code == 429:
287
  cooldown_duration = classified_error.retry_after or 60
288
  await self.cooldown_manager.start_cooldown(provider, cooldown_duration)
289
+ lib_logger.warning(f"IP-based rate limit detected for {provider} from generic exception. Starting a {cooldown_duration}-second global cooldown.")
 
290
 
291
  if classified_error.error_type in ['invalid_request', 'context_window_exceeded', 'authentication']:
292
  # For these errors, we should not retry with other keys.
 
365
  log_failure(api_key=current_key, model=model, attempt=attempt + 1, error=e, request_data=kwargs)
366
  classified_error = classify_error(e)
367
  error_message = str(e).split('\n')[0]
368
+ lib_logger.warning(f"Key ...{current_key[-4:]} failed with {classified_error.error_type} (Status: {classified_error.status_code}). Error: {error_message}. Rotating key.")
369
 
370
  if classified_error.error_type == 'rate_limit' and classified_error.status_code == 429:
371
  cooldown_duration = classified_error.retry_after or 60
372
  await self.cooldown_manager.start_cooldown(provider, cooldown_duration)
373
+ lib_logger.warning(f"IP-based rate limit detected for {provider}. Starting a {cooldown_duration}-second global cooldown.")
 
374
 
375
  await self.usage_manager.record_failure(current_key, model, classified_error)
376
+ lib_logger.info(f"Key ...{current_key[-4:]} failed during stream initiation. Trying next key.")
 
377
  break # Break inner loop to try next key
378
 
379
  except (APIConnectionError, litellm.InternalServerError, litellm.ServiceUnavailableError) as e:
 
384
 
385
  if attempt >= self.max_retries - 1:
386
  error_message = str(e).split('\n')[0]
387
+ lib_logger.warning(f"Key ...{current_key[-4:]} failed after {self.max_retries} retries with {classified_error.error_type} (Status: {classified_error.status_code}). Error: {error_message}. Rotating key.")
 
388
  break # Move to the next key
389
 
390
  wait_time = classified_error.retry_after or (1 * (2 ** attempt)) + random.uniform(0, 1)
391
  error_message = str(e).split('\n')[0]
392
+ lib_logger.warning(f"Key ...{current_key[-4:]} failed with {classified_error.error_type} (Status: {classified_error.status_code}). Error: {error_message}. Retrying in {wait_time:.2f} seconds.")
 
393
  await asyncio.sleep(wait_time)
394
  continue # Retry with the same key
395
 
 
398
  log_failure(api_key=current_key, model=model, attempt=attempt + 1, error=e, request_data=kwargs)
399
  classified_error = classify_error(e)
400
  error_message = str(e).split('\n')[0]
401
+ lib_logger.warning(f"Key ...{current_key[-4:]} failed with {classified_error.error_type} (Status: {classified_error.status_code}). Error: {error_message}. Rotating key.")
402
 
403
  if classified_error.status_code == 429:
404
  cooldown_duration = classified_error.retry_after or 60
405
  await self.cooldown_manager.start_cooldown(provider, cooldown_duration)
406
+ lib_logger.warning(f"IP-based rate limit detected for {provider} from generic stream exception. Starting a {cooldown_duration}-second global cooldown.")
 
407
 
408
  if classified_error.error_type in ['invalid_request', 'context_window_exceeded', 'authentication']:
409
  raise last_exception # Do not retry for these errors
 
457
 
458
  async def get_available_models(self, provider: str) -> List[str]:
459
  """Returns a list of available models for a specific provider, with caching."""
460
+ lib_logger.debug(f"Getting available models for provider: {provider}")
461
  if provider in self._model_list_cache:
462
+ lib_logger.debug(f"Returning cached models for provider: {provider}")
463
  return self._model_list_cache[provider]
464
 
465
  keys_for_provider = self.api_keys.get(provider)
 
475
  if provider_instance:
476
  for api_key in shuffled_keys:
477
  try:
478
+ lib_logger.debug(f"Attempting to get models for {provider} with key ...{api_key[-4:]}")
479
  models = await provider_instance.get_models(api_key, self.http_client)
480
+ lib_logger.debug(f"Got {len(models)} models for provider: {provider}")
481
  self._model_list_cache[provider] = models
482
  return models
483
  except Exception as e: