# router_users.py from fastapi import APIRouter, HTTPException, BackgroundTasks import time import re import random import os import json import urllib.request import urllib.parse import base64 import 数据库连接 as db from notifications import add_notification from models import UserRegister, UserLogin, UserUpdate, PasswordReset, FollowToggle, PrivacySettings, SendCodeRequest router = APIRouter() # ========================================== # 验证码内存缓存与发送引擎 # ========================================== VERIFY_CODES = {} def send_email_code(to_email: str, code: str, action: str): """利用自动化 Webhook 作为 HTTPS 到 SMTP 的代理桥梁""" webhook_url = os.environ.get("MAKE_WEBHOOK_URL") if not webhook_url: print("警告: 未配置 MAKE_WEBHOOK_URL,跳过邮件发送") return action_str = "注册账号" if action == "register" else "修改/找回密码" subject = f"ComfyUI 社区 - {action_str}验证码" html_content = f"""

ComfyUI 社区精选

您好,

您正在请求{action_str},您的验证码是:

{code}

该验证码在 10 分钟内有效。如非本人操作,请忽略此邮件。

""" data = { "to": to_email, "subject": subject, "html": html_content } req = urllib.request.Request( webhook_url, data=json.dumps(data).encode('utf-8'), headers={'Content-Type': 'application/json'} ) try: with urllib.request.urlopen(req, timeout=10) as response: print(f"✅ 成功触发 Webhook 发送验证码 {code} 至 {to_email}") except Exception as e: print(f"❌ Webhook 触发失败: {e}") def send_sms_code(phone: str, code: str, action: str): """支持 阿里云 与 Twilio 的双引擎短信发送路由""" action_str = "注册账号" if action == "register" else "修改/找回密码" # ========================================== # 引擎 A:Twilio (无需装 SDK,走纯 HTTP 接口,适合海外或免审核测试) # ========================================== twilio_sid = os.environ.get("TWILIO_SID") if twilio_sid: token = os.environ.get("TWILIO_TOKEN") from_phone = os.environ.get("TWILIO_FROM") body = f"【ComfyUI社区】您正在请求{action_str},验证码是:{code},10分钟内有效。" url = f"https://api.twilio.com/2010-04-01/Accounts/{twilio_sid}/Messages.json" auth = base64.b64encode(f"{twilio_sid}:{token}".encode('utf-8')).decode('utf-8') data = urllib.parse.urlencode({'To': phone, 'From': from_phone, 'Body': body}).encode('utf-8') req = urllib.request.Request(url, data=data) req.add_header("Authorization", f"Basic {auth}") try: with urllib.request.urlopen(req, timeout=10) as response: print(f"✅ Twilio 短信已成功下发至 {phone}") except Exception as e: print(f"❌ Twilio 发送失败: {e}") return # ========================================== # 引擎 B:阿里云 (国内首选,到达率最高) # ========================================== aliyun_ak = os.environ.get("ALIYUN_AK") if aliyun_ak: try: from alibabacloud_dysmsapi20170525.client import Client as DysmsapiClient from alibabacloud_tea_openapi import models as open_api_models from alibabacloud_dysmsapi20170525 import models as dysmsapi_models except ImportError: print("❌ 缺少阿里云 SDK,请在 requirements.txt 中添加 alibabacloud_dysmsapi20170525") return sk = os.environ.get("ALIYUN_SK") sign_name = os.environ.get("ALIYUN_SIGN_NAME") # 短信签名,例如 "阿里云" tpl_code = os.environ.get("ALIYUN_TPL_REGISTER") if action == "register" else os.environ.get("ALIYUN_TPL_RESET") config = open_api_models.Config(access_key_id=aliyun_ak, access_key_secret=sk) config.endpoint = 'dysmsapi.aliyuncs.com' client = DysmsapiClient(config) send_req = dysmsapi_models.SendSmsRequest( phone_numbers=phone, sign_name=sign_name, template_code=tpl_code, template_param=json.dumps({"code": code}) ) try: response = client.send_sms(send_req) if response.body.code == "OK": print(f"✅ 阿里云短信已成功下发至 {phone}") else: print(f"❌ 阿里云下发失败: {response.body.message}") except Exception as e: print(f"❌ 阿里云请求异常: {e}") return # 如果都没有配置,则降级为控制台打印模拟 print(f"⚠️ 未配置短信秘钥,模拟下发 -> 手机号: {phone}, 验证码: {code}") # ========================================== # 接口路由 # ========================================== @router.post("/api/users/send-code") async def send_verify_code(req: SendCodeRequest, bg_tasks: BackgroundTasks): if req.action_type == "reset": if not req.account: raise HTTPException(status_code=400, detail="找回密码需先填写当前账号") users_db = db.load_data("users.json", default_data={}) user = users_db.get(req.account) if not user: raise HTTPException(status_code=404, detail="该账号不存在") if req.contact_type == "email" and user.get("email") != req.contact: raise HTTPException(status_code=400, detail="填写的邮箱与该账号绑定的邮箱不一致") if req.contact_type == "phone" and user.get("phone") != req.contact: raise HTTPException(status_code=400, detail="填写的手机号与该账号绑定的手机号不一致") code = str(random.randint(100000, 999999)) cache_key = f"{req.contact}_{req.action_type}" VERIFY_CODES[cache_key] = { "code": code, "expires_at": int(time.time()) + 600 } if req.contact_type == "email": bg_tasks.add_task(send_email_code, req.contact, code, req.action_type) elif req.contact_type == "phone": bg_tasks.add_task(send_sms_code, req.contact, code, req.action_type) else: raise HTTPException(status_code=400, detail="不支持的验证方式") return {"status": "success", "message": "验证码发送请求已提交"} @router.post("/api/users/register") async def register_user(user: UserRegister): users_db = db.load_data("users.json", default_data={}) cache_key = f"{user.email}_register" if user.email else f"{user.phone}_register" cached = VERIFY_CODES.get(cache_key) if not cached or cached["code"] != user.code or int(time.time()) > cached["expires_at"]: raise HTTPException(status_code=400, detail="验证码不正确或已过期") if len(user.account) <= 5: raise HTTPException(status_code=400, detail="账号必须大于5个字符") if not re.match(r'^[a-zA-Z0-9_]{6,20}$', user.account): raise HTTPException(status_code=400, detail="账号仅支持大小写英文字母、数字及下划线") if len(user.password) < 6: raise HTTPException(status_code=400, detail="密码必须大于等于6个字符") if not re.match(r'^[a-zA-Z0-9!@#$%^&*()_+\-=\[\]{};\':"\\|,.<>\/?]{6,}$', user.password): raise HTTPException(status_code=400, detail="密码包含不支持的特殊字符") if user.intro and len(user.intro) > 100: raise HTTPException(status_code=400, detail="个人介绍不能超过100个字符") if user.account in users_db: raise HTTPException(status_code=400, detail="该账号已被注册") for existing_user in users_db.values(): if user.email and existing_user.get("email") == user.email: raise HTTPException(status_code=400, detail="该邮箱已被绑定") if user.phone and existing_user.get("phone") == user.phone: raise HTTPException(status_code=400, detail="该手机号已被绑定") VERIFY_CODES.pop(cache_key, None) new_user = user.dict() new_user.pop("code", None) new_user.update({"created_at": int(time.time()), "followers": [], "following": [], "privacy": {"follows": False, "likes": False, "favorites": False, "downloads": False}}) users_db[user.account] = new_user db.save_data("users.json", users_db) return {"status": "success", "message": "注册成功", "data": {k: v for k, v in new_user.items() if k != "password"}} @router.post("/api/users/login") async def login_user(user: UserLogin): users_db = db.load_data("users.json", default_data={}) if user.account not in users_db: raise HTTPException(status_code=404, detail="账号不存在") user_data = users_db[user.account] if user_data.get("password") != user.password: raise HTTPException(status_code=401, detail="密码错误") return {"status": "success", "token": f"mock_token_{user.account}", "account": user.account, "name": user_data["name"], "avatar": user_data.get("avatarDataUrl", "https://via.placeholder.com/150")} # ========================================== # 补充:发送验证码的 API 接口 # ========================================== @router.post("/api/users/send_code") async def send_code_api(req: SendCodeRequest): # 1. 生成 6 位随机数字验证码 code = str(random.randint(100000, 999999)) # 2. 生成缓存 Key (格式: 邮箱地址_动作) # 例如: test@qq.com_register 或 test@qq.com_reset key = f"{req.contact}_{req.action_type}" # 3. 存入全局内存字典 (设置有效期为 10 分钟 = 600 秒) VERIFY_CODES[key] = { "code": code, "expires": time.time() + 600 } # 4. 触发发送逻辑 if req.contact_type == "email": try: # 调用你文件里已经写好的通过 Make.com Webhook 发邮件的函数 send_email_code(req.contact, code, req.action_type) return {"status": "success", "message": "验证码已成功发送至邮箱"} except Exception as e: raise HTTPException(status_code=500, detail=f"邮件发送失败: {str(e)}") elif req.contact_type == "phone": # 预留的短信通道 return {"status": "success", "message": "验证码已成功发送至手机"} else: raise HTTPException(status_code=400, detail="不支持的验证方式") @router.get("/api/users/{account}") async def get_user_profile(account: str): users_db = db.load_data("users.json", default_data={}) if account not in users_db: raise HTTPException(status_code=404, detail="用户不存在") user_data = users_db[account] items_db = db.load_data("items.json", default_data=[]) user_items = [item for item in items_db if item.get("author") == account] user_data["receivedLikes"] = sum(item.get("likes", 0) for item in user_items) user_data["receivedFavorites"] = sum(item.get("favorites", 0) for item in user_items) user_data["receivedUses"] = sum(item.get("uses", 0) for item in user_items) return {"status": "success", "data": {k: v for k, v in user_data.items() if k != "password"}} @router.put("/api/users/{account}") async def update_user_profile(account: str, update_data: UserUpdate): users_db = db.load_data("users.json", default_data={}) if account not in users_db: raise HTTPException(status_code=404, detail="用户不存在") if update_data.intro and len(update_data.intro) > 100: raise HTTPException(status_code=400, detail="个人介绍不能超过100个字符") user = users_db[account] for k, v in update_data.dict(exclude_unset=True).items(): if v is not None: user[k] = v db.save_data("users.json", users_db) return {"status": "success", "data": {k: v for k, v in user.items() if k != "password"}} @router.post("/api/users/{account}/reset-password") async def reset_password(account: str, pwd_data: PasswordReset): users_db = db.load_data("users.json", default_data={}) if account not in users_db: raise HTTPException(status_code=404, detail="用户不存在") user = users_db[account] if pwd_data.verify_type == "email" and user.get("email") != pwd_data.verify_contact: raise HTTPException(status_code=400, detail="填写的邮箱与该账号绑定的邮箱不匹配") if pwd_data.verify_type == "phone" and user.get("phone") != pwd_data.verify_contact: raise HTTPException(status_code=400, detail="填写的手机号与该账号绑定的手机号不匹配") cache_key = f"{pwd_data.verify_contact}_reset" cached = VERIFY_CODES.get(cache_key) if not cached or cached["code"] != pwd_data.code or int(time.time()) > cached["expires_at"]: raise HTTPException(status_code=400, detail="验证码不正确或已过期") if len(pwd_data.new_password) < 6: raise HTTPException(status_code=400, detail="新密码必须大于等于6个字符") if not re.match(r'^[a-zA-Z0-9!@#$%^&*()_+\-=\[\]{};\':"\\|,.<>\/?]{6,}$', pwd_data.new_password): raise HTTPException(status_code=400, detail="新密码包含不支持的特殊字符") VERIFY_CODES.pop(cache_key, None) user["password"] = pwd_data.new_password db.save_data("users.json", users_db) return {"status": "success"} @router.post("/api/users/follow") async def toggle_follow(follow: FollowToggle): users_db = db.load_data("users.json", default_data={}) if follow.target_account not in users_db or follow.user_id not in users_db: raise HTTPException(status_code=404, detail="用户不存在") target_followers = users_db[follow.target_account].setdefault("followers", []) current_following = users_db[follow.user_id].setdefault("following", []) if follow.is_active: if follow.user_id not in target_followers: target_followers.append(follow.user_id) add_notification(follow.target_account, {"type": "follow", "from_user": follow.user_id}) if follow.target_account not in current_following: current_following.append(follow.target_account) else: if follow.user_id in target_followers: target_followers.remove(follow.user_id) if follow.target_account in current_following: current_following.remove(follow.target_account) db.save_data("users.json", users_db) return {"status": "success"} @router.put("/api/users/{account}/privacy") async def update_privacy(account: str, privacy: PrivacySettings): users_db = db.load_data("users.json", default_data={}) if account not in users_db: raise HTTPException(status_code=404, detail="用户不存在") users_db[account]["privacy"] = privacy.dict() db.save_data("users.json", users_db) return {"status": "success"}