Mirrowel commited on
Commit
aea7b14
·
1 Parent(s): bf565ba

feat: Implement robust API key rotation, retry, and logging

Browse files

This commit introduces significant enhancements to the API key management system, focusing on improved reliability, intelligent key rotation, and comprehensive failure logging.

Key changes include:

- **Enhanced Key Rotation Logic:**
- `src/rotator_library/client.py`: Refactored `acompletion` to use a more robust key selection mechanism, including a 5-second wait when all keys are on cooldown, and removed the `excluded_keys` parameter for `get_next_smart_key`.
- `src/rotator_library/usage_manager.py`: Implemented daily usage resets and global statistics accumulation. The `get_next_smart_key` now intelligently filters out keys on cooldown and selects the least-used active key.

- **Comprehensive Failure Logging:**
- **NEW `src/rotator_library/failure_logger.py`:** Introduced a dedicated module for structured logging of failed API calls, capturing details like API key ending, model, attempt number, error type, message, raw response, and request data.
- `src/rotator_library/client.py`: Integrated `log_failure` to centralize error reporting for all API call exceptions.

- **Improved Retry Mechanism:**
- `src/rotator_library/client.py`: Simplified the retry loop, ensuring that retriable server errors lead to a retry with the same key, while other permanent errors (authentication, rate limit) trigger immediate key rotation.

- **Refined Usage Tracking:**
- `src/rotator_library/usage_manager.py`: Updated `record_success` to correctly apply usage to daily and global statistics. `record_rotation_error` now sets a cooldown period for failed keys, optionally parsing `retry_delay` from error messages.

- **Minor Improvements:**
- `src/proxy_app/main.py`: Streamlined Gemini API key loading to correctly handle both `GEMINI_API_KEY` and `GEMINI_API_KEY_n` formats. Removed redundant comments.

src/proxy_app/main.py CHANGED
@@ -7,6 +7,9 @@ import logging
7
  from pathlib import Path
8
  import sys
9
 
 
 
 
10
  from src.rotator_library.client import RotatingClient
11
 
12
  # Configure logging
@@ -24,22 +27,20 @@ if not PROXY_API_KEY:
24
  gemini_keys = []
25
  i = 1
26
  while True:
 
27
  key = os.getenv(f"GEMINI_API_KEY_{i}")
 
 
 
 
28
  if key:
29
  gemini_keys.append(key)
30
  i += 1
31
  else:
32
- # Also check for the key without a number for the first one
33
- if i == 1:
34
- key = os.getenv("GEMINI_API_KEY")
35
- if key:
36
- gemini_keys.append(key)
37
- i += 1
38
- continue
39
  break
40
 
41
  if not gemini_keys:
42
- raise ValueError("No GEMINI_API_KEY environment variables found.")
43
 
44
  # Initialize the rotating client
45
  rotating_client = RotatingClient(api_keys=gemini_keys)
@@ -67,11 +68,8 @@ async def chat_completions(request: Request, _=Depends(verify_api_key)):
67
  response = await rotating_client.acompletion(**data)
68
 
69
  if is_streaming:
70
- # For streaming responses, we return a StreamingResponse.
71
- # The client's wrapper ensures usage is logged upon completion.
72
  return StreamingResponse(response, media_type="text/event-stream")
73
  else:
74
- # For non-streaming, the response is a regular JSON object.
75
  return response
76
 
77
  except Exception as e:
 
7
  from pathlib import Path
8
  import sys
9
 
10
+ # This is necessary for the app to find the rotator_library module
11
+ sys.path.append(str(Path(__file__).resolve().parent.parent.parent))
12
+
13
  from src.rotator_library.client import RotatingClient
14
 
15
  # Configure logging
 
27
  gemini_keys = []
28
  i = 1
29
  while True:
30
+ # Start with GEMINI_API_KEY_1, then GEMINI_API_KEY_2, etc.
31
  key = os.getenv(f"GEMINI_API_KEY_{i}")
32
+ if not key and i == 1:
33
+ # Fallback for a single key named just GEMINI_API_KEY
34
+ key = os.getenv("GEMINI_API_KEY")
35
+
36
  if key:
