Spaces:
Running
Running
| # router_users.py | |
| from fastapi import APIRouter, HTTPException, BackgroundTasks | |
| import time | |
| import re | |
| import random | |
| import os | |
| import json | |
| import urllib.request | |
| import urllib.parse | |
| import base64 | |
| import 数据库连接 as db | |
| from notifications import add_notification | |
| from models import UserRegister, UserLogin, UserUpdate, PasswordReset, FollowToggle, PrivacySettings, SendCodeRequest | |
| router = APIRouter() | |
| # ========================================== | |
| # 验证码内存缓存与发送引擎 | |
| # ========================================== | |
| VERIFY_CODES = {} | |
| def send_email_code(to_email: str, code: str, action: str): | |
| """利用自动化 Webhook 作为 HTTPS 到 SMTP 的代理桥梁""" | |
| webhook_url = os.environ.get("MAKE_WEBHOOK_URL") | |
| if not webhook_url: | |
| print("警告: 未配置 MAKE_WEBHOOK_URL,跳过邮件发送") | |
| return | |
| action_str = "注册账号" if action == "register" else "修改/找回密码" | |
| subject = f"ComfyUI 社区 - {action_str}验证码" | |
| html_content = f""" | |
| <div style="background:#f9f9f9; padding:20px; font-family:sans-serif;"> | |
| <div style="background:#fff; padding:20px; border-radius:8px; max-width:500px; margin:0 auto; box-shadow:0 2px 10px rgba(0,0,0,0.05);"> | |
| <h2 style="color:#4CAF50; margin-top:0;">ComfyUI 社区精选</h2> | |
| <p>您好,</p> | |
| <p>您正在请求<strong>{action_str}</strong>,您的验证码是:</p> | |
| <div style="font-size:24px; font-weight:bold; color:#2196F3; background:#e3f2fd; padding:15px; text-align:center; border-radius:6px; letter-spacing: 5px;">{code}</div> | |
| <p style="color:#888; font-size:12px; margin-top:20px;">该验证码在 10 分钟内有效。如非本人操作,请忽略此邮件。</p> | |
| </div> | |
| </div> | |
| """ | |
| data = { | |
| "to": to_email, | |
| "subject": subject, | |
| "html": html_content | |
| } | |
| req = urllib.request.Request( | |
| webhook_url, | |
| data=json.dumps(data).encode('utf-8'), | |
| headers={'Content-Type': 'application/json'} | |
| ) | |
| try: | |
| with urllib.request.urlopen(req, timeout=10) as response: | |
| print(f"✅ 成功触发 Webhook 发送验证码 {code} 至 {to_email}") | |
| except Exception as e: | |
| print(f"❌ Webhook 触发失败: {e}") | |
| def send_sms_code(phone: str, code: str, action: str): | |
| """支持 阿里云 与 Twilio 的双引擎短信发送路由""" | |
| action_str = "注册账号" if action == "register" else "修改/找回密码" | |
| # ========================================== | |
| # 引擎 A:Twilio (无需装 SDK,走纯 HTTP 接口,适合海外或免审核测试) | |
| # ========================================== | |
| twilio_sid = os.environ.get("TWILIO_SID") | |
| if twilio_sid: | |
| token = os.environ.get("TWILIO_TOKEN") | |
| from_phone = os.environ.get("TWILIO_FROM") | |
| body = f"【ComfyUI社区】您正在请求{action_str},验证码是:{code},10分钟内有效。" | |
| url = f"https://api.twilio.com/2010-04-01/Accounts/{twilio_sid}/Messages.json" | |
| auth = base64.b64encode(f"{twilio_sid}:{token}".encode('utf-8')).decode('utf-8') | |
| data = urllib.parse.urlencode({'To': phone, 'From': from_phone, 'Body': body}).encode('utf-8') | |
| req = urllib.request.Request(url, data=data) | |
| req.add_header("Authorization", f"Basic {auth}") | |
| try: | |
| with urllib.request.urlopen(req, timeout=10) as response: | |
| print(f"✅ Twilio 短信已成功下发至 {phone}") | |
| except Exception as e: | |
| print(f"❌ Twilio 发送失败: {e}") | |
| return | |
| # ========================================== | |
| # 引擎 B:阿里云 (国内首选,到达率最高) | |
| # ========================================== | |
| aliyun_ak = os.environ.get("ALIYUN_AK") | |
| if aliyun_ak: | |
| try: | |
| from alibabacloud_dysmsapi20170525.client import Client as DysmsapiClient | |
| from alibabacloud_tea_openapi import models as open_api_models | |
| from alibabacloud_dysmsapi20170525 import models as dysmsapi_models | |
| except ImportError: | |
| print("❌ 缺少阿里云 SDK,请在 requirements.txt 中添加 alibabacloud_dysmsapi20170525") | |
| return | |
| sk = os.environ.get("ALIYUN_SK") | |
| sign_name = os.environ.get("ALIYUN_SIGN_NAME") # 短信签名,例如 "阿里云" | |
| tpl_code = os.environ.get("ALIYUN_TPL_REGISTER") if action == "register" else os.environ.get("ALIYUN_TPL_RESET") | |
| config = open_api_models.Config(access_key_id=aliyun_ak, access_key_secret=sk) | |
| config.endpoint = 'dysmsapi.aliyuncs.com' | |
| client = DysmsapiClient(config) | |
| send_req = dysmsapi_models.SendSmsRequest( | |
| phone_numbers=phone, | |
| sign_name=sign_name, | |
| template_code=tpl_code, | |
| template_param=json.dumps({"code": code}) | |
| ) | |
| try: | |
| response = client.send_sms(send_req) | |
| if response.body.code == "OK": | |
| print(f"✅ 阿里云短信已成功下发至 {phone}") | |
| else: | |
| print(f"❌ 阿里云下发失败: {response.body.message}") | |
| except Exception as e: | |
| print(f"❌ 阿里云请求异常: {e}") | |
| return | |
| # 如果都没有配置,则降级为控制台打印模拟 | |
| print(f"⚠️ 未配置短信秘钥,模拟下发 -> 手机号: {phone}, 验证码: {code}") | |
| # ========================================== | |
| # 接口路由 | |
| # ========================================== | |
| async def send_verify_code(req: SendCodeRequest, bg_tasks: BackgroundTasks): | |
| if req.action_type == "reset": | |
| if not req.account: | |
| raise HTTPException(status_code=400, detail="找回密码需先填写当前账号") | |
| users_db = db.load_data("users.json", default_data={}) | |
| user = users_db.get(req.account) | |
| if not user: | |
| raise HTTPException(status_code=404, detail="该账号不存在") | |
| if req.contact_type == "email" and user.get("email") != req.contact: | |
| raise HTTPException(status_code=400, detail="填写的邮箱与该账号绑定的邮箱不一致") | |
| if req.contact_type == "phone" and user.get("phone") != req.contact: | |
| raise HTTPException(status_code=400, detail="填写的手机号与该账号绑定的手机号不一致") | |
| code = str(random.randint(100000, 999999)) | |
| cache_key = f"{req.contact}_{req.action_type}" | |
| VERIFY_CODES[cache_key] = { | |
| "code": code, | |
| "expires_at": int(time.time()) + 600 | |
| } | |
| if req.contact_type == "email": | |
| bg_tasks.add_task(send_email_code, req.contact, code, req.action_type) | |
| elif req.contact_type == "phone": | |
| bg_tasks.add_task(send_sms_code, req.contact, code, req.action_type) | |
| else: | |
| raise HTTPException(status_code=400, detail="不支持的验证方式") | |
| return {"status": "success", "message": "验证码发送请求已提交"} | |
| async def register_user(user: UserRegister): | |
| users_db = db.load_data("users.json", default_data={}) | |
| cache_key = f"{user.email}_register" if user.email else f"{user.phone}_register" | |
| cached = VERIFY_CODES.get(cache_key) | |
| if not cached or cached["code"] != user.code or int(time.time()) > cached["expires_at"]: | |
| raise HTTPException(status_code=400, detail="验证码不正确或已过期") | |
| if len(user.account) <= 5: raise HTTPException(status_code=400, detail="账号必须大于5个字符") | |
| if not re.match(r'^[a-zA-Z0-9_]{6,20}$', user.account): raise HTTPException(status_code=400, detail="账号仅支持大小写英文字母、数字及下划线") | |
| if len(user.password) < 6: raise HTTPException(status_code=400, detail="密码必须大于等于6个字符") | |
| if not re.match(r'^[a-zA-Z0-9!@#$%^&*()_+\-=\[\]{};\':"\\|,.<>\/?]{6,}$', user.password): raise HTTPException(status_code=400, detail="密码包含不支持的特殊字符") | |
| if user.intro and len(user.intro) > 100: raise HTTPException(status_code=400, detail="个人介绍不能超过100个字符") | |
| if user.account in users_db: raise HTTPException(status_code=400, detail="该账号已被注册") | |
| for existing_user in users_db.values(): | |
| if user.email and existing_user.get("email") == user.email: raise HTTPException(status_code=400, detail="该邮箱已被绑定") | |
| if user.phone and existing_user.get("phone") == user.phone: raise HTTPException(status_code=400, detail="该手机号已被绑定") | |
| VERIFY_CODES.pop(cache_key, None) | |
| new_user = user.dict() | |
| new_user.pop("code", None) | |
| new_user.update({"created_at": int(time.time()), "followers": [], "following": [], "privacy": {"follows": False, "likes": False, "favorites": False, "downloads": False}}) | |
| users_db[user.account] = new_user | |
| db.save_data("users.json", users_db) | |
| return {"status": "success", "message": "注册成功", "data": {k: v for k, v in new_user.items() if k != "password"}} | |
| async def login_user(user: UserLogin): | |
| users_db = db.load_data("users.json", default_data={}) | |
| if user.account not in users_db: raise HTTPException(status_code=404, detail="账号不存在") | |
| user_data = users_db[user.account] | |
| if user_data.get("password") != user.password: raise HTTPException(status_code=401, detail="密码错误") | |
| return {"status": "success", "token": f"mock_token_{user.account}", "account": user.account, "name": user_data["name"], "avatar": user_data.get("avatarDataUrl", "https://via.placeholder.com/150")} | |
| # ========================================== | |
| # 补充:发送验证码的 API 接口 | |
| # ========================================== | |
| async def send_code_api(req: SendCodeRequest): | |
| # 1. 生成 6 位随机数字验证码 | |
| code = str(random.randint(100000, 999999)) | |
| # 2. 生成缓存 Key (格式: 邮箱地址_动作) | |
| # 例如: test@qq.com_register 或 test@qq.com_reset | |
| key = f"{req.contact}_{req.action_type}" | |
| # 3. 存入全局内存字典 (设置有效期为 10 分钟 = 600 秒) | |
| VERIFY_CODES[key] = { | |
| "code": code, | |
| "expires": time.time() + 600 | |
| } | |
| # 4. 触发发送逻辑 | |
| if req.contact_type == "email": | |
| try: | |
| # 调用你文件里已经写好的通过 Make.com Webhook 发邮件的函数 | |
| send_email_code(req.contact, code, req.action_type) | |
| return {"status": "success", "message": "验证码已成功发送至邮箱"} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"邮件发送失败: {str(e)}") | |
| elif req.contact_type == "phone": | |
| # 预留的短信通道 | |
| return {"status": "success", "message": "验证码已成功发送至手机"} | |
| else: | |
| raise HTTPException(status_code=400, detail="不支持的验证方式") | |
| async def get_user_profile(account: str): | |
| users_db = db.load_data("users.json", default_data={}) | |
| if account not in users_db: raise HTTPException(status_code=404, detail="用户不存在") | |
| user_data = users_db[account] | |
| items_db = db.load_data("items.json", default_data=[]) | |
| user_items = [item for item in items_db if item.get("author") == account] | |
| user_data["receivedLikes"] = sum(item.get("likes", 0) for item in user_items) | |
| user_data["receivedFavorites"] = sum(item.get("favorites", 0) for item in user_items) | |
| user_data["receivedUses"] = sum(item.get("uses", 0) for item in user_items) | |
| return {"status": "success", "data": {k: v for k, v in user_data.items() if k != "password"}} | |
| async def update_user_profile(account: str, update_data: UserUpdate): | |
| users_db = db.load_data("users.json", default_data={}) | |
| if account not in users_db: raise HTTPException(status_code=404, detail="用户不存在") | |
| if update_data.intro and len(update_data.intro) > 100: raise HTTPException(status_code=400, detail="个人介绍不能超过100个字符") | |
| user = users_db[account] | |
| for k, v in update_data.dict(exclude_unset=True).items(): | |
| if v is not None: user[k] = v | |
| db.save_data("users.json", users_db) | |
| return {"status": "success", "data": {k: v for k, v in user.items() if k != "password"}} | |
| async def reset_password(account: str, pwd_data: PasswordReset): | |
| users_db = db.load_data("users.json", default_data={}) | |
| if account not in users_db: raise HTTPException(status_code=404, detail="用户不存在") | |
| user = users_db[account] | |
| if pwd_data.verify_type == "email" and user.get("email") != pwd_data.verify_contact: | |
| raise HTTPException(status_code=400, detail="填写的邮箱与该账号绑定的邮箱不匹配") | |
| if pwd_data.verify_type == "phone" and user.get("phone") != pwd_data.verify_contact: | |
| raise HTTPException(status_code=400, detail="填写的手机号与该账号绑定的手机号不匹配") | |
| cache_key = f"{pwd_data.verify_contact}_reset" | |
| cached = VERIFY_CODES.get(cache_key) | |
| if not cached or cached["code"] != pwd_data.code or int(time.time()) > cached["expires_at"]: | |
| raise HTTPException(status_code=400, detail="验证码不正确或已过期") | |
| if len(pwd_data.new_password) < 6: raise HTTPException(status_code=400, detail="新密码必须大于等于6个字符") | |
| if not re.match(r'^[a-zA-Z0-9!@#$%^&*()_+\-=\[\]{};\':"\\|,.<>\/?]{6,}$', pwd_data.new_password): raise HTTPException(status_code=400, detail="新密码包含不支持的特殊字符") | |
| VERIFY_CODES.pop(cache_key, None) | |
| user["password"] = pwd_data.new_password | |
| db.save_data("users.json", users_db) | |
| return {"status": "success"} | |
| async def toggle_follow(follow: FollowToggle): | |
| users_db = db.load_data("users.json", default_data={}) | |
| if follow.target_account not in users_db or follow.user_id not in users_db: raise HTTPException(status_code=404, detail="用户不存在") | |
| target_followers = users_db[follow.target_account].setdefault("followers", []) | |
| current_following = users_db[follow.user_id].setdefault("following", []) | |
| if follow.is_active: | |
| if follow.user_id not in target_followers: | |
| target_followers.append(follow.user_id) | |
| add_notification(follow.target_account, {"type": "follow", "from_user": follow.user_id}) | |
| if follow.target_account not in current_following: current_following.append(follow.target_account) | |
| else: | |
| if follow.user_id in target_followers: target_followers.remove(follow.user_id) | |
| if follow.target_account in current_following: current_following.remove(follow.target_account) | |
| db.save_data("users.json", users_db) | |
| return {"status": "success"} | |
| async def update_privacy(account: str, privacy: PrivacySettings): | |
| users_db = db.load_data("users.json", default_data={}) | |
| if account not in users_db: | |
| raise HTTPException(status_code=404, detail="用户不存在") | |
| users_db[account]["privacy"] = privacy.dict() | |
| db.save_data("users.json", users_db) | |
| return {"status": "success"} |