hethongtomtat / database.py
thinh21's picture
Upload 10 files
a9b0b3a verified
import sqlite3
from datetime import datetime, timedelta
import hashlib
import hmac
import os
import secrets
_DB_PATH = "history.db"
_PWD_ITERATIONS = 200_000
_RESET_CODE_ITERATIONS = 120_000
_RESET_CODE_TTL_MINUTES = 10
_RESET_MAX_ATTEMPTS = 5
def _utc_now_str():
return datetime.utcnow().strftime("%d/%m/%Y %H:%M:%S")
def _hash_password(password: str, salt: bytes) -> bytes:
return hashlib.pbkdf2_hmac("sha256", password.encode("utf-8"), salt, _PWD_ITERATIONS)
def _hash_reset_code(code: str, salt: bytes) -> bytes:
return hashlib.pbkdf2_hmac("sha256", code.encode("utf-8"), salt, _RESET_CODE_ITERATIONS)
def init_db():
conn = sqlite3.connect(_DB_PATH)
c = conn.cursor()
c.execute('''
CREATE TABLE IF NOT EXISTS summary_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
created_at TEXT,
method TEXT,
original_length INTEGER,
summary_length INTEGER,
process_time REAL,
original_text TEXT,
summary_text TEXT,
novelty_score REAL,
rouge_l_score REAL
)
''')
c.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
created_at TEXT NOT NULL,
username TEXT NOT NULL UNIQUE,
email TEXT,
password_salt TEXT NOT NULL,
password_hash TEXT NOT NULL
)
''')
c.execute('''
CREATE TABLE IF NOT EXISTS password_reset_codes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
created_at TEXT NOT NULL,
user_id INTEGER NOT NULL,
code_salt TEXT NOT NULL,
code_hash TEXT NOT NULL,
expires_at TEXT NOT NULL,
attempts INTEGER NOT NULL DEFAULT 0,
used INTEGER NOT NULL DEFAULT 0,
FOREIGN KEY(user_id) REFERENCES users(id)
)
''')
conn.commit()
conn.close()
def save_summary(method, orig_len, sum_len, p_time, orig_text, sum_text, novelty=0.0, rouge_l=0.0):
conn = sqlite3.connect(_DB_PATH)
c = conn.cursor()
date_str = datetime.now().strftime("%d/%m/%Y %H:%M:%S")
c.execute('''
INSERT INTO summary_history
(created_at, method, original_length, summary_length, process_time, original_text, summary_text, novelty_score, rouge_l_score)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (date_str, method, orig_len, sum_len, p_time, orig_text, sum_text, novelty, rouge_l))
conn.commit()
conn.close()
def get_history():
conn = sqlite3.connect(_DB_PATH)
c = conn.cursor()
c.execute("SELECT * FROM summary_history ORDER BY id DESC LIMIT 50")
rows = c.fetchall()
conn.close()
return rows
def create_user(username: str, password: str, email: str | None = None):
username = (username or "").strip()
email = (email or "").strip() or None
if not username or not password:
return False, "Thiếu username hoặc password."
salt = os.urandom(16)
pwd_hash = _hash_password(password, salt)
try:
conn = sqlite3.connect(_DB_PATH)
c = conn.cursor()
c.execute(
"INSERT INTO users (created_at, username, email, password_salt, password_hash) VALUES (?, ?, ?, ?, ?)",
(_utc_now_str(), username, email, salt.hex(), pwd_hash.hex()),
)
conn.commit()
return True, "Đăng ký thành công."
except sqlite3.IntegrityError:
return False, "Username đã tồn tại."
finally:
try:
conn.close()
except Exception:
pass
def authenticate_user(username: str, password: str):
username = (username or "").strip()
if not username or not password:
return False, None, "Thiếu username hoặc password."
conn = sqlite3.connect(_DB_PATH)
c = conn.cursor()
c.execute("SELECT id, username, email, password_salt, password_hash FROM users WHERE username = ?", (username,))
row = c.fetchone()
conn.close()
if not row:
return False, None, "Sai username hoặc password."
user_id, uname, email, salt_hex, hash_hex = row
salt = bytes.fromhex(salt_hex)
expected = bytes.fromhex(hash_hex)
actual = _hash_password(password, salt)
if not hmac.compare_digest(expected, actual):
return False, None, "Sai username hoặc password."
return True, {"id": user_id, "username": uname, "email": email}, "Đăng nhập thành công."
def _get_user_by_identifier(identifier: str):
identifier = (identifier or "").strip()
if not identifier:
return None
conn = sqlite3.connect(_DB_PATH)
c = conn.cursor()
if "@" in identifier:
c.execute("SELECT id, username, email FROM users WHERE email = ?", (identifier,))
else:
c.execute("SELECT id, username, email FROM users WHERE username = ?", (identifier,))
row = c.fetchone()
conn.close()
if not row:
return None
user_id, username, email = row
return {"id": user_id, "username": username, "email": email}
def create_password_reset_code(identifier: str):
"""
Create a one-time reset code for a user (by username or email).
Returns: (ok: bool, email: str|None, msg: str)
"""
user = _get_user_by_identifier(identifier)
if not user:
# Don't reveal whether user exists
return False, None, "Nếu tài khoản tồn tại và có email, hệ thống sẽ gửi mã đặt lại mật khẩu."
email = (user.get("email") or "").strip()
if not email:
return False, None, "Tài khoản này chưa có email nên không thể đặt lại mật khẩu."
code = f"{secrets.randbelow(1_000_000):06d}"
salt = os.urandom(16)
code_hash = _hash_reset_code(code, salt)
now = datetime.utcnow()
expires = now + timedelta(minutes=_RESET_CODE_TTL_MINUTES)
conn = sqlite3.connect(_DB_PATH)
c = conn.cursor()
# Invalidate previous unused codes for this user
c.execute("UPDATE password_reset_codes SET used = 1 WHERE user_id = ? AND used = 0", (user["id"],))
c.execute(
"INSERT INTO password_reset_codes (created_at, user_id, code_salt, code_hash, expires_at, attempts, used) VALUES (?, ?, ?, ?, ?, 0, 0)",
(_utc_now_str(), user["id"], salt.hex(), code_hash.hex(), expires.isoformat()),
)
conn.commit()
conn.close()
return True, email, code
def reset_password_with_code(identifier: str, code: str, new_password: str):
"""
Verify reset code and update password.
Returns: (ok: bool, msg: str)
"""
user = _get_user_by_identifier(identifier)
if not user:
return False, "Mã không hợp lệ hoặc đã hết hạn."
code = (code or "").strip()
if not code or not new_password:
return False, "Thiếu mã hoặc mật khẩu mới."
if len(new_password) < 6:
return False, "Password tối thiểu 6 ký tự."
conn = sqlite3.connect(_DB_PATH)
c = conn.cursor()
c.execute(
"""
SELECT id, code_salt, code_hash, expires_at, attempts, used
FROM password_reset_codes
WHERE user_id = ?
ORDER BY id DESC
LIMIT 1
""",
(user["id"],),
)
row = c.fetchone()
if not row:
conn.close()
return False, "Mã không hợp lệ hoặc đã hết hạn."
reset_id, salt_hex, hash_hex, expires_at, attempts, used = row
if used:
conn.close()
return False, "Mã không hợp lệ hoặc đã hết hạn."
try:
expires_dt = datetime.fromisoformat(expires_at)
except Exception:
expires_dt = datetime.utcnow() - timedelta(days=1)
if datetime.utcnow() > expires_dt:
c.execute("UPDATE password_reset_codes SET used = 1 WHERE id = ?", (reset_id,))
conn.commit()
conn.close()
return False, "Mã không hợp lệ hoặc đã hết hạn."
if attempts >= _RESET_MAX_ATTEMPTS:
c.execute("UPDATE password_reset_codes SET used = 1 WHERE id = ?", (reset_id,))
conn.commit()
conn.close()
return False, "Bạn đã nhập sai quá nhiều lần. Vui lòng yêu cầu mã mới."
salt = bytes.fromhex(salt_hex)
expected = bytes.fromhex(hash_hex)
actual = _hash_reset_code(code, salt)
if not hmac.compare_digest(expected, actual):
c.execute("UPDATE password_reset_codes SET attempts = attempts + 1 WHERE id = ?", (reset_id,))
conn.commit()
conn.close()
return False, "Mã không hợp lệ hoặc đã hết hạn."
# Update password
pwd_salt = os.urandom(16)
pwd_hash = _hash_password(new_password, pwd_salt)
c.execute(
"UPDATE users SET password_salt = ?, password_hash = ? WHERE id = ?",
(pwd_salt.hex(), pwd_hash.hex(), user["id"]),
)
c.execute("UPDATE password_reset_codes SET used = 1 WHERE id = ?", (reset_id,))
conn.commit()
conn.close()
return True, "Đổi mật khẩu thành công. Bạn có thể đăng nhập lại."