Spaces:
Sleeping
Sleeping
| """SQLite-backed document version registry. | |
| Tables | |
| ------ | |
| documents β one row per (doc_id, kb_version); tracks checksum + status | |
| kb_versions β one row per snapshot; docs added/changed/unchanged counts | |
| query_log β append-only audit trail of every versioned query | |
| latest_version β single-row table: which version is "current" | |
| """ | |
| from __future__ import annotations | |
| import datetime | |
| import sqlite3 | |
| from pathlib import Path | |
| from config import PATHS | |
| _DB_PATH: Path = PATHS.get("versions_db", # type: ignore[arg-type] | |
| Path(__file__).parent.parent / "storage" / "versions.db") | |
| class DocumentStore: | |
| def __init__(self, db_path: Path | str | None = None) -> None: | |
| self.db_path = Path(db_path or _DB_PATH) | |
| self._init_db() | |
| def _conn(self) -> sqlite3.Connection: | |
| self.db_path.parent.mkdir(parents=True, exist_ok=True) | |
| return sqlite3.connect(str(self.db_path), check_same_thread=False) | |
| def _init_db(self) -> None: | |
| with self._conn() as conn: | |
| conn.executescript( | |
| """ | |
| CREATE TABLE IF NOT EXISTS documents ( | |
| doc_id TEXT NOT NULL, | |
| version INTEGER NOT NULL, | |
| checksum TEXT NOT NULL, | |
| timestamp TEXT NOT NULL, | |
| status TEXT NOT NULL DEFAULT 'active', | |
| source_path TEXT, | |
| title TEXT, | |
| PRIMARY KEY (doc_id, version) | |
| ); | |
| CREATE TABLE IF NOT EXISTS kb_versions ( | |
| version INTEGER PRIMARY KEY, | |
| timestamp TEXT NOT NULL, | |
| batch_name TEXT, | |
| docs_added INTEGER DEFAULT 0, | |
| docs_changed INTEGER DEFAULT 0, | |
| docs_unchanged INTEGER DEFAULT 0, | |
| reason TEXT, | |
| collection_name TEXT | |
| ); | |
| CREATE TABLE IF NOT EXISTS query_log ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| timestamp TEXT NOT NULL, | |
| query TEXT NOT NULL, | |
| version_used INTEGER, | |
| answer_hash TEXT | |
| ); | |
| CREATE TABLE IF NOT EXISTS latest_version ( | |
| id INTEGER PRIMARY KEY CHECK (id = 1), | |
| version INTEGER NOT NULL | |
| ); | |
| """ | |
| ) | |
| # ββ document operations ββββββββββββββββββββββββββββββββββββββββββ | |
| def add_doc( | |
| self, | |
| doc_id: str, | |
| version: int, | |
| checksum: str, | |
| status: str, | |
| source_path: str, | |
| title: str, | |
| ) -> None: | |
| ts = datetime.datetime.utcnow().isoformat() | |
| with self._conn() as conn: | |
| conn.execute( | |
| "INSERT OR REPLACE INTO documents " | |
| "(doc_id, version, checksum, timestamp, status, source_path, title) " | |
| "VALUES (?,?,?,?,?,?,?)", | |
| (doc_id, version, checksum, ts, status, source_path, title), | |
| ) | |
| def get_checksum(self, doc_id: str) -> str | None: | |
| """Return the checksum of the highest-version record for this doc_id.""" | |
| with self._conn() as conn: | |
| row = conn.execute( | |
| "SELECT checksum FROM documents WHERE doc_id=? " | |
| "ORDER BY version DESC LIMIT 1", | |
| (doc_id,), | |
| ).fetchone() | |
| return row[0] if row else None | |
| def get_doc_history(self, doc_id: str) -> list[dict]: | |
| with self._conn() as conn: | |
| rows = conn.execute( | |
| "SELECT doc_id, version, checksum, timestamp, status, source_path, title " | |
| "FROM documents WHERE doc_id=? ORDER BY version DESC", | |
| (doc_id,), | |
| ).fetchall() | |
| cols = ["doc_id", "version", "checksum", "timestamp", | |
| "status", "source_path", "title"] | |
| return [dict(zip(cols, r)) for r in rows] | |
| def get_all_doc_ids(self) -> list[str]: | |
| with self._conn() as conn: | |
| rows = conn.execute( | |
| "SELECT DISTINCT doc_id FROM documents" | |
| ).fetchall() | |
| return [r[0] for r in rows] | |
| # ββ version operations βββββββββββββββββββββββββββββββββββββββββββ | |
| def bump_version(self) -> int: | |
| """Return next version number (max existing + 1, or 1 for first run).""" | |
| with self._conn() as conn: | |
| row = conn.execute("SELECT MAX(version) FROM kb_versions").fetchone() | |
| return (row[0] or 0) + 1 | |
| def log_version( | |
| self, | |
| version: int, | |
| batch_name: str, | |
| docs_added: int, | |
| docs_changed: int, | |
| docs_unchanged: int, | |
| reason: str, | |
| collection_name: str, | |
| ) -> None: | |
| ts = datetime.datetime.utcnow().isoformat() | |
| with self._conn() as conn: | |
| conn.execute( | |
| "INSERT OR REPLACE INTO kb_versions " | |
| "(version, timestamp, batch_name, docs_added, docs_changed, " | |
| "docs_unchanged, reason, collection_name) VALUES (?,?,?,?,?,?,?,?)", | |
| (version, ts, batch_name, docs_added, docs_changed, | |
| docs_unchanged, reason, collection_name), | |
| ) | |
| def set_latest(self, version: int) -> None: | |
| with self._conn() as conn: | |
| conn.execute( | |
| "INSERT OR REPLACE INTO latest_version (id, version) VALUES (1,?)", | |
| (version,), | |
| ) | |
| def get_latest(self) -> int | None: | |
| with self._conn() as conn: | |
| row = conn.execute( | |
| "SELECT version FROM latest_version WHERE id=1" | |
| ).fetchone() | |
| return row[0] if row else None | |
| def get_history(self) -> list[dict]: | |
| with self._conn() as conn: | |
| rows = conn.execute( | |
| "SELECT version, timestamp, batch_name, docs_added, docs_changed, " | |
| "docs_unchanged, reason, collection_name " | |
| "FROM kb_versions ORDER BY version DESC" | |
| ).fetchall() | |
| cols = ["version", "timestamp", "batch_name", "docs_added", | |
| "docs_changed", "docs_unchanged", "reason", "collection_name"] | |
| return [dict(zip(cols, r)) for r in rows] | |
| def get_version_info(self, version: int) -> dict | None: | |
| with self._conn() as conn: | |
| row = conn.execute( | |
| "SELECT version, timestamp, batch_name, docs_added, docs_changed, " | |
| "docs_unchanged, reason, collection_name " | |
| "FROM kb_versions WHERE version=?", | |
| (version,), | |
| ).fetchone() | |
| if not row: | |
| return None | |
| cols = ["version", "timestamp", "batch_name", "docs_added", | |
| "docs_changed", "docs_unchanged", "reason", "collection_name"] | |
| return dict(zip(cols, row)) | |
| def docs_at_version(self, version: int) -> list[dict]: | |
| """All docs that were active at or before this version (latest entry β€ version).""" | |
| with self._conn() as conn: | |
| rows = conn.execute( | |
| """ | |
| SELECT d.doc_id, d.version, d.checksum, d.timestamp, | |
| d.status, d.source_path, d.title | |
| FROM documents d | |
| INNER JOIN ( | |
| SELECT doc_id, MAX(version) AS mv | |
| FROM documents WHERE version <= ? | |
| GROUP BY doc_id | |
| ) latest ON d.doc_id = latest.doc_id AND d.version = latest.mv | |
| WHERE d.status = 'active' | |
| ORDER BY d.doc_id | |
| """, | |
| (version,), | |
| ).fetchall() | |
| cols = ["doc_id", "version", "checksum", "timestamp", | |
| "status", "source_path", "title"] | |
| return [dict(zip(cols, r)) for r in rows] | |
| # ββ query audit log ββββββββββββββββββββββββββββββββββββββββββββββ | |
| def log_query(self, query: str, version_used: int, answer_hash: str) -> None: | |
| ts = datetime.datetime.utcnow().isoformat() | |
| with self._conn() as conn: | |
| conn.execute( | |
| "INSERT INTO query_log (timestamp, query, version_used, answer_hash) " | |
| "VALUES (?,?,?,?)", | |
| (ts, query, version_used, answer_hash), | |
| ) | |
| def get_query_log(self, limit: int = 50) -> list[dict]: | |
| with self._conn() as conn: | |
| rows = conn.execute( | |
| "SELECT timestamp, query, version_used, answer_hash " | |
| "FROM query_log ORDER BY id DESC LIMIT ?", | |
| (limit,), | |
| ).fetchall() | |
| cols = ["timestamp", "query", "version_used", "answer_hash"] | |
| return [dict(zip(cols, r)) for r in rows] | |