huytrao123's picture
Upload 103 files
ced61cd verified
import sqlite3
import hashlib
import secrets
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
import os
class UserAuthManager:
"this class use for user authentication and session management"
def __init__(self, db_path: str = "./user_db/auth.db"):
self.db_path = db_path
self.session_timeout = timedelta(hours=24)
self._init_auth_database()
def _init_auth_database(self):
# Create directory if it doesn't exist
db_dir = os.path.dirname(self.db_path)
if db_dir and not os.path.exists(db_dir):
os.makedirs(db_dir, exist_ok=True)
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
email TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
salt TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_login TIMESTAMP,
is_active BOOLEAN DEFAULT 1
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS user_sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
session_token TEXT UNIQUE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP NOT NULL,
is_active BOOLEAN DEFAULT 1
)
''')
conn.commit()
conn.close()
def _hash_password(self, password: str, salt: str = None):
if salt is None:
salt = secrets.token_hex(32)
password_hash = hashlib.pbkdf2_hmac(
'sha256', password.encode('utf-8'), salt.encode('utf-8'), 100000
)
return password_hash.hex(), salt
def register_user(self, username: str, email: str, password: str):
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('SELECT id FROM users WHERE username = ? OR email = ?', (username, email))
if cursor.fetchone():
return {"success": False, "message": "User already exists"}
password_hash, salt = self._hash_password(password)
cursor.execute('''
INSERT INTO users (username, email, password_hash, salt)
VALUES (?, ?, ?, ?)
''', (username, email, password_hash, salt))
user_id = cursor.lastrowid
conn.commit()
conn.close()
return {"success": True, "message": "Registration successful", "user_id": user_id}
except Exception as e:
return {"success": False, "message": str(e)}
def login_user(self, username: str, password: str):
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT id, username, email, password_hash, salt
FROM users WHERE (username = ? OR email = ?) AND is_active = 1
''', (username, username))
user = cursor.fetchone()
if not user:
return {"success": False, "message": "Invalid credentials"}
user_id, user_username, user_email, stored_hash, salt = user
password_hash, _ = self._hash_password(password, salt)
if password_hash != stored_hash:
return {"success": False, "message": "Invalid credentials"}
session_token = secrets.token_urlsafe(32)
expires_at = datetime.now() + self.session_timeout
cursor.execute('UPDATE user_sessions SET is_active = 0 WHERE user_id = ?', (user_id,))
cursor.execute('''
INSERT INTO user_sessions (user_id, session_token, expires_at)
VALUES (?, ?, ?)
''', (user_id, session_token, expires_at))
conn.commit()
conn.close()
return {
"success": True,
"session_token": session_token,
"user": {"id": user_id, "username": user_username, "email": user_email}
}
except Exception as e:
return {"success": False, "message": str(e)}
def validate_session(self, session_token: str):
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT u.id, u.username, u.email, s.expires_at
FROM user_sessions s
JOIN users u ON s.user_id = u.id
WHERE s.session_token = ? AND s.is_active = 1
''', (session_token,))
result = cursor.fetchone()
if not result:
return None
user_id, username, email, expires_at = result
expires_datetime = datetime.fromisoformat(expires_at)
if datetime.now() > expires_datetime:
cursor.execute('UPDATE user_sessions SET is_active = 0 WHERE session_token = ?', (session_token,))
conn.commit()
conn.close()
return None
conn.close()
return {"id": user_id, "username": username, "email": email}
except Exception:
return None
def logout_user(self, session_token: str):
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('UPDATE user_sessions SET is_active = 0 WHERE session_token = ?', (session_token,))
conn.commit()
conn.close()
return True
except Exception:
return False
def authenticate_user(self, username_or_email: str, password: str):
"""Authenticate user and return success, message, and user data"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT id, username, email, password_hash, salt
FROM users WHERE (username = ? OR email = ?) AND is_active = 1
''', (username_or_email, username_or_email))
user = cursor.fetchone()
if not user:
conn.close()
return False, "Invalid credentials", None
user_id, username, email, stored_hash, salt = user
password_hash, _ = self._hash_password(password, salt)
if password_hash != stored_hash:
conn.close()
return False, "Invalid credentials", None
# Update last login
cursor.execute('UPDATE users SET last_login = CURRENT_TIMESTAMP WHERE id = ?', (user_id,))
conn.commit()
conn.close()
user_data = {"id": user_id, "username": username, "email": email}
return True, "Login successful", user_data
except Exception as e:
return False, f"Authentication error: {str(e)}", None
def create_session(self, user_id: int):
"""Create a new session for the user"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
session_token = secrets.token_urlsafe(32)
expires_at = datetime.now() + self.session_timeout
# Deactivate old sessions
cursor.execute('UPDATE user_sessions SET is_active = 0 WHERE user_id = ?', (user_id,))
# Create new session
cursor.execute('''
INSERT INTO user_sessions (user_id, session_token, expires_at)
VALUES (?, ?, ?)
''', (user_id, session_token, expires_at))
conn.commit()
conn.close()
return session_token
except Exception as e:
return None
def change_password(self, user_id: int, old_password: str, new_password: str):
"""Change user password"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Verify old password
cursor.execute('SELECT password_hash, salt FROM users WHERE id = ?', (user_id,))
result = cursor.fetchone()
if not result:
conn.close()
return False, "User not found"
stored_hash, salt = result
old_password_hash, _ = self._hash_password(old_password, salt)
if old_password_hash != stored_hash:
conn.close()
return False, "Current password is incorrect"
# Update with new password
new_password_hash, new_salt = self._hash_password(new_password)
cursor.execute('''
UPDATE users SET password_hash = ?, salt = ?
WHERE id = ?
''', (new_password_hash, new_salt, user_id))
conn.commit()
conn.close()
return True, "Password changed successfully"
except Exception as e:
return False, f"Error changing password: {str(e)}"