Spaces:
Sleeping
Sleeping
| """Version-aware query router with audit logging. | |
| Every query routed through this layer is: | |
| - dispatched to the correct versioned ChromaDB snapshot | |
| - logged to the query_log table (query text, version used, answer hash) | |
| This enables the "replay any query against any historical snapshot" guarantee: | |
| router.query("what is DDIM?", version=1) # before that paper was added | |
| router.query("what is DDIM?") # against latest | |
| """ | |
| from __future__ import annotations | |
| import hashlib | |
| from versioning.document_store import DocumentStore | |
| from versioning.index_manager import RAGVersionManager | |
| class VersionRouter: | |
| """Intercepts RAG queries and routes them to a specific knowledge-base version.""" | |
| def __init__(self) -> None: | |
| self.manager = RAGVersionManager() | |
| self.store = DocumentStore() | |
| # ββ querying βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def query( | |
| self, | |
| text: str, | |
| version: str | int = "latest", | |
| k: int = 12, | |
| log: bool = True, | |
| ) -> tuple[list, int]: | |
| """ | |
| Run a dense search against the specified snapshot. | |
| Returns | |
| ------- | |
| (hits: list[Hit], resolved_version: int) | |
| """ | |
| resolved = ( | |
| self.store.get_latest() | |
| if version == "latest" | |
| else int(version) | |
| ) | |
| if resolved is None: | |
| raise RuntimeError("No versioned index exists yet. Run `python ingest.py` first.") | |
| hits = self.manager.query(text, version=resolved, k=k) | |
| if log: | |
| answer_hash = hashlib.sha256( | |
| (text + str(resolved)).encode() | |
| ).hexdigest()[:16] | |
| self.store.log_query(text, resolved, answer_hash) | |
| return hits, resolved | |
| # ββ metadata helpers βββββββββββββββββββββββββββββββββββββββββββββ | |
| def current_version(self) -> int | None: | |
| return self.store.get_latest() | |
| def list_versions(self) -> list[dict]: | |
| return self.store.get_history() | |
| def version_info(self, version: str | int = "latest") -> dict | None: | |
| v = ( | |
| self.store.get_latest() | |
| if version == "latest" | |
| else int(version) | |
| ) | |
| if v is None: | |
| return None | |
| return self.store.get_version_info(v) | |
| def docs_at_version(self, version: str | int = "latest") -> list[dict]: | |
| v = ( | |
| self.store.get_latest() | |
| if version == "latest" | |
| else int(version) | |
| ) | |
| if v is None: | |
| return [] | |
| return self.store.docs_at_version(v) | |
| def get_query_log(self, limit: int = 50) -> list[dict]: | |
| return self.store.get_query_log(limit) | |
| def rollback(self, to_version: int) -> None: | |
| self.manager.rollback(to_version) | |
| def collection_exists(self, version: str | int = "latest") -> bool: | |
| return self.manager.collection_exists(version) | |