|
|
import jwt |
|
|
import smtplib |
|
|
import secrets |
|
|
import ssl |
|
|
import random |
|
|
import time |
|
|
from datetime import datetime, timedelta |
|
|
from email.mime.text import MIMEText |
|
|
from email.header import Header |
|
|
from passlib.context import CryptContext |
|
|
from typing import Optional |
|
|
|
|
|
from core.config import SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES, \ |
|
|
SMTP_SERVER, SMTP_PORT, SMTP_USERNAME, SMTP_PASSWORD, SENDER_EMAIL, SENDER_NAME, \ |
|
|
VERIFICATION_CODE_EXPIRE_MINUTES |
|
|
|
|
|
|
|
|
|
|
|
verification_codes = {} |
|
|
|
|
|
|
|
|
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") |
|
|
|
|
|
|
|
|
def verify_password(plain_password, hashed_password): |
|
|
return pwd_context.verify(plain_password, hashed_password) |
|
|
|
|
|
def get_password_hash(password): |
|
|
|
|
|
|
|
|
truncated_password = password.encode('utf-8')[:72].decode('utf-8', 'ignore') |
|
|
return pwd_context.hash(truncated_password) |
|
|
|
|
|
|
|
|
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): |
|
|
to_encode = data.copy() |
|
|
if expires_delta: |
|
|
expire = datetime.utcnow() + expires_delta |
|
|
else: |
|
|
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) |
|
|
to_encode.update({"exp": expire}) |
|
|
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) |
|
|
return encoded_jwt |
|
|
|
|
|
def generate_verification_code(): |
|
|
"""Generates a 6-digit numeric verification code.""" |
|
|
return str(random.randint(100000, 999999)) |
|
|
|
|
|
def store_verification_code(email: str, code: str, prefix: str = ""): |
|
|
"""Stores the verification code with an expiration time, using an optional prefix for the key.""" |
|
|
key = prefix + email |
|
|
expires_at = time.time() + VERIFICATION_CODE_EXPIRE_MINUTES * 60 |
|
|
verification_codes[key] = {"code": code, "expires_at": expires_at} |
|
|
print(f"Stored verification code for {key}: {code}, expires at {datetime.fromtimestamp(expires_at)}") |
|
|
|
|
|
def verify_stored_code(email: str, code: str, prefix: str = "") -> bool: |
|
|
"""Verifies the provided code against the stored one and checks for expiration, using an optional prefix for the key.""" |
|
|
key = prefix + email |
|
|
stored_data = verification_codes.get(key) |
|
|
if not stored_data: |
|
|
return False |
|
|
|
|
|
if time.time() > stored_data["expires_at"]: |
|
|
del verification_codes[key] |
|
|
return False |
|
|
|
|
|
return stored_data["code"] == code |
|
|
|
|
|
|
|
|
def send_email(to_email: str, subject: str, body: str): |
|
|
if not all([SMTP_SERVER, SMTP_USERNAME, SMTP_PASSWORD, SENDER_EMAIL]): |
|
|
print(f"错误: SMTP配置不完整。邮件未发送至 {to_email}。请检查 .env 文件中的 SMTP_SERVER, SMTP_USERNAME, SMTP_PASSWORD, SENDER_EMAIL。") |
|
|
return False |
|
|
|
|
|
msg = MIMEText(body, 'plain', 'utf-8') |
|
|
msg['From'] = f"{SENDER_NAME} <{SENDER_EMAIL}>" |
|
|
msg['To'] = Header(to_email, 'utf-8') |
|
|
msg['Subject'] = Header(subject, 'utf-8') |
|
|
|
|
|
email_sent_successfully = False |
|
|
|
|
|
try: |
|
|
if SMTP_PORT == 587: |
|
|
with smtplib.SMTP(SMTP_SERVER, SMTP_PORT, timeout=10) as server: |
|
|
|
|
|
server.starttls(context=ssl.create_default_context()) |
|
|
server.login(SMTP_USERNAME, SMTP_PASSWORD) |
|
|
server.sendmail(SENDER_EMAIL, to_email, msg.as_string()) |
|
|
print(f"邮件发送成功至 {to_email} (端口: {SMTP_PORT})。") |
|
|
email_sent_successfully = True |
|
|
|
|
|
elif SMTP_PORT == 465: |
|
|
context = ssl.create_default_context() |
|
|
context.check_hostname = False |
|
|
context.verify_mode = ssl.CERT_NONE |
|
|
with smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT, context=context, timeout=10) as server: |
|
|
|
|
|
server.login(SMTP_USERNAME, SMTP_PASSWORD) |
|
|
server.sendmail(SENDER_EMAIL, to_email, msg.as_string()) |
|
|
server.quit() |
|
|
print(f"邮件发送成功至 {to_email} (端口: {SMTP_PORT})。") |
|
|
email_sent_successfully = True |
|
|
else: |
|
|
print(f"错误:不支持的端口 {SMTP_PORT}。邮件未发送至 {to_email}。目前只支持 465 (SSL) 和 587 (STARTTLS)。") |
|
|
|
|
|
except smtplib.SMTPAuthenticationError: |
|
|
print(f"认证失败:邮件未发送至 {to_email}。请检查 .env 文件中的 SMTP_USERNAME 和 SMTP_PASSWORD。对于 QQ 邮箱,请确保使用的是授权码而非登录密码。") |
|
|
except smtplib.SMTPConnectError as e: |
|
|
print(f"连接失败:邮件未发送至 {to_email}。请检查 SMTP_SERVER 地址、SMTP_PORT 端口是否正确,以及网络防火墙设置。错误详情: {e}") |
|
|
except smtplib.SMTPServerDisconnected as e: |
|
|
print(f"SMTP 服务器意外断开连接:邮件未发送至 {to_email}。错误详情: {e}") |
|
|
except smtplib.SMTPRecipientsRefused as e: |
|
|
print(f"收件人被拒绝:邮件未发送至 {to_email}。错误详情: {e}") |
|
|
except smtplib.SMTPSenderRefused as e: |
|
|
print(f"发件人被拒绝:邮件未发送至 {to_email}。错误详情: {e}") |
|
|
except smtplib.SMTPDataError as e: |
|
|
print(f"SMTP 数据错误:邮件未发送至 {to_email}。错误详情: {e}") |
|
|
except smtplib.SMTPException as e: |
|
|
print(f"SMTP 协议错误:邮件未发送至 {to_email}。错误详情: {e}") |
|
|
except Exception as e: |
|
|
print(f"发生未知错误:邮件未发送至 {to_email}。错误详情: {e}") |
|
|
|
|
|
return email_sent_successfully |
|
|
|