import sqlite3 import os from utils.security import hash_string from config import DB_DEBUG_MODE from pathlib import Path import re # Define the database path relative to the script's directory BASE_DIR = Path(__file__).resolve().parents[2] # goes up from db/users/ DB_PATH = os.path.join(BASE_DIR, 'data', 'users.db') def init_db(): """Initialize the database, and create the table in case it doesn't already exist.""" with sqlite3.connect(DB_PATH) as conn: cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, real_name TEXT NOT NULL UNIQUE, access_name TEXT NOT NULL UNIQUE, jarvis_name TEXT NOT NULL, is_female BOOLEAN NOT NULL, admin BOOLEAN NOT NULL DEFAULT 0 ) """) conn.commit() def insert_user( real_name: str, access_name: str, jarvis_name: str, is_female: bool, admin: bool = False ): """Insert a new user into the database.""" try: with sqlite3.connect(DB_PATH) as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO users ( real_name, access_name, jarvis_name, is_female, admin ) VALUES (?, ?, ?, ?, ?) """, ( real_name, hash_string(access_name), jarvis_name, int(is_female), int(admin) )) conn.commit() except sqlite3.IntegrityError: print(f"[Warning] IntegrityError: User with real_name '{real_name}' or access_name '{access_name}' already exists.") def insert_user_list(user_list: list[dict]): """ Insert multiple users into the database using the insert_user function. Each dictionary in the list must contain the keys: - real_name (str) - access_name (str) - jarvis_name (str) - is_female (bool) - admin (bool, optional) """ for user in user_list: try: insert_user( real_name=user["real_name"], access_name=user["access_name"], jarvis_name=user["jarvis_name"], is_female=user["is_female"], admin=user.get("admin", False) ) except KeyError as e: print(f"[Error] Missing required field: {e} in user data: {user}") def get_user_by_field(field: str, value: str, is_sensitive: bool = False) -> dict | None: """ Returns a user record as a dictionary based on the given field. Parameters: - field: The column to query by ('real_name' or 'access_name'). - value: The value to match (plaintext; will be hashed if is_sensitive is True). - is_sensitive: Whether to hash the value before querying (e.g., for access_name). """ if field not in {"real_name", "access_name"}: raise ValueError(f"Field '{field}' is not allowed for querying.") actual_value = hash_string(value) if is_sensitive else value with sqlite3.connect(DB_PATH) as conn: conn.row_factory = sqlite3.Row cursor = conn.cursor() query = f"SELECT * FROM users WHERE {field} = ?" cursor.execute(query, (actual_value,)) user = cursor.fetchone() return dict(user) if user else None def find_user_by_prompt(prompt: str) -> str | None: """ Searches for a valid identifier within the prompt (in Spanish: looks for phrases like 'soy Verkk') and returns the registered name for Jarvis if found. """ match = re.search(r"\bsoy\s+([^\s,.!?]+)", prompt, re.IGNORECASE) if not match: return None access_candidate = match.group(1).strip().lower() user = get_user_by_field("access_name", access_candidate, is_sensitive=True) return user def delete_user_by_field(field: str, value: str, is_sensitive: bool = False) -> bool: """ Delete a user by the value of one of their fields. Parameters: - field: The column to match ('real_name' or 'access_name'). - value: The value to match (plaintext; will be hashed if is_sensitive is True). - is_sensitive: If True, the value will be hashed (e.g., for access_name). """ if field not in {"real_name", "access_name"}: raise ValueError(f"Field '{field}' not allowed for deletion.") actual_value = hash_string(value) if is_sensitive else value with sqlite3.connect(DB_PATH) as conn: cursor = conn.cursor() query = f"DELETE FROM users WHERE {field} = ?" cursor.execute(query, (actual_value,)) conn.commit() return cursor.rowcount > 0 def get_all_users() -> list[tuple]: """ Return all users in the database (for debugging only). WARNING: Should not be exposed in production. """ if not DB_DEBUG_MODE: raise PermissionError("Access to user list is disabled in production.") with sqlite3.connect(DB_PATH) as conn: cursor = conn.cursor() cursor.execute(""" SELECT id, real_name, access_name, jarvis_name, is_female, admin FROM users """) return cursor.fetchall() def is_user_admin(access_name: str) -> bool: """Check if a user has admin privileges based on their plaintext access_name.""" hashed_name = hash_string(access_name) with sqlite3.connect(DB_PATH) as conn: cursor = conn.cursor() cursor.execute("SELECT admin FROM users WHERE access_name = ?", (hashed_name,)) result = cursor.fetchone() return bool(result[0]) if result else False