sofhiaazzhr's picture
[NOTICKET] Adopt verb-first skill naming
2d6406d
Raw
History Blame
2.75 kB
"""CatalogReader — loads + filters catalog by source_hint.
For typical users (≤50 tables), returns the FULL catalog with no slicing.
Catalog-level search is added later if catalog grows past the limit.
"""
from datetime import UTC, datetime
from typing import Literal
from .models import Catalog
from .store import CatalogStore
SourceHint = Literal["chat", "unstructured", "structured"]
class CatalogReader:
"""Loads the user's catalog and filters by source_hint.
On miss, returns an empty Catalog (never raises) — query path is
responsible for handling "no data registered yet" gracefully.
Returned Catalog is always a copy; the underlying stored catalog
is never mutated.
"""
def __init__(self, store: CatalogStore) -> None:
self._store = store
async def read(self, user_id: str, source_hint: SourceHint) -> Catalog:
catalog = await self._store.get(user_id)
if catalog is None:
return Catalog(user_id=user_id, generated_at=datetime.now(UTC))
if source_hint == "chat":
filtered: list = []
elif source_hint == "structured":
filtered = [s for s in catalog.sources if s.source_type in {"schema", "tabular"}]
else: # "unstructured"
filtered = [s for s in catalog.sources if s.source_type == "unstructured"]
return catalog.model_copy(update={"sources": filtered})
class MemoizingCatalogReader(CatalogReader):
"""Request-scoped CatalogReader that caches each ``read`` by source_hint.
One per request. The same per-user catalog is otherwise fetched from the
catalog DB 4-5x during a single slow-path run (planner load, then
check_data's structured read + check_knowledge's unstructured read, then
retrieve_data's structured read). Wrapping the base reader collapses those
to one round-trip
per distinct source_hint and pins a single consistent snapshot for the whole
request (plan-time and execution-time catalogs can no longer diverge).
"""
def __init__(self, inner: CatalogReader) -> None:
# `read` is fully overridden below and delegates to `inner`, so the parent's
# `_store` is never used — carry it through only so this stays a real
# CatalogReader (any inner with a `read` works, including test fakes).
super().__init__(getattr(inner, "_store", None))
self._inner = inner
self._cache: dict[SourceHint, Catalog] = {}
async def read(self, user_id: str, source_hint: SourceHint) -> Catalog:
cached = self._cache.get(source_hint)
if cached is None:
cached = await self._inner.read(user_id, source_hint)
self._cache[source_hint] = cached
return cached