Spaces:
Running
Running
| """ | |
| Module auth: Xác thực người dùng cho chatbot. | |
| Hỗ trợ đăng ký, đăng nhập bằng email/password và quên mật khẩu. | |
| """ | |
| import uuid | |
| import random | |
| import string | |
| import hashlib | |
| import sqlite3 | |
| from datetime import datetime, timedelta | |
| from backend.admin_config import is_bootstrap_admin_email | |
| from backend.runtime_paths import DB_PATH | |
| from backend.db_sync import schedule_sync as _schedule_sync | |
| def _get_connection(): | |
| """Tạo kết nối SQLite.""" | |
| conn = sqlite3.connect(DB_PATH) | |
| conn.row_factory = sqlite3.Row | |
| conn.execute("PRAGMA foreign_keys=ON") | |
| return conn | |
| def _hash_password(password: str) -> str: | |
| """Hash password bằng SHA-256 + salt.""" | |
| salt = uuid.uuid4().hex | |
| hashed = hashlib.sha256((salt + password).encode()).hexdigest() | |
| return f"{salt}${hashed}" | |
| def _verify_password(password: str, stored_hash: str) -> bool: | |
| """Kiểm tra password có khớp với hash không.""" | |
| if not stored_hash or "$" not in stored_hash: | |
| return False | |
| salt, hashed = stored_hash.split("$", 1) | |
| return hashlib.sha256((salt + password).encode()).hexdigest() == hashed | |
| def register_user(email: str, password: str, display_name: str = None) -> dict: | |
| """ | |
| Đăng ký tài khoản mới. | |
| Returns: {"success": True/False, "message": str, "user": dict or None} | |
| """ | |
| email = email.strip().lower() | |
| if not email or not password: | |
| return {"success": False, "message": "Email và mật khẩu không được để trống"} | |
| # Ràng buộc định dạng email: chỉ chấp nhận địa chỉ @gmail.com | |
| if not email.endswith("@gmail.com"): | |
| return {"success": False, "message": "Vui lòng nhập địa chỉ Email hợp lệ!"} | |
| if len(password) < 6: | |
| return {"success": False, "message": "Mật khẩu phải có ít nhất 6 ký tự"} | |
| conn = _get_connection() | |
| try: | |
| # Kiểm tra email đã tồn tại chưa | |
| existing = conn.execute( | |
| "SELECT ma_nguoi_dung FROM nguoi_dung WHERE email = ?", (email,) | |
| ).fetchone() | |
| if existing: | |
| return {"success": False, "message": "Email này đã được đăng ký"} | |
| user_id = str(uuid.uuid4()) | |
| password_hash = _hash_password(password) | |
| name = display_name or email.split("@")[0] | |
| role = "admin" if is_bootstrap_admin_email(email) else "user" | |
| conn.execute( | |
| """INSERT INTO nguoi_dung (ma_nguoi_dung, email, mat_khau_bam, ten_hien_thi, vai_tro) | |
| VALUES (?, ?, ?, ?, ?)""", | |
| (user_id, email, password_hash, name, role), | |
| ) | |
| conn.commit() | |
| user = { | |
| "id": user_id, | |
| "email": email, | |
| "display_name": name, | |
| "role": role, | |
| } | |
| print(f"[AUTH] ✅ Đăng ký thành công: {email}") | |
| _schedule_sync() | |
| return {"success": True, "message": "Đăng ký thành công!", "user": user} | |
| except Exception as e: | |
| print(f"[AUTH] ❌ Lỗi đăng ký: {e}") | |
| return {"success": False, "message": f"Lỗi hệ thống: {str(e)}"} | |
| finally: | |
| conn.close() | |
| def login_user(email: str, password: str) -> dict: | |
| """ | |
| Đăng nhập bằng email/password. | |
| Returns: {"success": True/False, "message": str, "user": dict or None} | |
| User dict gồm: id, email, display_name, role ('user'|'admin'). | |
| """ | |
| email = email.strip().lower() | |
| conn = _get_connection() | |
| try: | |
| row = conn.execute( | |
| """SELECT ma_nguoi_dung, email, mat_khau_bam, ten_hien_thi, | |
| COALESCE(vai_tro, 'user') AS vai_tro, | |
| COALESCE(khoa_tai_khoan, 0) AS khoa_tai_khoan | |
| FROM nguoi_dung WHERE email = ?""", | |
| (email,), | |
| ).fetchone() | |
| if not row: | |
| return {"success": False, "message": "Email không tồn tại"} | |
| if row["khoa_tai_khoan"]: | |
| return {"success": False, "message": "Tài khoản đã bị khóa. Liên hệ quản trị viên."} | |
| if not _verify_password(password, row["mat_khau_bam"]): | |
| return {"success": False, "message": "Mật khẩu không đúng"} | |
| user = { | |
| "id": row["ma_nguoi_dung"], | |
| "email": row["email"], | |
| "display_name": row["ten_hien_thi"], | |
| "role": row["vai_tro"] if row["vai_tro"] else "user", | |
| } | |
| print(f"[AUTH] ✅ Đăng nhập: {email} (role={user['role']})") | |
| return {"success": True, "message": "Đăng nhập thành công!", "user": user} | |
| finally: | |
| conn.close() | |
| def is_admin(user: dict) -> bool: | |
| """Kiểm tra user có phải admin không.""" | |
| return user and user.get("role") == "admin" | |
| def create_reset_token(email: str) -> dict: | |
| """ | |
| Tạo mã OTP reset password (6 chữ số, hết hạn 15 phút). | |
| Returns: {"success": True/False, "message": str, "token": str or None} | |
| """ | |
| email = email.strip().lower() | |
| conn = _get_connection() | |
| try: | |
| # Kiểm tra email tồn tại | |
| row = conn.execute( | |
| "SELECT ma_nguoi_dung FROM nguoi_dung WHERE email = ?", (email,) | |
| ).fetchone() | |
| if not row: | |
| return {"success": False, "message": "Email không tồn tại trong hệ thống"} | |
| # Tạo OTP 6 chữ số | |
| token = "".join(random.choices(string.digits, k=6)) | |
| expires_at = (datetime.now() + timedelta(minutes=15)).isoformat() | |
| # Xóa token cũ (nếu có) | |
| conn.execute("DELETE FROM khoi_phuc_mat_khau WHERE email = ?", (email,)) | |
| conn.execute( | |
| "INSERT INTO khoi_phuc_mat_khau (email, ma_otp, thoi_gian_het_han) VALUES (?, ?, ?)", | |
| (email, token, expires_at), | |
| ) | |
| conn.commit() | |
| print(f"[AUTH] 🔑 Reset token created for: {email}") | |
| return {"success": True, "message": "Mã OTP đã được tạo", "token": token} | |
| finally: | |
| conn.close() | |
| def reset_password(email: str, token: str, new_password: str) -> dict: | |
| """ | |
| Đặt lại mật khẩu bằng mã OTP. | |
| Returns: {"success": True/False, "message": str} | |
| """ | |
| email = email.strip().lower() | |
| if len(new_password) < 6: | |
| return {"success": False, "message": "Mật khẩu mới phải có ít nhất 6 ký tự"} | |
| conn = _get_connection() | |
| try: | |
| row = conn.execute( | |
| "SELECT ma_otp, thoi_gian_het_han, da_su_dung FROM khoi_phuc_mat_khau WHERE email = ? ORDER BY ma_khoi_phuc DESC LIMIT 1", | |
| (email,), | |
| ).fetchone() | |
| if not row: | |
| return {"success": False, "message": "Không tìm thấy yêu cầu đặt lại mật khẩu"} | |
| if row["da_su_dung"]: | |
| return {"success": False, "message": "Mã OTP đã được sử dụng"} | |
| if row["ma_otp"] != token: | |
| return {"success": False, "message": "Mã OTP không đúng"} | |
| # Kiểm tra hết hạn | |
| expires_at = datetime.fromisoformat(row["thoi_gian_het_han"]) | |
| if datetime.now() > expires_at: | |
| return {"success": False, "message": "Mã OTP đã hết hạn (15 phút)"} | |
| # Cập nhật mật khẩu | |
| password_hash = _hash_password(new_password) | |
| conn.execute( | |
| "UPDATE nguoi_dung SET mat_khau_bam = ? WHERE email = ?", | |
| (password_hash, email), | |
| ) | |
| conn.execute( | |
| "UPDATE khoi_phuc_mat_khau SET da_su_dung = 1 WHERE email = ?", (email,) | |
| ) | |
| conn.commit() | |
| print(f"[AUTH] ✅ Password reset for: {email}") | |
| _schedule_sync() | |
| return {"success": True, "message": "Đặt lại mật khẩu thành công!"} | |
| finally: | |
| conn.close() | |