Spaces:
Sleeping
Sleeping
| import sqlite3 | |
| import json | |
| import logging | |
| from datetime import datetime | |
| from typing import List, Optional | |
| from pathlib import Path | |
| logger = logging.getLogger(__name__) | |
| DB_PATH = Path(__file__).parent.parent / "nexusai_history.db" | |
| def _safe_json_loads(data: str) -> List[dict]: | |
| if not data: | |
| return [] | |
| try: | |
| return json.loads(data) | |
| except (json.JSONDecodeError, TypeError) as e: | |
| logger.warning(f"Failed to parse JSON from database: {e}") | |
| return [] | |
| def get_connection(): | |
| return sqlite3.connect(DB_PATH, check_same_thread=False) | |
| def init_db(): | |
| conn = get_connection() | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS research_history ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| query TEXT NOT NULL, | |
| answer TEXT, | |
| sources TEXT, | |
| trace TEXT, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| """) | |
| conn.commit() | |
| conn.close() | |
| def save_research(query: str, answer: str, sources: List[dict], trace: List[str] = None) -> int: | |
| conn = get_connection() | |
| cursor = conn.cursor() | |
| sources_json = json.dumps(sources) | |
| trace_json = json.dumps(trace if trace else []) | |
| cursor.execute( | |
| "INSERT INTO research_history (query, answer, sources, trace) VALUES (?, ?, ?, ?)", | |
| (query, answer, sources_json, trace_json), | |
| ) | |
| conn.commit() | |
| research_id = cursor.lastrowid | |
| conn.close() | |
| return research_id | |
| def get_all_research(limit: int = 50) -> List[dict]: | |
| conn = get_connection() | |
| cursor = conn.cursor() | |
| cursor.execute( | |
| "SELECT id, query, answer, sources, created_at, trace FROM research_history ORDER BY created_at DESC LIMIT ?", | |
| (limit,), | |
| ) | |
| rows = cursor.fetchall() | |
| conn.close() | |
| results = [] | |
| for row in rows: | |
| results.append( | |
| { | |
| "id": row[0], | |
| "query": row[1], | |
| "answer": row[2], | |
| "sources": _safe_json_loads(row[3]), | |
| "created_at": row[4], | |
| "trace": _safe_json_loads(row[5]), | |
| } | |
| ) | |
| return results | |
| def get_research_by_id(research_id: int) -> Optional[dict]: | |
| conn = get_connection() | |
| cursor = conn.cursor() | |
| cursor.execute( | |
| "SELECT id, query, answer, sources, created_at, trace FROM research_history WHERE id = ?", | |
| (research_id,), | |
| ) | |
| row = cursor.fetchone() | |
| conn.close() | |
| if row: | |
| return { | |
| "id": row[0], | |
| "query": row[1], | |
| "answer": row[2], | |
| "sources": _safe_json_loads(row[3]), | |
| "created_at": row[4], | |
| "trace": _safe_json_loads(row[5]), | |
| } | |
| return None | |
| def delete_research(research_id: int) -> bool: | |
| conn = get_connection() | |
| cursor = conn.cursor() | |
| cursor.execute("DELETE FROM research_history WHERE id = ?", (research_id,)) | |
| deleted = cursor.rowcount > 0 | |
| conn.commit() | |
| conn.close() | |
| return deleted | |
| def search_history(search_term: str, limit: int = 20) -> List[dict]: | |
| conn = get_connection() | |
| cursor = conn.cursor() | |
| cursor.execute( | |
| "SELECT id, query, answer, sources, created_at, trace FROM research_history WHERE query LIKE ? ORDER BY created_at DESC LIMIT ?", | |
| (f"%{search_term}%", limit), | |
| ) | |
| rows = cursor.fetchall() | |
| conn.close() | |
| results = [] | |
| for row in rows: | |
| results.append( | |
| { | |
| "id": row[0], | |
| "query": row[1], | |
| "answer": row[2], | |
| "sources": _safe_json_loads(row[3]), | |
| "created_at": row[4], | |
| "trace": _safe_json_loads(row[5]), | |
| } | |
| ) | |
| return results | |
| init_db() | |