Spaces:
Sleeping
Sleeping
File size: 5,671 Bytes
bb270f0 25baa3d bb270f0 25baa3d bb270f0 25baa3d bb270f0 25baa3d bb270f0 25baa3d bb270f0 25baa3d 809d18f 25baa3d bb270f0 25baa3d bb270f0 25baa3d bb270f0 25baa3d bb270f0 25baa3d bb270f0 25baa3d bb270f0 25baa3d bb270f0 25baa3d bb270f0 25baa3d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 | import sqlite3
import os
from utils.security import hash_string
from config import DB_DEBUG_MODE
from pathlib import Path
import re
# Define the database path relative to the script's directory
BASE_DIR = Path(__file__).resolve().parents[2] # goes up from db/users/
DB_PATH = os.path.join(BASE_DIR, 'data', 'users.db')
def init_db():
"""Initialize the database, and create the table in case it doesn't already exist."""
with sqlite3.connect(DB_PATH) as conn:
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
real_name TEXT NOT NULL UNIQUE,
access_name TEXT NOT NULL UNIQUE,
jarvis_name TEXT NOT NULL,
is_female BOOLEAN NOT NULL,
admin BOOLEAN NOT NULL DEFAULT 0
)
""")
conn.commit()
def insert_user(
real_name: str,
access_name: str,
jarvis_name: str,
is_female: bool,
admin: bool = False
):
"""Insert a new user into the database."""
try:
with sqlite3.connect(DB_PATH) as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO users (
real_name, access_name, jarvis_name, is_female, admin
) VALUES (?, ?, ?, ?, ?)
""", (
real_name,
hash_string(access_name),
jarvis_name,
int(is_female),
int(admin)
))
conn.commit()
except sqlite3.IntegrityError:
print(f"[Warning] IntegrityError: User with real_name '{real_name}' or access_name '{access_name}' already exists.")
def insert_user_list(user_list: list[dict]):
"""
Insert multiple users into the database using the insert_user function.
Each dictionary in the list must contain the keys:
- real_name (str)
- access_name (str)
- jarvis_name (str)
- is_female (bool)
- admin (bool, optional)
"""
for user in user_list:
try:
insert_user(
real_name=user["real_name"],
access_name=user["access_name"],
jarvis_name=user["jarvis_name"],
is_female=user["is_female"],
admin=user.get("admin", False)
)
except KeyError as e:
print(f"[Error] Missing required field: {e} in user data: {user}")
def get_user_by_field(field: str, value: str, is_sensitive: bool = False) -> dict | None:
"""
Returns a user record as a dictionary based on the given field.
Parameters:
- field: The column to query by ('real_name' or 'access_name').
- value: The value to match (plaintext; will be hashed if is_sensitive is True).
- is_sensitive: Whether to hash the value before querying (e.g., for access_name).
"""
if field not in {"real_name", "access_name"}:
raise ValueError(f"Field '{field}' is not allowed for querying.")
actual_value = hash_string(value) if is_sensitive else value
with sqlite3.connect(DB_PATH) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
query = f"SELECT * FROM users WHERE {field} = ?"
cursor.execute(query, (actual_value,))
user = cursor.fetchone()
return dict(user) if user else None
def find_user_by_prompt(prompt: str) -> str | None:
"""
Searches for a valid identifier within the prompt
(in Spanish: looks for phrases like 'soy Verkk')
and returns the registered name for Jarvis if found.
"""
match = re.search(r"\bsoy\s+([^\s,.!?]+)", prompt, re.IGNORECASE)
if not match:
return None
access_candidate = match.group(1).strip().lower()
user = get_user_by_field("access_name", access_candidate, is_sensitive=True)
return user
def delete_user_by_field(field: str, value: str, is_sensitive: bool = False) -> bool:
"""
Delete a user by the value of one of their fields.
Parameters:
- field: The column to match ('real_name' or 'access_name').
- value: The value to match (plaintext; will be hashed if is_sensitive is True).
- is_sensitive: If True, the value will be hashed (e.g., for access_name).
"""
if field not in {"real_name", "access_name"}:
raise ValueError(f"Field '{field}' not allowed for deletion.")
actual_value = hash_string(value) if is_sensitive else value
with sqlite3.connect(DB_PATH) as conn:
cursor = conn.cursor()
query = f"DELETE FROM users WHERE {field} = ?"
cursor.execute(query, (actual_value,))
conn.commit()
return cursor.rowcount > 0
def get_all_users() -> list[tuple]:
"""
Return all users in the database (for debugging only).
WARNING: Should not be exposed in production.
"""
if not DB_DEBUG_MODE:
raise PermissionError("Access to user list is disabled in production.")
with sqlite3.connect(DB_PATH) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT id, real_name, access_name, jarvis_name, is_female, admin
FROM users
""")
return cursor.fetchall()
def is_user_admin(access_name: str) -> bool:
"""Check if a user has admin privileges based on their plaintext access_name."""
hashed_name = hash_string(access_name)
with sqlite3.connect(DB_PATH) as conn:
cursor = conn.cursor()
cursor.execute("SELECT admin FROM users WHERE access_name = ?", (hashed_name,))
result = cursor.fetchone()
return bool(result[0]) if result else False
|