Spaces:
Sleeping
Sleeping
| """ | |
| VoiceVault — SQLite Metadata Store | |
| ==================================== | |
| All SQLite interactions are centralized here. | |
| Provides schema initialization and CRUD operations for: | |
| - knowledge_bases : KB registry | |
| - documents : per-KB document registry | |
| - chunks : chunk-level metadata (source, page, section) | |
| - query_log : append-only audit trail of every query | |
| Security: | |
| - All queries use parameterized statements (? placeholders). | |
| - No f-string SQL anywhere in this module. | |
| - No raw user input is ever interpolated into SQL. | |
| """ | |
| import json | |
| import logging | |
| import sqlite3 | |
| from contextlib import contextmanager | |
| from datetime import datetime, timedelta, timezone | |
| from pathlib import Path | |
| from typing import Generator, Optional | |
| logger = logging.getLogger(__name__) | |
| # ------------------------------------------------------------------ # | |
| # Schema DDL # | |
| # ------------------------------------------------------------------ # | |
| _DDL_KNOWLEDGE_BASES = """ | |
| CREATE TABLE IF NOT EXISTS knowledge_bases ( | |
| kb_name TEXT PRIMARY KEY, | |
| display_name TEXT NOT NULL, | |
| password_hash TEXT, -- bcrypt hash; NULL = public | |
| owner TEXT NOT NULL DEFAULT 'default', | |
| doc_count INTEGER NOT NULL DEFAULT 0, | |
| chunk_count INTEGER NOT NULL DEFAULT 0, | |
| created_at TEXT NOT NULL DEFAULT (datetime('now')), | |
| last_updated TEXT | |
| ); | |
| """ | |
| _DDL_DOCUMENTS = """ | |
| CREATE TABLE IF NOT EXISTS documents ( | |
| doc_id TEXT PRIMARY KEY, | |
| kb_name TEXT NOT NULL REFERENCES knowledge_bases(kb_name) ON DELETE CASCADE, | |
| filename TEXT NOT NULL, | |
| file_hash TEXT NOT NULL, -- SHA-256 of file bytes | |
| page_count INTEGER NOT NULL DEFAULT 0, | |
| chunk_count INTEGER NOT NULL DEFAULT 0, | |
| is_private INTEGER NOT NULL DEFAULT 0, -- 0 = public, 1 = private | |
| ingested_at TEXT NOT NULL DEFAULT (datetime('now')) | |
| ); | |
| CREATE INDEX IF NOT EXISTS idx_documents_kb ON documents(kb_name); | |
| CREATE INDEX IF NOT EXISTS idx_documents_hash ON documents(file_hash); | |
| """ | |
| _DDL_CHUNKS = """ | |
| CREATE TABLE IF NOT EXISTS chunks ( | |
| chunk_id TEXT PRIMARY KEY, | |
| kb_name TEXT NOT NULL, | |
| doc_id TEXT NOT NULL REFERENCES documents(doc_id) ON DELETE CASCADE, | |
| source_file TEXT NOT NULL, | |
| page_number INTEGER NOT NULL DEFAULT 0, | |
| section TEXT NOT NULL DEFAULT '', | |
| chunk_index INTEGER NOT NULL DEFAULT 0, | |
| text_hash TEXT NOT NULL, -- SHA-256 of chunk text | |
| token_count INTEGER NOT NULL DEFAULT 0, | |
| language TEXT NOT NULL DEFAULT 'en', | |
| ingested_at TEXT NOT NULL DEFAULT (datetime('now')) | |
| ); | |
| CREATE INDEX IF NOT EXISTS idx_chunks_kb ON chunks(kb_name); | |
| CREATE INDEX IF NOT EXISTS idx_chunks_doc ON chunks(doc_id); | |
| CREATE INDEX IF NOT EXISTS idx_chunks_hash ON chunks(text_hash); | |
| """ | |
| _DDL_QUERY_LOG = """ | |
| CREATE TABLE IF NOT EXISTS query_log ( | |
| id TEXT PRIMARY KEY, | |
| session_id TEXT NOT NULL, | |
| kb_names TEXT NOT NULL, -- JSON array of kb names | |
| voice_query_hash TEXT NOT NULL, -- SHA-256 of query (anonymized) | |
| processed_query TEXT NOT NULL, | |
| query_type TEXT NOT NULL DEFAULT 'factual', | |
| answer_length INTEGER NOT NULL DEFAULT 0, | |
| citation_count INTEGER NOT NULL DEFAULT 0, | |
| latency_asr_ms INTEGER NOT NULL DEFAULT 0, | |
| latency_ret_ms INTEGER NOT NULL DEFAULT 0, | |
| latency_llm_ms INTEGER NOT NULL DEFAULT 0, | |
| total_latency_ms INTEGER NOT NULL DEFAULT 0, | |
| groq_tokens_used INTEGER NOT NULL DEFAULT 0, | |
| timestamp TEXT NOT NULL DEFAULT (datetime('now')) | |
| ); | |
| CREATE INDEX IF NOT EXISTS idx_query_log_session ON query_log(session_id); | |
| CREATE INDEX IF NOT EXISTS idx_query_log_ts ON query_log(timestamp); | |
| """ | |
| _ALL_DDL = [ | |
| _DDL_KNOWLEDGE_BASES, | |
| _DDL_DOCUMENTS, | |
| _DDL_CHUNKS, | |
| _DDL_QUERY_LOG, | |
| ] | |
| # ------------------------------------------------------------------ # | |
| # Connection Helper # | |
| # ------------------------------------------------------------------ # | |
| def _connect(db_path: Path) -> Generator[sqlite3.Connection, None, None]: | |
| """ | |
| Context manager for SQLite connections. | |
| Enforces WAL mode for concurrent readers and enables foreign keys. | |
| Always commits on clean exit, always rolls back on exception. | |
| """ | |
| conn = sqlite3.connect(str(db_path), check_same_thread=False) | |
| conn.row_factory = sqlite3.Row | |
| try: | |
| conn.execute("PRAGMA journal_mode=WAL;") | |
| conn.execute("PRAGMA foreign_keys=ON;") | |
| yield conn | |
| conn.commit() | |
| except Exception: | |
| conn.rollback() | |
| raise | |
| finally: | |
| conn.close() | |
| # ------------------------------------------------------------------ # | |
| # Schema Initialization # | |
| # ------------------------------------------------------------------ # | |
| def initialize_database(db_path: Path) -> None: | |
| """ | |
| Create all tables and indexes if they do not already exist. | |
| Safe to call on every application startup — idempotent. | |
| Args: | |
| db_path: Absolute path to the .db file. | |
| The parent directory must already exist. | |
| """ | |
| if not db_path.parent.exists(): | |
| raise FileNotFoundError( | |
| f"Parent directory does not exist: {db_path.parent}. " | |
| "Call cfg.ensure_directories() first." | |
| ) | |
| with _connect(db_path) as conn: | |
| for ddl in _ALL_DDL: | |
| conn.executescript(ddl) | |
| logger.info("SQLite schema initialized at %s", db_path) | |
| # ------------------------------------------------------------------ # | |
| # Knowledge Base CRUD # | |
| # ------------------------------------------------------------------ # | |
| def create_kb( | |
| db_path: Path, | |
| kb_name: str, | |
| display_name: str, | |
| owner: str = "default", | |
| password_hash: Optional[str] = None, | |
| ) -> None: | |
| """Insert a new knowledge base row. Raises if kb_name already exists.""" | |
| with _connect(db_path) as conn: | |
| conn.execute( | |
| """ | |
| INSERT INTO knowledge_bases (kb_name, display_name, owner, password_hash) | |
| VALUES (?, ?, ?, ?) | |
| """, | |
| (kb_name, display_name, owner, password_hash), | |
| ) | |
| logger.info("Created knowledge base '%s'", kb_name) | |
| def get_kb(db_path: Path, kb_name: str) -> Optional[dict]: | |
| """Return a KB row as a dict, or None if not found.""" | |
| with _connect(db_path) as conn: | |
| row = conn.execute( | |
| "SELECT * FROM knowledge_bases WHERE kb_name = ?", (kb_name,) | |
| ).fetchone() | |
| return dict(row) if row else None | |
| def list_kbs(db_path: Path) -> list[dict]: | |
| """Return all knowledge bases ordered by creation time.""" | |
| with _connect(db_path) as conn: | |
| rows = conn.execute( | |
| "SELECT * FROM knowledge_bases ORDER BY created_at DESC" | |
| ).fetchall() | |
| return [dict(r) for r in rows] | |
| def update_kb_counts(db_path: Path, kb_name: str, doc_count: int, chunk_count: int) -> None: | |
| """Update document and chunk counts + last_updated timestamp for a KB.""" | |
| with _connect(db_path) as conn: | |
| conn.execute( | |
| """ | |
| UPDATE knowledge_bases | |
| SET doc_count = ?, chunk_count = ?, last_updated = datetime('now') | |
| WHERE kb_name = ? | |
| """, | |
| (doc_count, chunk_count, kb_name), | |
| ) | |
| def delete_kb(db_path: Path, kb_name: str) -> None: | |
| """Delete a knowledge base and cascade-delete its documents and chunks.""" | |
| with _connect(db_path) as conn: | |
| conn.execute("DELETE FROM knowledge_bases WHERE kb_name = ?", (kb_name,)) | |
| logger.info("Deleted knowledge base '%s'", kb_name) | |
| # ------------------------------------------------------------------ # | |
| # Document CRUD # | |
| # ------------------------------------------------------------------ # | |
| def register_document( | |
| db_path: Path, | |
| doc_id: str, | |
| kb_name: str, | |
| filename: str, | |
| file_hash: str, | |
| page_count: int = 0, | |
| chunk_count: int = 0, | |
| is_private: bool = False, | |
| ) -> None: | |
| """Register a document in the documents table.""" | |
| with _connect(db_path) as conn: | |
| conn.execute( | |
| """ | |
| INSERT INTO documents | |
| (doc_id, kb_name, filename, file_hash, page_count, chunk_count, is_private) | |
| VALUES (?, ?, ?, ?, ?, ?, ?) | |
| """, | |
| (doc_id, kb_name, filename, file_hash, page_count, chunk_count, int(is_private)), | |
| ) | |
| def get_document_by_hash(db_path: Path, file_hash: str, kb_name: str) -> Optional[dict]: | |
| """Return a document row if a file with the same hash is already indexed in this KB.""" | |
| with _connect(db_path) as conn: | |
| row = conn.execute( | |
| "SELECT * FROM documents WHERE file_hash = ? AND kb_name = ?", | |
| (file_hash, kb_name), | |
| ).fetchone() | |
| return dict(row) if row else None | |
| def list_documents(db_path: Path, kb_name: str) -> list[dict]: | |
| """Return all documents for a KB ordered by ingestion time.""" | |
| with _connect(db_path) as conn: | |
| rows = conn.execute( | |
| "SELECT * FROM documents WHERE kb_name = ? ORDER BY ingested_at DESC", | |
| (kb_name,), | |
| ).fetchall() | |
| return [dict(r) for r in rows] | |
| def delete_document(db_path: Path, doc_id: str) -> None: | |
| """Delete a document and cascade-delete its chunks.""" | |
| with _connect(db_path) as conn: | |
| conn.execute("DELETE FROM documents WHERE doc_id = ?", (doc_id,)) | |
| # ------------------------------------------------------------------ # | |
| # Chunk CRUD # | |
| # ------------------------------------------------------------------ # | |
| def register_chunk( | |
| db_path: Path, | |
| chunk_id: str, | |
| kb_name: str, | |
| doc_id: str, | |
| source_file: str, | |
| page_number: int, | |
| section: str, | |
| chunk_index: int, | |
| text_hash: str, | |
| token_count: int, | |
| language: str = "en", | |
| ) -> None: | |
| """Insert a single chunk metadata row.""" | |
| with _connect(db_path) as conn: | |
| conn.execute( | |
| """ | |
| INSERT INTO chunks | |
| (chunk_id, kb_name, doc_id, source_file, page_number, | |
| section, chunk_index, text_hash, token_count, language) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, | |
| ( | |
| chunk_id, kb_name, doc_id, source_file, page_number, | |
| section, chunk_index, text_hash, token_count, language, | |
| ), | |
| ) | |
| def chunk_hash_exists(db_path: Path, text_hash: str, kb_name: str) -> bool: | |
| """Return True if a chunk with this text_hash already exists in the KB (deduplication).""" | |
| with _connect(db_path) as conn: | |
| row = conn.execute( | |
| "SELECT 1 FROM chunks WHERE text_hash = ? AND kb_name = ? LIMIT 1", | |
| (text_hash, kb_name), | |
| ).fetchone() | |
| return row is not None | |
| def get_chunk_count(db_path: Path, kb_name: str) -> int: | |
| """Return total chunk count for a KB.""" | |
| with _connect(db_path) as conn: | |
| row = conn.execute( | |
| "SELECT COUNT(*) AS cnt FROM chunks WHERE kb_name = ?", (kb_name,) | |
| ).fetchone() | |
| return int(row["cnt"]) if row else 0 | |
| def get_chunks_for_doc(db_path: Path, doc_id: str) -> list[dict]: | |
| """Return all chunk metadata rows for a document.""" | |
| with _connect(db_path) as conn: | |
| rows = conn.execute( | |
| "SELECT * FROM chunks WHERE doc_id = ? ORDER BY chunk_index", | |
| (doc_id,), | |
| ).fetchall() | |
| return [dict(r) for r in rows] | |
| def delete_chunks_for_doc(db_path: Path, doc_id: str) -> None: | |
| """Delete all chunk rows for a document (called before re-indexing).""" | |
| with _connect(db_path) as conn: | |
| conn.execute("DELETE FROM chunks WHERE doc_id = ?", (doc_id,)) | |
| # ------------------------------------------------------------------ # | |
| # Query Audit Log # | |
| # ------------------------------------------------------------------ # | |
| def log_query( | |
| db_path: Path, | |
| log_id: str, | |
| session_id: str, | |
| kb_names: list[str], | |
| voice_query_hash: str, | |
| processed_query: str, | |
| query_type: str, | |
| answer_length: int, | |
| citation_count: int, | |
| latency_asr_ms: int, | |
| latency_ret_ms: int, | |
| latency_llm_ms: int, | |
| total_latency_ms: int, | |
| groq_tokens_used: int, | |
| ) -> None: | |
| """Append a query session record to the audit log.""" | |
| with _connect(db_path) as conn: | |
| conn.execute( | |
| """ | |
| INSERT INTO query_log ( | |
| id, session_id, kb_names, voice_query_hash, | |
| processed_query, query_type, answer_length, citation_count, | |
| latency_asr_ms, latency_ret_ms, latency_llm_ms, | |
| total_latency_ms, groq_tokens_used | |
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, | |
| ( | |
| log_id, session_id, json.dumps(kb_names), voice_query_hash, | |
| processed_query, query_type, answer_length, citation_count, | |
| latency_asr_ms, latency_ret_ms, latency_llm_ms, | |
| total_latency_ms, groq_tokens_used, | |
| ), | |
| ) | |
| def get_query_log( | |
| db_path: Path, limit: int = 100, offset: int = 0 | |
| ) -> list[dict]: | |
| """Return recent query log entries, newest first.""" | |
| with _connect(db_path) as conn: | |
| rows = conn.execute( | |
| """ | |
| SELECT * FROM query_log | |
| ORDER BY timestamp DESC | |
| LIMIT ? OFFSET ? | |
| """, | |
| (limit, offset), | |
| ).fetchall() | |
| return [dict(r) for r in rows] | |
| def get_query_stats(db_path: Path, days: int = 7) -> dict: | |
| """ | |
| Return aggregate query statistics for the Analytics tab. | |
| Returns: | |
| dict with keys: total_queries, avg_latency_ms, avg_citation_count, | |
| queries_by_day (list of {date, count}) | |
| """ | |
| cutoff = (datetime.now(timezone.utc) - timedelta(days=days)).strftime("%Y-%m-%d %H:%M:%S") | |
| with _connect(db_path) as conn: | |
| totals = conn.execute( | |
| """ | |
| SELECT | |
| COUNT(*) AS total_queries, | |
| ROUND(AVG(total_latency_ms)) AS avg_latency_ms, | |
| ROUND(AVG(citation_count), 1) AS avg_citation_count | |
| FROM query_log | |
| WHERE timestamp >= ? | |
| """, | |
| (cutoff,), | |
| ).fetchone() | |
| by_day = conn.execute( | |
| """ | |
| SELECT | |
| DATE(timestamp) AS date, | |
| COUNT(*) AS count | |
| FROM query_log | |
| WHERE timestamp >= ? | |
| GROUP BY DATE(timestamp) | |
| ORDER BY date | |
| """, | |
| (cutoff,), | |
| ).fetchall() | |
| return { | |
| "total_queries": totals["total_queries"] if totals else 0, | |
| "avg_latency_ms": totals["avg_latency_ms"] if totals else 0, | |
| "avg_citation_count": totals["avg_citation_count"] if totals else 0, | |
| "queries_by_day": [dict(r) for r in by_day], | |
| } | |