ClariDoc / app /database /database.py
Kshitijk20's picture
db fix
e9b9c34
import sqlite3
import os
from datetime import datetime
from typing import Optional, List, Dict
import json
class SessionDatabase:
def __init__(self, db_path: str = "app/database/sessions.db"):
self.db_path = db_path
# Create directory if it doesn't exist
os.makedirs(os.path.dirname(db_path), exist_ok=True)
self.init_db()
self.migrate_db()
def init_db(self):
"""Initialize the database with required tables"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Users table
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
email TEXT UNIQUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Sessions table
cursor.execute("""
CREATE TABLE IF NOT EXISTS sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT UNIQUE NOT NULL,
user_id INTEGER,
username TEXT,
document_name TEXT,
document_type TEXT,
document_path TEXT,
document_url TEXT,
pinecone_index TEXT,
pinecone_namespace TEXT,
chunks_count INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_accessed TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
is_active BOOLEAN DEFAULT 1,
FOREIGN KEY (user_id) REFERENCES users (id)
)
""")
# Chat history table
cursor.execute("""
CREATE TABLE IF NOT EXISTS chat_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
question TEXT NOT NULL,
answer TEXT NOT NULL,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (session_id) REFERENCES sessions (session_id)
)
""")
conn.commit()
def migrate_db(self):
"""Handle database migrations for existing databases"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Check if document_name column exists in sessions table
cursor.execute("PRAGMA table_info(sessions)")
columns = [column[1] for column in cursor.fetchall()]
if 'document_name' not in columns:
print("[Database] Adding document_name column to sessions table...")
cursor.execute("ALTER TABLE sessions ADD COLUMN document_name TEXT")
conn.commit()
print("[Database] Migration completed: document_name column added")
# Check if document_type column exists
if 'document_type' not in columns:
print("[Database] Adding document_type column to sessions table...")
cursor.execute("ALTER TABLE sessions ADD COLUMN document_type TEXT")
conn.commit()
print("[Database] Migration completed: document_type column added")
# Check if document_path column exists
if 'document_path' not in columns:
print("[Database] Adding document_path column to sessions table...")
cursor.execute("ALTER TABLE sessions ADD COLUMN document_path TEXT")
conn.commit()
print("[Database] Migration completed: document_path column added")
# Check if document_url column exists
if 'document_url' not in columns:
print("[Database] Adding document_url column to sessions table...")
cursor.execute("ALTER TABLE sessions ADD COLUMN document_url TEXT")
conn.commit()
print("[Database] Migration completed: document_url column added")
# Check if pinecone_index column exists
if 'pinecone_index' not in columns:
print("[Database] Adding pinecone_index column to sessions table...")
cursor.execute("ALTER TABLE sessions ADD COLUMN pinecone_index TEXT")
conn.commit()
print("[Database] Migration completed: pinecone_index column added")
# Check if pinecone_namespace column exists
if 'pinecone_namespace' not in columns:
print("[Database] Adding pinecone_namespace column to sessions table...")
cursor.execute("ALTER TABLE sessions ADD COLUMN pinecone_namespace TEXT")
conn.commit()
print("[Database] Migration completed: pinecone_namespace column added")
# Check if chunks_count column exists
if 'chunks_count' not in columns:
print("[Database] Adding chunks_count column to sessions table...")
cursor.execute("ALTER TABLE sessions ADD COLUMN chunks_count INTEGER DEFAULT 0")
conn.commit()
print("[Database] Migration completed: chunks_count column added")
# Check if is_active column exists
if 'is_active' not in columns:
print("[Database] Adding is_active column to sessions table...")
cursor.execute("ALTER TABLE sessions ADD COLUMN is_active BOOLEAN DEFAULT 1")
conn.commit()
print("[Database] Migration completed: is_active column added")
def create_user(self, username: str, email: str = None) -> int:
"""Create a new user"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
try:
cursor.execute(
"INSERT INTO users (username, email) VALUES (?, ?)",
(username, email)
)
conn.commit()
return cursor.lastrowid
except sqlite3.IntegrityError:
# User already exists, return existing user_id
cursor.execute("SELECT id FROM users WHERE username = ?", (username,))
result = cursor.fetchone()
return result[0] if result else None
def get_user(self, username: str) -> Optional[Dict]:
"""Get user by username"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT id, username, email, created_at FROM users WHERE username = ?",
(username,)
)
result = cursor.fetchone()
if result:
return {
'id': result[0],
'username': result[1],
'email': result[2],
'created_at': result[3]
}
return None
def create_session(self, session_id: str, username: str,
document_name: str = None, document_type: str = None,
document_path: str = None, document_url: str = None) -> bool:
"""Create a new session"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Get user_id
user = self.get_user(username)
if not user:
user_id = self.create_user(username)
else:
user_id = user['id']
cursor.execute("""
INSERT INTO sessions
(session_id, user_id, username, document_name, document_type,
document_path, document_url, last_accessed)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (session_id, user_id, username, document_name, document_type,
document_path, document_url, datetime.now()))
conn.commit()
return True
def update_session(self, session_id: str, **kwargs) -> bool:
"""Update session with new information"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Build dynamic update query
update_fields = []
values = []
for key, value in kwargs.items():
if key in ['pinecone_index', 'pinecone_namespace', 'chunks_count',
'document_name', 'document_type', 'document_path', 'document_url']:
update_fields.append(f"{key} = ?")
values.append(value)
if update_fields:
update_fields.append("last_accessed = ?")
values.append(datetime.now())
values.append(session_id)
query = f"UPDATE sessions SET {', '.join(update_fields)} WHERE session_id = ?"
cursor.execute(query, values)
conn.commit()
return cursor.rowcount > 0
return False
def get_user_sessions(self, username: str) -> List[Dict]:
"""Get all sessions for a user"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT session_id, document_name, document_type, chunks_count,
created_at, last_accessed, is_active, pinecone_index, pinecone_namespace
FROM sessions
WHERE username = ? AND is_active = 1
ORDER BY last_accessed DESC
""", (username,))
results = cursor.fetchall()
return [
{
'session_id': row[0],
'document_name': row[1],
'document_type': row[2],
'chunks_count': row[3],
'created_at': row[4],
'last_accessed': row[5],
'is_active': row[6],
'pinecone_index': row[7],
'pinecone_namespace': row[8]
}
for row in results
]
def get_session(self, session_id: str) -> Optional[Dict]:
"""Get session by session_id"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT session_id, username, document_name, document_type,
document_path, document_url, pinecone_index, pinecone_namespace,
chunks_count, created_at, last_accessed, is_active
FROM sessions
WHERE session_id = ?
""", (session_id,))
result = cursor.fetchone()
if result:
return {
'session_id': result[0],
'username': result[1],
'document_name': result[2],
'document_type': result[3],
'document_path': result[4],
'document_url': result[5],
'pinecone_index': result[6],
'pinecone_namespace': result[7],
'chunks_count': result[8],
'created_at': result[9],
'last_accessed': result[10],
'is_active': result[11]
}
return None
def add_chat_message(self, session_id: str, question: str, answer: str) -> bool:
"""Add a chat message to history"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO chat_history (session_id, question, answer)
VALUES (?, ?, ?)
""", (session_id, question, answer))
conn.commit()
return True
def get_chat_history(self, session_id: str) -> List[Dict]:
"""Get chat history for a session"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT question, answer, timestamp
FROM chat_history
WHERE session_id = ?
ORDER BY timestamp ASC
""", (session_id,))
results = cursor.fetchall()
return [
{
'question': row[0],
'answer': row[1],
'timestamp': row[2]
}
for row in results
]
def deactivate_session(self, session_id: str) -> bool:
"""Deactivate a session"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
UPDATE sessions SET is_active = 0 WHERE session_id = ?
""", (session_id,))
conn.commit()
return cursor.rowcount > 0