# backend/database.py import sqlite3 import bcrypt import os import time DATABASE_PATH = 'data.db' def get_db_connection(): """Establishes a connection to the SQLite database.""" conn = sqlite3.connect(DATABASE_PATH) return conn def init_db(): """ Initializes the database by creating the necessary tables. This function should be called once on application startup. """ conn = get_db_connection() c = conn.cursor() c.execute(""" CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, password_hash TEXT NOT NULL ) """) c.execute(""" CREATE TABLE IF NOT EXISTS projects ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, title TEXT NOT NULL, prompt TEXT NOT NULL, status TEXT NOT NULL, created_at INTEGER NOT NULL, logs TEXT, zip_path TEXT, FOREIGN KEY (user_id) REFERENCES users (id) ) """) conn.commit() conn.close() def create_user(username, password): """ Creates a new user in the database. Returns the user ID or None if the username is already taken. """ conn = get_db_connection() c = conn.cursor() hashed_password = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8') try: c.execute("INSERT INTO users (username, password_hash) VALUES (?, ?)", (username, hashed_password)) conn.commit() return c.lastrowid except sqlite3.IntegrityError: return None finally: conn.close() def get_user_by_username(username): """Fetches a user by their username.""" conn = get_db_connection() c = conn.cursor() c.execute("SELECT * FROM users WHERE username = ?", (username,)) user = c.fetchone() conn.close() if user: columns = [desc[0] for desc in c.description] return dict(zip(columns, user)) return None def verify_password(password, password_hash): """Verifies a password against its hash.""" return bcrypt.checkpw(password.encode('utf-8'), password_hash.encode('utf-8')) def create_project(user_id, title, prompt): """Creates a new project record and returns its ID.""" conn = get_db_connection() c = conn.cursor() c.execute("INSERT INTO projects (user_id, title, prompt, status, created_at) VALUES (?, ?, ?, ?, ?)", (user_id, title, prompt, "queued", int(time.time()))) project_id = c.lastrowid conn.commit() conn.close() return project_id def get_user_projects(user_id): """Retrieves all projects for a given user, ordered by creation date.""" conn = get_db_connection() c = conn.cursor() c.execute("SELECT * FROM projects WHERE user_id = ? ORDER BY created_at DESC", (user_id,)) projects = c.fetchall() conn.close() project_list = [] if projects: columns = [desc[0] for desc in c.description] for project in projects: project_list.append(dict(zip(columns, project))) return project_list def get_project(project_id): """ Fetches a single project record by its unique ID. This function is used by the front end for live log updates. """ conn = get_db_connection() c = conn.cursor() c.execute("SELECT * FROM projects WHERE id = ?", (project_id,)) project = c.fetchone() conn.close() if project: columns = [desc[0] for desc in c.description] return dict(zip(columns, project)) return None def update_project_status(project_id, status, logs=None, zip_path=None): """Updates the status and optional logs/zip path for a project.""" conn = get_db_connection() c = conn.cursor() if logs is not None and zip_path is not None: c.execute("UPDATE projects SET status = ?, logs = ?, zip_path = ? WHERE id = ?", (status, logs, zip_path, project_id)) elif logs is not None: c.execute("UPDATE projects SET status = ?, logs = ? WHERE id = ?", (status, logs, project_id)) else: c.execute("UPDATE projects SET status = ? WHERE id = ?", (status, project_id)) conn.commit() conn.close()