| """ |
| Database operations for SQLite chat history |
| """ |
| import sqlite3 |
| import json |
| from datetime import datetime |
| from typing import List, Optional, Dict, Any |
| from contextlib import contextmanager |
| from config import settings |
|
|
| @contextmanager |
| def get_db_connection(): |
| """Context manager for database connections""" |
| conn = sqlite3.connect(str(settings.DATABASE_PATH)) |
| conn.row_factory = sqlite3.Row |
| try: |
| yield conn |
| finally: |
| conn.close() |
|
|
| def init_database(): |
| """Initialize database tables""" |
| with get_db_connection() as conn: |
| cursor = conn.cursor() |
| |
| |
| cursor.execute(""" |
| CREATE TABLE IF NOT EXISTS users ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| username TEXT UNIQUE NOT NULL, |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP |
| ) |
| """) |
| |
| |
| cursor.execute(""" |
| CREATE TABLE IF NOT EXISTS conversations ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| user_id INTEGER NOT NULL, |
| title TEXT DEFAULT 'New Chat', |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
| updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
| FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE |
| ) |
| """) |
| |
| |
| cursor.execute(""" |
| CREATE TABLE IF NOT EXISTS messages ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| conversation_id INTEGER NOT NULL, |
| role TEXT NOT NULL CHECK (role IN ('user', 'assistant', 'system')), |
| content TEXT NOT NULL, |
| files_json TEXT, -- JSON array of file references |
| timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
| FOREIGN KEY (conversation_id) REFERENCES conversations (id) ON DELETE CASCADE |
| ) |
| """) |
| |
| |
| cursor.execute(""" |
| CREATE INDEX IF NOT EXISTS idx_messages_conversation |
| ON messages (conversation_id) |
| """) |
| cursor.execute(""" |
| CREATE INDEX IF NOT EXISTS idx_conversations_user |
| ON conversations (user_id) |
| """) |
| |
| conn.commit() |
| print("Database initialized successfully") |
|
|
| |
| def get_or_create_user(username: str) -> int: |
| """Get user ID or create if not exists""" |
| with get_db_connection() as conn: |
| cursor = conn.cursor() |
| |
| |
| cursor.execute("SELECT id FROM users WHERE username = ?", (username,)) |
| result = cursor.fetchone() |
| |
| if result: |
| return result["id"] |
| |
| |
| cursor.execute( |
| "INSERT INTO users (username) VALUES (?)", |
| (username,) |
| ) |
| conn.commit() |
| return cursor.lastrowid |
|
|
| def get_user_by_username(username: str) -> Optional[Dict[str, Any]]: |
| """Get user by username""" |
| with get_db_connection() as conn: |
| cursor = conn.cursor() |
| cursor.execute("SELECT * FROM users WHERE username = ?", (username,)) |
| result = cursor.fetchone() |
| return dict(result) if result else None |
|
|
| |
| def create_conversation(user_id: int, title: str = "New Chat") -> int: |
| """Create a new conversation""" |
| with get_db_connection() as conn: |
| cursor = conn.cursor() |
| cursor.execute( |
| "INSERT INTO conversations (user_id, title) VALUES (?, ?)", |
| (user_id, title) |
| ) |
| conn.commit() |
| return cursor.lastrowid |
|
|
| def get_conversations(user_id: int) -> List[Dict[str, Any]]: |
| """Get all conversations for a user""" |
| with get_db_connection() as conn: |
| cursor = conn.cursor() |
| cursor.execute(""" |
| SELECT * FROM conversations |
| WHERE user_id = ? |
| ORDER BY updated_at DESC |
| """, (user_id,)) |
| return [dict(row) for row in cursor.fetchall()] |
|
|
| def get_conversation(conversation_id: int, user_id: int) -> Optional[Dict[str, Any]]: |
| """Get a specific conversation""" |
| with get_db_connection() as conn: |
| cursor = conn.cursor() |
| cursor.execute(""" |
| SELECT * FROM conversations |
| WHERE id = ? AND user_id = ? |
| """, (conversation_id, user_id)) |
| result = cursor.fetchone() |
| return dict(result) if result else None |
|
|
| def update_conversation_title(conversation_id: int, user_id: int, title: str) -> bool: |
| """Update conversation title""" |
| with get_db_connection() as conn: |
| cursor = conn.cursor() |
| cursor.execute(""" |
| UPDATE conversations |
| SET title = ?, updated_at = CURRENT_TIMESTAMP |
| WHERE id = ? AND user_id = ? |
| """, (title, conversation_id, user_id)) |
| conn.commit() |
| return cursor.rowcount > 0 |
|
|
| def delete_conversation(conversation_id: int, user_id: int) -> bool: |
| """Delete a conversation""" |
| with get_db_connection() as conn: |
| cursor = conn.cursor() |
| cursor.execute(""" |
| DELETE FROM conversations |
| WHERE id = ? AND user_id = ? |
| """, (conversation_id, user_id)) |
| conn.commit() |
| return cursor.rowcount > 0 |
|
|
| def update_conversation_timestamp(conversation_id: int): |
| """Update conversation's updated_at timestamp""" |
| with get_db_connection() as conn: |
| cursor = conn.cursor() |
| cursor.execute(""" |
| UPDATE conversations |
| SET updated_at = CURRENT_TIMESTAMP |
| WHERE id = ? |
| """, (conversation_id,)) |
| conn.commit() |
|
|
| |
| def add_message( |
| conversation_id: int, |
| role: str, |
| content: str, |
| files: List[Dict[str, Any]] = None |
| ) -> int: |
| """Add a message to a conversation""" |
| files_json = json.dumps(files) if files else None |
| |
| with get_db_connection() as conn: |
| cursor = conn.cursor() |
| cursor.execute( |
| """ |
| INSERT INTO messages (conversation_id, role, content, files_json) |
| VALUES (?, ?, ?, ?) |
| """, |
| (conversation_id, role, content, files_json) |
| ) |
| conn.commit() |
| |
| |
| update_conversation_timestamp(conversation_id) |
| |
| return cursor.lastrowid |
|
|
| def append_to_last_assistant_message(conversation_id: int, content_suffix: str) -> bool: |
| """Append text to the most recent assistant message in a conversation""" |
| if not content_suffix: |
| return False |
|
|
| with get_db_connection() as conn: |
| cursor = conn.cursor() |
| cursor.execute( |
| """ |
| SELECT id, content |
| FROM messages |
| WHERE conversation_id = ? AND role = 'assistant' |
| ORDER BY id DESC |
| LIMIT 1 |
| """, |
| (conversation_id,) |
| ) |
| row = cursor.fetchone() |
| if not row: |
| return False |
|
|
| updated_content = f"{row['content']}{content_suffix}" |
| cursor.execute( |
| """ |
| UPDATE messages |
| SET content = ? |
| WHERE id = ? |
| """, |
| (updated_content, row["id"]) |
| ) |
| conn.commit() |
|
|
| update_conversation_timestamp(conversation_id) |
| return True |
|
|
| def append_to_assistant_message( |
| conversation_id: int, |
| message_id: int, |
| content_suffix: str |
| ) -> bool: |
| """Append text to a specific assistant message by ID within a conversation.""" |
| if not content_suffix: |
| return False |
|
|
| with get_db_connection() as conn: |
| cursor = conn.cursor() |
| cursor.execute( |
| """ |
| SELECT id, content |
| FROM messages |
| WHERE id = ? AND conversation_id = ? AND role = 'assistant' |
| LIMIT 1 |
| """, |
| (message_id, conversation_id) |
| ) |
| row = cursor.fetchone() |
| if not row: |
| return False |
|
|
| updated_content = f"{row['content']}{content_suffix}" |
| cursor.execute( |
| """ |
| UPDATE messages |
| SET content = ? |
| WHERE id = ? |
| """, |
| (updated_content, row["id"]) |
| ) |
| conn.commit() |
|
|
| update_conversation_timestamp(conversation_id) |
| return True |
|
|
| def get_messages(conversation_id: int, limit: Optional[int] = None) -> List[Dict[str, Any]]: |
| """Get messages for a conversation""" |
| with get_db_connection() as conn: |
| cursor = conn.cursor() |
| if limit is None: |
| cursor.execute(""" |
| SELECT * FROM messages |
| WHERE conversation_id = ? |
| ORDER BY id ASC |
| """, (conversation_id,)) |
| else: |
| cursor.execute(""" |
| SELECT * FROM messages |
| WHERE conversation_id = ? |
| ORDER BY id ASC |
| LIMIT ? |
| """, (conversation_id, limit)) |
| |
| messages = [] |
| for row in cursor.fetchall(): |
| msg = dict(row) |
| if msg["files_json"]: |
| msg["files"] = json.loads(msg["files_json"]) |
| else: |
| msg["files"] = [] |
| del msg["files_json"] |
| messages.append(msg) |
| |
| return messages |
|
|
| def get_recent_messages_for_context( |
| conversation_id: int, |
| max_messages: Optional[int] = 20 |
| ) -> List[Dict[str, str]]: |
| """Get recent messages formatted for model context""" |
| messages = get_messages(conversation_id, limit=max_messages) |
| |
| |
| context_messages = [] |
| for msg in messages: |
| if msg["role"] in ["user", "assistant"]: |
| context_messages.append({ |
| "role": msg["role"], |
| "content": msg["content"] |
| }) |
| |
| return context_messages |
|
|
| def delete_message(message_id: int, conversation_id: int) -> bool: |
| """Delete a specific message""" |
| with get_db_connection() as conn: |
| cursor = conn.cursor() |
| cursor.execute(""" |
| DELETE FROM messages |
| WHERE id = ? AND conversation_id = ? |
| """, (message_id, conversation_id)) |
| conn.commit() |
| return cursor.rowcount > 0 |
|
|
| |
| def get_user_stats(user_id: int) -> Dict[str, int]: |
| """Get user statistics""" |
| with get_db_connection() as conn: |
| cursor = conn.cursor() |
| |
| |
| cursor.execute( |
| "SELECT COUNT(*) as count FROM conversations WHERE user_id = ?", |
| (user_id,) |
| ) |
| total_conversations = cursor.fetchone()["count"] |
| |
| |
| cursor.execute(""" |
| SELECT COUNT(*) as count FROM messages m |
| JOIN conversations c ON m.conversation_id = c.id |
| WHERE c.user_id = ? |
| """, (user_id,)) |
| total_messages = cursor.fetchone()["count"] |
| |
| return { |
| "total_conversations": total_conversations, |
| "total_messages": total_messages |
| } |
|
|
| |
| init_database() |
|
|