import jwt import smtplib import secrets import ssl # Import ssl import random # Import random for verification code generation import time # Import time for verification code expiration from datetime import datetime, timedelta from email.mime.text import MIMEText from email.header import Header from passlib.context import CryptContext from typing import Optional # Import Optional from core.config import SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES, \ SMTP_SERVER, SMTP_PORT, SMTP_USERNAME, SMTP_PASSWORD, SENDER_EMAIL, SENDER_NAME, \ VERIFICATION_CODE_EXPIRE_MINUTES # In-memory storage for verification codes (for demonstration purposes) # In a production environment, consider using Redis or a database for persistence and scalability verification_codes = {} # {email: {"code": "123456", "expires_at": timestamp}} # Password hashing context pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # Helper functions for password hashing def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password): # Truncate password to 72 bytes as bcrypt has a limit # This handles cases where users might input very long passwords truncated_password = password.encode('utf-8')[:72].decode('utf-8', 'ignore') return pwd_context.hash(truncated_password) # Helper function for JWT def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt def generate_verification_code(): """Generates a 6-digit numeric verification code.""" return str(random.randint(100000, 999999)) def store_verification_code(email: str, code: str, prefix: str = ""): """Stores the verification code with an expiration time, using an optional prefix for the key.""" key = prefix + email expires_at = time.time() + VERIFICATION_CODE_EXPIRE_MINUTES * 60 verification_codes[key] = {"code": code, "expires_at": expires_at} print(f"Stored verification code for {key}: {code}, expires at {datetime.fromtimestamp(expires_at)}") def verify_stored_code(email: str, code: str, prefix: str = "") -> bool: """Verifies the provided code against the stored one and checks for expiration, using an optional prefix for the key.""" key = prefix + email stored_data = verification_codes.get(key) if not stored_data: return False # No code stored for this email if time.time() > stored_data["expires_at"]: del verification_codes[key] # Remove expired code return False # Code expired return stored_data["code"] == code # Helper function for sending emails def send_email(to_email: str, subject: str, body: str): if not all([SMTP_SERVER, SMTP_USERNAME, SMTP_PASSWORD, SENDER_EMAIL]): print(f"错误: SMTP配置不完整。邮件未发送至 {to_email}。请检查 .env 文件中的 SMTP_SERVER, SMTP_USERNAME, SMTP_PASSWORD, SENDER_EMAIL。") return False msg = MIMEText(body, 'plain', 'utf-8') msg['From'] = f"{SENDER_NAME} <{SENDER_EMAIL}>" # 调整From头部构建方式 msg['To'] = Header(to_email, 'utf-8') msg['Subject'] = Header(subject, 'utf-8') email_sent_successfully = False # 新增标志变量 try: if SMTP_PORT == 587: with smtplib.SMTP(SMTP_SERVER, SMTP_PORT, timeout=10) as server: # server.set_debuglevel(1) # 生产环境不启用调试级别 server.starttls(context=ssl.create_default_context()) server.login(SMTP_USERNAME, SMTP_PASSWORD) server.sendmail(SENDER_EMAIL, to_email, msg.as_string()) print(f"邮件发送成功至 {to_email} (端口: {SMTP_PORT})。") email_sent_successfully = True elif SMTP_PORT == 465: context = ssl.create_default_context() context.check_hostname = False context.verify_mode = ssl.CERT_NONE with smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT, context=context, timeout=10) as server: # server.set_debuglevel(1) # 生产环境不启用调试级别 server.login(SMTP_USERNAME, SMTP_PASSWORD) server.sendmail(SENDER_EMAIL, to_email, msg.as_string()) server.quit() # 明确关闭连接 print(f"邮件发送成功至 {to_email} (端口: {SMTP_PORT})。") email_sent_successfully = True else: print(f"错误:不支持的端口 {SMTP_PORT}。邮件未发送至 {to_email}。目前只支持 465 (SSL) 和 587 (STARTTLS)。") except smtplib.SMTPAuthenticationError: print(f"认证失败:邮件未发送至 {to_email}。请检查 .env 文件中的 SMTP_USERNAME 和 SMTP_PASSWORD。对于 QQ 邮箱,请确保使用的是授权码而非登录密码。") except smtplib.SMTPConnectError as e: print(f"连接失败:邮件未发送至 {to_email}。请检查 SMTP_SERVER 地址、SMTP_PORT 端口是否正确,以及网络防火墙设置。错误详情: {e}") except smtplib.SMTPServerDisconnected as e: print(f"SMTP 服务器意外断开连接:邮件未发送至 {to_email}。错误详情: {e}") except smtplib.SMTPRecipientsRefused as e: # 添加更具体的异常捕获 print(f"收件人被拒绝:邮件未发送至 {to_email}。错误详情: {e}") except smtplib.SMTPSenderRefused as e: # 添加更具体的异常捕获 print(f"发件人被拒绝:邮件未发送至 {to_email}。错误详情: {e}") except smtplib.SMTPDataError as e: # 添加更具体的异常捕获 print(f"SMTP 数据错误:邮件未发送至 {to_email}。错误详情: {e}") except smtplib.SMTPException as e: print(f"SMTP 协议错误:邮件未发送至 {to_email}。错误详情: {e}") except Exception as e: print(f"发生未知错误:邮件未发送至 {to_email}。错误详情: {e}") return email_sent_successfully