import random import re import os import logging from datetime import datetime, timedelta from apscheduler.schedulers.background import BackgroundScheduler from app.utils.logging import format_log_message logger = logging.getLogger("my_logger") class APIKeyManager: def __init__(self): self.api_keys = re.findall( r"AIzaSy[a-zA-Z0-9_-]{33}", os.environ.get('GEMINI_API_KEYS', "")) self.key_stack = [] # 初始化密钥栈 self._reset_key_stack() # 初始化时创建随机密钥栈 # self.api_key_blacklist = set() # self.api_key_blacklist_duration = 60 self.scheduler = BackgroundScheduler() self.scheduler.start() self.tried_keys_for_request = set() # 用于跟踪当前请求尝试中已试过的 key def _reset_key_stack(self): """创建并随机化密钥栈""" shuffled_keys = self.api_keys[:] # 创建 api_keys 的副本以避免直接修改原列表 random.shuffle(shuffled_keys) self.key_stack = shuffled_keys def get_available_key(self): """从栈顶获取密钥,栈空时重新生成 (修改后)""" while self.key_stack: key = self.key_stack.pop() # if key not in self.api_key_blacklist and key not in self.tried_keys_for_request: if key not in self.tried_keys_for_request: self.tried_keys_for_request.add(key) return key if not self.api_keys: log_msg = format_log_message('ERROR', "没有配置任何 API 密钥!") logger.error(log_msg) return None self._reset_key_stack() # 重新生成密钥栈 # 再次尝试从新栈中获取密钥 (迭代一次) while self.key_stack: key = self.key_stack.pop() # if key not in self.api_key_blacklist and key not in self.tried_keys_for_request: if key not in self.tried_keys_for_request: self.tried_keys_for_request.add(key) return key return None def show_all_keys(self): log_msg = format_log_message('INFO', f"当前可用API key个数: {len(self.api_keys)} ") logger.info(log_msg) for i, api_key in enumerate(self.api_keys): log_msg = format_log_message('INFO', f"API Key{i}: {api_key[:8]}...{api_key[-3:]}") logger.info(log_msg) # def blacklist_key(self, key): # log_msg = format_log_message('WARNING', f"{key[:8]} → 暂时禁用 {self.api_key_blacklist_duration} 秒") # logger.warning(log_msg) # self.api_key_blacklist.add(key) # self.scheduler.add_job(lambda: self.api_key_blacklist.discard(key), 'date', # run_date=datetime.now() + timedelta(seconds=self.api_key_blacklist_duration)) def reset_tried_keys_for_request(self): """在新的请求尝试时重置已尝试的 key 集合""" self.tried_keys_for_request = set() async def test_api_key(api_key: str) -> bool: """ 测试 API 密钥是否有效。 """ try: import httpx url = "https://generativelanguage.googleapis.com/v1beta/models?key={}".format(api_key) async with httpx.AsyncClient() as client: response = await client.get(url) response.raise_for_status() return True except Exception: return False