File size: 3,411 Bytes
60016b2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
import random
import re
import os
import logging
from datetime import datetime, timedelta
from apscheduler.schedulers.background import BackgroundScheduler
from app.utils.logging import format_log_message
logger = logging.getLogger("my_logger")
class APIKeyManager:
def __init__(self):
self.api_keys = re.findall(
r"AIzaSy[a-zA-Z0-9_-]{33}", os.environ.get('GEMINI_API_KEYS', ""))
self.key_stack = [] # 初始化密钥栈
self._reset_key_stack() # 初始化时创建随机密钥栈
# self.api_key_blacklist = set()
# self.api_key_blacklist_duration = 60
self.scheduler = BackgroundScheduler()
self.scheduler.start()
self.tried_keys_for_request = set() # 用于跟踪当前请求尝试中已试过的 key
def _reset_key_stack(self):
"""创建并随机化密钥栈"""
shuffled_keys = self.api_keys[:] # 创建 api_keys 的副本以避免直接修改原列表
random.shuffle(shuffled_keys)
self.key_stack = shuffled_keys
def get_available_key(self):
"""从栈顶获取密钥,栈空时重新生成 (修改后)"""
while self.key_stack:
key = self.key_stack.pop()
# if key not in self.api_key_blacklist and key not in self.tried_keys_for_request:
if key not in self.tried_keys_for_request:
self.tried_keys_for_request.add(key)
return key
if not self.api_keys:
log_msg = format_log_message('ERROR', "没有配置任何 API 密钥!")
logger.error(log_msg)
return None
self._reset_key_stack() # 重新生成密钥栈
# 再次尝试从新栈中获取密钥 (迭代一次)
while self.key_stack:
key = self.key_stack.pop()
# if key not in self.api_key_blacklist and key not in self.tried_keys_for_request:
if key not in self.tried_keys_for_request:
self.tried_keys_for_request.add(key)
return key
return None
def show_all_keys(self):
log_msg = format_log_message('INFO', f"当前可用API key个数: {len(self.api_keys)} ")
logger.info(log_msg)
for i, api_key in enumerate(self.api_keys):
log_msg = format_log_message('INFO', f"API Key{i}: {api_key[:8]}...{api_key[-3:]}")
logger.info(log_msg)
# def blacklist_key(self, key):
# log_msg = format_log_message('WARNING', f"{key[:8]} → 暂时禁用 {self.api_key_blacklist_duration} 秒")
# logger.warning(log_msg)
# self.api_key_blacklist.add(key)
# self.scheduler.add_job(lambda: self.api_key_blacklist.discard(key), 'date',
# run_date=datetime.now() + timedelta(seconds=self.api_key_blacklist_duration))
def reset_tried_keys_for_request(self):
"""在新的请求尝试时重置已尝试的 key 集合"""
self.tried_keys_for_request = set()
async def test_api_key(api_key: str) -> bool:
"""
测试 API 密钥是否有效。
"""
try:
import httpx
url = "https://generativelanguage.googleapis.com/v1beta/models?key={}".format(api_key)
async with httpx.AsyncClient() as client:
response = await client.get(url)
response.raise_for_status()
return True
except Exception:
return False |