File size: 5,926 Bytes
af4bd3a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 | """
Database Module for User Authentication
========================================
SQLite-based user storage for Pharma K platform.
"""
import sqlite3
import os
from datetime import datetime
from typing import Optional, List, Dict, Any
from pathlib import Path
# Database file location
DB_DIR = Path(__file__).parent.parent / "data"
DB_PATH = DB_DIR / "pharma_k.db"
def get_db_connection() -> sqlite3.Connection:
"""Get database connection, creating tables if needed."""
# Ensure data directory exists
DB_DIR.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
# Create tables if not exist
_init_tables(conn)
return conn
def _init_tables(conn: sqlite3.Connection):
"""Initialize database tables."""
cursor = conn.cursor()
# Users table
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
email TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
role TEXT DEFAULT 'user',
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
last_login DATETIME
)
''')
# Admin config table
cursor.execute('''
CREATE TABLE IF NOT EXISTS admin_config (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
# Initialize default admin if not exists
_init_default_admin(conn)
# Initialize default LLM config
_init_default_llm_config(conn)
def _init_default_admin(conn: sqlite3.Connection):
"""Create default admin account if not exists."""
import hashlib
cursor = conn.cursor()
cursor.execute("SELECT id FROM users WHERE email = 'admin'")
if cursor.fetchone() is None:
# Use simple hash for admin (bcrypt will be used for regular users)
password_hash = hashlib.sha256("pharma_admin_2026".encode()).hexdigest()
cursor.execute('''
INSERT INTO users (email, password_hash, role)
VALUES (?, ?, 'admin')
''', ('admin', password_hash))
conn.commit()
print("Default admin account created: admin / pharma_admin_2026")
def _init_default_llm_config(conn: sqlite3.Connection):
"""Initialize default LLM configuration."""
cursor = conn.cursor()
# Default provider
cursor.execute('''
INSERT OR IGNORE INTO admin_config (key, value)
VALUES ('default_provider', 'kimi')
''')
# Default API key (empty - admin needs to set)
cursor.execute('''
INSERT OR IGNORE INTO admin_config (key, value)
VALUES ('default_api_key', '')
''')
conn.commit()
# =============================================================================
# User Operations
# =============================================================================
def create_user(email: str, password_hash: str) -> bool:
"""Create a new user account."""
try:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('''
INSERT INTO users (email, password_hash, role)
VALUES (?, ?, 'user')
''', (email, password_hash))
conn.commit()
conn.close()
return True
except sqlite3.IntegrityError:
return False # Email already exists
def get_user_by_email(email: str) -> Optional[Dict[str, Any]]:
"""Get user by email."""
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT * FROM users WHERE email = ?", (email,))
row = cursor.fetchone()
conn.close()
if row:
return dict(row)
return None
def update_last_login(email: str):
"""Update user's last login time."""
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('''
UPDATE users SET last_login = ?
WHERE email = ?
''', (datetime.now().isoformat(), email))
conn.commit()
conn.close()
def get_all_users() -> List[Dict[str, Any]]:
"""Get all users (for admin)."""
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT id, email, role, created_at, last_login FROM users ORDER BY created_at DESC")
rows = cursor.fetchall()
conn.close()
return [dict(row) for row in rows]
# =============================================================================
# Admin Config Operations
# =============================================================================
def get_admin_config(key: str) -> Optional[str]:
"""Get admin configuration value."""
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("SELECT value FROM admin_config WHERE key = ?", (key,))
row = cursor.fetchone()
conn.close()
if row:
return row['value']
return None
def set_admin_config(key: str, value: str):
"""Set admin configuration value."""
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO admin_config (key, value, updated_at)
VALUES (?, ?, ?)
''', (key, value, datetime.now().isoformat()))
conn.commit()
conn.close()
def get_default_llm_config() -> Dict[str, str]:
"""Get default LLM configuration."""
return {
'provider': get_admin_config('default_provider') or 'kimi',
'api_key': get_admin_config('default_api_key') or ''
}
def set_default_llm_config(provider: str, api_key: str):
"""Set default LLM configuration."""
set_admin_config('default_provider', provider)
set_admin_config('default_api_key', api_key)
|