import os import sqlite3 import json import time """ Simple SQLite helper for persisting resources and conversation messages. This module centralises all database access used by the Course Creator agent. It defines three tables: resources (id, url, title, source, published_at, retrieved_at, content_excerpt, meta_json) chats (id, chat_key, title, created_at) messages (id, chat_key, role, content, status, created_at) Resources are de-duplicated by URL. Chats are keyed by a unique string (UUID-like) generated externally. Messages are stored in the order received and may be soft-deleted by updating their status column. """ # Determine database path. Use environment override or default to a local data dir. DB_PATH = os.getenv("COURSECREATOR_DB", os.path.join(os.path.dirname(__file__), "data", "course_creator.db")) def _ensure_db(): """Initialise the SQLite database with the required tables if they don't exist.""" os.makedirs(os.path.dirname(DB_PATH), exist_ok=True) conn = sqlite3.connect(DB_PATH) conn.execute("PRAGMA journal_mode=WAL;") conn.executescript( """ CREATE TABLE IF NOT EXISTS resources ( id INTEGER PRIMARY KEY, url TEXT UNIQUE, title TEXT, source TEXT, published_at TEXT, retrieved_at INTEGER, content_excerpt TEXT, meta_json TEXT ); CREATE TABLE IF NOT EXISTS chats ( id INTEGER PRIMARY KEY, chat_key TEXT UNIQUE, title TEXT, created_at INTEGER ); CREATE TABLE IF NOT EXISTS messages ( id INTEGER PRIMARY KEY, chat_key TEXT, role TEXT, content TEXT, status TEXT DEFAULT 'normal', created_at INTEGER, FOREIGN KEY(chat_key) REFERENCES chats(chat_key) ); -- Table to track files generated during chat sessions. Each record stores -- the chat_key to which the file belongs, the original file name, -- the absolute file path on disk, and the timestamp when it was -- created. These attachments allow the UI to show downloadable files -- generated in earlier chat interactions. CREATE TABLE IF NOT EXISTS attachments ( id INTEGER PRIMARY KEY, chat_key TEXT, file_name TEXT, file_path TEXT, created_at INTEGER, FOREIGN KEY(chat_key) REFERENCES chats(chat_key) ); """ ) conn.commit() conn.close() def get_conn(): """Return a connection with WAL mode enabled and ensure tables exist.""" _ensure_db() conn = sqlite3.connect(DB_PATH) conn.execute("PRAGMA journal_mode=WAL;") return conn def upsert_resource(url: str, title: str, source: str, content_excerpt: str, meta: dict | None = None) -> None: """Insert or update a resource record based on its URL. Args: url: The canonical URL of the resource. title: Title or headline. source: Domain or source label. content_excerpt: A short excerpt of the page content. meta: Optional dictionary of additional metadata. """ now = int(time.time()) meta_json = json.dumps(meta or {}) with get_conn() as conn: conn.execute( """ INSERT INTO resources (url, title, source, retrieved_at, content_excerpt, meta_json) VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT(url) DO UPDATE SET title = excluded.title, source = excluded.source, retrieved_at = excluded.retrieved_at, content_excerpt = excluded.content_excerpt, meta_json = excluded.meta_json """, (url, title, source, now, content_excerpt, meta_json), ) def get_resource(url: str) -> dict | None: """Retrieve a resource by URL, returning a dictionary or None.""" with get_conn() as conn: row = conn.execute( "SELECT url, title, source, published_at, retrieved_at, content_excerpt, meta_json FROM resources WHERE url=?", (url,), ).fetchone() if not row: return None url, title, source, published_at, retrieved_at, content_excerpt, meta_json = row meta = json.loads(meta_json or "{}") return { "url": url, "title": title, "source": source, "published_at": published_at, "retrieved_at": retrieved_at, "excerpt": content_excerpt, "meta": meta, } def list_resources(limit: int = 200) -> list[dict]: """List recently retrieved resources.""" with get_conn() as conn: rows = conn.execute( "SELECT url, title, source, retrieved_at FROM resources ORDER BY retrieved_at DESC LIMIT ?", (limit,), ).fetchall() return [{"url": url, "title": title, "source": source, "retrieved_at": retrieved_at} for url, title, source, retrieved_at in rows] def new_chat(title: str = "Untitled") -> str: """Create a new chat and return its key.""" import uuid chat_key = str(uuid.uuid4()) now = int(time.time()) with get_conn() as conn: conn.execute("INSERT INTO chats (chat_key, title, created_at) VALUES (?, ?, ?)", (chat_key, title, now)) return chat_key def append_message(chat_key: str, role: str, content: str, status: str = "normal") -> None: """Append a message to a chat.""" now = int(time.time()) with get_conn() as conn: conn.execute( "INSERT INTO messages (chat_key, role, content, status, created_at) VALUES (?, ?, ?, ?, ?)", (chat_key, role, content, status, now), ) def load_chat(chat_key: str) -> list[dict]: """Load all non-deleted messages for a chat key.""" with get_conn() as conn: rows = conn.execute( "SELECT rowid, role, content, status FROM messages WHERE chat_key=? ORDER BY id ASC", (chat_key,), ).fetchall() messages = [] for rowid, role, content, status in rows: if status != "deleted": messages.append({"id": rowid, "role": role, "content": content}) return messages def soft_delete_message(message_id: int) -> None: """Mark a message as deleted without removing it.""" with get_conn() as conn: conn.execute("UPDATE messages SET status='deleted' WHERE id=?", (message_id,)) # --------------------------------------------------------------------------- # Additional chat management helpers # # The following functions support listing, renaming, and deleting entire chat # sessions. They complement the existing new_chat(), append_message(), and # load_chat() functions above. Each chat is keyed by a unique chat_key. def list_chats(limit: int = 100) -> list[dict]: """Return a list of chat summaries ordered by most recent. Args: limit: Maximum number of chats to return. Returns: A list of dictionaries with keys 'key', 'title' and 'created_at'. """ with get_conn() as conn: rows = conn.execute( "SELECT chat_key, COALESCE(title, 'Untitled'), created_at FROM chats " "ORDER BY created_at DESC LIMIT ?", (limit,), ).fetchall() return [ {"key": chat_key, "title": title, "created_at": created_at} for chat_key, title, created_at in rows ] def rename_chat(chat_key: str, new_title: str) -> None: """Rename a chat session given its key. Args: chat_key: The unique key of the chat to rename. new_title: The new title to set. """ with get_conn() as conn: conn.execute( "UPDATE chats SET title=? WHERE chat_key=?", (new_title, chat_key), ) def delete_chat(chat_key: str) -> None: """Delete a chat session and all its messages. Args: chat_key: The unique key of the chat to delete. """ with get_conn() as conn: conn.execute("DELETE FROM messages WHERE chat_key=?", (chat_key,)) conn.execute("DELETE FROM chats WHERE chat_key=?", (chat_key,)) # --------------------------------------------------------------------------- # Attachment management helpers # # These helpers allow the app to record files generated during chat sessions # (such as outlines, scripts, zip packages) so they persist and can be # downloaded later. Each attachment is tied to a chat via its chat_key. def add_attachment(chat_key: str, file_path: str, file_name: str) -> None: """Store a new file attachment for a chat. Args: chat_key: The unique chat key that the file belongs to. file_path: The absolute path to the file on disk. file_name: The base name of the file (for display). """ now = int(time.time()) with get_conn() as conn: conn.execute( "INSERT INTO attachments (chat_key, file_name, file_path, created_at) VALUES (?, ?, ?, ?)", (chat_key, file_name, file_path, now), ) def list_attachments(chat_key: str) -> list[dict]: """List all attachments for a given chat. Args: chat_key: The chat to list files for. Returns: A list of dictionaries, each with keys 'file_name', 'file_path' and 'created_at'. """ with get_conn() as conn: rows = conn.execute( "SELECT file_name, file_path, created_at FROM attachments WHERE chat_key=? ORDER BY created_at ASC", (chat_key,), ).fetchall() return [ {"file_name": fname, "file_path": fpath, "created_at": ts} for fname, fpath, ts in rows ]