ComfyUI-Ranking-API / router_admin_plugins.py
ZHIWEI666's picture
支持第三方扣费
ee88a99 verified
Raw
History Blame Contribute Delete
14.4 kB
# router_admin_plugins.py
# ==========================================
# 🌐 管理员插件管理路由模块
# ==========================================
# 作用:提供第三方插件的注册、查询、更新、删除、Key轮换等管理API
# 关联文件:
# - models_sql.py (RegisteredPlugin 模型)
# - database_sql.py (SQL数据库连接 get_db)
# - 安全认证.py (require_auth / is_admin)
# - router_open_api.py (reload_plugin_registry 热更新)
# 权限:所有端点均需管理员权限(is_admin() == True)
# ==========================================
import re
import json
import secrets
import logging
import datetime
from typing import Optional, List
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import JSONResponse
from sqlalchemy.orm import Session
from pydantic import BaseModel, Field, validator
from database_sql import get_db
from models_sql import RegisteredPlugin
from 安全认证 import require_auth, is_admin
logger = logging.getLogger("ComfyUI-Ranking.AdminPlugins")
router = APIRouter()
# ==========================================
# 🔧 工具函数
# ==========================================
# plugin_id 合法性正则:3-50字符,仅允许小写字母、数字、下划线
_PLUGIN_ID_PATTERN = re.compile(r"^[a-z0-9_]{3,50}$")
# 默认权限列表
_DEFAULT_PERMISSIONS = ["verify", "balance", "deduct", "refund"]
def generate_plugin_key() -> str:
"""生成 plugin_key,格式: rk_plug_ + 32位hex随机串"""
return f"rk_plug_{secrets.token_hex(16)}"
def mask_key(key: str) -> str:
"""脱敏 plugin_key:保留前12位(rk_plug_ + 4位),其余以 *** 替代"""
if not key:
return "***"
return key[:12] + "***" if len(key) > 12 else "***"
def _require_admin(account: str):
"""工具函数:校验管理员权限,否则抛 403"""
if not is_admin(account):
raise HTTPException(status_code=403, detail="需要管理员权限")
def _notify_registry_reload():
"""通知 open_api 模块重新加载插件注册表(热更新)"""
try:
from router_open_api import reload_plugin_registry
reload_plugin_registry()
logger.info("插件注册表热更新成功")
except (ImportError, Exception) as e:
logger.warning(f"插件注册表热更新失败: {e}")
def _serialize_plugin(p: RegisteredPlugin, include_key: bool = False, raw_key: Optional[str] = None) -> dict:
"""
序列化插件对象为字典
参数:
p: RegisteredPlugin ORM 对象
include_key: 是否返回完整 plugin_key(仅在创建/轮换时为 True)
raw_key: 完整明文 key(仅在创建/轮换时传入;其他场景使用脱敏)
"""
try:
permissions = json.loads(p.permissions) if p.permissions else _DEFAULT_PERMISSIONS
except Exception:
permissions = _DEFAULT_PERMISSIONS
data = {
"plugin_id": p.plugin_id,
"name": p.name,
"description": p.description,
"enabled": bool(p.enabled),
"rate_limit": p.rate_limit,
"max_deduct_per_tx": p.max_deduct_per_tx,
"max_deduct_per_day": p.max_deduct_per_day,
"permissions": permissions,
"created_at": p.created_at.isoformat() if p.created_at else None,
"created_by": p.created_by,
"updated_at": p.updated_at.isoformat() if p.updated_at else None,
"last_rotated_at": p.last_rotated_at.isoformat() if p.last_rotated_at else None,
}
if include_key and raw_key:
data["plugin_key"] = raw_key
else:
data["plugin_key_masked"] = mask_key(p.plugin_key)
return data
# ==========================================
# 📥 请求模型
# ==========================================
class PluginCreateRequest(BaseModel):
plugin_id: str = Field(..., min_length=3, max_length=50, description="插件唯一标识,仅允许小写字母、数字、下划线")
name: str = Field(..., min_length=1, max_length=100, description="插件显示名")
description: Optional[str] = Field(None, max_length=500)
rate_limit: Optional[int] = Field(100, ge=1, le=10000)
max_deduct_per_tx: Optional[int] = Field(100, ge=1, le=10000)
max_deduct_per_day: Optional[int] = Field(1000, ge=1, le=100000)
permissions: Optional[List[str]] = Field(default=None)
@validator("plugin_id")
def _validate_plugin_id(cls, v):
if not _PLUGIN_ID_PATTERN.match(v):
raise ValueError("plugin_id 仅允许 [a-z0-9_],长度 3-50")
return v
@validator("permissions", always=True)
def _validate_permissions(cls, v):
if v is None:
return _DEFAULT_PERMISSIONS
if not isinstance(v, list) or any(not isinstance(x, str) for x in v):
raise ValueError("permissions 必须为字符串数组")
return v
class PluginUpdateRequest(BaseModel):
name: Optional[str] = Field(None, min_length=1, max_length=100)
description: Optional[str] = Field(None, max_length=500)
enabled: Optional[bool] = None
rate_limit: Optional[int] = Field(None, ge=1, le=10000)
max_deduct_per_tx: Optional[int] = Field(None, ge=1, le=10000)
max_deduct_per_day: Optional[int] = Field(None, ge=1, le=100000)
permissions: Optional[List[str]] = None
@validator("permissions")
def _validate_permissions(cls, v):
if v is None:
return v
if not isinstance(v, list) or any(not isinstance(x, str) for x in v):
raise ValueError("permissions 必须为字符串数组")
return v
# ==========================================
# 🌐 路由实现
# ==========================================
@router.get("/api/admin/plugins")
async def list_plugins(
account: str = Depends(require_auth),
db: Session = Depends(get_db),
):
"""列出所有已注册插件(plugin_key 脱敏)"""
_require_admin(account)
try:
plugins = db.query(RegisteredPlugin).order_by(RegisteredPlugin.created_at.desc()).all()
return JSONResponse(content={
"success": True,
"total": len(plugins),
"plugins": [_serialize_plugin(p) for p in plugins],
})
except HTTPException:
raise
except Exception as e:
logger.exception(f"列出插件失败: {e}")
return JSONResponse(status_code=500, content={"success": False, "error": "服务器内部错误"})
@router.get("/api/admin/plugins/{plugin_id}")
async def get_plugin(
plugin_id: str,
account: str = Depends(require_auth),
db: Session = Depends(get_db),
):
"""获取插件详情(plugin_key 脱敏)"""
_require_admin(account)
plugin = db.query(RegisteredPlugin).filter(RegisteredPlugin.plugin_id == plugin_id).first()
if not plugin:
return JSONResponse(status_code=404, content={"success": False, "error": "插件不存在"})
return JSONResponse(content={"success": True, "plugin": _serialize_plugin(plugin)})
@router.post("/api/admin/plugins")
async def create_plugin(
payload: PluginCreateRequest,
account: str = Depends(require_auth),
db: Session = Depends(get_db),
):
"""创建新插件(自动生成 plugin_key,仅本次返回明文 key)"""
_require_admin(account)
# 检查 plugin_id 是否已存在
exists = db.query(RegisteredPlugin).filter(RegisteredPlugin.plugin_id == payload.plugin_id).first()
if exists:
return JSONResponse(status_code=409, content={"success": False, "error": f"plugin_id '{payload.plugin_id}' 已存在"})
plugin_key = generate_plugin_key()
now = datetime.datetime.utcnow()
plugin = RegisteredPlugin(
plugin_id=payload.plugin_id,
plugin_key=plugin_key,
name=payload.name,
description=payload.description,
enabled=True,
rate_limit=payload.rate_limit,
max_deduct_per_tx=payload.max_deduct_per_tx,
max_deduct_per_day=payload.max_deduct_per_day,
permissions=json.dumps(payload.permissions, ensure_ascii=False),
created_at=now,
created_by=account,
updated_at=now,
)
try:
db.add(plugin)
db.commit()
db.refresh(plugin)
except Exception as e:
db.rollback()
logger.exception(f"创建插件失败: {e}")
return JSONResponse(status_code=500, content={"success": False, "error": "创建插件失败"})
logger.info(f"管理员 {account} 创建插件 plugin_id={payload.plugin_id}")
_notify_registry_reload()
return JSONResponse(status_code=201, content={
"success": True,
"plugin_id": plugin.plugin_id,
"plugin_key": plugin_key,
"name": plugin.name,
"message": "插件创建成功,请妥善保存 plugin_key(仅显示一次)",
"plugin": _serialize_plugin(plugin, include_key=True, raw_key=plugin_key),
})
@router.put("/api/admin/plugins/{plugin_id}")
async def update_plugin(
plugin_id: str,
payload: PluginUpdateRequest,
account: str = Depends(require_auth),
db: Session = Depends(get_db),
):
"""更新插件配置(不可修改 plugin_id 与 plugin_key)"""
_require_admin(account)
plugin = db.query(RegisteredPlugin).filter(RegisteredPlugin.plugin_id == plugin_id).first()
if not plugin:
return JSONResponse(status_code=404, content={"success": False, "error": "插件不存在"})
update_fields = payload.dict(exclude_unset=True)
if not update_fields:
return JSONResponse(status_code=400, content={"success": False, "error": "未提供任何可更新字段"})
try:
if "permissions" in update_fields and update_fields["permissions"] is not None:
plugin.permissions = json.dumps(update_fields.pop("permissions"), ensure_ascii=False)
for key, value in update_fields.items():
if value is None:
continue
setattr(plugin, key, value)
plugin.updated_at = datetime.datetime.utcnow()
db.commit()
db.refresh(plugin)
except Exception as e:
db.rollback()
logger.exception(f"更新插件失败: {e}")
return JSONResponse(status_code=500, content={"success": False, "error": "更新插件失败"})
logger.info(f"管理员 {account} 更新插件 plugin_id={plugin_id}, fields={list(update_fields.keys())}")
_notify_registry_reload()
return JSONResponse(content={
"success": True,
"message": "插件更新成功",
"plugin": _serialize_plugin(plugin),
})
@router.delete("/api/admin/plugins/{plugin_id}")
async def delete_plugin(
plugin_id: str,
account: str = Depends(require_auth),
db: Session = Depends(get_db),
):
"""删除插件"""
_require_admin(account)
plugin = db.query(RegisteredPlugin).filter(RegisteredPlugin.plugin_id == plugin_id).first()
if not plugin:
return JSONResponse(status_code=404, content={"success": False, "error": "插件不存在"})
try:
db.delete(plugin)
db.commit()
except Exception as e:
db.rollback()
logger.exception(f"删除插件失败: {e}")
return JSONResponse(status_code=500, content={"success": False, "error": "删除插件失败"})
logger.info(f"管理员 {account} 删除插件 plugin_id={plugin_id}")
_notify_registry_reload()
return JSONResponse(content={"success": True, "message": "插件已删除", "plugin_id": plugin_id})
@router.post("/api/admin/plugins/{plugin_id}/rotate-key")
async def rotate_plugin_key(
plugin_id: str,
account: str = Depends(require_auth),
db: Session = Depends(get_db),
):
"""轮换 plugin_key,返回新 key(仅本次返回明文)"""
_require_admin(account)
plugin = db.query(RegisteredPlugin).filter(RegisteredPlugin.plugin_id == plugin_id).first()
if not plugin:
return JSONResponse(status_code=404, content={"success": False, "error": "插件不存在"})
new_key = generate_plugin_key()
now = datetime.datetime.utcnow()
try:
plugin.plugin_key = new_key
plugin.last_rotated_at = now
plugin.updated_at = now
db.commit()
db.refresh(plugin)
except Exception as e:
db.rollback()
logger.exception(f"轮换 plugin_key 失败: {e}")
return JSONResponse(status_code=500, content={"success": False, "error": "轮换 plugin_key 失败"})
logger.info(f"管理员 {account} 轮换插件 plugin_id={plugin_id} 的 plugin_key")
_notify_registry_reload()
return JSONResponse(content={
"success": True,
"plugin_id": plugin.plugin_id,
"plugin_key": new_key,
"message": "plugin_key 已轮换,请妥善保存(仅显示一次)",
"last_rotated_at": now.isoformat(),
})
@router.post("/api/admin/plugins/{plugin_id}/toggle")
async def toggle_plugin(
plugin_id: str,
account: str = Depends(require_auth),
db: Session = Depends(get_db),
):
"""启用/禁用切换"""
_require_admin(account)
plugin = db.query(RegisteredPlugin).filter(RegisteredPlugin.plugin_id == plugin_id).first()
if not plugin:
return JSONResponse(status_code=404, content={"success": False, "error": "插件不存在"})
try:
plugin.enabled = not bool(plugin.enabled)
plugin.updated_at = datetime.datetime.utcnow()
db.commit()
db.refresh(plugin)
except Exception as e:
db.rollback()
logger.exception(f"切换插件状态失败: {e}")
return JSONResponse(status_code=500, content={"success": False, "error": "切换插件状态失败"})
logger.info(f"管理员 {account} 切换插件 plugin_id={plugin_id} 状态为 enabled={plugin.enabled}")
_notify_registry_reload()
return JSONResponse(content={
"success": True,
"plugin_id": plugin.plugin_id,
"enabled": bool(plugin.enabled),
"message": "插件已启用" if plugin.enabled else "插件已禁用",
})