NextQuest.ai / src /database.py
ajeet9843
Deploy to Hugging Face Spaces
f10fe83
import sqlite3
import json
import logging
from datetime import datetime
from typing import List, Optional
from pathlib import Path
logger = logging.getLogger(__name__)
DB_PATH = Path(__file__).parent.parent / "nexusai_history.db"
def _safe_json_loads(data: str) -> List[dict]:
if not data:
return []
try:
return json.loads(data)
except (json.JSONDecodeError, TypeError) as e:
logger.warning(f"Failed to parse JSON from database: {e}")
return []
def get_connection():
return sqlite3.connect(DB_PATH, check_same_thread=False)
def init_db():
conn = get_connection()
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS research_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
query TEXT NOT NULL,
answer TEXT,
sources TEXT,
trace TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()
conn.close()
def save_research(query: str, answer: str, sources: List[dict], trace: List[str] = None) -> int:
conn = get_connection()
cursor = conn.cursor()
sources_json = json.dumps(sources)
trace_json = json.dumps(trace if trace else [])
cursor.execute(
"INSERT INTO research_history (query, answer, sources, trace) VALUES (?, ?, ?, ?)",
(query, answer, sources_json, trace_json),
)
conn.commit()
research_id = cursor.lastrowid
conn.close()
return research_id
def get_all_research(limit: int = 50) -> List[dict]:
conn = get_connection()
cursor = conn.cursor()
cursor.execute(
"SELECT id, query, answer, sources, created_at, trace FROM research_history ORDER BY created_at DESC LIMIT ?",
(limit,),
)
rows = cursor.fetchall()
conn.close()
results = []
for row in rows:
results.append(
{
"id": row[0],
"query": row[1],
"answer": row[2],
"sources": _safe_json_loads(row[3]),
"created_at": row[4],
"trace": _safe_json_loads(row[5]),
}
)
return results
def get_research_by_id(research_id: int) -> Optional[dict]:
conn = get_connection()
cursor = conn.cursor()
cursor.execute(
"SELECT id, query, answer, sources, created_at, trace FROM research_history WHERE id = ?",
(research_id,),
)
row = cursor.fetchone()
conn.close()
if row:
return {
"id": row[0],
"query": row[1],
"answer": row[2],
"sources": _safe_json_loads(row[3]),
"created_at": row[4],
"trace": _safe_json_loads(row[5]),
}
return None
def delete_research(research_id: int) -> bool:
conn = get_connection()
cursor = conn.cursor()
cursor.execute("DELETE FROM research_history WHERE id = ?", (research_id,))
deleted = cursor.rowcount > 0
conn.commit()
conn.close()
return deleted
def search_history(search_term: str, limit: int = 20) -> List[dict]:
conn = get_connection()
cursor = conn.cursor()
cursor.execute(
"SELECT id, query, answer, sources, created_at, trace FROM research_history WHERE query LIKE ? ORDER BY created_at DESC LIMIT ?",
(f"%{search_term}%", limit),
)
rows = cursor.fetchall()
conn.close()
results = []
for row in rows:
results.append(
{
"id": row[0],
"query": row[1],
"answer": row[2],
"sources": _safe_json_loads(row[3]),
"created_at": row[4],
"trace": _safe_json_loads(row[5]),
}
)
return results
init_db()