spwebsite / core /utils.py
geqintan's picture
update
133609a
import jwt
import smtplib
import secrets
import ssl # Import ssl
import random # Import random for verification code generation
import time # Import time for verification code expiration
from datetime import datetime, timedelta
from email.mime.text import MIMEText
from email.header import Header
from passlib.context import CryptContext
from typing import Optional # Import Optional
from core.config import SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES, \
SMTP_SERVER, SMTP_PORT, SMTP_USERNAME, SMTP_PASSWORD, SENDER_EMAIL, SENDER_NAME, \
VERIFICATION_CODE_EXPIRE_MINUTES
# In-memory storage for verification codes (for demonstration purposes)
# In a production environment, consider using Redis or a database for persistence and scalability
verification_codes = {} # {email: {"code": "123456", "expires_at": timestamp}}
# Password hashing context
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# Helper functions for password hashing
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
# Truncate password to 72 bytes as bcrypt has a limit
# This handles cases where users might input very long passwords
truncated_password = password.encode('utf-8')[:72].decode('utf-8', 'ignore')
return pwd_context.hash(truncated_password)
# Helper function for JWT
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def generate_verification_code():
"""Generates a 6-digit numeric verification code."""
return str(random.randint(100000, 999999))
def store_verification_code(email: str, code: str, prefix: str = ""):
"""Stores the verification code with an expiration time, using an optional prefix for the key."""
key = prefix + email
expires_at = time.time() + VERIFICATION_CODE_EXPIRE_MINUTES * 60
verification_codes[key] = {"code": code, "expires_at": expires_at}
print(f"Stored verification code for {key}: {code}, expires at {datetime.fromtimestamp(expires_at)}")
def verify_stored_code(email: str, code: str, prefix: str = "") -> bool:
"""Verifies the provided code against the stored one and checks for expiration, using an optional prefix for the key."""
key = prefix + email
stored_data = verification_codes.get(key)
if not stored_data:
return False # No code stored for this email
if time.time() > stored_data["expires_at"]:
del verification_codes[key] # Remove expired code
return False # Code expired
return stored_data["code"] == code
# Helper function for sending emails
def send_email(to_email: str, subject: str, body: str):
if not all([SMTP_SERVER, SMTP_USERNAME, SMTP_PASSWORD, SENDER_EMAIL]):
print(f"错误: SMTP配置不完整。邮件未发送至 {to_email}。请检查 .env 文件中的 SMTP_SERVER, SMTP_USERNAME, SMTP_PASSWORD, SENDER_EMAIL。")
return False
msg = MIMEText(body, 'plain', 'utf-8')
msg['From'] = f"{SENDER_NAME} <{SENDER_EMAIL}>" # 调整From头部构建方式
msg['To'] = Header(to_email, 'utf-8')
msg['Subject'] = Header(subject, 'utf-8')
email_sent_successfully = False # 新增标志变量
try:
if SMTP_PORT == 587:
with smtplib.SMTP(SMTP_SERVER, SMTP_PORT, timeout=10) as server:
# server.set_debuglevel(1) # 生产环境不启用调试级别
server.starttls(context=ssl.create_default_context())
server.login(SMTP_USERNAME, SMTP_PASSWORD)
server.sendmail(SENDER_EMAIL, to_email, msg.as_string())
print(f"邮件发送成功至 {to_email} (端口: {SMTP_PORT})。")
email_sent_successfully = True
elif SMTP_PORT == 465:
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
with smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT, context=context, timeout=10) as server:
# server.set_debuglevel(1) # 生产环境不启用调试级别
server.login(SMTP_USERNAME, SMTP_PASSWORD)
server.sendmail(SENDER_EMAIL, to_email, msg.as_string())
server.quit() # 明确关闭连接
print(f"邮件发送成功至 {to_email} (端口: {SMTP_PORT})。")
email_sent_successfully = True
else:
print(f"错误:不支持的端口 {SMTP_PORT}。邮件未发送至 {to_email}。目前只支持 465 (SSL) 和 587 (STARTTLS)。")
except smtplib.SMTPAuthenticationError:
print(f"认证失败:邮件未发送至 {to_email}。请检查 .env 文件中的 SMTP_USERNAME 和 SMTP_PASSWORD。对于 QQ 邮箱,请确保使用的是授权码而非登录密码。")
except smtplib.SMTPConnectError as e:
print(f"连接失败:邮件未发送至 {to_email}。请检查 SMTP_SERVER 地址、SMTP_PORT 端口是否正确,以及网络防火墙设置。错误详情: {e}")
except smtplib.SMTPServerDisconnected as e:
print(f"SMTP 服务器意外断开连接:邮件未发送至 {to_email}。错误详情: {e}")
except smtplib.SMTPRecipientsRefused as e: # 添加更具体的异常捕获
print(f"收件人被拒绝:邮件未发送至 {to_email}。错误详情: {e}")
except smtplib.SMTPSenderRefused as e: # 添加更具体的异常捕获
print(f"发件人被拒绝:邮件未发送至 {to_email}。错误详情: {e}")
except smtplib.SMTPDataError as e: # 添加更具体的异常捕获
print(f"SMTP 数据错误:邮件未发送至 {to_email}。错误详情: {e}")
except smtplib.SMTPException as e:
print(f"SMTP 协议错误:邮件未发送至 {to_email}。错误详情: {e}")
except Exception as e:
print(f"发生未知错误:邮件未发送至 {to_email}。错误详情: {e}")
return email_sent_successfully