""" Database Module for User Authentication ======================================== SQLite-based user storage for Pharma K platform. """ import sqlite3 import os from datetime import datetime from typing import Optional, List, Dict, Any from pathlib import Path # Database file location DB_DIR = Path(__file__).parent.parent / "data" DB_PATH = DB_DIR / "pharma_k.db" def get_db_connection() -> sqlite3.Connection: """Get database connection, creating tables if needed.""" # Ensure data directory exists DB_DIR.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(str(DB_PATH)) conn.row_factory = sqlite3.Row # Create tables if not exist _init_tables(conn) return conn def _init_tables(conn: sqlite3.Connection): """Initialize database tables.""" cursor = conn.cursor() # Users table cursor.execute(''' CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, email TEXT UNIQUE NOT NULL, password_hash TEXT NOT NULL, role TEXT DEFAULT 'user', created_at DATETIME DEFAULT CURRENT_TIMESTAMP, last_login DATETIME ) ''') # Admin config table cursor.execute(''' CREATE TABLE IF NOT EXISTS admin_config ( key TEXT PRIMARY KEY, value TEXT NOT NULL, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ) ''') conn.commit() # Initialize default admin if not exists _init_default_admin(conn) # Initialize default LLM config _init_default_llm_config(conn) def _init_default_admin(conn: sqlite3.Connection): """Create default admin account if not exists.""" import hashlib cursor = conn.cursor() cursor.execute("SELECT id FROM users WHERE email = 'admin'") if cursor.fetchone() is None: # Use simple hash for admin (bcrypt will be used for regular users) password_hash = hashlib.sha256("pharma_admin_2026".encode()).hexdigest() cursor.execute(''' INSERT INTO users (email, password_hash, role) VALUES (?, ?, 'admin') ''', ('admin', password_hash)) conn.commit() print("Default admin account created: admin / pharma_admin_2026") def _init_default_llm_config(conn: sqlite3.Connection): """Initialize default LLM configuration.""" cursor = conn.cursor() # Default provider cursor.execute(''' INSERT OR IGNORE INTO admin_config (key, value) VALUES ('default_provider', 'kimi') ''') # Default API key (empty - admin needs to set) cursor.execute(''' INSERT OR IGNORE INTO admin_config (key, value) VALUES ('default_api_key', '') ''') conn.commit() # ============================================================================= # User Operations # ============================================================================= def create_user(email: str, password_hash: str) -> bool: """Create a new user account.""" try: conn = get_db_connection() cursor = conn.cursor() cursor.execute(''' INSERT INTO users (email, password_hash, role) VALUES (?, ?, 'user') ''', (email, password_hash)) conn.commit() conn.close() return True except sqlite3.IntegrityError: return False # Email already exists def get_user_by_email(email: str) -> Optional[Dict[str, Any]]: """Get user by email.""" conn = get_db_connection() cursor = conn.cursor() cursor.execute("SELECT * FROM users WHERE email = ?", (email,)) row = cursor.fetchone() conn.close() if row: return dict(row) return None def update_last_login(email: str): """Update user's last login time.""" conn = get_db_connection() cursor = conn.cursor() cursor.execute(''' UPDATE users SET last_login = ? WHERE email = ? ''', (datetime.now().isoformat(), email)) conn.commit() conn.close() def get_all_users() -> List[Dict[str, Any]]: """Get all users (for admin).""" conn = get_db_connection() cursor = conn.cursor() cursor.execute("SELECT id, email, role, created_at, last_login FROM users ORDER BY created_at DESC") rows = cursor.fetchall() conn.close() return [dict(row) for row in rows] # ============================================================================= # Admin Config Operations # ============================================================================= def get_admin_config(key: str) -> Optional[str]: """Get admin configuration value.""" conn = get_db_connection() cursor = conn.cursor() cursor.execute("SELECT value FROM admin_config WHERE key = ?", (key,)) row = cursor.fetchone() conn.close() if row: return row['value'] return None def set_admin_config(key: str, value: str): """Set admin configuration value.""" conn = get_db_connection() cursor = conn.cursor() cursor.execute(''' INSERT OR REPLACE INTO admin_config (key, value, updated_at) VALUES (?, ?, ?) ''', (key, value, datetime.now().isoformat())) conn.commit() conn.close() def get_default_llm_config() -> Dict[str, str]: """Get default LLM configuration.""" return { 'provider': get_admin_config('default_provider') or 'kimi', 'api_key': get_admin_config('default_api_key') or '' } def set_default_llm_config(provider: str, api_key: str): """Set default LLM configuration.""" set_admin_config('default_provider', provider) set_admin_config('default_api_key', api_key)