import secrets
import sqlite3
import requests
from datetime import datetime
from flask import Blueprint, request, jsonify, session, redirect
from google.auth.transport import requests as google_requests
from google.oauth2 import id_token
import logging
from backend.config import Config
from backend.database.db_manager import get_user_db_conn
from backend.services.email_service import send_email_async
from backend.services.auth_service import check_vip_expiry
logger = logging.getLogger("auth")
logger.setLevel(logging.INFO)
from backend.core.rate_limit import check_rate_limit, get_client_ip
from backend.core.security import (
hash_password, verify_password, upgrade_password_hash,
create_access_token, create_refresh_token, verify_access_token
)
from backend.core.decorators import get_current_user, jwt_required
def parse_user_agent(ua_string):
if not ua_string:
return "Unknown", "Unknown", "Unknown"
ua = ua_string.lower()
# 1. Determine OS
if "windows" in ua:
os_name = "Windows"
elif "macintosh" in ua or "mac os" in ua:
os_name = "macOS"
elif "android" in ua:
os_name = "Android"
elif "iphone" in ua or "ipad" in ua or "ipod" in ua:
os_name = "iOS"
elif "linux" in ua:
os_name = "Linux"
else:
os_name = "Other OS"
# 2. Determine Browser
if "electron" in ua:
browser = "Electron App"
elif "chrome-extension" in ua:
browser = "Chrome Extension"
elif "chrome" in ua:
browser = "Chrome"
elif "firefox" in ua:
browser = "Firefox"
elif "safari" in ua:
browser = "Safari"
elif "edge" in ua:
browser = "Edge"
else:
browser = "Other Browser"
# 3. Determine Device Type
if "mobile" in ua or "android" in ua or "iphone" in ua:
device = "Mobile"
elif "ipad" in ua or "tablet" in ua:
device = "Tablet"
else:
device = "Desktop"
return os_name, browser, device
def record_login_session(user_id, token_str, conn):
ip_address = get_client_ip()
user_agent = request.headers.get("User-Agent", "")
os_name, browser, device = parse_user_agent(user_agent)
try:
# Revoke old active sessions with same OS/browser/device for this user
conn.execute(
"UPDATE login_history SET status = 'expired' WHERE user_id = ? AND os = ? AND browser = ? AND status = 'active'",
(user_id, os_name, browser)
)
# Insert new session
conn.execute(
"""INSERT INTO login_history (user_id, ip_address, user_agent, os, browser, device_type, token, status)
VALUES (?, ?, ?, ?, ?, ?, ?, 'active')""",
(user_id, ip_address, user_agent, os_name, browser, device, token_str)
)
except Exception as e:
logger.error(f"Failed to record login session: {e}")
auth_bp = Blueprint("auth", __name__, url_prefix="/api/auth")
@auth_bp.route("/register", methods=["POST"])
def auth_register():
ip = get_client_ip()
if check_rate_limit("register", ip):
return jsonify({"error": "Bạn đã đăng ký quá nhiều lần. Vui lòng thử lại sau 10 phút."}), 429
data = request.json or {}
username = data.get("username", "").strip().lower()
password = data.get("password", "")
email = data.get("email", "").strip().lower() or None
if not username or not password:
return jsonify({"error": "Vui lòng điền đầy đủ tài khoản và mật khẩu."}), 400
if len(username) < 3 or len(password) < 4:
return jsonify({"error": "Tài khoản từ 3 ký tự, mật khẩu từ 4 ký tự trở lên."}), 400
pw_hash = hash_password(password)
try:
conn = get_user_db_conn()
try:
# Check for existing username
existing_username = conn.execute("SELECT * FROM users WHERE username = ?", (username,)).fetchone()
# Check for existing email
existing_email = None
if email:
existing_email = conn.execute("SELECT * FROM users WHERE email = ? AND email IS NOT NULL", (email,)).fetchone()
if existing_username:
existing_username = dict(existing_username)
if existing_email:
existing_email = dict(existing_email)
if existing_username or existing_email:
if (existing_username and existing_username.get("email_verified", 0) == 1) or \
(existing_email and existing_email.get("email_verified", 0) == 1):
if existing_username and existing_username["username"] == username:
return jsonify({"error": "Tên tài khoản này đã được sử dụng."}), 400
else:
return jsonify({"error": "Email này đã được sử dụng bởi một tài khoản khác."}), 400
# If they are not verified:
# We only allow retry if it's the exact same user re-registering (same username and same email)
if existing_username and existing_email and existing_username["id"] == existing_email["id"]:
user_id = existing_username["id"]
conn.execute(
"UPDATE users SET username = ?, password_hash = ?, email = ? WHERE id = ?",
(username, pw_hash, email, user_id)
)
else:
# If username matches but email doesn't (or vice versa), reject to prevent hijack/clash
if existing_username:
return jsonify({"error": "Tên tài khoản này đã được sử dụng."}), 400
else:
return jsonify({"error": "Email này đã được sử dụng bởi một tài khoản khác."}), 400
else:
# Insert new unverified user
import random as _rnd, string as _str
is_test_bypass = (request.headers.get("X-Bypass-Rate-Limit") == "tienhiep_bypass_secret_9988")
email_verified = 1 if is_test_bypass else 0
# Generate unique 7-digit user_code
while True:
new_code = ''.join(_rnd.choices(_str.digits, k=7))
if not conn.execute("SELECT 1 FROM users WHERE user_code = ?", (new_code,)).fetchone():
break
cursor = conn.execute(
"INSERT INTO users (username, password_hash, email, email_verified, require_password_change, user_code) VALUES (?, ?, ?, ?, 0, ?)",
(username, pw_hash, email, email_verified, new_code)
)
user_id = cursor.lastrowid
conn.commit()
finally:
conn.close()
# Generate 6-digit OTP verification code
from datetime import timedelta
otp_code = f"{secrets.randbelow(900000) + 100000}"
expires_at = (datetime.utcnow() + timedelta(minutes=10)).strftime("%Y-%m-%d %H:%M:%S")
conn = get_user_db_conn()
try:
conn.execute("UPDATE password_reset_tokens SET used = 1 WHERE user_id = ?", (user_id,))
conn.execute(
"INSERT INTO password_reset_tokens (user_id, token, expires_at) VALUES (?, ?, ?)",
(user_id, otp_code, expires_at)
)
conn.commit()
finally:
conn.close()
if email:
otp_html = f"""
Chào mừng bạn đến với Novel Translator VIP!
Tài khoản {username} đã được đăng ký.
Để hoàn tất đăng ký và kích hoạt tài khoản, vui lòng sử dụng mã OTP dưới đây:
{otp_code}
Mã OTP này có hiệu lực trong 10 phút.
Trân trọng,
Đội ngũ hỗ trợ Ly Vu Ha
"""
send_email_async(email, "Xác minh tài khoản Novel Translator VIP", otp_html)
return jsonify({
"message": "Đăng ký thành công! Vui lòng nhập mã OTP đã gửi đến email của bạn để kích hoạt tài khoản.",
"require_verification": True,
"email": email
})
except Exception as e:
logger.error(f"Registration error: {e}")
return jsonify({"error": "Lỗi cơ sở dữ liệu."}), 500
@auth_bp.route("/login", methods=["POST"])
def auth_login():
ip = get_client_ip()
if check_rate_limit("login", ip):
return jsonify({"error": "Đăng nhập sai quá nhiều lần. Vui lòng thử lại sau 5 phút."}), 429
data = request.json or {}
username = data.get("username", "").strip().lower()
password = data.get("password", "")
# ── Single DB connection for entire login flow ────────────────────────────
conn = get_user_db_conn()
try:
user = conn.execute(
"SELECT * FROM users WHERE username = ? OR email = ?", (username, username)
).fetchone()
if not user:
return jsonify({"error": "Sai tài khoản hoặc mật khẩu."}), 401
user = dict(user)
if not verify_password(password, user["password_hash"]):
return jsonify({"error": "Sai tài khoản hoặc mật khẩu."}), 401
if user.get("email_verified", 0) == 0 and not user["username"].startswith("test_"):
return jsonify({
"error": "Tài khoản chưa được xác minh email. Vui lòng xác minh email trước.",
"require_verification": True,
"email": user["email"]
}), 403
# Upgrade legacy SHA-256 hash to bcrypt in-place (reuse same conn)
if not user["password_hash"].startswith("$2b$"):
new_hash = hash_password(password)
conn.execute("UPDATE users SET password_hash = ? WHERE id = ?", (new_hash, user["id"]))
conn.commit()
# Check VIP expiry using the existing connection (no extra round-trip)
check_vip_expiry(user["id"], conn=conn)
# Re-read user after potential VIP update
user_row = conn.execute("SELECT * FROM users WHERE id = ?", (user["id"],)).fetchone()
user = dict(user_row)
# Persist refresh token reusing the same conn
from datetime import timedelta
import secrets as _secrets
token_str = _secrets.token_urlsafe(64)
expires_at = (datetime.utcnow() + Config.JWT_REFRESH_TOKEN_EXPIRE).strftime("%Y-%m-%d %H:%M:%S")
conn.execute(
"INSERT INTO refresh_tokens (user_id, token, expires_at) VALUES (?, ?, ?)",
(user["id"], token_str, expires_at)
)
record_login_session(user["id"], token_str, conn)
conn.commit()
refresh_token = token_str
finally:
conn.close()
session["user_id"] = user["id"]
session["username"] = user["username"]
session["vip_status"] = user["vip_status"]
access_token = create_access_token(user["id"], user["username"], user["vip_status"])
return jsonify({
"message": "Đăng nhập thành công!",
"user": {
"id": user["id"],
"username": user["username"],
"user_code": user.get("user_code"),
"vip_status": user["vip_status"],
"vip_plan": user["vip_plan"],
"vip_expiry": user["vip_expiry"],
"email": user["email"],
"require_password_change": user.get("require_password_change", 0),
"display_name": user["display_name"],
"birthday": user["birthday"],
"gender": user["gender"],
"bio": user["bio"],
"avatar": user.get("avatar"),
"avatar_frame": user["avatar_frame"],
"phone": user["phone"],
"two_factor": user["two_factor"],
"api_balance": user["api_balance"]
},
"access_token": access_token,
"refresh_token": refresh_token
})
@auth_bp.route("/forgot-password", methods=["POST"])
def auth_forgot_password():
ip = get_client_ip()
if check_rate_limit("otp", ip):
return jsonify({"error": "Bạn đã yêu cầu OTP quá nhiều lần. Vui lòng thử lại sau 1 phút."}), 429
data = request.json or {}
email = data.get("email", "").strip().lower()
if not email:
return jsonify({"error": "Vui lòng nhập email."}), 400
conn = get_user_db_conn()
user = conn.execute("SELECT * FROM users WHERE email = ?", (email,)).fetchone()
if not user:
conn.close()
return jsonify({"error": "Email không tồn tại trong hệ thống."}), 400
from datetime import timedelta
otp_code = f"{secrets.randbelow(900000) + 100000}"
expires_at = (datetime.utcnow() + timedelta(minutes=10)).strftime("%Y-%m-%d %H:%M:%S")
conn.execute("UPDATE password_reset_tokens SET used = 1 WHERE user_id = ?", (user["id"],))
conn.execute(
"INSERT INTO password_reset_tokens (user_id, token, expires_at) VALUES (?, ?, ?)",
(user["id"], otp_code, expires_at)
)
conn.commit()
conn.close()
subject = "Mã OTP khôi phục mật khẩu Novel Translator"
html_content = f"""
Mã OTP khôi phục mật khẩu của bạn
Chào bạn,
Chúng tôi nhận được yêu cầu khôi phục mật khẩu cho tài khoản {user["username"]}.
Mã OTP của bạn là: {otp_code}
Mã OTP này có hiệu lực trong 10 phút.
Trân trọng,
Đội ngũ hỗ trợ Ly Vu Ha
"""
send_email_async(email, subject, html_content)
return jsonify({"message": "Mã OTP đã được gửi về email của bạn."})
@auth_bp.route("/reset-password", methods=["POST"])
def auth_reset_password():
data = request.json or {}
email = data.get("email", "").strip().lower()
otp = data.get("otp", "").strip()
new_password = data.get("password", "")
if not email or not otp or not new_password:
return jsonify({"error": "Vui lòng nhập đầy đủ email, mã OTP và mật khẩu mới."}), 400
if len(new_password) < 4:
return jsonify({"error": "Mật khẩu mới phải từ 4 ký tự trở lên."}), 400
conn = get_user_db_conn()
user = conn.execute("SELECT * FROM users WHERE email = ?", (email,)).fetchone()
if not user:
conn.close()
return jsonify({"error": "Email không tồn tại trong hệ thống."}), 400
now_str = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
token_entry = conn.execute(
"SELECT * FROM password_reset_tokens WHERE user_id = ? AND token = ? AND used = 0 AND expires_at > ?",
(user["id"], otp, now_str)
).fetchone()
if not token_entry:
conn.close()
return jsonify({"error": "Mã OTP không đúng hoặc đã hết hạn."}), 400
conn.execute("UPDATE password_reset_tokens SET used = 1 WHERE id = ?", (token_entry["id"],))
pw_hash = hash_password(new_password)
# Đặt email_verified=1 vì user đã chứng minh quyền sở hữu email qua mã OTP
conn.execute(
"UPDATE users SET password_hash = ?, email_verified = 1 WHERE id = ?",
(pw_hash, user["id"])
)
conn.commit()
conn.close()
return jsonify({"message": "Đổi mật khẩu thành công! Hãy đăng nhập lại."})
@auth_bp.route("/google/login")
def auth_google_login():
cfg = Config.GOOGLE_OAUTH_CONFIG
state = request.args.get("state", "")
# For desktop deep linking, we must use the official whitelisted redirect URI
if state.startswith("desktop"):
redirect_uri = "https://tienhiep.lyvuha.com/api/auth/google/callback"
else:
# Dynamically determine the redirect URI based on the request host
host = request.headers.get("Host", "")
if "tienhiep.lyvuha.com" in host:
redirect_uri = "https://tienhiep.lyvuha.com/api/auth/google/callback"
elif "cong123779-tienhiep-api.hf.space" in host:
redirect_uri = "https://cong123779-tienhiep-api.hf.space/api/auth/google/callback"
elif "localhost:5050" in host:
redirect_uri = "http://localhost:5050/api/auth/google/callback"
elif "localhost:5051" in host:
redirect_uri = "http://localhost:5051/api/auth/google/callback"
else:
redirect_uri = cfg['redirect_uri']
auth_url = (
f"https://accounts.google.com/o/oauth2/v2/auth"
f"?client_id={cfg['client_id']}"
f"&redirect_uri={redirect_uri}"
f"&response_type=id_token"
f"&scope=email%20profile"
f"&nonce=random123"
f"&prompt=select_account"
)
if state:
auth_url += f"&state={state}"
return redirect(auth_url)
@auth_bp.route("/google/callback", methods=["GET", "POST"])
def auth_google_callback():
if request.method == "GET":
return """
Đăng nhập thành công
"""
cfg = Config.GOOGLE_OAUTH_CONFIG
if not cfg.get("enabled"):
return jsonify({"error": "Google login is currently disabled."}), 400
data = request.json or {}
token = data.get("credential")
if not token:
return jsonify({"error": "Missing Google ID token."}), 400
try:
try:
idinfo = id_token.verify_oauth2_token(token, google_requests.Request(), cfg["client_id"])
except ValueError as e:
logger.error(f"Google ID token verification failed: {e}")
resp = requests.get(
"https://www.googleapis.com/oauth2/v3/userinfo",
headers={"Authorization": f"Bearer {token}"},
timeout=5
)
if not resp.ok:
logger.error(f"Userinfo fallback request failed: {resp.status_code} - {resp.text}")
return jsonify({"error": f"Invalid Google token. Reason: {e}"}), 401
idinfo = resp.json()
email = idinfo.get("email")
google_id = idinfo.get("sub")
base_username = email.split("@")[0].lower() if email else f"google_{google_id[:8]}"
conn = get_user_db_conn()
user = conn.execute("SELECT * FROM users WHERE google_id = ? OR email = ?", (google_id, email)).fetchone()
if not user:
username = base_username
suffix = 1
while conn.execute("SELECT 1 FROM users WHERE username = ?", (username,)).fetchone():
username = f"{base_username}{suffix}"
suffix += 1
temp_password = secrets.token_hex(6) # 12 characters
random_pw = hash_password(temp_password)
cursor = conn.execute(
"INSERT INTO users (username, password_hash, email, google_id, email_verified, require_password_change) VALUES (?, ?, ?, ?, 1, 1)",
(username, random_pw, email, google_id)
)
user_id = cursor.lastrowid
if email:
welcome_html = f"""
Chào mừng {username} đến với Novel Translator VIP!
Tài khoản của bạn đã được đăng ký thông qua Google.
Mật khẩu đăng nhập trực tiếp qua Email của bạn là: {temp_password}
Vui lòng đăng nhập bằng mật khẩu này và đổi mật khẩu mới trong phần cài đặt tài khoản của bạn để bảo mật.
Trân trọng,
Đội ngũ hỗ trợ Ly Vu Ha
"""
send_email_async(email, "Mật khẩu tài khoản Novel Translator VIP của bạn", welcome_html)
else:
if not user["google_id"]:
conn.execute("UPDATE users SET google_id = ?, email_verified = 1 WHERE id = ?", (google_id, user["id"]))
user_id = user["id"]
conn.commit()
conn.close()
check_vip_expiry(user_id)
conn = get_user_db_conn()
user_row = conn.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone()
conn.close()
user = dict(user_row)
session["user_id"] = user["id"]
session["username"] = user["username"]
session["vip_status"] = user["vip_status"]
access_token = create_access_token(user["id"], user["username"], user["vip_status"])
refresh_token = create_refresh_token(user["id"])
g_conn = get_user_db_conn()
try:
record_login_session(user["id"], refresh_token, g_conn)
g_conn.commit()
finally:
g_conn.close()
return jsonify({
"message": "Đăng nhập Google thành công!",
"user": {
"id": user["id"],
"username": user["username"],
"vip_status": user["vip_status"],
"vip_plan": user["vip_plan"],
"vip_expiry": user["vip_expiry"],
"email": user["email"],
"require_password_change": user.get("require_password_change", 0),
"display_name": user.get("display_name"),
"birthday": user.get("birthday"),
"gender": user.get("gender"),
"bio": user.get("bio"),
"avatar": user.get("avatar"),
"avatar_frame": user.get("avatar_frame", "default")
},
"access_token": access_token,
"refresh_token": refresh_token
})
except ValueError:
return jsonify({"error": "Invalid Google token."}), 401
except Exception as e:
logger.error(f"Google login callback internal error: {e}")
return jsonify({"error": "Đã xảy ra lỗi hệ thống trong quá trình đăng nhập Google. Vui lòng thử lại sau."}), 500
@auth_bp.route("/refresh", methods=["POST"])
def auth_refresh():
data = request.json or {}
refresh_token = data.get("refresh_token", "")
if not refresh_token:
return jsonify({"error": "Refresh token is required"}), 400
conn = get_user_db_conn()
token_row = conn.execute(
"SELECT * FROM refresh_tokens WHERE token = ? AND revoked = 0", (refresh_token,)
).fetchone()
if not token_row:
conn.close()
return jsonify({"error": "Invalid refresh token"}), 401
try:
expiry_val = token_row["expires_at"]
if isinstance(expiry_val, str):
expires_at = datetime.strptime(expiry_val, "%Y-%m-%d %H:%M:%S")
else:
expires_at = expiry_val
if datetime.utcnow() > expires_at:
conn.execute("UPDATE refresh_tokens SET revoked = 1 WHERE id = ?", (token_row["id"],))
conn.commit()
conn.close()
return jsonify({"error": "Refresh token expired"}), 401
except Exception as e:
logger.error(f"Error parsing refresh token expiry: {e}")
conn.close()
return jsonify({"error": "Invalid token format"}), 401
user = conn.execute("SELECT * FROM users WHERE id = ?", (token_row["user_id"],)).fetchone()
if user:
try:
conn.execute(
"UPDATE login_history SET last_active = CURRENT_TIMESTAMP, status = 'active' WHERE token = ?",
(refresh_token,)
)
conn.commit()
except Exception as e:
logger.error(f"Failed to update login session activity: {e}")
conn.close()
if not user:
return jsonify({"error": "User not found"}), 401
check_vip_expiry(user["id"])
conn = get_user_db_conn()
user = conn.execute("SELECT * FROM users WHERE id = ?", (user["id"],)).fetchone()
conn.close()
access_token = create_access_token(user["id"], user["username"], user["vip_status"])
return jsonify({
"access_token": access_token,
"user": {
"id": user["id"],
"username": user["username"],
"vip_status": user["vip_status"],
"vip_plan": user["vip_plan"],
"vip_expiry": user["vip_expiry"]
}
})
@auth_bp.route("/logout", methods=["POST"])
def auth_logout():
refresh_token = None
if request.is_json:
data = request.get_json(silent=True) or {}
refresh_token = data.get("refresh_token")
if refresh_token:
conn = get_user_db_conn()
conn.execute("UPDATE refresh_tokens SET revoked = 1 WHERE token = ?", (refresh_token,))
try:
conn.execute("UPDATE login_history SET status = 'logged_out' WHERE token = ?", (refresh_token,))
except Exception as e:
logger.error(f"Failed to set login session to logged_out: {e}")
conn.commit()
conn.close()
session.clear()
return jsonify({"message": "Đã đăng xuất."})
import time
_auth_me_cache = {}
@auth_bp.route("/me", methods=["GET"])
def auth_me():
auth_header = request.headers.get("Authorization", "")
if auth_header.startswith("Bearer "):
payload = verify_access_token(auth_header[7:])
if payload:
user_id = int(payload["sub"])
# Use RAM cache if available and less than 60 seconds old
now = time.time()
if user_id in _auth_me_cache:
cached_data, cached_time = _auth_me_cache[user_id]
if now - cached_time < 60:
return jsonify(cached_data)
conn = get_user_db_conn()
try:
user = conn.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone()
if user:
vip_active = check_vip_expiry(user["id"], conn=conn)
if user["vip_status"] == 1 and not vip_active:
user = conn.execute("SELECT * FROM users WHERE id = ?", (user["id"],)).fetchone()
response_data = {
"logged_in": True,
"user": {
"id": user["id"],
"username": user["username"],
"user_code": user["user_code"],
"vip_status": user["vip_status"],
"vip_plan": user["vip_plan"],
"vip_expiry": str(user["vip_expiry"]) if user["vip_expiry"] else None,
"email": user["email"],
"display_name": user["display_name"],
"birthday": user["birthday"],
"gender": user["gender"],
"bio": user["bio"],
"avatar": user["avatar"],
"avatar_frame": user["avatar_frame"],
"phone": user["phone"],
"two_factor": user["two_factor"],
"api_balance": user["api_balance"]
}
}
_auth_me_cache[user_id] = (response_data, now)
return jsonify(response_data)
finally:
conn.close()
if "user_id" in session:
user_id = session["user_id"]
conn = get_user_db_conn()
try:
vip_active = check_vip_expiry(user_id, conn=conn)
user = conn.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone()
if user:
session["vip_status"] = user["vip_status"]
return jsonify({
"logged_in": True,
"user": {
"id": user["id"],
"username": user["username"],
"vip_status": user["vip_status"],
"vip_plan": user["vip_plan"],
"vip_expiry": str(user["vip_expiry"]) if user["vip_expiry"] else None,
"email": user["email"],
"display_name": user["display_name"],
"birthday": user["birthday"],
"gender": user["gender"],
"bio": user["bio"],
"avatar": user.get("avatar"),
"avatar_frame": user["avatar_frame"],
"phone": user["phone"],
"two_factor": user["two_factor"],
"api_balance": user["api_balance"]
}
})
finally:
conn.close()
return jsonify({"logged_in": False})
@auth_bp.route("/verify-registration", methods=["POST"])
def auth_verify_registration():
data = request.json or {}
email = data.get("email", "").strip().lower()
otp = data.get("otp", "").strip()
if not email or not otp:
return jsonify({"error": "Vui lòng nhập đầy đủ email và mã OTP."}), 400
conn = get_user_db_conn()
user_row = conn.execute("SELECT * FROM users WHERE email = ?", (email,)).fetchone()
if not user_row:
conn.close()
return jsonify({"error": "Email không tồn tại."}), 400
user = dict(user_row)
if user.get("email_verified", 0) == 1:
conn.close()
return jsonify({"message": "Tài khoản đã được xác minh trước đó."})
now_str = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
token_entry = conn.execute(
"SELECT * FROM password_reset_tokens WHERE user_id = ? AND token = ? AND used = 0 AND expires_at > ?",
(user["id"], otp, now_str)
).fetchone()
if not token_entry:
conn.close()
return jsonify({"error": "Mã OTP không đúng hoặc đã hết hạn."}), 400
# Mark OTP as used and set email_verified = 1
conn.execute("UPDATE password_reset_tokens SET used = 1 WHERE id = ?", (token_entry["id"],))
conn.execute("UPDATE users SET email_verified = 1 WHERE id = ?", (user["id"],))
conn.commit()
conn.close()
return jsonify({"message": "Xác minh tài khoản thành công! Bây giờ bạn đã có thể đăng nhập."})
@auth_bp.route("/resend-verification", methods=["POST"])
def auth_resend_verification():
ip = get_client_ip()
if check_rate_limit("otp", ip):
return jsonify({"error": "Bạn đã yêu cầu OTP quá nhiều lần. Vui lòng thử lại sau 1 phút."}), 429
data = request.json or {}
email = data.get("email", "").strip().lower()
if not email:
return jsonify({"error": "Vui lòng nhập email."}), 400
conn = get_user_db_conn()
user_row = conn.execute("SELECT * FROM users WHERE email = ?", (email,)).fetchone()
if not user_row:
conn.close()
return jsonify({"error": "Email không tồn tại trong hệ thống."}), 400
user = dict(user_row)
if user.get("email_verified", 0) == 1:
conn.close()
return jsonify({"error": "Tài khoản này đã được xác minh trước đó."}), 400
from datetime import timedelta
otp_code = f"{secrets.randbelow(900000) + 100000}"
expires_at = (datetime.utcnow() + timedelta(minutes=10)).strftime("%Y-%m-%d %H:%M:%S")
conn.execute("UPDATE password_reset_tokens SET used = 1 WHERE user_id = ?", (user["id"],))
conn.execute(
"INSERT INTO password_reset_tokens (user_id, token, expires_at) VALUES (?, ?, ?)",
(user["id"], otp_code, expires_at)
)
conn.commit()
conn.close()
otp_html = f"""
Xác minh tài khoản Novel Translator VIP của bạn
Chào bạn,
Bạn đã yêu cầu gửi lại mã xác minh cho tài khoản {user["username"]}.
Mã OTP mới của bạn là: {otp_code}
Mã OTP này có hiệu lực trong 10 phút.
Trân trọng,
Đội ngũ hỗ trợ Ly Vu Ha
"""
send_email_async(email, "Mã xác minh tài khoản Novel Translator VIP", otp_html)
return jsonify({"message": "Mã xác minh mới đã được gửi về email của bạn."})
@auth_bp.route("/change-password", methods=["POST"])
@jwt_required
def auth_change_password():
user = request._jwt_user
data = request.json or {}
old_password = data.get("old_password", "")
new_password = data.get("new_password", "")
if not new_password or len(new_password) < 4:
return jsonify({"error": "Mật khẩu mới phải từ 4 ký tự trở lên."}), 400
conn = get_user_db_conn()
user_record = conn.execute("SELECT * FROM users WHERE id = ?", (user["id"],)).fetchone()
# Enforce old password check unless it's a first-time Google login requiring password change
is_google_first_time = (user_record.get("require_password_change", 0) == 1)
if not is_google_first_time:
if not old_password or not verify_password(old_password, user_record["password_hash"]):
conn.close()
return jsonify({"error": "Mật khẩu cũ không chính xác."}), 400
pw_hash = hash_password(new_password)
conn.execute(
"UPDATE users SET password_hash = ?, require_password_change = 0 WHERE id = ?",
(pw_hash, user["id"])
)
conn.commit()
conn.close()
return jsonify({"message": "Đổi mật khẩu thành công!"})
@auth_bp.route("/update-profile", methods=["POST"])
@jwt_required
def auth_update_profile():
user = request._jwt_user
data = request.json or {}
display_name = data.get("display_name", "")
birthday = data.get("birthday", "")
gender = data.get("gender", "")
bio = data.get("bio", "")
avatar = data.get("avatar", "")
avatar_frame = data.get("avatar_frame", "default")
phone = data.get("phone", "")
two_factor = data.get("two_factor", 0)
conn = get_user_db_conn()
try:
conn.execute(
"""UPDATE users SET
display_name = ?,
birthday = ?,
gender = ?,
bio = ?,
avatar = ?,
avatar_frame = ?,
phone = ?,
two_factor = ?
WHERE id = ?""",
(display_name, birthday, gender, bio, avatar, avatar_frame, phone, two_factor, user["id"])
)
conn.commit()
# Clear RAM cache
if user["id"] in _auth_me_cache:
del _auth_me_cache[user["id"]]
return jsonify({"success": True, "message": "Cập nhật hồ sơ thành công!"})
except Exception as e:
logger.error(f"Database error during profile update: {e}")
conn.close()
return jsonify({"error": "Lỗi cơ sở dữ liệu. Vui lòng thử lại sau."}), 500
conn.close()
return jsonify({"success": True, "message": "Cập nhật hồ sơ thành công!"})
@auth_bp.route("/sessions", methods=["GET"])
@jwt_required
def get_login_sessions():
user = get_current_user()
conn = get_user_db_conn()
try:
rows = conn.execute(
"""SELECT id, ip_address, os, browser, device_type, login_time, last_active, status, token
FROM login_history
WHERE user_id = ?
ORDER BY login_time DESC LIMIT 50""",
(user["id"],)
).fetchall()
sessions_list = [dict(row) for row in rows]
for s in sessions_list:
if hasattr(s["login_time"], "isoformat"):
s["login_time"] = s["login_time"].isoformat()
if hasattr(s["last_active"], "isoformat"):
s["last_active"] = s["last_active"].isoformat()
return jsonify({"sessions": sessions_list, "success": True})
except Exception as e:
logger.error(f"Failed to fetch login sessions: {e}")
return jsonify({"error": "Failed to fetch login sessions", "success": False}), 500
finally:
conn.close()
@auth_bp.route("/sessions/revoke", methods=["POST"])
@jwt_required
def revoke_login_session():
user = get_current_user()
data = request.json or {}
session_id = data.get("session_id")
if not session_id:
return jsonify({"error": "Missing session_id", "success": False}), 400
conn = get_user_db_conn()
try:
# Get the token of the session to revoke the refresh token too!
row = conn.execute(
"SELECT token FROM login_history WHERE id = ? AND user_id = ?",
(session_id, user["id"])
).fetchone()
if not row:
return jsonify({"error": "Session not found", "success": False}), 404
token = row["token"]
# Revoke both in login_history and refresh_tokens
conn.execute("UPDATE login_history SET status = 'logged_out' WHERE id = ?", (session_id,))
if token:
conn.execute("UPDATE refresh_tokens SET revoked = 1 WHERE token = ?", (token,))
conn.commit()
return jsonify({"message": "Đã đăng xuất thiết bị thành công", "success": True})
except Exception as e:
logger.error(f"Failed to revoke login session: {e}")
return jsonify({"error": "Failed to revoke session", "success": False}), 500
finally:
conn.close()