ZHIWEI666's picture
Upload app.py
599e756 verified
# ⚙️ 后端逻辑/核心服务端.py (Hugging Face Spaces app.py)
from fastapi import FastAPI, File, UploadFile, Form, Depends, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import Response, JSONResponse, FileResponse
from sqlalchemy.orm import Session
from pydantic import BaseModel
from huggingface_hub import hf_hub_download, HfApi
import hashlib
import uuid
import urllib.parse
import urllib.request
import urllib.error
import os
import json
import mimetypes
import logging
import time
import 数据库连接 as db
import asyncio
from 安全认证 import require_auth
# ==========================================
# 📝 P2优化:日志配置
# ==========================================
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s | %(levelname)s | %(name)s | %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger("ComfyUI-Ranking")
from 云端_定时版本检测引擎 import daily_version_check_task, reset_daily_views_task
# ==========================================
# 👥 用户模块 (拆分为3个子模块)
# ==========================================
# router_users_auth.py - 🔐 登录/注册/密码重置/验证码
# router_users_profile.py - 👤 获取/更新用户资料
# router_users_social.py - 🤝 关注/隐私设置
from router_users_auth import router as users_auth_router
from router_users_profile import router as users_profile_router
from router_users_social import router as users_social_router
# ==========================================
# 其他业务模块
# ==========================================
from router_items import router as items_router # 📦 内容管理(工具/应用/推荐)
from router_comments import router as comments_router # 💬 评论系统
from router_messages import router as messages_router # ✉️ 私信系统
from router_wallet import router as wallet_router # 💰 钱包/提现
from router_proxy import router as proxy_router # 🔗 代理下载
from router_posts import router as posts_router # 💬 讨论区
from router_tasks import router as tasks_router # 📝 任务榜
from database_sql import init_sql_db, get_db
from models_sql import Ownership
# 🚀 P2优化:速率限制 (防止暴力攻击)
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from fastapi.exceptions import RequestValidationError
from starlette.exceptions import HTTPException as StarletteHTTPException
import traceback
limiter = Limiter(key_func=get_remote_address)
app = FastAPI(
title="ComfyUI Ranking API",
description="ComfyUI 社区排名系统 API",
version="1.0.0",
docs_url="/api/docs",
redoc_url="/api/redoc"
)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
# ==========================================
# 🛡️ 稳定性优化:全局异常处理器
# ==========================================
# 作用:捕获所有未处理异常,防止单个请求崩溃整个服务
# 🚀 P3优化:统一错误响应格式
def create_error_response(status_code: int, detail: str, error_type: str = None, error_id: str = None, extra: dict = None):
"""🚀 P3优化:统一错误响应构造器"""
response = {
"success": False,
"status": "error",
"code": status_code,
"detail": detail,
}
if error_type:
response["type"] = error_type
if error_id:
response["error_id"] = error_id
if extra:
response.update(extra)
return response
@app.exception_handler(StarletteHTTPException)
async def http_exception_handler(request, exc):
"""🚀 P3优化:HTTP 异常处理,返回统一格式"""
logger.warning(f"HTTP {exc.status_code} | {request.url.path} | {exc.detail}")
# 根据状态码确定错误类型
error_type_map = {
400: "bad_request",
401: "unauthorized",
403: "forbidden",
404: "not_found",
409: "conflict",
429: "rate_limited",
}
error_type = error_type_map.get(exc.status_code, "http_error")
return JSONResponse(
status_code=exc.status_code,
content=create_error_response(exc.status_code, str(exc.detail), error_type)
)
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request, exc):
"""🚀 P3优化:请求参数校验失败处理"""
logger.warning(f"Validation Error | {request.url.path} | {exc.errors()}")
# 提取简洁的错误信息
errors = []
for err in exc.errors():
field = ".".join(str(loc) for loc in err["loc"][1:]) # 跳过 'body'
errors.append({"field": field, "message": err["msg"], "type": err["type"]})
return JSONResponse(
status_code=422,
content=create_error_response(
422,
"请求参数格式错误",
"validation_error",
extra={"errors": errors}
)
)
@app.exception_handler(Exception)
async def global_exception_handler(request, exc):
"""🚀 P3优化:全局异常处理,捕获所有未预期异常"""
error_id = f"ERR_{int(time.time())}_{id(exc) % 10000}"
logger.error(f"Unhandled Exception [{error_id}] | {request.url.path} | {type(exc).__name__}: {exc}")
logger.error(traceback.format_exc())
return JSONResponse(
status_code=500,
content=create_error_response(
500,
"服务器内部错误,请稍后重试",
"internal_error",
error_id=error_id
)
)
# ==========================================
# ❤️ 健康检查接口 (增强版)
# ==========================================
@app.get("/")
def health_check():
return {"status": "ok", "message": "ComfyUI Ranking API is running perfectly!"}
@app.get("/health")
def detailed_health_check():
"""详细健康检查,含依赖状态"""
health_status = {
"status": "ok",
"components": {}
}
# 检查 SQL 数据库
try:
from database_sql import check_db_connection
if check_db_connection():
health_status["components"]["sql_database"] = "ok"
else:
health_status["components"]["sql_database"] = "error: connection failed"
health_status["status"] = "degraded"
except Exception as e:
health_status["components"]["sql_database"] = f"error: {str(e)}"
health_status["status"] = "degraded"
# 检查 JSON 文件访问
try:
import 数据库连接 as json_db
json_db.load_data("users.json", default_data={})
health_status["components"]["json_storage"] = "ok"
except Exception as e:
health_status["components"]["json_storage"] = f"error: {str(e)}"
health_status["status"] = "degraded"
return health_status
# ==========================================
# 🚀 应用启动事件
# ==========================================
# 作用:初始化数据库 + 启动定时版本检测后台任务 + 预热检查
@app.on_event("startup")
async def on_startup():
logger.info("🚀 ComfyUI-Ranking API 启动中...")
# ========== 预热检查:SQL 数据库 ==========
try:
init_sql_db()
logger.info("✅ SQL 数据库初始化完成")
except Exception as e:
logger.error(f"❌ SQL 数据库初始化失败: {e}")
# 不抛出异常,允许降级运行
# ========== 预热检查:JSON 数据库 ==========
try:
db.load_data("users.json", default_data={})
db.load_data("items.json", default_data=[])
logger.info("✅ JSON 数据库访问正常")
except Exception as e:
logger.warning(f"⚠️ JSON 数据库访问异常: {e}")
# ========== 启动后台任务 ==========
asyncio.create_task(daily_version_check_task())
logger.info("✅ 定时版本检测任务已挂载")
asyncio.create_task(reset_daily_views_task())
logger.info("✅ daily_views 每日重置任务已挂载")
# ========== 启动 HF 批量同步定时器 ==========
db.start_batch_sync()
logger.info("✅ HF 批量同步定时器已启动")
logger.info("🎉 ComfyUI-Ranking API 启动完成!")
@app.on_event("shutdown")
async def on_shutdown():
"""优雅关闭,清理资源"""
logger.info("🛑 ComfyUI-Ranking API 正在关闭...")
# ========== 关闭前同步所有脏文件 ==========
db.flush_sync()
logger.info("✅ HF 批量同步已完成")
# 这里可以添加其他清理逻辑(如关闭连接池等)
logger.info("✅ 关闭完成")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=False,
allow_methods=["*"],
allow_headers=["*"],
)
# ==========================================
# 路由挂载
# ==========================================
# 用户模块 (3个子模块)
app.include_router(users_auth_router) # 🔐 登录/注册/密码重置
app.include_router(users_profile_router) # 👤 用户资料
app.include_router(users_social_router) # 🤝 关注/隐私
# 其他业务模块
app.include_router(items_router) # 📦 内容管理
app.include_router(comments_router) # 💬 评论系统
app.include_router(messages_router) # ✉️ 私信系统
app.include_router(wallet_router) # 💰 钱包/提现
app.include_router(proxy_router) # 🔗 代理下载
app.include_router(posts_router) # 💬 讨论区
app.include_router(tasks_router) # 📝 任务榜
# ==========================================
# 🟢 私有图床代理中心 (Image Proxy)
# 解决 Private 仓库下,本地客户端报 401 Unauthorized 的终极方案
# ==========================================
@app.get("/api/image_proxy")
def proxy_hf_image(url: str = None, path: str = None):
"""云端图片代理:使用云端的 HF_TOKEN 提取私有图床图片并返回给没有任何权限的本地端"""
# 兼容处理:如果本地发来的是已经被污染的老版本 HF 直链,我们自动将其转换为相对路径
if url and url.startswith("https://huggingface.co/datasets/"):
try:
path = url.split("resolve/main/")[-1]
path = urllib.parse.unquote(path)
except Exception:
raise HTTPException(status_code=400, detail="无效的 HF 原链接格式")
if not path:
raise HTTPException(status_code=400, detail="缺少路径参数")
# 🛡️ 绝对的安全红线:限制只能代理下载图片目录,严禁黑客通过此接口下载 users.json 或账本数据!
allowed_dirs = ["uploads/", "avatars/", "covers/"]
if not any(path.startswith(d) for d in allowed_dirs):
raise HTTPException(status_code=403, detail="非法访问:该接口仅允许代理图片资源")
hf_token = os.environ.get("HF_TOKEN")
dataset_repo_id = db.DATASET_REPO_ID
try:
# hf_hub_download 会自动利用云端容器的缓存,只有第一次会去真实请求 Dataset
cached_file_path = hf_hub_download(
repo_id=dataset_repo_id,
repo_type="dataset",
filename=path,
token=hf_token
)
# 智能识别文件类型 (image/jpeg, image/png 等)
content_type, _ = mimetypes.guess_type(cached_file_path)
return FileResponse(cached_file_path, media_type=content_type or "application/octet-stream")
except Exception as e:
return JSONResponse(content={"error": f"代理获取图片失败: {str(e)}"}, status_code=404)
# ==========================================
# 上传接口 (将返回的 URL 替换为 Proxy 代理链接)
# 🔒 P0安全优化:集成图片内容审核
# ==========================================
from image_moderation import moderate_image_sync, ModerationResult
@app.post("/api/upload")
def upload_file(file: UploadFile = File(...), file_type: str = Form(...), current_user: str = Depends(require_auth)):
content = file.file.read()
# 🟢 动态文件大小风控
max_size = 10 * 1024 * 1024
if file_type == "avatar":
max_size = 2 * 1024 * 1024
elif file_type == "cover":
max_size = 5 * 1024 * 1024
elif file_type == "post_video":
max_size = 100 * 1024 * 1024
if len(content) > max_size:
raise HTTPException(status_code=400, detail=f"文件过大,{file_type} 类型请限制在 {max_size // (1024*1024)}MB 以内")
ext = file.filename.split(".")[-1].lower()
allowed_exts = ["jpg", "jpeg", "png", "gif", "webp", "json", "mp4"]
if file_type == "post_video":
allowed_exts = ["mp4", "webm", "mov"]
if ext not in allowed_exts:
raise HTTPException(status_code=400, detail=f"不支持的文件格式,{file_type} 类型仅支持 {', '.join(allowed_exts)}")
# 🔒 P0安全优化:图片内容审核
if ext in ["jpg", "jpeg", "png", "gif", "webp"]:
moderation_result = moderate_image_sync(content, ext)
if not moderation_result.passed:
raise HTTPException(
status_code=400,
detail=f"图片内容审核未通过:{moderation_result.label or '内容不合规'}"
)
if file_type == "post_video":
safe_filename = f"post_video_{uuid.uuid4().hex[:8]}.{ext}"
else:
file_hash = hashlib.md5(content).hexdigest()[:10]
safe_filename = f"{file_type}_{file_hash}.{ext}"
local_tmp_path = f"/tmp/{safe_filename}"
with open(local_tmp_path, "wb") as f:
f.write(content)
hf_token = os.environ.get("HF_TOKEN")
dataset_repo_id = "ZHIWEI666/ComfyUI-Ranking"
try:
api = HfApi()
api.upload_file(
path_or_fileobj=local_tmp_path,
path_in_repo=f"uploads/{file_type}/{safe_filename}",
repo_id=dataset_repo_id,
repo_type="dataset",
token=hf_token,
commit_message=f"Upload media: {safe_filename}"
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"图床同步失败: {str(e)}")
finally:
if os.path.exists(local_tmp_path):
os.remove(local_tmp_path)
# 🚀 核心修复:不再返回暴露隐私且报 401 的 HF 直链,而是返回我们刚写好的 Proxy 代理链接
if file_type == "post_video":
# 视频文件返回相对路径,由前端 proxyImages 统一处理
# 前端会将其转换为 /community_hub/video 本地视频代理(支持 Range 请求和流式传输)
permanent_url = f"uploads/{file_type}/{safe_filename}"
else:
permanent_url = f"https://zhiwei666-comfyui-ranking-api.hf.space/api/image_proxy?path=uploads/{file_type}/{safe_filename}"
return {"status": "success", "url": permanent_url}
class ValidateResourceRequest(BaseModel):
url: str
item_id: str
account: str
@app.post("/api/validate_resource")
def validate_resource(req_data: ValidateResourceRequest, sql_db: Session = Depends(get_db)):
target_url = req_data.url
# 允许的链接前缀:HuggingFace数据集、GitHub仓库、内部代理URL
ALLOWED_URL_PREFIXES = (
"https://huggingface.co/datasets/",
"https://github.com/",
"https://zhiwei666-comfyui-ranking-api.hf.space/api/image_proxy",
)
if not any(target_url.startswith(prefix) for prefix in ALLOWED_URL_PREFIXES):
return JSONResponse(content={"error": "无效的下载链接"}, status_code=400)
items_db = db.load_data("items.json", default_data=[])
item = next((i for i in items_db if i["id"] == req_data.item_id), None)
if not item: return JSONResponse(content={"error": "资源不存在或已被删除"}, status_code=404)
# 内部代理URL直接返回成功(文件已存在于服务器本地)
if target_url.startswith("https://zhiwei666-comfyui-ranking-api.hf.space/api/image_proxy"):
return {"status": "success", "message": "资源有效"}
hf_token = os.environ.get("HF_TOKEN")
if not hf_token: return JSONResponse(content={"error": "云端环境变量未配置 HF_TOKEN"}, status_code=401)
try:
if target_url.startswith("https://github.com/"):
creator_token = item.get("github_token")
fallback_token = os.environ.get("GITHUB_PAT")
active_token = creator_token if creator_token else fallback_token
headers = {"User-Agent": "ComfyUI-Ranking-SaaS"}
if active_token:
headers["Authorization"] = f"Bearer {active_token}"
repo_parts = target_url.rstrip("/").split("/")
if len(repo_parts) < 2: return JSONResponse(content={"error": "无效的仓库地址格式"}, status_code=400)
owner, repo = repo_parts[-2], repo_parts[-1]
api_url = f"https://api.github.com/repos/{owner}/{repo}"
req = urllib.request.Request(api_url, headers=headers)
with urllib.request.urlopen(req) as response:
if response.status != 200:
return JSONResponse(content={"error": "资源仓库不可访问,可能已被作者删除或设为私有"}, status_code=404)
return {"status": "success", "message": "资源有效"}
elif target_url.startswith("https://huggingface.co/datasets/"):
repo_path_encoded = target_url.split("resolve/main/")[-1]
repo_path = urllib.parse.unquote(repo_path_encoded)
cached_file_path = hf_hub_download(
repo_id=db.DATASET_REPO_ID,
repo_type="dataset",
filename=repo_path,
token=hf_token
)
if not os.path.exists(cached_file_path):
return JSONResponse(content={"error": "云端文件不存在,可能已被作者删除"}, status_code=404)
return {"status": "success", "message": "资源有效"}
except urllib.error.HTTPError as e:
return JSONResponse(content={"error": f"资源探测失败,源站返回: {e.code}。请联系作者处理。"}, status_code=400)
except Exception as e:
return JSONResponse(content={"error": f"资源探测异常: {str(e)}"}, status_code=500)
return {"status": "success"}
# ==========================================
# 🔒 管理员:系统配置 API
# ==========================================
from 安全认证 import require_auth
# 🔒 管理员账号列表(从环境变量读取)
ADMIN_ACCOUNTS = set(
acc.strip()
for acc in os.environ.get("ADMIN_ACCOUNTS", "").split(",")
if acc.strip()
)
def _is_admin(account: str) -> bool:
"""检查账号是否为管理员"""
return account in ADMIN_ACCOUNTS
# 配置存储文件
SYSTEM_CONFIG_FILE = "/tmp/system_config.json"
def _load_system_config() -> dict:
""" 加载系统配置 """
try:
if os.path.exists(SYSTEM_CONFIG_FILE):
with open(SYSTEM_CONFIG_FILE, "r") as f:
return json.load(f)
except Exception as e:
logger.debug(f"加载系统配置失败: {e}")
return {}
def _save_system_config(config: dict):
""" 保存系统配置 """
try:
with open(SYSTEM_CONFIG_FILE, "w") as f:
json.dump(config, f)
except Exception as e:
logger.warning(f"保存系统配置失败: {e}")
@app.get("/api/admin/config/{config_key}")
async def get_system_config(config_key: str, current_user: str = Depends(require_auth)):
""" 🔒 获取系统配置(仅管理员) """
# 🔒 权限验证
if not _is_admin(current_user):
raise HTTPException(status_code=403, detail="无权访问系统配置,仅管理员可操作")
config = _load_system_config()
if config_key == "image_moderation":
# 图像审核配置:包含开关状态和额度信息
try:
from image_moderation import _load_quota, ALIYUN_FREE_QUOTA, TENCENT_FREE_QUOTA
quota = _load_quota()
return {
"status": "success",
"data": {
"enabled": config.get("image_moderation_enabled", False),
"quota": {
"aliyun": quota.get("aliyun", 0),
"tencent": quota.get("tencent", 0),
"aliyun_limit": ALIYUN_FREE_QUOTA,
"tencent_limit": TENCENT_FREE_QUOTA
}
}
}
except Exception as e:
logger.error(f"获取审核配置失败: {e}")
return {
"status": "success",
"data": {
"enabled": config.get("image_moderation_enabled", False),
"quota": {}
}
}
if config_key == "project_version":
# 🏷️ 项目版本配置
version_config = config.get("project_version", DEFAULT_VERSION_CONFIG)
return {
"status": "success",
"data": version_config
}
return {"status": "success", "data": config.get(config_key)}
# 🏷️ 版本配置默认值
DEFAULT_VERSION_CONFIG = {
"stage": "alpha",
"major": 1,
"minor": 0,
"patch": 0
}
@app.get("/api/public/project-version")
async def get_public_project_version():
"""获取项目版本信息(公开接口,无需认证)"""
config = _load_system_config()
version_data = config.get("project_version", DEFAULT_VERSION_CONFIG)
return {"status": "success", "data": version_data}
@app.put("/api/admin/config/{config_key}")
async def set_system_config(config_key: str, value: dict, current_user: str = Depends(require_auth)):
""" 🔒 设置系统配置(仅管理员) """
# 🔒 权限验证
if not _is_admin(current_user):
raise HTTPException(status_code=403, detail="无权修改系统配置,仅管理员可操作")
config = _load_system_config()
if config_key == "image_moderation":
# 更新图像审核开关
enabled = value.get("enabled", False)
config["image_moderation_enabled"] = enabled
_save_system_config(config)
logger.info(f"🔒 管理员设置图像审核: {'enabled' if enabled else 'disabled'}")
return {"status": "success", "message": f"图像审核已{'enabled' if enabled else 'disabled'}"}
if config_key == "project_version":
# 🏷️ 更新项目版本配置
version_config = {
"stage": value.get("stage", "alpha"),
"major": int(value.get("major", 1)),
"minor": int(value.get("minor", 0)),
"patch": int(value.get("patch", 0))
}
config["project_version"] = version_config
_save_system_config(config)
version_str = f"V{version_config['major']}.{version_config['minor']}.{version_config['patch']}"
stage_labels = {"alpha": "内测", "beta": "公测", "rc": "候选版", "stable": "正式版"}
stage_label = stage_labels.get(version_config["stage"], version_config["stage"])
logger.info(f"🏷️ 管理员更新项目版本: {version_str} {stage_label}")
return {"status": "success", "message": f"版本已更新为 {version_str} {stage_label}"}
# 其他配置项
config[config_key] = value
_save_system_config(config)
return {"status": "success", "message": "配置已更新"}