Nguyễn Quốc Vỹ
Chuyển từ dữ liệu tạm sang runtime để dữ liệu không bị mất
98f87e1
"""
Module auth: Xác thực người dùng cho chatbot.
Hỗ trợ đăng ký, đăng nhập bằng email/password và quên mật khẩu.
"""
import uuid
import random
import string
import hashlib
import sqlite3
from datetime import datetime, timedelta
from backend.admin_config import is_bootstrap_admin_email
from backend.runtime_paths import DB_PATH
from backend.db_sync import schedule_sync as _schedule_sync
def _get_connection():
"""Tạo kết nối SQLite."""
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA foreign_keys=ON")
return conn
def _hash_password(password: str) -> str:
"""Hash password bằng SHA-256 + salt."""
salt = uuid.uuid4().hex
hashed = hashlib.sha256((salt + password).encode()).hexdigest()
return f"{salt}${hashed}"
def _verify_password(password: str, stored_hash: str) -> bool:
"""Kiểm tra password có khớp với hash không."""
if not stored_hash or "$" not in stored_hash:
return False
salt, hashed = stored_hash.split("$", 1)
return hashlib.sha256((salt + password).encode()).hexdigest() == hashed
def register_user(email: str, password: str, display_name: str = None) -> dict:
"""
Đăng ký tài khoản mới.
Returns: {"success": True/False, "message": str, "user": dict or None}
"""
email = email.strip().lower()
if not email or not password:
return {"success": False, "message": "Email và mật khẩu không được để trống"}
# Ràng buộc định dạng email: chỉ chấp nhận địa chỉ @gmail.com
if not email.endswith("@gmail.com"):
return {"success": False, "message": "Vui lòng nhập địa chỉ Email hợp lệ!"}
if len(password) < 6:
return {"success": False, "message": "Mật khẩu phải có ít nhất 6 ký tự"}
conn = _get_connection()
try:
# Kiểm tra email đã tồn tại chưa
existing = conn.execute(
"SELECT ma_nguoi_dung FROM nguoi_dung WHERE email = ?", (email,)
).fetchone()
if existing:
return {"success": False, "message": "Email này đã được đăng ký"}
user_id = str(uuid.uuid4())
password_hash = _hash_password(password)
name = display_name or email.split("@")[0]
role = "admin" if is_bootstrap_admin_email(email) else "user"
conn.execute(
"""INSERT INTO nguoi_dung (ma_nguoi_dung, email, mat_khau_bam, ten_hien_thi, vai_tro)
VALUES (?, ?, ?, ?, ?)""",
(user_id, email, password_hash, name, role),
)
conn.commit()
user = {
"id": user_id,
"email": email,
"display_name": name,
"role": role,
}
print(f"[AUTH] ✅ Đăng ký thành công: {email}")
_schedule_sync()
return {"success": True, "message": "Đăng ký thành công!", "user": user}
except Exception as e:
print(f"[AUTH] ❌ Lỗi đăng ký: {e}")
return {"success": False, "message": f"Lỗi hệ thống: {str(e)}"}
finally:
conn.close()
def login_user(email: str, password: str) -> dict:
"""
Đăng nhập bằng email/password.
Returns: {"success": True/False, "message": str, "user": dict or None}
User dict gồm: id, email, display_name, role ('user'|'admin').
"""
email = email.strip().lower()
conn = _get_connection()
try:
row = conn.execute(
"""SELECT ma_nguoi_dung, email, mat_khau_bam, ten_hien_thi,
COALESCE(vai_tro, 'user') AS vai_tro,
COALESCE(khoa_tai_khoan, 0) AS khoa_tai_khoan
FROM nguoi_dung WHERE email = ?""",
(email,),
).fetchone()
if not row:
return {"success": False, "message": "Email không tồn tại"}
if row["khoa_tai_khoan"]:
return {"success": False, "message": "Tài khoản đã bị khóa. Liên hệ quản trị viên."}
if not _verify_password(password, row["mat_khau_bam"]):
return {"success": False, "message": "Mật khẩu không đúng"}
user = {
"id": row["ma_nguoi_dung"],
"email": row["email"],
"display_name": row["ten_hien_thi"],
"role": row["vai_tro"] if row["vai_tro"] else "user",
}
print(f"[AUTH] ✅ Đăng nhập: {email} (role={user['role']})")
return {"success": True, "message": "Đăng nhập thành công!", "user": user}
finally:
conn.close()
def is_admin(user: dict) -> bool:
"""Kiểm tra user có phải admin không."""
return user and user.get("role") == "admin"
def create_reset_token(email: str) -> dict:
"""
Tạo mã OTP reset password (6 chữ số, hết hạn 15 phút).
Returns: {"success": True/False, "message": str, "token": str or None}
"""
email = email.strip().lower()
conn = _get_connection()
try:
# Kiểm tra email tồn tại
row = conn.execute(
"SELECT ma_nguoi_dung FROM nguoi_dung WHERE email = ?", (email,)
).fetchone()
if not row:
return {"success": False, "message": "Email không tồn tại trong hệ thống"}
# Tạo OTP 6 chữ số
token = "".join(random.choices(string.digits, k=6))
expires_at = (datetime.now() + timedelta(minutes=15)).isoformat()
# Xóa token cũ (nếu có)
conn.execute("DELETE FROM khoi_phuc_mat_khau WHERE email = ?", (email,))
conn.execute(
"INSERT INTO khoi_phuc_mat_khau (email, ma_otp, thoi_gian_het_han) VALUES (?, ?, ?)",
(email, token, expires_at),
)
conn.commit()
print(f"[AUTH] 🔑 Reset token created for: {email}")
return {"success": True, "message": "Mã OTP đã được tạo", "token": token}
finally:
conn.close()
def reset_password(email: str, token: str, new_password: str) -> dict:
"""
Đặt lại mật khẩu bằng mã OTP.
Returns: {"success": True/False, "message": str}
"""
email = email.strip().lower()
if len(new_password) < 6:
return {"success": False, "message": "Mật khẩu mới phải có ít nhất 6 ký tự"}
conn = _get_connection()
try:
row = conn.execute(
"SELECT ma_otp, thoi_gian_het_han, da_su_dung FROM khoi_phuc_mat_khau WHERE email = ? ORDER BY ma_khoi_phuc DESC LIMIT 1",
(email,),
).fetchone()
if not row:
return {"success": False, "message": "Không tìm thấy yêu cầu đặt lại mật khẩu"}
if row["da_su_dung"]:
return {"success": False, "message": "Mã OTP đã được sử dụng"}
if row["ma_otp"] != token:
return {"success": False, "message": "Mã OTP không đúng"}
# Kiểm tra hết hạn
expires_at = datetime.fromisoformat(row["thoi_gian_het_han"])
if datetime.now() > expires_at:
return {"success": False, "message": "Mã OTP đã hết hạn (15 phút)"}
# Cập nhật mật khẩu
password_hash = _hash_password(new_password)
conn.execute(
"UPDATE nguoi_dung SET mat_khau_bam = ? WHERE email = ?",
(password_hash, email),
)
conn.execute(
"UPDATE khoi_phuc_mat_khau SET da_su_dung = 1 WHERE email = ?", (email,)
)
conn.commit()
print(f"[AUTH] ✅ Password reset for: {email}")
_schedule_sync()
return {"success": True, "message": "Đặt lại mật khẩu thành công!"}
finally:
conn.close()