Spaces:
Running
Running
| import sqlite3 | |
| from datetime import datetime, timedelta | |
| import hashlib | |
| import hmac | |
| import os | |
| import secrets | |
| _DB_PATH = "history.db" | |
| _PWD_ITERATIONS = 200_000 | |
| _RESET_CODE_ITERATIONS = 120_000 | |
| _RESET_CODE_TTL_MINUTES = 10 | |
| _RESET_MAX_ATTEMPTS = 5 | |
| def _utc_now_str(): | |
| return datetime.utcnow().strftime("%d/%m/%Y %H:%M:%S") | |
| def _hash_password(password: str, salt: bytes) -> bytes: | |
| return hashlib.pbkdf2_hmac("sha256", password.encode("utf-8"), salt, _PWD_ITERATIONS) | |
| def _hash_reset_code(code: str, salt: bytes) -> bytes: | |
| return hashlib.pbkdf2_hmac("sha256", code.encode("utf-8"), salt, _RESET_CODE_ITERATIONS) | |
| def init_db(): | |
| conn = sqlite3.connect(_DB_PATH) | |
| c = conn.cursor() | |
| c.execute(''' | |
| CREATE TABLE IF NOT EXISTS summary_history ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| created_at TEXT, | |
| method TEXT, | |
| original_length INTEGER, | |
| summary_length INTEGER, | |
| process_time REAL, | |
| original_text TEXT, | |
| summary_text TEXT, | |
| novelty_score REAL, | |
| rouge_l_score REAL | |
| ) | |
| ''') | |
| c.execute(''' | |
| CREATE TABLE IF NOT EXISTS users ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| created_at TEXT NOT NULL, | |
| username TEXT NOT NULL UNIQUE, | |
| email TEXT, | |
| password_salt TEXT NOT NULL, | |
| password_hash TEXT NOT NULL | |
| ) | |
| ''') | |
| c.execute(''' | |
| CREATE TABLE IF NOT EXISTS password_reset_codes ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| created_at TEXT NOT NULL, | |
| user_id INTEGER NOT NULL, | |
| code_salt TEXT NOT NULL, | |
| code_hash TEXT NOT NULL, | |
| expires_at TEXT NOT NULL, | |
| attempts INTEGER NOT NULL DEFAULT 0, | |
| used INTEGER NOT NULL DEFAULT 0, | |
| FOREIGN KEY(user_id) REFERENCES users(id) | |
| ) | |
| ''') | |
| conn.commit() | |
| conn.close() | |
| def save_summary(method, orig_len, sum_len, p_time, orig_text, sum_text, novelty=0.0, rouge_l=0.0): | |
| conn = sqlite3.connect(_DB_PATH) | |
| c = conn.cursor() | |
| date_str = datetime.now().strftime("%d/%m/%Y %H:%M:%S") | |
| c.execute(''' | |
| INSERT INTO summary_history | |
| (created_at, method, original_length, summary_length, process_time, original_text, summary_text, novelty_score, rouge_l_score) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| ''', (date_str, method, orig_len, sum_len, p_time, orig_text, sum_text, novelty, rouge_l)) | |
| conn.commit() | |
| conn.close() | |
| def get_history(): | |
| conn = sqlite3.connect(_DB_PATH) | |
| c = conn.cursor() | |
| c.execute("SELECT * FROM summary_history ORDER BY id DESC LIMIT 50") | |
| rows = c.fetchall() | |
| conn.close() | |
| return rows | |
| def create_user(username: str, password: str, email: str | None = None): | |
| username = (username or "").strip() | |
| email = (email or "").strip() or None | |
| if not username or not password: | |
| return False, "Thiếu username hoặc password." | |
| salt = os.urandom(16) | |
| pwd_hash = _hash_password(password, salt) | |
| try: | |
| conn = sqlite3.connect(_DB_PATH) | |
| c = conn.cursor() | |
| c.execute( | |
| "INSERT INTO users (created_at, username, email, password_salt, password_hash) VALUES (?, ?, ?, ?, ?)", | |
| (_utc_now_str(), username, email, salt.hex(), pwd_hash.hex()), | |
| ) | |
| conn.commit() | |
| return True, "Đăng ký thành công." | |
| except sqlite3.IntegrityError: | |
| return False, "Username đã tồn tại." | |
| finally: | |
| try: | |
| conn.close() | |
| except Exception: | |
| pass | |
| def authenticate_user(username: str, password: str): | |
| username = (username or "").strip() | |
| if not username or not password: | |
| return False, None, "Thiếu username hoặc password." | |
| conn = sqlite3.connect(_DB_PATH) | |
| c = conn.cursor() | |
| c.execute("SELECT id, username, email, password_salt, password_hash FROM users WHERE username = ?", (username,)) | |
| row = c.fetchone() | |
| conn.close() | |
| if not row: | |
| return False, None, "Sai username hoặc password." | |
| user_id, uname, email, salt_hex, hash_hex = row | |
| salt = bytes.fromhex(salt_hex) | |
| expected = bytes.fromhex(hash_hex) | |
| actual = _hash_password(password, salt) | |
| if not hmac.compare_digest(expected, actual): | |
| return False, None, "Sai username hoặc password." | |
| return True, {"id": user_id, "username": uname, "email": email}, "Đăng nhập thành công." | |
| def _get_user_by_identifier(identifier: str): | |
| identifier = (identifier or "").strip() | |
| if not identifier: | |
| return None | |
| conn = sqlite3.connect(_DB_PATH) | |
| c = conn.cursor() | |
| if "@" in identifier: | |
| c.execute("SELECT id, username, email FROM users WHERE email = ?", (identifier,)) | |
| else: | |
| c.execute("SELECT id, username, email FROM users WHERE username = ?", (identifier,)) | |
| row = c.fetchone() | |
| conn.close() | |
| if not row: | |
| return None | |
| user_id, username, email = row | |
| return {"id": user_id, "username": username, "email": email} | |
| def create_password_reset_code(identifier: str): | |
| """ | |
| Create a one-time reset code for a user (by username or email). | |
| Returns: (ok: bool, email: str|None, msg: str) | |
| """ | |
| user = _get_user_by_identifier(identifier) | |
| if not user: | |
| # Don't reveal whether user exists | |
| return False, None, "Nếu tài khoản tồn tại và có email, hệ thống sẽ gửi mã đặt lại mật khẩu." | |
| email = (user.get("email") or "").strip() | |
| if not email: | |
| return False, None, "Tài khoản này chưa có email nên không thể đặt lại mật khẩu." | |
| code = f"{secrets.randbelow(1_000_000):06d}" | |
| salt = os.urandom(16) | |
| code_hash = _hash_reset_code(code, salt) | |
| now = datetime.utcnow() | |
| expires = now + timedelta(minutes=_RESET_CODE_TTL_MINUTES) | |
| conn = sqlite3.connect(_DB_PATH) | |
| c = conn.cursor() | |
| # Invalidate previous unused codes for this user | |
| c.execute("UPDATE password_reset_codes SET used = 1 WHERE user_id = ? AND used = 0", (user["id"],)) | |
| c.execute( | |
| "INSERT INTO password_reset_codes (created_at, user_id, code_salt, code_hash, expires_at, attempts, used) VALUES (?, ?, ?, ?, ?, 0, 0)", | |
| (_utc_now_str(), user["id"], salt.hex(), code_hash.hex(), expires.isoformat()), | |
| ) | |
| conn.commit() | |
| conn.close() | |
| return True, email, code | |
| def reset_password_with_code(identifier: str, code: str, new_password: str): | |
| """ | |
| Verify reset code and update password. | |
| Returns: (ok: bool, msg: str) | |
| """ | |
| user = _get_user_by_identifier(identifier) | |
| if not user: | |
| return False, "Mã không hợp lệ hoặc đã hết hạn." | |
| code = (code or "").strip() | |
| if not code or not new_password: | |
| return False, "Thiếu mã hoặc mật khẩu mới." | |
| if len(new_password) < 6: | |
| return False, "Password tối thiểu 6 ký tự." | |
| conn = sqlite3.connect(_DB_PATH) | |
| c = conn.cursor() | |
| c.execute( | |
| """ | |
| SELECT id, code_salt, code_hash, expires_at, attempts, used | |
| FROM password_reset_codes | |
| WHERE user_id = ? | |
| ORDER BY id DESC | |
| LIMIT 1 | |
| """, | |
| (user["id"],), | |
| ) | |
| row = c.fetchone() | |
| if not row: | |
| conn.close() | |
| return False, "Mã không hợp lệ hoặc đã hết hạn." | |
| reset_id, salt_hex, hash_hex, expires_at, attempts, used = row | |
| if used: | |
| conn.close() | |
| return False, "Mã không hợp lệ hoặc đã hết hạn." | |
| try: | |
| expires_dt = datetime.fromisoformat(expires_at) | |
| except Exception: | |
| expires_dt = datetime.utcnow() - timedelta(days=1) | |
| if datetime.utcnow() > expires_dt: | |
| c.execute("UPDATE password_reset_codes SET used = 1 WHERE id = ?", (reset_id,)) | |
| conn.commit() | |
| conn.close() | |
| return False, "Mã không hợp lệ hoặc đã hết hạn." | |
| if attempts >= _RESET_MAX_ATTEMPTS: | |
| c.execute("UPDATE password_reset_codes SET used = 1 WHERE id = ?", (reset_id,)) | |
| conn.commit() | |
| conn.close() | |
| return False, "Bạn đã nhập sai quá nhiều lần. Vui lòng yêu cầu mã mới." | |
| salt = bytes.fromhex(salt_hex) | |
| expected = bytes.fromhex(hash_hex) | |
| actual = _hash_reset_code(code, salt) | |
| if not hmac.compare_digest(expected, actual): | |
| c.execute("UPDATE password_reset_codes SET attempts = attempts + 1 WHERE id = ?", (reset_id,)) | |
| conn.commit() | |
| conn.close() | |
| return False, "Mã không hợp lệ hoặc đã hết hạn." | |
| # Update password | |
| pwd_salt = os.urandom(16) | |
| pwd_hash = _hash_password(new_password, pwd_salt) | |
| c.execute( | |
| "UPDATE users SET password_salt = ?, password_hash = ? WHERE id = ?", | |
| (pwd_salt.hex(), pwd_hash.hex(), user["id"]), | |
| ) | |
| c.execute("UPDATE password_reset_codes SET used = 1 WHERE id = ?", (reset_id,)) | |
| conn.commit() | |
| conn.close() | |
| return True, "Đổi mật khẩu thành công. Bạn có thể đăng nhập lại." | |