""" VoiceVault — SQLite Metadata Store ==================================== All SQLite interactions are centralized here. Provides schema initialization and CRUD operations for: - knowledge_bases : KB registry - documents : per-KB document registry - chunks : chunk-level metadata (source, page, section) - query_log : append-only audit trail of every query Security: - All queries use parameterized statements (? placeholders). - No f-string SQL anywhere in this module. - No raw user input is ever interpolated into SQL. """ import json import logging import sqlite3 from contextlib import contextmanager from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Generator, Optional logger = logging.getLogger(__name__) # ------------------------------------------------------------------ # # Schema DDL # # ------------------------------------------------------------------ # _DDL_KNOWLEDGE_BASES = """ CREATE TABLE IF NOT EXISTS knowledge_bases ( kb_name TEXT PRIMARY KEY, display_name TEXT NOT NULL, password_hash TEXT, -- bcrypt hash; NULL = public owner TEXT NOT NULL DEFAULT 'default', doc_count INTEGER NOT NULL DEFAULT 0, chunk_count INTEGER NOT NULL DEFAULT 0, created_at TEXT NOT NULL DEFAULT (datetime('now')), last_updated TEXT ); """ _DDL_DOCUMENTS = """ CREATE TABLE IF NOT EXISTS documents ( doc_id TEXT PRIMARY KEY, kb_name TEXT NOT NULL REFERENCES knowledge_bases(kb_name) ON DELETE CASCADE, filename TEXT NOT NULL, file_hash TEXT NOT NULL, -- SHA-256 of file bytes page_count INTEGER NOT NULL DEFAULT 0, chunk_count INTEGER NOT NULL DEFAULT 0, is_private INTEGER NOT NULL DEFAULT 0, -- 0 = public, 1 = private ingested_at TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE INDEX IF NOT EXISTS idx_documents_kb ON documents(kb_name); CREATE INDEX IF NOT EXISTS idx_documents_hash ON documents(file_hash); """ _DDL_CHUNKS = """ CREATE TABLE IF NOT EXISTS chunks ( chunk_id TEXT PRIMARY KEY, kb_name TEXT NOT NULL, doc_id TEXT NOT NULL REFERENCES documents(doc_id) ON DELETE CASCADE, source_file TEXT NOT NULL, page_number INTEGER NOT NULL DEFAULT 0, section TEXT NOT NULL DEFAULT '', chunk_index INTEGER NOT NULL DEFAULT 0, text_hash TEXT NOT NULL, -- SHA-256 of chunk text token_count INTEGER NOT NULL DEFAULT 0, language TEXT NOT NULL DEFAULT 'en', ingested_at TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE INDEX IF NOT EXISTS idx_chunks_kb ON chunks(kb_name); CREATE INDEX IF NOT EXISTS idx_chunks_doc ON chunks(doc_id); CREATE INDEX IF NOT EXISTS idx_chunks_hash ON chunks(text_hash); """ _DDL_QUERY_LOG = """ CREATE TABLE IF NOT EXISTS query_log ( id TEXT PRIMARY KEY, session_id TEXT NOT NULL, kb_names TEXT NOT NULL, -- JSON array of kb names voice_query_hash TEXT NOT NULL, -- SHA-256 of query (anonymized) processed_query TEXT NOT NULL, query_type TEXT NOT NULL DEFAULT 'factual', answer_length INTEGER NOT NULL DEFAULT 0, citation_count INTEGER NOT NULL DEFAULT 0, latency_asr_ms INTEGER NOT NULL DEFAULT 0, latency_ret_ms INTEGER NOT NULL DEFAULT 0, latency_llm_ms INTEGER NOT NULL DEFAULT 0, total_latency_ms INTEGER NOT NULL DEFAULT 0, groq_tokens_used INTEGER NOT NULL DEFAULT 0, timestamp TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE INDEX IF NOT EXISTS idx_query_log_session ON query_log(session_id); CREATE INDEX IF NOT EXISTS idx_query_log_ts ON query_log(timestamp); """ _ALL_DDL = [ _DDL_KNOWLEDGE_BASES, _DDL_DOCUMENTS, _DDL_CHUNKS, _DDL_QUERY_LOG, ] # ------------------------------------------------------------------ # # Connection Helper # # ------------------------------------------------------------------ # @contextmanager def _connect(db_path: Path) -> Generator[sqlite3.Connection, None, None]: """ Context manager for SQLite connections. Enforces WAL mode for concurrent readers and enables foreign keys. Always commits on clean exit, always rolls back on exception. """ conn = sqlite3.connect(str(db_path), check_same_thread=False) conn.row_factory = sqlite3.Row try: conn.execute("PRAGMA journal_mode=WAL;") conn.execute("PRAGMA foreign_keys=ON;") yield conn conn.commit() except Exception: conn.rollback() raise finally: conn.close() # ------------------------------------------------------------------ # # Schema Initialization # # ------------------------------------------------------------------ # def initialize_database(db_path: Path) -> None: """ Create all tables and indexes if they do not already exist. Safe to call on every application startup — idempotent. Args: db_path: Absolute path to the .db file. The parent directory must already exist. """ if not db_path.parent.exists(): raise FileNotFoundError( f"Parent directory does not exist: {db_path.parent}. " "Call cfg.ensure_directories() first." ) with _connect(db_path) as conn: for ddl in _ALL_DDL: conn.executescript(ddl) logger.info("SQLite schema initialized at %s", db_path) # ------------------------------------------------------------------ # # Knowledge Base CRUD # # ------------------------------------------------------------------ # def create_kb( db_path: Path, kb_name: str, display_name: str, owner: str = "default", password_hash: Optional[str] = None, ) -> None: """Insert a new knowledge base row. Raises if kb_name already exists.""" with _connect(db_path) as conn: conn.execute( """ INSERT INTO knowledge_bases (kb_name, display_name, owner, password_hash) VALUES (?, ?, ?, ?) """, (kb_name, display_name, owner, password_hash), ) logger.info("Created knowledge base '%s'", kb_name) def get_kb(db_path: Path, kb_name: str) -> Optional[dict]: """Return a KB row as a dict, or None if not found.""" with _connect(db_path) as conn: row = conn.execute( "SELECT * FROM knowledge_bases WHERE kb_name = ?", (kb_name,) ).fetchone() return dict(row) if row else None def list_kbs(db_path: Path) -> list[dict]: """Return all knowledge bases ordered by creation time.""" with _connect(db_path) as conn: rows = conn.execute( "SELECT * FROM knowledge_bases ORDER BY created_at DESC" ).fetchall() return [dict(r) for r in rows] def update_kb_counts(db_path: Path, kb_name: str, doc_count: int, chunk_count: int) -> None: """Update document and chunk counts + last_updated timestamp for a KB.""" with _connect(db_path) as conn: conn.execute( """ UPDATE knowledge_bases SET doc_count = ?, chunk_count = ?, last_updated = datetime('now') WHERE kb_name = ? """, (doc_count, chunk_count, kb_name), ) def delete_kb(db_path: Path, kb_name: str) -> None: """Delete a knowledge base and cascade-delete its documents and chunks.""" with _connect(db_path) as conn: conn.execute("DELETE FROM knowledge_bases WHERE kb_name = ?", (kb_name,)) logger.info("Deleted knowledge base '%s'", kb_name) # ------------------------------------------------------------------ # # Document CRUD # # ------------------------------------------------------------------ # def register_document( db_path: Path, doc_id: str, kb_name: str, filename: str, file_hash: str, page_count: int = 0, chunk_count: int = 0, is_private: bool = False, ) -> None: """Register a document in the documents table.""" with _connect(db_path) as conn: conn.execute( """ INSERT INTO documents (doc_id, kb_name, filename, file_hash, page_count, chunk_count, is_private) VALUES (?, ?, ?, ?, ?, ?, ?) """, (doc_id, kb_name, filename, file_hash, page_count, chunk_count, int(is_private)), ) def get_document_by_hash(db_path: Path, file_hash: str, kb_name: str) -> Optional[dict]: """Return a document row if a file with the same hash is already indexed in this KB.""" with _connect(db_path) as conn: row = conn.execute( "SELECT * FROM documents WHERE file_hash = ? AND kb_name = ?", (file_hash, kb_name), ).fetchone() return dict(row) if row else None def list_documents(db_path: Path, kb_name: str) -> list[dict]: """Return all documents for a KB ordered by ingestion time.""" with _connect(db_path) as conn: rows = conn.execute( "SELECT * FROM documents WHERE kb_name = ? ORDER BY ingested_at DESC", (kb_name,), ).fetchall() return [dict(r) for r in rows] def delete_document(db_path: Path, doc_id: str) -> None: """Delete a document and cascade-delete its chunks.""" with _connect(db_path) as conn: conn.execute("DELETE FROM documents WHERE doc_id = ?", (doc_id,)) # ------------------------------------------------------------------ # # Chunk CRUD # # ------------------------------------------------------------------ # def register_chunk( db_path: Path, chunk_id: str, kb_name: str, doc_id: str, source_file: str, page_number: int, section: str, chunk_index: int, text_hash: str, token_count: int, language: str = "en", ) -> None: """Insert a single chunk metadata row.""" with _connect(db_path) as conn: conn.execute( """ INSERT INTO chunks (chunk_id, kb_name, doc_id, source_file, page_number, section, chunk_index, text_hash, token_count, language) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( chunk_id, kb_name, doc_id, source_file, page_number, section, chunk_index, text_hash, token_count, language, ), ) def chunk_hash_exists(db_path: Path, text_hash: str, kb_name: str) -> bool: """Return True if a chunk with this text_hash already exists in the KB (deduplication).""" with _connect(db_path) as conn: row = conn.execute( "SELECT 1 FROM chunks WHERE text_hash = ? AND kb_name = ? LIMIT 1", (text_hash, kb_name), ).fetchone() return row is not None def get_chunk_count(db_path: Path, kb_name: str) -> int: """Return total chunk count for a KB.""" with _connect(db_path) as conn: row = conn.execute( "SELECT COUNT(*) AS cnt FROM chunks WHERE kb_name = ?", (kb_name,) ).fetchone() return int(row["cnt"]) if row else 0 def get_chunks_for_doc(db_path: Path, doc_id: str) -> list[dict]: """Return all chunk metadata rows for a document.""" with _connect(db_path) as conn: rows = conn.execute( "SELECT * FROM chunks WHERE doc_id = ? ORDER BY chunk_index", (doc_id,), ).fetchall() return [dict(r) for r in rows] def delete_chunks_for_doc(db_path: Path, doc_id: str) -> None: """Delete all chunk rows for a document (called before re-indexing).""" with _connect(db_path) as conn: conn.execute("DELETE FROM chunks WHERE doc_id = ?", (doc_id,)) # ------------------------------------------------------------------ # # Query Audit Log # # ------------------------------------------------------------------ # def log_query( db_path: Path, log_id: str, session_id: str, kb_names: list[str], voice_query_hash: str, processed_query: str, query_type: str, answer_length: int, citation_count: int, latency_asr_ms: int, latency_ret_ms: int, latency_llm_ms: int, total_latency_ms: int, groq_tokens_used: int, ) -> None: """Append a query session record to the audit log.""" with _connect(db_path) as conn: conn.execute( """ INSERT INTO query_log ( id, session_id, kb_names, voice_query_hash, processed_query, query_type, answer_length, citation_count, latency_asr_ms, latency_ret_ms, latency_llm_ms, total_latency_ms, groq_tokens_used ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( log_id, session_id, json.dumps(kb_names), voice_query_hash, processed_query, query_type, answer_length, citation_count, latency_asr_ms, latency_ret_ms, latency_llm_ms, total_latency_ms, groq_tokens_used, ), ) def get_query_log( db_path: Path, limit: int = 100, offset: int = 0 ) -> list[dict]: """Return recent query log entries, newest first.""" with _connect(db_path) as conn: rows = conn.execute( """ SELECT * FROM query_log ORDER BY timestamp DESC LIMIT ? OFFSET ? """, (limit, offset), ).fetchall() return [dict(r) for r in rows] def get_query_stats(db_path: Path, days: int = 7) -> dict: """ Return aggregate query statistics for the Analytics tab. Returns: dict with keys: total_queries, avg_latency_ms, avg_citation_count, queries_by_day (list of {date, count}) """ cutoff = (datetime.now(timezone.utc) - timedelta(days=days)).strftime("%Y-%m-%d %H:%M:%S") with _connect(db_path) as conn: totals = conn.execute( """ SELECT COUNT(*) AS total_queries, ROUND(AVG(total_latency_ms)) AS avg_latency_ms, ROUND(AVG(citation_count), 1) AS avg_citation_count FROM query_log WHERE timestamp >= ? """, (cutoff,), ).fetchone() by_day = conn.execute( """ SELECT DATE(timestamp) AS date, COUNT(*) AS count FROM query_log WHERE timestamp >= ? GROUP BY DATE(timestamp) ORDER BY date """, (cutoff,), ).fetchall() return { "total_queries": totals["total_queries"] if totals else 0, "avg_latency_ms": totals["avg_latency_ms"] if totals else 0, "avg_citation_count": totals["avg_citation_count"] if totals else 0, "queries_by_day": [dict(r) for r in by_day], }