""" Database operations for SQLite chat history """ import sqlite3 import json from datetime import datetime from typing import List, Optional, Dict, Any from contextlib import contextmanager from config import settings @contextmanager def get_db_connection(): """Context manager for database connections""" conn = sqlite3.connect(str(settings.DATABASE_PATH)) conn.row_factory = sqlite3.Row try: yield conn finally: conn.close() def init_database(): """Initialize database tables""" with get_db_connection() as conn: cursor = conn.cursor() # Users table (references users.json by username) cursor.execute(""" CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # Conversations table cursor.execute(""" CREATE TABLE IF NOT EXISTS conversations ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, title TEXT DEFAULT 'New Chat', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE ) """) # Messages table cursor.execute(""" CREATE TABLE IF NOT EXISTS messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, conversation_id INTEGER NOT NULL, role TEXT NOT NULL CHECK (role IN ('user', 'assistant', 'system')), content TEXT NOT NULL, files_json TEXT, -- JSON array of file references timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (conversation_id) REFERENCES conversations (id) ON DELETE CASCADE ) """) # Create indexes for better performance cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_messages_conversation ON messages (conversation_id) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_conversations_user ON conversations (user_id) """) conn.commit() print("Database initialized successfully") # User operations def get_or_create_user(username: str) -> int: """Get user ID or create if not exists""" with get_db_connection() as conn: cursor = conn.cursor() # Try to get existing user cursor.execute("SELECT id FROM users WHERE username = ?", (username,)) result = cursor.fetchone() if result: return result["id"] # Create new user cursor.execute( "INSERT INTO users (username) VALUES (?)", (username,) ) conn.commit() return cursor.lastrowid def get_user_by_username(username: str) -> Optional[Dict[str, Any]]: """Get user by username""" with get_db_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM users WHERE username = ?", (username,)) result = cursor.fetchone() return dict(result) if result else None # Conversation operations def create_conversation(user_id: int, title: str = "New Chat") -> int: """Create a new conversation""" with get_db_connection() as conn: cursor = conn.cursor() cursor.execute( "INSERT INTO conversations (user_id, title) VALUES (?, ?)", (user_id, title) ) conn.commit() return cursor.lastrowid def get_conversations(user_id: int) -> List[Dict[str, Any]]: """Get all conversations for a user""" with get_db_connection() as conn: cursor = conn.cursor() cursor.execute(""" SELECT * FROM conversations WHERE user_id = ? ORDER BY updated_at DESC """, (user_id,)) return [dict(row) for row in cursor.fetchall()] def get_conversation(conversation_id: int, user_id: int) -> Optional[Dict[str, Any]]: """Get a specific conversation""" with get_db_connection() as conn: cursor = conn.cursor() cursor.execute(""" SELECT * FROM conversations WHERE id = ? AND user_id = ? """, (conversation_id, user_id)) result = cursor.fetchone() return dict(result) if result else None def update_conversation_title(conversation_id: int, user_id: int, title: str) -> bool: """Update conversation title""" with get_db_connection() as conn: cursor = conn.cursor() cursor.execute(""" UPDATE conversations SET title = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ? AND user_id = ? """, (title, conversation_id, user_id)) conn.commit() return cursor.rowcount > 0 def delete_conversation(conversation_id: int, user_id: int) -> bool: """Delete a conversation""" with get_db_connection() as conn: cursor = conn.cursor() cursor.execute(""" DELETE FROM conversations WHERE id = ? AND user_id = ? """, (conversation_id, user_id)) conn.commit() return cursor.rowcount > 0 def update_conversation_timestamp(conversation_id: int): """Update conversation's updated_at timestamp""" with get_db_connection() as conn: cursor = conn.cursor() cursor.execute(""" UPDATE conversations SET updated_at = CURRENT_TIMESTAMP WHERE id = ? """, (conversation_id,)) conn.commit() # Message operations def add_message( conversation_id: int, role: str, content: str, files: List[Dict[str, Any]] = None ) -> int: """Add a message to a conversation""" files_json = json.dumps(files) if files else None with get_db_connection() as conn: cursor = conn.cursor() cursor.execute( """ INSERT INTO messages (conversation_id, role, content, files_json) VALUES (?, ?, ?, ?) """, (conversation_id, role, content, files_json) ) conn.commit() # Update conversation timestamp update_conversation_timestamp(conversation_id) return cursor.lastrowid def append_to_last_assistant_message(conversation_id: int, content_suffix: str) -> bool: """Append text to the most recent assistant message in a conversation""" if not content_suffix: return False with get_db_connection() as conn: cursor = conn.cursor() cursor.execute( """ SELECT id, content FROM messages WHERE conversation_id = ? AND role = 'assistant' ORDER BY id DESC LIMIT 1 """, (conversation_id,) ) row = cursor.fetchone() if not row: return False updated_content = f"{row['content']}{content_suffix}" cursor.execute( """ UPDATE messages SET content = ? WHERE id = ? """, (updated_content, row["id"]) ) conn.commit() update_conversation_timestamp(conversation_id) return True def append_to_assistant_message( conversation_id: int, message_id: int, content_suffix: str ) -> bool: """Append text to a specific assistant message by ID within a conversation.""" if not content_suffix: return False with get_db_connection() as conn: cursor = conn.cursor() cursor.execute( """ SELECT id, content FROM messages WHERE id = ? AND conversation_id = ? AND role = 'assistant' LIMIT 1 """, (message_id, conversation_id) ) row = cursor.fetchone() if not row: return False updated_content = f"{row['content']}{content_suffix}" cursor.execute( """ UPDATE messages SET content = ? WHERE id = ? """, (updated_content, row["id"]) ) conn.commit() update_conversation_timestamp(conversation_id) return True def get_messages(conversation_id: int, limit: Optional[int] = None) -> List[Dict[str, Any]]: """Get messages for a conversation""" with get_db_connection() as conn: cursor = conn.cursor() if limit is None: cursor.execute(""" SELECT * FROM messages WHERE conversation_id = ? ORDER BY id ASC """, (conversation_id,)) else: cursor.execute(""" SELECT * FROM messages WHERE conversation_id = ? ORDER BY id ASC LIMIT ? """, (conversation_id, limit)) messages = [] for row in cursor.fetchall(): msg = dict(row) if msg["files_json"]: msg["files"] = json.loads(msg["files_json"]) else: msg["files"] = [] del msg["files_json"] messages.append(msg) return messages def get_recent_messages_for_context( conversation_id: int, max_messages: Optional[int] = 20 ) -> List[Dict[str, str]]: """Get recent messages formatted for model context""" messages = get_messages(conversation_id, limit=max_messages) # Format for LLM context (only user and assistant roles) context_messages = [] for msg in messages: if msg["role"] in ["user", "assistant"]: context_messages.append({ "role": msg["role"], "content": msg["content"] }) return context_messages def delete_message(message_id: int, conversation_id: int) -> bool: """Delete a specific message""" with get_db_connection() as conn: cursor = conn.cursor() cursor.execute(""" DELETE FROM messages WHERE id = ? AND conversation_id = ? """, (message_id, conversation_id)) conn.commit() return cursor.rowcount > 0 # Statistics def get_user_stats(user_id: int) -> Dict[str, int]: """Get user statistics""" with get_db_connection() as conn: cursor = conn.cursor() # Total conversations cursor.execute( "SELECT COUNT(*) as count FROM conversations WHERE user_id = ?", (user_id,) ) total_conversations = cursor.fetchone()["count"] # Total messages cursor.execute(""" SELECT COUNT(*) as count FROM messages m JOIN conversations c ON m.conversation_id = c.id WHERE c.user_id = ? """, (user_id,)) total_messages = cursor.fetchone()["count"] return { "total_conversations": total_conversations, "total_messages": total_messages } # Initialize database on module import init_database()