"""CatalogReader — loads + filters catalog by source_hint. For typical users (≤50 tables), returns the FULL catalog with no slicing. Catalog-level search is added later if catalog grows past the limit. """ from datetime import UTC, datetime from typing import Literal from .models import Catalog from .store import CatalogStore SourceHint = Literal["chat", "unstructured", "structured"] class CatalogReader: """Loads the user's catalog and filters by source_hint. On miss, returns an empty Catalog (never raises) — query path is responsible for handling "no data registered yet" gracefully. Returned Catalog is always a copy; the underlying stored catalog is never mutated. """ def __init__(self, store: CatalogStore) -> None: self._store = store async def read(self, user_id: str, source_hint: SourceHint) -> Catalog: catalog = await self._store.get(user_id) if catalog is None: return Catalog(user_id=user_id, generated_at=datetime.now(UTC)) if source_hint == "chat": filtered: list = [] elif source_hint == "structured": filtered = [s for s in catalog.sources if s.source_type in {"schema", "tabular"}] else: # "unstructured" filtered = [s for s in catalog.sources if s.source_type == "unstructured"] return catalog.model_copy(update={"sources": filtered}) class MemoizingCatalogReader(CatalogReader): """Request-scoped CatalogReader that caches each ``read`` by source_hint. One per request. The same per-user catalog is otherwise fetched from the catalog DB 4-5x during a single slow-path run (planner load, then check_data's structured read + check_knowledge's unstructured read, then retrieve_data's structured read). Wrapping the base reader collapses those to one round-trip per distinct source_hint and pins a single consistent snapshot for the whole request (plan-time and execution-time catalogs can no longer diverge). """ def __init__(self, inner: CatalogReader) -> None: # `read` is fully overridden below and delegates to `inner`, so the parent's # `_store` is never used — carry it through only so this stays a real # CatalogReader (any inner with a `read` works, including test fakes). super().__init__(getattr(inner, "_store", None)) self._inner = inner self._cache: dict[SourceHint, Catalog] = {} async def read(self, user_id: str, source_hint: SourceHint) -> Catalog: cached = self._cache.get(source_hint) if cached is None: cached = await self._inner.read(user_id, source_hint) self._cache[source_hint] = cached return cached