| """CatalogReader — loads + filters catalog by source_hint. |
| |
| For typical users (≤50 tables), returns the FULL catalog with no slicing. |
| Catalog-level search is added later if catalog grows past the limit. |
| """ |
|
|
| from datetime import UTC, datetime |
| from typing import Literal |
|
|
| from .models import Catalog |
| from .store import CatalogStore |
|
|
| SourceHint = Literal["chat", "unstructured", "structured"] |
|
|
|
|
| class CatalogReader: |
| """Loads the user's catalog and filters by source_hint. |
| |
| On miss, returns an empty Catalog (never raises) — query path is |
| responsible for handling "no data registered yet" gracefully. |
| Returned Catalog is always a copy; the underlying stored catalog |
| is never mutated. |
| """ |
|
|
| def __init__(self, store: CatalogStore) -> None: |
| self._store = store |
|
|
| async def read(self, user_id: str, source_hint: SourceHint) -> Catalog: |
| catalog = await self._store.get(user_id) |
| if catalog is None: |
| return Catalog(user_id=user_id, generated_at=datetime.now(UTC)) |
|
|
| if source_hint == "chat": |
| filtered: list = [] |
| elif source_hint == "structured": |
| filtered = [s for s in catalog.sources if s.source_type in {"schema", "tabular"}] |
| else: |
| filtered = [s for s in catalog.sources if s.source_type == "unstructured"] |
|
|
| return catalog.model_copy(update={"sources": filtered}) |
|
|
|
|
| class MemoizingCatalogReader(CatalogReader): |
| """Request-scoped CatalogReader that caches each ``read`` by source_hint. |
| |
| One per request. The same per-user catalog is otherwise fetched from the |
| catalog DB 4-5x during a single slow-path run (planner load, then |
| check_data's structured read + check_knowledge's unstructured read, then |
| retrieve_data's structured read). Wrapping the base reader collapses those |
| to one round-trip |
| per distinct source_hint and pins a single consistent snapshot for the whole |
| request (plan-time and execution-time catalogs can no longer diverge). |
| """ |
|
|
| def __init__(self, inner: CatalogReader) -> None: |
| |
| |
| |
| super().__init__(getattr(inner, "_store", None)) |
| self._inner = inner |
| self._cache: dict[SourceHint, Catalog] = {} |
|
|
| async def read(self, user_id: str, source_hint: SourceHint) -> Catalog: |
| cached = self._cache.get(source_hint) |
| if cached is None: |
| cached = await self._inner.read(user_id, source_hint) |
| self._cache[source_hint] = cached |
| return cached |
|
|