ComfyUI-Ranking-API / router_users.py
ZHIWEI666's picture
Upload 2 files
6f7ceef verified
raw
history blame
15.7 kB
# router_users.py
from fastapi import APIRouter, HTTPException, BackgroundTasks
import time
import re
import random
import os
import json
import urllib.request
import urllib.parse
import base64
import 数据库连接 as db
from notifications import add_notification
from models import UserRegister, UserLogin, UserUpdate, PasswordReset, FollowToggle, PrivacySettings, SendCodeRequest
router = APIRouter()
# ==========================================
# 验证码内存缓存与发送引擎
# ==========================================
VERIFY_CODES = {}
def send_email_code(to_email: str, code: str, action: str):
"""利用自动化 Webhook 作为 HTTPS 到 SMTP 的代理桥梁"""
webhook_url = os.environ.get("MAKE_WEBHOOK_URL")
if not webhook_url:
print("警告: 未配置 MAKE_WEBHOOK_URL,跳过邮件发送")
return
action_str = "注册账号" if action == "register" else "修改/找回密码"
subject = f"ComfyUI 社区 - {action_str}验证码"
html_content = f"""
<div style="background:#f9f9f9; padding:20px; font-family:sans-serif;">
<div style="background:#fff; padding:20px; border-radius:8px; max-width:500px; margin:0 auto; box-shadow:0 2px 10px rgba(0,0,0,0.05);">
<h2 style="color:#4CAF50; margin-top:0;">ComfyUI 社区精选</h2>
<p>您好,</p>
<p>您正在请求<strong>{action_str}</strong>,您的验证码是:</p>
<div style="font-size:24px; font-weight:bold; color:#2196F3; background:#e3f2fd; padding:15px; text-align:center; border-radius:6px; letter-spacing: 5px;">{code}</div>
<p style="color:#888; font-size:12px; margin-top:20px;">该验证码在 10 分钟内有效。如非本人操作,请忽略此邮件。</p>
</div>
</div>
"""
data = {
"to": to_email,
"subject": subject,
"html": html_content
}
req = urllib.request.Request(
webhook_url,
data=json.dumps(data).encode('utf-8'),
headers={'Content-Type': 'application/json'}
)
try:
with urllib.request.urlopen(req, timeout=10) as response:
print(f"✅ 成功触发 Webhook 发送验证码 {code}{to_email}")
except Exception as e:
print(f"❌ Webhook 触发失败: {e}")
def send_sms_code(phone: str, code: str, action: str):
"""支持 阿里云 与 Twilio 的双引擎短信发送路由"""
action_str = "注册账号" if action == "register" else "修改/找回密码"
# ==========================================
# 引擎 A:Twilio (无需装 SDK,走纯 HTTP 接口,适合海外或免审核测试)
# ==========================================
twilio_sid = os.environ.get("TWILIO_SID")
if twilio_sid:
token = os.environ.get("TWILIO_TOKEN")
from_phone = os.environ.get("TWILIO_FROM")
body = f"【ComfyUI社区】您正在请求{action_str},验证码是:{code},10分钟内有效。"
url = f"https://api.twilio.com/2010-04-01/Accounts/{twilio_sid}/Messages.json"
auth = base64.b64encode(f"{twilio_sid}:{token}".encode('utf-8')).decode('utf-8')
data = urllib.parse.urlencode({'To': phone, 'From': from_phone, 'Body': body}).encode('utf-8')
req = urllib.request.Request(url, data=data)
req.add_header("Authorization", f"Basic {auth}")
try:
with urllib.request.urlopen(req, timeout=10) as response:
print(f"✅ Twilio 短信已成功下发至 {phone}")
except Exception as e:
print(f"❌ Twilio 发送失败: {e}")
return
# ==========================================
# 引擎 B:阿里云 (国内首选,到达率最高)
# ==========================================
aliyun_ak = os.environ.get("ALIYUN_AK")
if aliyun_ak:
try:
from alibabacloud_dysmsapi20170525.client import Client as DysmsapiClient
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_dysmsapi20170525 import models as dysmsapi_models
except ImportError:
print("❌ 缺少阿里云 SDK,请在 requirements.txt 中添加 alibabacloud_dysmsapi20170525")
return
sk = os.environ.get("ALIYUN_SK")
sign_name = os.environ.get("ALIYUN_SIGN_NAME") # 短信签名,例如 "阿里云"
tpl_code = os.environ.get("ALIYUN_TPL_REGISTER") if action == "register" else os.environ.get("ALIYUN_TPL_RESET")
config = open_api_models.Config(access_key_id=aliyun_ak, access_key_secret=sk)
config.endpoint = 'dysmsapi.aliyuncs.com'
client = DysmsapiClient(config)
send_req = dysmsapi_models.SendSmsRequest(
phone_numbers=phone,
sign_name=sign_name,
template_code=tpl_code,
template_param=json.dumps({"code": code})
)
try:
response = client.send_sms(send_req)
if response.body.code == "OK":
print(f"✅ 阿里云短信已成功下发至 {phone}")
else:
print(f"❌ 阿里云下发失败: {response.body.message}")
except Exception as e:
print(f"❌ 阿里云请求异常: {e}")
return
# 如果都没有配置,则降级为控制台打印模拟
print(f"⚠️ 未配置短信秘钥,模拟下发 -> 手机号: {phone}, 验证码: {code}")
# ==========================================
# 接口路由
# ==========================================
@router.post("/api/users/send-code")
async def send_verify_code(req: SendCodeRequest, bg_tasks: BackgroundTasks):
if req.action_type == "reset":
if not req.account:
raise HTTPException(status_code=400, detail="找回密码需先填写当前账号")
users_db = db.load_data("users.json", default_data={})
user = users_db.get(req.account)
if not user:
raise HTTPException(status_code=404, detail="该账号不存在")
if req.contact_type == "email" and user.get("email") != req.contact:
raise HTTPException(status_code=400, detail="填写的邮箱与该账号绑定的邮箱不一致")
if req.contact_type == "phone" and user.get("phone") != req.contact:
raise HTTPException(status_code=400, detail="填写的手机号与该账号绑定的手机号不一致")
code = str(random.randint(100000, 999999))
cache_key = f"{req.contact}_{req.action_type}"
VERIFY_CODES[cache_key] = {
"code": code,
"expires_at": int(time.time()) + 600
}
if req.contact_type == "email":
bg_tasks.add_task(send_email_code, req.contact, code, req.action_type)
elif req.contact_type == "phone":
bg_tasks.add_task(send_sms_code, req.contact, code, req.action_type)
else:
raise HTTPException(status_code=400, detail="不支持的验证方式")
return {"status": "success", "message": "验证码发送请求已提交"}
@router.post("/api/users/register")
async def register_user(user: UserRegister):
users_db = db.load_data("users.json", default_data={})
cache_key = f"{user.email}_register" if user.email else f"{user.phone}_register"
cached = VERIFY_CODES.get(cache_key)
if not cached or cached["code"] != user.code or int(time.time()) > cached["expires_at"]:
raise HTTPException(status_code=400, detail="验证码不正确或已过期")
if len(user.account) <= 5: raise HTTPException(status_code=400, detail="账号必须大于5个字符")
if not re.match(r'^[a-zA-Z0-9_]{6,20}$', user.account): raise HTTPException(status_code=400, detail="账号仅支持大小写英文字母、数字及下划线")
if len(user.password) < 6: raise HTTPException(status_code=400, detail="密码必须大于等于6个字符")
if not re.match(r'^[a-zA-Z0-9!@#$%^&*()_+\-=\[\]{};\':"\\|,.<>\/?]{6,}$', user.password): raise HTTPException(status_code=400, detail="密码包含不支持的特殊字符")
if user.intro and len(user.intro) > 100: raise HTTPException(status_code=400, detail="个人介绍不能超过100个字符")
if user.account in users_db: raise HTTPException(status_code=400, detail="该账号已被注册")
for existing_user in users_db.values():
if user.email and existing_user.get("email") == user.email: raise HTTPException(status_code=400, detail="该邮箱已被绑定")
if user.phone and existing_user.get("phone") == user.phone: raise HTTPException(status_code=400, detail="该手机号已被绑定")
VERIFY_CODES.pop(cache_key, None)
new_user = user.dict()
new_user.pop("code", None)
new_user.update({"created_at": int(time.time()), "followers": [], "following": [], "privacy": {"follows": False, "likes": False, "favorites": False, "downloads": False}})
users_db[user.account] = new_user
db.save_data("users.json", users_db)
return {"status": "success", "message": "注册成功", "data": {k: v for k, v in new_user.items() if k != "password"}}
@router.post("/api/users/login")
async def login_user(user: UserLogin):
users_db = db.load_data("users.json", default_data={})
if user.account not in users_db: raise HTTPException(status_code=404, detail="账号不存在")
user_data = users_db[user.account]
if user_data.get("password") != user.password: raise HTTPException(status_code=401, detail="密码错误")
return {"status": "success", "token": f"mock_token_{user.account}", "account": user.account, "name": user_data["name"], "avatar": user_data.get("avatarDataUrl", "https://via.placeholder.com/150")}
# ==========================================
# 补充:发送验证码的 API 接口
# ==========================================
@router.post("/api/users/send_code")
async def send_code_api(req: SendCodeRequest):
# 1. 生成 6 位随机数字验证码
code = str(random.randint(100000, 999999))
# 2. 生成缓存 Key (格式: 邮箱地址_动作)
# 例如: test@qq.com_register 或 test@qq.com_reset
key = f"{req.contact}_{req.action_type}"
# 3. 存入全局内存字典 (设置有效期为 10 分钟 = 600 秒)
VERIFY_CODES[key] = {
"code": code,
"expires": time.time() + 600
}
# 4. 触发发送逻辑
if req.contact_type == "email":
try:
# 调用你文件里已经写好的通过 Make.com Webhook 发邮件的函数
send_email_code(req.contact, code, req.action_type)
return {"status": "success", "message": "验证码已成功发送至邮箱"}
except Exception as e:
raise HTTPException(status_code=500, detail=f"邮件发送失败: {str(e)}")
elif req.contact_type == "phone":
# 预留的短信通道
return {"status": "success", "message": "验证码已成功发送至手机"}
else:
raise HTTPException(status_code=400, detail="不支持的验证方式")
@router.get("/api/users/{account}")
async def get_user_profile(account: str):
users_db = db.load_data("users.json", default_data={})
if account not in users_db: raise HTTPException(status_code=404, detail="用户不存在")
user_data = users_db[account]
items_db = db.load_data("items.json", default_data=[])
user_items = [item for item in items_db if item.get("author") == account]
user_data["receivedLikes"] = sum(item.get("likes", 0) for item in user_items)
user_data["receivedFavorites"] = sum(item.get("favorites", 0) for item in user_items)
user_data["receivedUses"] = sum(item.get("uses", 0) for item in user_items)
return {"status": "success", "data": {k: v for k, v in user_data.items() if k != "password"}}
@router.put("/api/users/{account}")
async def update_user_profile(account: str, update_data: UserUpdate):
users_db = db.load_data("users.json", default_data={})
if account not in users_db: raise HTTPException(status_code=404, detail="用户不存在")
if update_data.intro and len(update_data.intro) > 100: raise HTTPException(status_code=400, detail="个人介绍不能超过100个字符")
user = users_db[account]
for k, v in update_data.dict(exclude_unset=True).items():
if v is not None: user[k] = v
db.save_data("users.json", users_db)
return {"status": "success", "data": {k: v for k, v in user.items() if k != "password"}}
@router.post("/api/users/{account}/reset-password")
async def reset_password(account: str, pwd_data: PasswordReset):
users_db = db.load_data("users.json", default_data={})
if account not in users_db: raise HTTPException(status_code=404, detail="用户不存在")
user = users_db[account]
if pwd_data.verify_type == "email" and user.get("email") != pwd_data.verify_contact:
raise HTTPException(status_code=400, detail="填写的邮箱与该账号绑定的邮箱不匹配")
if pwd_data.verify_type == "phone" and user.get("phone") != pwd_data.verify_contact:
raise HTTPException(status_code=400, detail="填写的手机号与该账号绑定的手机号不匹配")
cache_key = f"{pwd_data.verify_contact}_reset"
cached = VERIFY_CODES.get(cache_key)
if not cached or cached["code"] != pwd_data.code or int(time.time()) > cached["expires_at"]:
raise HTTPException(status_code=400, detail="验证码不正确或已过期")
if len(pwd_data.new_password) < 6: raise HTTPException(status_code=400, detail="新密码必须大于等于6个字符")
if not re.match(r'^[a-zA-Z0-9!@#$%^&*()_+\-=\[\]{};\':"\\|,.<>\/?]{6,}$', pwd_data.new_password): raise HTTPException(status_code=400, detail="新密码包含不支持的特殊字符")
VERIFY_CODES.pop(cache_key, None)
user["password"] = pwd_data.new_password
db.save_data("users.json", users_db)
return {"status": "success"}
@router.post("/api/users/follow")
async def toggle_follow(follow: FollowToggle):
users_db = db.load_data("users.json", default_data={})
if follow.target_account not in users_db or follow.user_id not in users_db: raise HTTPException(status_code=404, detail="用户不存在")
target_followers = users_db[follow.target_account].setdefault("followers", [])
current_following = users_db[follow.user_id].setdefault("following", [])
if follow.is_active:
if follow.user_id not in target_followers:
target_followers.append(follow.user_id)
add_notification(follow.target_account, {"type": "follow", "from_user": follow.user_id})
if follow.target_account not in current_following: current_following.append(follow.target_account)
else:
if follow.user_id in target_followers: target_followers.remove(follow.user_id)
if follow.target_account in current_following: current_following.remove(follow.target_account)
db.save_data("users.json", users_db)
return {"status": "success"}
@router.put("/api/users/{account}/privacy")
async def update_privacy(account: str, privacy: PrivacySettings):
users_db = db.load_data("users.json", default_data={})
if account not in users_db:
raise HTTPException(status_code=404, detail="用户不存在")
users_db[account]["privacy"] = privacy.dict()
db.save_data("users.json", users_db)
return {"status": "success"}