Agentic-Service-Data-Eyond-Catalog / src /pipeline /structured_pipeline.py
ishaq101's picture
feat/Catalog Retrieval System (#1)
6bff5d9
"""StructuredPipeline β€” builds a catalog for DB / tabular sources.
Steps (per source, end-to-end):
1. introspect (caller-supplied β€” DatabaseIntrospector or TabularIntrospector)
2. merge (replace any existing source with the same source_id)
3. validate (catalog/validator.py)
4. upsert (catalog/store.py)
LLM-driven enrichment was removed: the planner relies on stats + sample
rows + column names directly. Source/table/column `description` fields stay
in the model but are not populated by this pipeline.
Source-type-agnostic: the caller picks the introspector. Triggers in
`pipeline/triggers.py` know which one to use based on the upload event.
"""
from __future__ import annotations
from datetime import UTC, datetime
from typing import TYPE_CHECKING
from src.catalog.introspect.base import BaseIntrospector
from src.catalog.models import Catalog, Source
from src.middlewares.logging import get_logger
if TYPE_CHECKING:
from src.catalog.store import CatalogStore
from src.catalog.validator import CatalogValidator
logger = get_logger("structured_pipeline")
class StructuredPipeline:
"""Orchestrates introspect β†’ merge β†’ validate β†’ store.
Dependencies are injected (no concrete imports at class-definition time)
so tests can pass mocks without constructing Settings or opening DB
connections.
"""
def __init__(
self,
validator: CatalogValidator,
store: CatalogStore,
) -> None:
self._validator = validator
self._store = store
async def run(
self,
introspector: BaseIntrospector,
location_ref: str,
user_id: str,
) -> Source:
source = await introspector.introspect(location_ref)
merged = await self._merge_with_existing(user_id, source)
self._validator.validate(merged)
await self._store.upsert(merged)
logger.info(
"structured pipeline complete",
user_id=user_id,
source_id=source.source_id,
source_type=source.source_type,
tables=len(source.tables),
)
return source
async def _merge_with_existing(self, user_id: str, new_source: Source) -> Catalog:
existing = await self._store.get(user_id)
now = datetime.now(UTC)
if existing is None:
return Catalog(user_id=user_id, generated_at=now, sources=[new_source])
kept = [s for s in existing.sources if s.source_id != new_source.source_id]
return existing.model_copy(
update={"sources": [*kept, new_source]}
)
def default_structured_pipeline() -> StructuredPipeline:
"""Build the production pipeline with default deps.
Lazy imports keep `from src.pipeline.structured_pipeline import …` cheap
and side-effect-free for tests.
"""
from src.catalog.store import CatalogStore
from src.catalog.validator import CatalogValidator
return StructuredPipeline(
validator=CatalogValidator(),
store=CatalogStore(),
)