"""StructuredPipeline — builds a catalog for DB / tabular sources. Steps (per source, end-to-end): 1. introspect (caller-supplied — DatabaseIntrospector or TabularIntrospector) 2. merge (replace any existing source with the same source_id) 3. validate (catalog/validator.py) 4. upsert (catalog/store.py) LLM-driven enrichment was removed: the planner relies on stats + sample rows + column names directly. Source/table/column `description` fields stay in the model but are not populated by this pipeline. Source-type-agnostic: the caller picks the introspector. Triggers in `pipeline/triggers.py` know which one to use based on the upload event. """ from __future__ import annotations from datetime import UTC, datetime from typing import TYPE_CHECKING from src.catalog.introspect.base import BaseIntrospector from src.catalog.models import Catalog, Source from src.middlewares.logging import get_logger if TYPE_CHECKING: from src.catalog.store import CatalogStore from src.catalog.validator import CatalogValidator logger = get_logger("structured_pipeline") class StructuredPipeline: """Orchestrates introspect → merge → validate → store. Dependencies are injected (no concrete imports at class-definition time) so tests can pass mocks without constructing Settings or opening DB connections. """ def __init__( self, validator: CatalogValidator, store: CatalogStore, ) -> None: self._validator = validator self._store = store async def run( self, introspector: BaseIntrospector, location_ref: str, user_id: str, ) -> Source: source = await introspector.introspect(location_ref) merged = await self._merge_with_existing(user_id, source) self._validator.validate(merged) await self._store.upsert(merged) logger.info( "structured pipeline complete", user_id=user_id, source_id=source.source_id, source_type=source.source_type, tables=len(source.tables), ) return source async def _merge_with_existing(self, user_id: str, new_source: Source) -> Catalog: existing = await self._store.get(user_id) now = datetime.now(UTC) if existing is None: return Catalog(user_id=user_id, generated_at=now, sources=[new_source]) kept = [s for s in existing.sources if s.source_id != new_source.source_id] return existing.model_copy( update={"sources": [*kept, new_source]} ) def default_structured_pipeline() -> StructuredPipeline: """Build the production pipeline with default deps. Lazy imports keep `from src.pipeline.structured_pipeline import …` cheap and side-effect-free for tests. """ from src.catalog.store import CatalogStore from src.catalog.validator import CatalogValidator return StructuredPipeline( validator=CatalogValidator(), store=CatalogStore(), )