37
  gemini_keys.append(key)
38
  i += 1
39
  else:
 
 
 
 
 
 
 
40
  break
41
 
42
  if not gemini_keys:
43
+ raise ValueError("No GEMINI_API_KEY or GEMINI_API_KEY_n environment variables found.")
44
 
45
  # Initialize the rotating client
46
  rotating_client = RotatingClient(api_keys=gemini_keys)
 
68
  response = await rotating_client.acompletion(**data)
69
 
70
  if is_streaming:
 
 
71
  return StreamingResponse(response, media_type="text/event-stream")
72
  else:
 
73
  return response
74
 
75
  except Exception as e:
src/rotator_library/client.py CHANGED
@@ -5,6 +5,7 @@ import logging
5
  from typing import List, Dict, Any, AsyncGenerator
6
 
7
  from src.rotator_library.usage_manager import UsageManager
 
8
  from src.rotator_library.error_handler import (
9
  is_authentication_error,
10
  is_rate_limit_error,
@@ -54,22 +55,22 @@ class RotatingClient:
54
  Performs a completion call with smart key rotation and retry logic.
55
  Handles both streaming and non-streaming requests.
56
  """
57
- failed_keys_for_this_request = []
58
  model = kwargs.get("model")
59
  is_streaming = kwargs.get("stream", False)
60
 
61
  if not model:
62
  raise ValueError("'model' is a required parameter.")
63
 
64
- while len(failed_keys_for_this_request) < len(self.api_keys):
65
  current_key = self.usage_manager.get_next_smart_key(
66
  available_keys=self.api_keys,
67
- model=model,
68
- excluded_keys=failed_keys_for_this_request
69
  )
70
 
71
  if not current_key:
72
- raise Exception("All available API keys have failed for this request.")
 
 
73
 
74
  for attempt in range(self.max_retries):
75
  try:
@@ -85,27 +86,25 @@ class RotatingClient:
85
  return response
86
 
87
  except Exception as e:
88
- error_message = str(e)
89
- print(f"Key ...{current_key[-4:]} failed with error: {error_message}")
90
-
91
- if is_authentication_error(e) or is_rate_limit_error(e):
92
- self.usage_manager.record_rotation_error(current_key, model, error_message)
93
- failed_keys_for_this_request.append(current_key)
94
- break
95
-
96
- elif is_server_error(e):
97
- if attempt == self.max_retries - 1:
98
- self.usage_manager.record_rotation_error(current_key, model, f"Failed after max retries with error: {error_message}")
99
- failed_keys_for_this_request.append(current_key)
100
- break
101
- else:
102
- await asyncio.sleep(1 * (attempt + 1))
103
- continue
104
-
105
- elif is_unrecoverable_error(e):
106
- raise e
107
-
108
- else:
109
  raise e
110
 
111
- raise Exception("All API keys failed after multiple retries.")
 
 
 
 
5
  from typing import List, Dict, Any, AsyncGenerator
6
 
7
  from src.rotator_library.usage_manager import UsageManager
8
+ from src.rotator_library.failure_logger import log_failure
9
  from src.rotator_library.error_handler import (
10
  is_authentication_error,
11
  is_rate_limit_error,
 
55
  Performs a completion call with smart key rotation and retry logic.
56
  Handles both streaming and non-streaming requests.
57
  """
 
58
  model = kwargs.get("model")
59
  is_streaming = kwargs.get("stream", False)
60
 
61
  if not model:
62
  raise ValueError("'model' is a required parameter.")
63
 
64
+ while True: # Loop until a key succeeds or we decide to give up
65
  current_key = self.usage_manager.get_next_smart_key(
66
  available_keys=self.api_keys,
67
+ model=model
 
68
  )
69
 
70
  if not current_key:
71
+ print("All keys are currently on cooldown. Waiting...")
72
+ await asyncio.sleep(5) # Wait 5 seconds before checking for an available key again
73
+ continue
74
 
75
  for attempt in range(self.max_retries):
76
  try:
 
86
  return response
87
 
88
  except Exception as e:
89
+ log_failure(
90
+ api_key=current_key,
91
+ model=model,
92
+ attempt=attempt + 1,
93
+ error=e,
94
+ request_data=kwargs
95
+ )
96
+
97
+ # For any retriable server error, we just continue the attempt loop
98
+ if is_server_error(e) and attempt < self.max_retries - 1:
99
+ print(f"Key ...{current_key[-4:]} failed with server error. Retrying...")
100
+ await asyncio.sleep(1 * (attempt + 1))
101
+ continue
102
+
103
+ # For unrecoverable errors, fail fast
104
+ if is_unrecoverable_error(e):
 
 
 
 
 
105
  raise e
106
 
107
+ # For all other errors (Auth, RateLimit, or final Server error), record it and break to get a new key
108
+ print(f"Key ...{current_key[-4:]} failed permanently. Rotating...")
109
+ self.usage_manager.record_rotation_error(current_key, model, e)
110
+ break
src/rotator_library/failure_logger.py ADDED
@@ -0,0 +1,62 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import logging
2
+ import json
3
+ from logging.handlers import RotatingFileHandler
4
+ import os
5
+
6
+ def setup_failure_logger():
7
+ """Sets up a dedicated JSON logger for failed API calls."""
8
+ log_dir = "logs"
9
+ if not os.path.exists(log_dir):
10
+ os.makedirs(log_dir)
11
+
12
+ logger = logging.getLogger('failure_logger')
13
+ logger.setLevel(logging.ERROR)
14
+
15
+ # Prevent logs from propagating to the root logger
16
+ logger.propagate = False
17
+
18
+ # Use a rotating file handler to keep log files from growing too large
19
+ handler = RotatingFileHandler(
20
+ os.path.join(log_dir, 'failures.log'),
21
+ maxBytes=5*1024*1024, # 5 MB
22
+ backupCount=2
23
+ )
24
+
25
+ # Custom JSON formatter
26
+ class JsonFormatter(logging.Formatter):
27
+ def format(self, record):
28
+ log_record = {
29
+ "timestamp": self.formatTime(record, self.datefmt),
30
+ "level": record.levelname,
31
+ "message": record.getMessage()
32
+ }
33
+ return json.dumps(log_record)
34
+
35
+ handler.setFormatter(JsonFormatter())
36
+
37
+ # Add handler only if it hasn't been added before
38
+ if not logger.handlers:
39
+ logger.addHandler(handler)
40
+
41
+ return logger
42
+
43
+ failure_logger = setup_failure_logger()
44
+
45
+ def log_failure(api_key: str, model: str, attempt: int, error: Exception, request_data: dict):
46
+ """Logs a structured message for a failed API call."""
47
+
48
+ # Try to get the raw response from the exception if it exists
49
+ raw_response = None
50
+ if hasattr(error, 'response') and hasattr(error.response, 'text'):
51
+ raw_response = error.response.text
52
+
53
+ log_data = {
54
+ "api_key_ending": api_key[-4:],
55
+ "model": model,
56
+ "attempt_number": attempt,
57
+ "error_type": type(error).__name__,
58
+ "error_message": str(error),
59
+ "raw_response": raw_response,
60
+ "request_data": request_data,
61
+ }
62
+ failure_logger.error(log_data)
src/rotator_library/usage_manager.py CHANGED
@@ -1,20 +1,21 @@
1
  import json
2
  import os
3
  import time
4
- from typing import Dict, List, Optional
 
5
  from filelock import FileLock
6
 
7
  class UsageManager:
8
  """
9
- Manages detailed usage and failure data for API keys, stored in a JSON file.
10
  """
11
  def __init__(self, file_path: str = "key_usage.json"):
12
  self.file_path = file_path
13
  self.lock = FileLock(f"{self.file_path}.lock")
14
  self.usage_data = self._load_usage()
 
15
 
16
  def _load_usage(self) -> Dict:
17
- """Loads usage data from the JSON file."""
18
  with self.lock:
19
  if not os.path.exists(self.file_path):
20
  return {}
@@ -25,63 +26,99 @@ class UsageManager:
25
  return {}
26
 
27
  def _save_usage(self):
28
- """Saves the current usage data to the JSON file."""
29
  with self.lock:
30
  with open(self.file_path, 'w') as f:
31
  json.dump(self.usage_data, f, indent=2)
32
 
33
- def get_next_smart_key(self, available_keys: List[str], model: str, excluded_keys: List[str]) -> Optional[str]:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
34
  """
35
- Finds the best key to use based on the lowest usage count for the given model.
36
  """
37
  best_key = None
38
  min_usage = float('inf')
 
 
 
 
 
 
 
39
 
40
- eligible_keys = [k for k in available_keys if k not in excluded_keys]
41
-
42
- if not eligible_keys:
43
  return None
44
 
45
- # Initialize all available keys in usage data if they aren't present
46
- for key in eligible_keys:
47
- self.usage_data.setdefault(key, {"models": {}, "last_rotation_error": None})
48
-
49
- # Find the key with the minimum success_count for the given model
50
- for key in eligible_keys:
51
- model_usage = self.usage_data[key].get("models", {}).get(model, {})
52
- usage_count = model_usage.get("success_count", 0)
53
 
54
  if usage_count < min_usage:
55
  min_usage = usage_count
56
  best_key = key
57
 
58
- # If all have the same usage count, it will pick the first one in the list
59
- return best_key if best_key else eligible_keys[0]
60
-
61
 
62
  def record_success(self, key: str, model: str, usage: Dict):
63
- """Records a successful API call and its token usage."""
64
- key_data = self.usage_data.setdefault(key, {"models": {}, "last_rotation_error": None})
65
- model_data = key_data["models"].setdefault(model, {
66
- "success_count": 0,
67
- "prompt_tokens": 0,
68
- "completion_tokens": 0
69
- })
70
 
71
- model_data["success_count"] += 1
72
- model_data["prompt_tokens"] += usage.get("prompt_tokens", 0)
73
- model_data["completion_tokens"] += usage.get("completion_tokens", 0)
 
 
 
 
 
 
 
74
 
75
  key_data["last_used_ts"] = time.time()
76
  self._save_usage()
77
 
78
- def record_rotation_error(self, key: str, model: str, error: str):
79
- """Records the error that caused a key to be rotated."""
80
- key_data = self.usage_data.setdefault(key, {"models": {}, "last_rotation_error": None})
81
-
 
 
 
 
 
 
 
 
 
 
 
 
 
82
  key_data["last_rotation_error"] = {
83
  "timestamp": time.time(),
84
  "model": model,
85
- "error": error
86
  }
87
  self._save_usage()
 
1
  import json
2
  import os
3
  import time
4
+ from datetime import date, datetime
5
+ from typing import Dict, List, Optional, Any
6
  from filelock import FileLock
7
 
8
  class UsageManager:
9
  """
10
+ Manages daily and global usage statistics and cooldowns for API keys.
11
  """
12
  def __init__(self, file_path: str = "key_usage.json"):
13
  self.file_path = file_path
14
  self.lock = FileLock(f"{self.file_path}.lock")
15
  self.usage_data = self._load_usage()
16
+ self._reset_daily_stats_if_needed()
17
 
18
  def _load_usage(self) -> Dict:
 
19
  with self.lock:
20
  if not os.path.exists(self.file_path):
21
  return {}
 
26
  return {}
27
 
28
  def _save_usage(self):
 
29
  with self.lock:
30
  with open(self.file_path, 'w') as f:
31
  json.dump(self.usage_data, f, indent=2)
32
 
33
+ def _reset_daily_stats_if_needed(self):
34
+ """Checks if daily stats need to be reset for any key."""
35
+ today_str = date.today().isoformat()
36
+ needs_saving = False
37
+ for key, data in self.usage_data.items():
38
+ daily_data = data.get("daily", {})
39
+ last_date_str = daily_data.get("date")
40
+ if last_date_str != today_str:
41
+ needs_saving = True
42
+ # Add yesterday's daily stats to global stats
43
+ global_data = data.setdefault("global", {"models": {}})
44
+ for model, stats in daily_data.get("models", {}).items():
45
+ global_model_stats = global_data["models"].setdefault(model, {"success_count": 0, "prompt_tokens": 0, "completion_tokens": 0})
46
+ global_model_stats["success_count"] += stats.get("success_count", 0)
47
+ global_model_stats["prompt_tokens"] += stats.get("prompt_tokens", 0)
48
+ global_model_stats["completion_tokens"] += stats.get("completion_tokens", 0)
49
+
50
+ # Reset daily stats
51
+ data["daily"] = {"date": today_str, "models": {}}
52
+
53
+ if needs_saving:
54
+ self._save_usage()
55
+
56
+ def get_next_smart_key(self, available_keys: List[str], model: str) -> Optional[str]:
57
  """
58
+ Gets the least-used, available key based on daily stats.
59
  """
60
  best_key = None
61
  min_usage = float('inf')
62
+
63
+ # Filter for keys that are not on cooldown
64
+ active_keys = []
65
+ for key in available_keys:
66
+ cooldown_until = self.usage_data.get(key, {}).get("cooldown_until")
67
+ if not cooldown_until or time.time() > cooldown_until:
68
+ active_keys.append(key)
69
 
70
+ if not active_keys:
 
 
71
  return None
72
 
73
+ # Find the key with the minimum daily success_count for the given model
74
+ for key in active_keys:
75
+ key_data = self.usage_data.setdefault(key, {"daily": {"date": date.today().isoformat(), "models": {}}, "global": {"models": {}}, "cooldown_until": None})
76
+ daily_model_usage = key_data.get("daily", {}).get("models", {}).get(model, {})
77
+ usage_count = daily_model_usage.get("success_count", 0)
 
 
 
78
 
79
  if usage_count < min_usage:
80
  min_usage = usage_count
81
  best_key = key
82
 
83
+ return best_key if best_key else active_keys[0]
 
 
84
 
85
  def record_success(self, key: str, model: str, usage: Dict):
86
+ key_data = self.usage_data.setdefault(key, {"daily": {"date": date.today().isoformat(), "models": {}}, "global": {"models": {}}, "cooldown_until": None})
 
 
 
 
 
 
87
 
88
+ # Ensure daily stats are for today
89
+ if key_data["daily"].get("date") != date.today().isoformat():
90
+ self._reset_daily_stats_if_needed() # Should be rare, but as a safeguard
91
+ key_data = self.usage_data[key]
92
+
93
+ daily_model_data = key_data["daily"]["models"].setdefault(model, {"success_count": 0, "prompt_tokens": 0, "completion_tokens": 0})
94
+
95
+ daily_model_data["success_count"] += 1
96
+ daily_model_data["prompt_tokens"] += usage.get("prompt_tokens", 0)
97
+ daily_model_data["completion_tokens"] += usage.get("completion_tokens", 0)
98
 
99
  key_data["last_used_ts"] = time.time()
100
  self._save_usage()
101
 
102
+ def record_rotation_error(self, key: str, model: str, error: Exception):
103
+ key_data = self.usage_data.setdefault(key, {"daily": {"date": date.today().isoformat(), "models": {}}, "global": {"models": {}}, "cooldown_until": None})
104
+
105
+ # Default cooldown of 24 hours
106
+ cooldown_seconds = 86400
107
+
108
+ # Try to parse retry_delay from the error message (very provider-specific)
109
+ error_str = str(error).lower()
110
+ if "retry_delay" in error_str:
111
+ try:
112
+ # A simple way to parse, might need to be more robust
113
+ delay_str = error_str.split("retry_delay")[1].split("seconds:")[1].strip().split("}")[0]
114
+ cooldown_seconds = int(delay_str)
115
+ except (IndexError, ValueError):
116
+ pass # Stick to default
117
+
118
+ key_data["cooldown_until"] = time.time() + cooldown_seconds
119
  key_data["last_rotation_error"] = {
120
  "timestamp": time.time(),
121
  "model": model,
122
+ "error": str(error)
123
  }
124
  self._save_usage()