AdaptiveRAG / versioning /document_store.py
NoobNovel's picture
Add Knowledge Base Versioning layer with full UI tab
66bd94a
"""SQLite-backed document version registry.
Tables
------
documents β€” one row per (doc_id, kb_version); tracks checksum + status
kb_versions β€” one row per snapshot; docs added/changed/unchanged counts
query_log β€” append-only audit trail of every versioned query
latest_version β€” single-row table: which version is "current"
"""
from __future__ import annotations
import datetime
import sqlite3
from pathlib import Path
from config import PATHS
_DB_PATH: Path = PATHS.get("versions_db", # type: ignore[arg-type]
Path(__file__).parent.parent / "storage" / "versions.db")
class DocumentStore:
def __init__(self, db_path: Path | str | None = None) -> None:
self.db_path = Path(db_path or _DB_PATH)
self._init_db()
def _conn(self) -> sqlite3.Connection:
self.db_path.parent.mkdir(parents=True, exist_ok=True)
return sqlite3.connect(str(self.db_path), check_same_thread=False)
def _init_db(self) -> None:
with self._conn() as conn:
conn.executescript(
"""
CREATE TABLE IF NOT EXISTS documents (
doc_id TEXT NOT NULL,
version INTEGER NOT NULL,
checksum TEXT NOT NULL,
timestamp TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'active',
source_path TEXT,
title TEXT,
PRIMARY KEY (doc_id, version)
);
CREATE TABLE IF NOT EXISTS kb_versions (
version INTEGER PRIMARY KEY,
timestamp TEXT NOT NULL,
batch_name TEXT,
docs_added INTEGER DEFAULT 0,
docs_changed INTEGER DEFAULT 0,
docs_unchanged INTEGER DEFAULT 0,
reason TEXT,
collection_name TEXT
);
CREATE TABLE IF NOT EXISTS query_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
query TEXT NOT NULL,
version_used INTEGER,
answer_hash TEXT
);
CREATE TABLE IF NOT EXISTS latest_version (
id INTEGER PRIMARY KEY CHECK (id = 1),
version INTEGER NOT NULL
);
"""
)
# ── document operations ──────────────────────────────────────────
def add_doc(
self,
doc_id: str,
version: int,
checksum: str,
status: str,
source_path: str,
title: str,
) -> None:
ts = datetime.datetime.utcnow().isoformat()
with self._conn() as conn:
conn.execute(
"INSERT OR REPLACE INTO documents "
"(doc_id, version, checksum, timestamp, status, source_path, title) "
"VALUES (?,?,?,?,?,?,?)",
(doc_id, version, checksum, ts, status, source_path, title),
)
def get_checksum(self, doc_id: str) -> str | None:
"""Return the checksum of the highest-version record for this doc_id."""
with self._conn() as conn:
row = conn.execute(
"SELECT checksum FROM documents WHERE doc_id=? "
"ORDER BY version DESC LIMIT 1",
(doc_id,),
).fetchone()
return row[0] if row else None
def get_doc_history(self, doc_id: str) -> list[dict]:
with self._conn() as conn:
rows = conn.execute(
"SELECT doc_id, version, checksum, timestamp, status, source_path, title "
"FROM documents WHERE doc_id=? ORDER BY version DESC",
(doc_id,),
).fetchall()
cols = ["doc_id", "version", "checksum", "timestamp",
"status", "source_path", "title"]
return [dict(zip(cols, r)) for r in rows]
def get_all_doc_ids(self) -> list[str]:
with self._conn() as conn:
rows = conn.execute(
"SELECT DISTINCT doc_id FROM documents"
).fetchall()
return [r[0] for r in rows]
# ── version operations ───────────────────────────────────────────
def bump_version(self) -> int:
"""Return next version number (max existing + 1, or 1 for first run)."""
with self._conn() as conn:
row = conn.execute("SELECT MAX(version) FROM kb_versions").fetchone()
return (row[0] or 0) + 1
def log_version(
self,
version: int,
batch_name: str,
docs_added: int,
docs_changed: int,
docs_unchanged: int,
reason: str,
collection_name: str,
) -> None:
ts = datetime.datetime.utcnow().isoformat()
with self._conn() as conn:
conn.execute(
"INSERT OR REPLACE INTO kb_versions "
"(version, timestamp, batch_name, docs_added, docs_changed, "
"docs_unchanged, reason, collection_name) VALUES (?,?,?,?,?,?,?,?)",
(version, ts, batch_name, docs_added, docs_changed,
docs_unchanged, reason, collection_name),
)
def set_latest(self, version: int) -> None:
with self._conn() as conn:
conn.execute(
"INSERT OR REPLACE INTO latest_version (id, version) VALUES (1,?)",
(version,),
)
def get_latest(self) -> int | None:
with self._conn() as conn:
row = conn.execute(
"SELECT version FROM latest_version WHERE id=1"
).fetchone()
return row[0] if row else None
def get_history(self) -> list[dict]:
with self._conn() as conn:
rows = conn.execute(
"SELECT version, timestamp, batch_name, docs_added, docs_changed, "
"docs_unchanged, reason, collection_name "
"FROM kb_versions ORDER BY version DESC"
).fetchall()
cols = ["version", "timestamp", "batch_name", "docs_added",
"docs_changed", "docs_unchanged", "reason", "collection_name"]
return [dict(zip(cols, r)) for r in rows]
def get_version_info(self, version: int) -> dict | None:
with self._conn() as conn:
row = conn.execute(
"SELECT version, timestamp, batch_name, docs_added, docs_changed, "
"docs_unchanged, reason, collection_name "
"FROM kb_versions WHERE version=?",
(version,),
).fetchone()
if not row:
return None
cols = ["version", "timestamp", "batch_name", "docs_added",
"docs_changed", "docs_unchanged", "reason", "collection_name"]
return dict(zip(cols, row))
def docs_at_version(self, version: int) -> list[dict]:
"""All docs that were active at or before this version (latest entry ≀ version)."""
with self._conn() as conn:
rows = conn.execute(
"""
SELECT d.doc_id, d.version, d.checksum, d.timestamp,
d.status, d.source_path, d.title
FROM documents d
INNER JOIN (
SELECT doc_id, MAX(version) AS mv
FROM documents WHERE version <= ?
GROUP BY doc_id
) latest ON d.doc_id = latest.doc_id AND d.version = latest.mv
WHERE d.status = 'active'
ORDER BY d.doc_id
""",
(version,),
).fetchall()
cols = ["doc_id", "version", "checksum", "timestamp",
"status", "source_path", "title"]
return [dict(zip(cols, r)) for r in rows]
# ── query audit log ──────────────────────────────────────────────
def log_query(self, query: str, version_used: int, answer_hash: str) -> None:
ts = datetime.datetime.utcnow().isoformat()
with self._conn() as conn:
conn.execute(
"INSERT INTO query_log (timestamp, query, version_used, answer_hash) "
"VALUES (?,?,?,?)",
(ts, query, version_used, answer_hash),
)
def get_query_log(self, limit: int = 50) -> list[dict]:
with self._conn() as conn:
rows = conn.execute(
"SELECT timestamp, query, version_used, answer_hash "
"FROM query_log ORDER BY id DESC LIMIT ?",
(limit,),
).fetchall()
cols = ["timestamp", "query", "version_used", "answer_hash"]
return [dict(zip(cols, r)) for r in rows]