Spaces:
Running
Running
| """SQLite + numpy vector store. | |
| Zero external dependencies (SQLite is built into Python). For our scale | |
| (< 10K chunks total per user), a brute-force cosine-sim scan in numpy is | |
| ~5-20ms β there's no need for an ANN index. If we ever need it, we can | |
| swap in FAISS or Chroma without touching the API. | |
| """ | |
| from __future__ import annotations | |
| import sqlite3 | |
| import uuid | |
| from contextlib import contextmanager | |
| from datetime import datetime | |
| from pathlib import Path | |
| import numpy as np | |
| DEFAULT_DB = Path("data/knowledge.sqlite") | |
| class KnowledgeStore: | |
| def __init__(self, db_path: str | Path = DEFAULT_DB) -> None: | |
| self.db_path = Path(db_path) | |
| self.db_path.parent.mkdir(parents=True, exist_ok=True) | |
| self._init_schema() | |
| def _conn(self): | |
| c = sqlite3.connect(self.db_path) | |
| c.row_factory = sqlite3.Row | |
| c.execute("PRAGMA foreign_keys = ON") | |
| try: | |
| yield c | |
| c.commit() | |
| finally: | |
| c.close() | |
| def _init_schema(self) -> None: | |
| with self._conn() as c: | |
| c.executescript( | |
| """ | |
| CREATE TABLE IF NOT EXISTS documents ( | |
| id TEXT PRIMARY KEY, | |
| name TEXT NOT NULL, | |
| format TEXT NOT NULL, | |
| size_bytes INTEGER DEFAULT 0, | |
| chunk_count INTEGER DEFAULT 0, | |
| uploaded_at TEXT NOT NULL | |
| ); | |
| CREATE TABLE IF NOT EXISTS chunks ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| document_id TEXT NOT NULL, | |
| chunk_index INTEGER NOT NULL, | |
| text TEXT NOT NULL, | |
| embedding BLOB NOT NULL, | |
| FOREIGN KEY (document_id) REFERENCES documents(id) ON DELETE CASCADE | |
| ); | |
| CREATE INDEX IF NOT EXISTS idx_chunks_doc ON chunks(document_id); | |
| """ | |
| ) | |
| # ββ writes ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def add_document( | |
| self, | |
| name: str, | |
| fmt: str, | |
| chunks: list[str], | |
| embeddings: np.ndarray, | |
| size_bytes: int = 0, | |
| ) -> str: | |
| if len(chunks) != len(embeddings): | |
| raise ValueError( | |
| f"chunks ({len(chunks)}) and embeddings ({len(embeddings)}) must match" | |
| ) | |
| doc_id = uuid.uuid4().hex[:12] | |
| with self._conn() as c: | |
| c.execute( | |
| "INSERT INTO documents (id, name, format, size_bytes, chunk_count, uploaded_at) " | |
| "VALUES (?, ?, ?, ?, ?, ?)", | |
| (doc_id, name, fmt, size_bytes, len(chunks), datetime.utcnow().isoformat()), | |
| ) | |
| for i, (chunk, emb) in enumerate(zip(chunks, embeddings)): | |
| c.execute( | |
| "INSERT INTO chunks (document_id, chunk_index, text, embedding) " | |
| "VALUES (?, ?, ?, ?)", | |
| (doc_id, i, chunk, emb.astype(np.float32).tobytes()), | |
| ) | |
| return doc_id | |
| def delete_document(self, document_id: str) -> bool: | |
| with self._conn() as c: | |
| c.execute("DELETE FROM chunks WHERE document_id = ?", (document_id,)) | |
| cur = c.execute("DELETE FROM documents WHERE id = ?", (document_id,)) | |
| return cur.rowcount > 0 | |
| def clear_all(self) -> None: | |
| with self._conn() as c: | |
| c.execute("DELETE FROM chunks") | |
| c.execute("DELETE FROM documents") | |
| # ββ reads βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def list_documents(self) -> list[dict]: | |
| with self._conn() as c: | |
| rows = c.execute( | |
| "SELECT id, name, format, size_bytes, chunk_count, uploaded_at " | |
| "FROM documents ORDER BY uploaded_at DESC" | |
| ).fetchall() | |
| return [dict(r) for r in rows] | |
| def stats(self) -> dict: | |
| with self._conn() as c: | |
| doc_count = c.execute("SELECT COUNT(*) FROM documents").fetchone()[0] | |
| chunk_count = c.execute("SELECT COUNT(*) FROM chunks").fetchone()[0] | |
| total_size = c.execute( | |
| "SELECT COALESCE(SUM(size_bytes), 0) FROM documents" | |
| ).fetchone()[0] | |
| return { | |
| "document_count": doc_count, | |
| "chunk_count": chunk_count, | |
| "total_bytes": total_size, | |
| } | |
| def search(self, query_embedding: np.ndarray, top_k: int = 3) -> list[dict]: | |
| """Cosine-similarity search across all chunks. | |
| Returns the top-k most relevant chunks with full provenance metadata. | |
| Embeddings are assumed L2-normalised so dot product == cosine sim. | |
| """ | |
| with self._conn() as c: | |
| rows = c.execute( | |
| """ | |
| SELECT chunks.id, chunks.document_id, chunks.chunk_index, chunks.text, | |
| chunks.embedding, documents.name AS doc_name, | |
| documents.format AS doc_format | |
| FROM chunks | |
| JOIN documents ON chunks.document_id = documents.id | |
| """ | |
| ).fetchall() | |
| if not rows: | |
| return [] | |
| emb_matrix = np.stack( | |
| [np.frombuffer(r["embedding"], dtype=np.float32) for r in rows] | |
| ) | |
| q = query_embedding.astype(np.float32) | |
| q_norm = np.linalg.norm(q) | |
| if q_norm > 0: | |
| q = q / q_norm | |
| scores = emb_matrix @ q | |
| top = np.argsort(scores)[::-1][:top_k] | |
| return [ | |
| { | |
| "chunk_id": int(rows[i]["id"]), | |
| "document_id": rows[i]["document_id"], | |
| "document_name": rows[i]["doc_name"], | |
| "document_format": rows[i]["doc_format"], | |
| "chunk_index": int(rows[i]["chunk_index"]), | |
| "text": rows[i]["text"], | |
| "score": float(scores[i]), | |
| } | |
| for i in top | |
| ] | |