File size: 5,671 Bytes
bb270f0
 
 
 
 
25baa3d
bb270f0
 
25baa3d
bb270f0
 
 
 
 
 
 
 
 
25baa3d
 
 
 
 
bb270f0
 
 
 
25baa3d
 
 
 
 
 
 
bb270f0
 
 
 
 
25baa3d
 
 
 
 
 
 
 
 
 
bb270f0
 
25baa3d
 
809d18f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
25baa3d
 
 
 
 
 
 
 
 
 
 
 
 
bb270f0
 
25baa3d
bb270f0
25baa3d
 
 
 
bb270f0
25baa3d
bb270f0
 
 
 
 
 
 
 
 
25baa3d
 
 
bb270f0
25baa3d
 
 
 
 
 
 
 
 
 
 
 
 
bb270f0
 
 
25baa3d
 
bb270f0
 
 
 
 
 
 
 
 
 
 
 
 
25baa3d
 
 
 
bb270f0
25baa3d
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import sqlite3
import os
from utils.security import hash_string
from config import DB_DEBUG_MODE
from pathlib import Path
import re

# Define the database path relative to the script's directory
BASE_DIR = Path(__file__).resolve().parents[2]  # goes up from db/users/
DB_PATH = os.path.join(BASE_DIR, 'data', 'users.db')

def init_db():
    """Initialize the database, and create the table in case it doesn't already exist."""
    with sqlite3.connect(DB_PATH) as conn:
        cursor = conn.cursor()
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS users (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                real_name TEXT NOT NULL UNIQUE,
                access_name TEXT NOT NULL UNIQUE,
                jarvis_name TEXT NOT NULL,
                is_female BOOLEAN NOT NULL,
                admin BOOLEAN NOT NULL DEFAULT 0
            )
        """)
        conn.commit()

def insert_user(
    real_name: str,
    access_name: str,
    jarvis_name: str,
    is_female: bool,
    admin: bool = False
):
    """Insert a new user into the database."""
    try:
        with sqlite3.connect(DB_PATH) as conn:
            cursor = conn.cursor()
            cursor.execute("""
                INSERT INTO users (
                    real_name, access_name, jarvis_name, is_female, admin
                ) VALUES (?, ?, ?, ?, ?)
            """, (
                real_name,
                hash_string(access_name),
                jarvis_name,
                int(is_female),
                int(admin)
            ))
            conn.commit()
    except sqlite3.IntegrityError:
        print(f"[Warning] IntegrityError: User with real_name '{real_name}' or access_name '{access_name}' already exists.")

def insert_user_list(user_list: list[dict]):
    """
    Insert multiple users into the database using the insert_user function.

    Each dictionary in the list must contain the keys:
    - real_name (str)
    - access_name (str)
    - jarvis_name (str)
    - is_female (bool)
    - admin (bool, optional)
    """
    for user in user_list:
        try:
            insert_user(
                real_name=user["real_name"],
                access_name=user["access_name"],
                jarvis_name=user["jarvis_name"],
                is_female=user["is_female"],
                admin=user.get("admin", False)
            )
        except KeyError as e:
            print(f"[Error] Missing required field: {e} in user data: {user}")


def get_user_by_field(field: str, value: str, is_sensitive: bool = False) -> dict | None:
    """
    Returns a user record as a dictionary based on the given field.
    
    Parameters:
    - field: The column to query by ('real_name' or 'access_name').
    - value: The value to match (plaintext; will be hashed if is_sensitive is True).
    - is_sensitive: Whether to hash the value before querying (e.g., for access_name).
    """
    if field not in {"real_name", "access_name"}:
        raise ValueError(f"Field '{field}' is not allowed for querying.")

    actual_value = hash_string(value) if is_sensitive else value

    with sqlite3.connect(DB_PATH) as conn:
        conn.row_factory = sqlite3.Row
        cursor = conn.cursor()
        query = f"SELECT * FROM users WHERE {field} = ?"
        cursor.execute(query, (actual_value,))
        user = cursor.fetchone()
        return dict(user) if user else None

def find_user_by_prompt(prompt: str) -> str | None:
    """
    Searches for a valid identifier within the prompt 
    (in Spanish: looks for phrases like 'soy Verkk')
    and returns the registered name for Jarvis if found.
    """
    match = re.search(r"\bsoy\s+([^\s,.!?]+)", prompt, re.IGNORECASE)
    if not match:
        return None

    access_candidate = match.group(1).strip().lower()
    user = get_user_by_field("access_name", access_candidate, is_sensitive=True)
    return user

def delete_user_by_field(field: str, value: str, is_sensitive: bool = False) -> bool:
    """
    Delete a user by the value of one of their fields.
    
    Parameters:
    - field: The column to match ('real_name' or 'access_name').
    - value: The value to match (plaintext; will be hashed if is_sensitive is True).
    - is_sensitive: If True, the value will be hashed (e.g., for access_name).
    """
    if field not in {"real_name", "access_name"}:
        raise ValueError(f"Field '{field}' not allowed for deletion.")
    
    actual_value = hash_string(value) if is_sensitive else value

    with sqlite3.connect(DB_PATH) as conn:
        cursor = conn.cursor()
        query = f"DELETE FROM users WHERE {field} = ?"
        cursor.execute(query, (actual_value,))
        conn.commit()
        return cursor.rowcount > 0

def get_all_users() -> list[tuple]:
    """
    Return all users in the database (for debugging only).
    WARNING: Should not be exposed in production.
    """
    if not DB_DEBUG_MODE:
        raise PermissionError("Access to user list is disabled in production.")
    
    with sqlite3.connect(DB_PATH) as conn:
        cursor = conn.cursor()
        cursor.execute("""
            SELECT id, real_name, access_name, jarvis_name, is_female, admin 
            FROM users
        """)
        return cursor.fetchall()

def is_user_admin(access_name: str) -> bool:
    """Check if a user has admin privileges based on their plaintext access_name."""
    hashed_name = hash_string(access_name)
    with sqlite3.connect(DB_PATH) as conn:
        cursor = conn.cursor()
        cursor.execute("SELECT admin FROM users WHERE access_name = ?", (hashed_name,))
        result = cursor.fetchone()
        return bool(result[0]) if result else False