Spaces:
Paused
Paused
File size: 6,552 Bytes
1a9e2c2 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 | """API Key 管理器 - 多用户密钥管理"""
import orjson
import time
import secrets
import asyncio
from typing import List, Dict, Optional
from pathlib import Path
from app.core.logger import logger
from app.core.config import setting
class ApiKeyManager:
"""API Key 管理服务"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if hasattr(self, '_initialized'):
return
self.file_path = Path(__file__).parents[2] / "data" / "api_keys.json"
self._keys: List[Dict] = []
self._lock = asyncio.Lock()
self._loaded = False
self._initialized = True
logger.debug(f"[ApiKey] 初始化完成: {self.file_path}")
async def init(self):
"""初始化加载数据"""
if not self._loaded:
await self._load_data()
async def _load_data(self):
"""加载 API Keys"""
if self._loaded:
return
if not self.file_path.exists():
self._keys = []
self._loaded = True
return
try:
async with self._lock:
if self.file_path.exists():
content = await asyncio.to_thread(self.file_path.read_bytes)
if content:
self._keys = orjson.loads(content)
self._loaded = True
logger.debug(f"[ApiKey] 加载了 {len(self._keys)} 个 API Key")
except Exception as e:
logger.error(f"[ApiKey] 加载失败: {e}")
self._keys = []
self._loaded = True # 即使加载失败也认为已尝试加载,防止后续保存清空数据(或者抛出异常)
async def _save_data(self):
"""保存 API Keys"""
if not self._loaded:
logger.warning("[ApiKey] 尝试在数据未加载时保存,已取消操作以防覆盖数据")
return
try:
# 确保目录存在
self.file_path.parent.mkdir(parents=True, exist_ok=True)
async with self._lock:
content = orjson.dumps(self._keys, option=orjson.OPT_INDENT_2)
await asyncio.to_thread(self.file_path.write_bytes, content)
except Exception as e:
logger.error(f"[ApiKey] 保存失败: {e}")
def generate_key(self) -> str:
"""生成一个新的 sk- 开头的 key"""
return f"sk-{secrets.token_urlsafe(24)}"
async def add_key(self, name: str) -> Dict:
"""添加 API Key"""
new_key = {
"key": self.generate_key(),
"name": name,
"created_at": int(time.time()),
"is_active": True
}
self._keys.append(new_key)
await self._save_data()
logger.info(f"[ApiKey] 添加新Key: {name}")
return new_key
async def batch_add_keys(self, name_prefix: str, count: int) -> List[Dict]:
"""批量添加 API Key"""
new_keys = []
for i in range(1, count + 1):
name = f"{name_prefix}-{i}" if count > 1 else name_prefix
new_keys.append({
"key": self.generate_key(),
"name": name,
"created_at": int(time.time()),
"is_active": True
})
self._keys.extend(new_keys)
await self._save_data()
logger.info(f"[ApiKey] 批量添加 {count} 个 Key, 前缀: {name_prefix}")
return new_keys
async def delete_key(self, key: str) -> bool:
"""删除 API Key"""
initial_len = len(self._keys)
self._keys = [k for k in self._keys if k["key"] != key]
if len(self._keys) != initial_len:
await self._save_data()
logger.info(f"[ApiKey] 删除Key: {key[:10]}...")
return True
return False
async def batch_delete_keys(self, keys: List[str]) -> int:
"""批量删除 API Key"""
initial_len = len(self._keys)
self._keys = [k for k in self._keys if k["key"] not in keys]
deleted_count = initial_len - len(self._keys)
if deleted_count > 0:
await self._save_data()
logger.info(f"[ApiKey] 批量删除 {deleted_count} 个 Key")
return deleted_count
async def update_key_status(self, key: str, is_active: bool) -> bool:
"""更新 Key 状态"""
for k in self._keys:
if k["key"] == key:
k["is_active"] = is_active
await self._save_data()
return True
return False
async def batch_update_keys_status(self, keys: List[str], is_active: bool) -> int:
"""批量更新 Key 状态"""
updated_count = 0
for k in self._keys:
if k["key"] in keys:
if k["is_active"] != is_active:
k["is_active"] = is_active
updated_count += 1
if updated_count > 0:
await self._save_data()
logger.info(f"[ApiKey] 批量更新 {updated_count} 个 Key 状态为: {is_active}")
return updated_count
async def update_key_name(self, key: str, name: str) -> bool:
"""更新 Key 备注"""
for k in self._keys:
if k["key"] == key:
k["name"] = name
await self._save_data()
return True
return False
def validate_key(self, key: str) -> Optional[Dict]:
"""验证 Key,返回 Key 信息"""
# 1. 检查全局配置的 Key (作为默认 admin key)
global_key = setting.grok_config.get("api_key")
if global_key and key == global_key:
return {
"key": global_key,
"name": "默认管理员",
"is_active": True,
"is_admin": True
}
# 2. 检查多 Key 列表
for k in self._keys:
if k["key"] == key:
if k["is_active"]:
return {**k, "is_admin": False} # 普通 Key 也可以视为非管理员? 暂不区分权限,只做身份识别
return None
return None
def get_all_keys(self) -> List[Dict]:
"""获取所有 Keys"""
return self._keys
# 全局实例
api_key_manager = ApiKeyManager()
|