| """StructuredPipeline β builds a catalog for DB / tabular sources. |
| |
| Steps (per source, end-to-end): |
| 1. introspect (caller-supplied β DatabaseIntrospector or TabularIntrospector) |
| 2. merge (replace any existing source with the same source_id) |
| 3. validate (catalog/validator.py) |
| 4. upsert (catalog/store.py) |
| |
| LLM-driven enrichment was removed: the planner relies on stats + sample |
| rows + column names directly. Source/table/column `description` fields stay |
| in the model but are not populated by this pipeline. |
| |
| Source-type-agnostic: the caller picks the introspector. Triggers in |
| `pipeline/triggers.py` know which one to use based on the upload event. |
| """ |
|
|
| from __future__ import annotations |
|
|
| from datetime import UTC, datetime |
| from typing import TYPE_CHECKING |
|
|
| from src.catalog.introspect.base import BaseIntrospector |
| from src.catalog.models import Catalog, Source |
| from src.middlewares.logging import get_logger |
|
|
| if TYPE_CHECKING: |
| from src.catalog.store import CatalogStore |
| from src.catalog.validator import CatalogValidator |
|
|
| logger = get_logger("structured_pipeline") |
|
|
|
|
| class StructuredPipeline: |
| """Orchestrates introspect β merge β validate β store. |
| |
| Dependencies are injected (no concrete imports at class-definition time) |
| so tests can pass mocks without constructing Settings or opening DB |
| connections. |
| """ |
|
|
| def __init__( |
| self, |
| validator: CatalogValidator, |
| store: CatalogStore, |
| ) -> None: |
| self._validator = validator |
| self._store = store |
|
|
| async def run( |
| self, |
| introspector: BaseIntrospector, |
| location_ref: str, |
| user_id: str, |
| ) -> Source: |
| source = await introspector.introspect(location_ref) |
| merged = await self._merge_with_existing(user_id, source) |
| self._validator.validate(merged) |
| await self._store.upsert(merged) |
| logger.info( |
| "structured pipeline complete", |
| user_id=user_id, |
| source_id=source.source_id, |
| source_type=source.source_type, |
| tables=len(source.tables), |
| ) |
| return source |
|
|
| async def _merge_with_existing(self, user_id: str, new_source: Source) -> Catalog: |
| existing = await self._store.get(user_id) |
| now = datetime.now(UTC) |
| if existing is None: |
| return Catalog(user_id=user_id, generated_at=now, sources=[new_source]) |
| kept = [s for s in existing.sources if s.source_id != new_source.source_id] |
| return existing.model_copy( |
| update={"sources": [*kept, new_source]} |
| ) |
|
|
|
|
| def default_structured_pipeline() -> StructuredPipeline: |
| """Build the production pipeline with default deps. |
| |
| Lazy imports keep `from src.pipeline.structured_pipeline import β¦` cheap |
| and side-effect-free for tests. |
| """ |
| from src.catalog.store import CatalogStore |
| from src.catalog.validator import CatalogValidator |
|
|
| return StructuredPipeline( |
| validator=CatalogValidator(), |
| store=CatalogStore(), |
| ) |
|
|