"""API Key 管理器 - 多用户密钥管理""" import orjson import time import secrets import asyncio from typing import List, Dict, Optional from pathlib import Path from app.core.logger import logger from app.core.config import setting class ApiKeyManager: """API Key 管理服务""" _instance = None def __new__(cls): if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance def __init__(self): if hasattr(self, '_initialized'): return self.file_path = Path(__file__).parents[2] / "data" / "api_keys.json" self._keys: List[Dict] = [] self._lock = asyncio.Lock() self._loaded = False self._initialized = True logger.debug(f"[ApiKey] 初始化完成: {self.file_path}") async def init(self): """初始化加载数据""" if not self._loaded: await self._load_data() async def _load_data(self): """加载 API Keys""" if self._loaded: return if not self.file_path.exists(): self._keys = [] self._loaded = True return try: async with self._lock: if self.file_path.exists(): content = await asyncio.to_thread(self.file_path.read_bytes) if content: self._keys = orjson.loads(content) self._loaded = True logger.debug(f"[ApiKey] 加载了 {len(self._keys)} 个 API Key") except Exception as e: logger.error(f"[ApiKey] 加载失败: {e}") self._keys = [] self._loaded = True # 即使加载失败也认为已尝试加载,防止后续保存清空数据(或者抛出异常) async def _save_data(self): """保存 API Keys""" if not self._loaded: logger.warning("[ApiKey] 尝试在数据未加载时保存,已取消操作以防覆盖数据") return try: # 确保目录存在 self.file_path.parent.mkdir(parents=True, exist_ok=True) async with self._lock: content = orjson.dumps(self._keys, option=orjson.OPT_INDENT_2) await asyncio.to_thread(self.file_path.write_bytes, content) except Exception as e: logger.error(f"[ApiKey] 保存失败: {e}") def generate_key(self) -> str: """生成一个新的 sk- 开头的 key""" return f"sk-{secrets.token_urlsafe(24)}" async def add_key(self, name: str) -> Dict: """添加 API Key""" new_key = { "key": self.generate_key(), "name": name, "created_at": int(time.time()), "is_active": True } self._keys.append(new_key) await self._save_data() logger.info(f"[ApiKey] 添加新Key: {name}") return new_key async def batch_add_keys(self, name_prefix: str, count: int) -> List[Dict]: """批量添加 API Key""" new_keys = [] for i in range(1, count + 1): name = f"{name_prefix}-{i}" if count > 1 else name_prefix new_keys.append({ "key": self.generate_key(), "name": name, "created_at": int(time.time()), "is_active": True }) self._keys.extend(new_keys) await self._save_data() logger.info(f"[ApiKey] 批量添加 {count} 个 Key, 前缀: {name_prefix}") return new_keys async def delete_key(self, key: str) -> bool: """删除 API Key""" initial_len = len(self._keys) self._keys = [k for k in self._keys if k["key"] != key] if len(self._keys) != initial_len: await self._save_data() logger.info(f"[ApiKey] 删除Key: {key[:10]}...") return True return False async def batch_delete_keys(self, keys: List[str]) -> int: """批量删除 API Key""" initial_len = len(self._keys) self._keys = [k for k in self._keys if k["key"] not in keys] deleted_count = initial_len - len(self._keys) if deleted_count > 0: await self._save_data() logger.info(f"[ApiKey] 批量删除 {deleted_count} 个 Key") return deleted_count async def update_key_status(self, key: str, is_active: bool) -> bool: """更新 Key 状态""" for k in self._keys: if k["key"] == key: k["is_active"] = is_active await self._save_data() return True return False async def batch_update_keys_status(self, keys: List[str], is_active: bool) -> int: """批量更新 Key 状态""" updated_count = 0 for k in self._keys: if k["key"] in keys: if k["is_active"] != is_active: k["is_active"] = is_active updated_count += 1 if updated_count > 0: await self._save_data() logger.info(f"[ApiKey] 批量更新 {updated_count} 个 Key 状态为: {is_active}") return updated_count async def update_key_name(self, key: str, name: str) -> bool: """更新 Key 备注""" for k in self._keys: if k["key"] == key: k["name"] = name await self._save_data() return True return False def validate_key(self, key: str) -> Optional[Dict]: """验证 Key,返回 Key 信息""" # 1. 检查全局配置的 Key (作为默认 admin key) global_key = setting.grok_config.get("api_key") if global_key and key == global_key: return { "key": global_key, "name": "默认管理员", "is_active": True, "is_admin": True } # 2. 检查多 Key 列表 for k in self._keys: if k["key"] == key: if k["is_active"]: return {**k, "is_admin": False} # 普通 Key 也可以视为非管理员? 暂不区分权限,只做身份识别 return None return None def get_all_keys(self) -> List[Dict]: """获取所有 Keys""" return self._keys # 全局实例 api_key_manager = ApiKeyManager()