grok2api / app /services /api_keys.py
JXJBing's picture
Upload 45 files
1a9e2c2 verified
"""API Key 管理器 - 多用户密钥管理"""
import orjson
import time
import secrets
import asyncio
from typing import List, Dict, Optional
from pathlib import Path
from app.core.logger import logger
from app.core.config import setting
class ApiKeyManager:
"""API Key 管理服务"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if hasattr(self, '_initialized'):
return
self.file_path = Path(__file__).parents[2] / "data" / "api_keys.json"
self._keys: List[Dict] = []
self._lock = asyncio.Lock()
self._loaded = False
self._initialized = True
logger.debug(f"[ApiKey] 初始化完成: {self.file_path}")
async def init(self):
"""初始化加载数据"""
if not self._loaded:
await self._load_data()
async def _load_data(self):
"""加载 API Keys"""
if self._loaded:
return
if not self.file_path.exists():
self._keys = []
self._loaded = True
return
try:
async with self._lock:
if self.file_path.exists():
content = await asyncio.to_thread(self.file_path.read_bytes)
if content:
self._keys = orjson.loads(content)
self._loaded = True
logger.debug(f"[ApiKey] 加载了 {len(self._keys)} 个 API Key")
except Exception as e:
logger.error(f"[ApiKey] 加载失败: {e}")
self._keys = []
self._loaded = True # 即使加载失败也认为已尝试加载,防止后续保存清空数据(或者抛出异常)
async def _save_data(self):
"""保存 API Keys"""
if not self._loaded:
logger.warning("[ApiKey] 尝试在数据未加载时保存,已取消操作以防覆盖数据")
return
try:
# 确保目录存在
self.file_path.parent.mkdir(parents=True, exist_ok=True)
async with self._lock:
content = orjson.dumps(self._keys, option=orjson.OPT_INDENT_2)
await asyncio.to_thread(self.file_path.write_bytes, content)
except Exception as e:
logger.error(f"[ApiKey] 保存失败: {e}")
def generate_key(self) -> str:
"""生成一个新的 sk- 开头的 key"""
return f"sk-{secrets.token_urlsafe(24)}"
async def add_key(self, name: str) -> Dict:
"""添加 API Key"""
new_key = {
"key": self.generate_key(),
"name": name,
"created_at": int(time.time()),
"is_active": True
}
self._keys.append(new_key)
await self._save_data()
logger.info(f"[ApiKey] 添加新Key: {name}")
return new_key
async def batch_add_keys(self, name_prefix: str, count: int) -> List[Dict]:
"""批量添加 API Key"""
new_keys = []
for i in range(1, count + 1):
name = f"{name_prefix}-{i}" if count > 1 else name_prefix
new_keys.append({
"key": self.generate_key(),
"name": name,
"created_at": int(time.time()),
"is_active": True
})
self._keys.extend(new_keys)
await self._save_data()
logger.info(f"[ApiKey] 批量添加 {count} 个 Key, 前缀: {name_prefix}")
return new_keys
async def delete_key(self, key: str) -> bool:
"""删除 API Key"""
initial_len = len(self._keys)
self._keys = [k for k in self._keys if k["key"] != key]
if len(self._keys) != initial_len:
await self._save_data()
logger.info(f"[ApiKey] 删除Key: {key[:10]}...")
return True
return False
async def batch_delete_keys(self, keys: List[str]) -> int:
"""批量删除 API Key"""
initial_len = len(self._keys)
self._keys = [k for k in self._keys if k["key"] not in keys]
deleted_count = initial_len - len(self._keys)
if deleted_count > 0:
await self._save_data()
logger.info(f"[ApiKey] 批量删除 {deleted_count} 个 Key")
return deleted_count
async def update_key_status(self, key: str, is_active: bool) -> bool:
"""更新 Key 状态"""
for k in self._keys:
if k["key"] == key:
k["is_active"] = is_active
await self._save_data()
return True
return False
async def batch_update_keys_status(self, keys: List[str], is_active: bool) -> int:
"""批量更新 Key 状态"""
updated_count = 0
for k in self._keys:
if k["key"] in keys:
if k["is_active"] != is_active:
k["is_active"] = is_active
updated_count += 1
if updated_count > 0:
await self._save_data()
logger.info(f"[ApiKey] 批量更新 {updated_count} 个 Key 状态为: {is_active}")
return updated_count
async def update_key_name(self, key: str, name: str) -> bool:
"""更新 Key 备注"""
for k in self._keys:
if k["key"] == key:
k["name"] = name
await self._save_data()
return True
return False
def validate_key(self, key: str) -> Optional[Dict]:
"""验证 Key,返回 Key 信息"""
# 1. 检查全局配置的 Key (作为默认 admin key)
global_key = setting.grok_config.get("api_key")
if global_key and key == global_key:
return {
"key": global_key,
"name": "默认管理员",
"is_active": True,
"is_admin": True
}
# 2. 检查多 Key 列表
for k in self._keys:
if k["key"] == key:
if k["is_active"]:
return {**k, "is_admin": False} # 普通 Key 也可以视为非管理员? 暂不区分权限,只做身份识别
return None
return None
def get_all_keys(self) -> List[Dict]:
"""获取所有 Keys"""
return self._keys
# 全局实例
api_key_manager = ApiKeyManager()