[KM-556][KM-562] fix tabular catalog pipeline: trigger wiring, parquet upload, lazy Azure init, backfill script
59b14af | """Backfill catalogs for existing users. | |
| One-off script. For each user that already has registered DB connections or | |
| uploaded tabular files, run the structured pipeline to build their catalog. | |
| Run once against the live DB after deploying this branch to populate catalog | |
| rows for data registered before the catalog pipeline landed. | |
| Note: enrich_all_sources.py is not needed β LLM enrichment was removed in | |
| KM-557. The pipeline is now introspect β merge β validate β upsert. | |
| Usage: | |
| uv run python scripts/build_initial_catalogs.py [--user-id USER_ID] | |
| """ | |
| import asyncio | |
| import sys | |
| import os | |
| sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| from sqlalchemy import select | |
| from src.db.postgres.connection import AsyncSessionLocal | |
| from src.db.postgres.models import DatabaseClient, Document | |
| from src.pipeline.triggers import on_db_registered, on_tabular_uploaded | |
| async def main() -> None: | |
| user_id_filter = None | |
| if "--user-id" in sys.argv: | |
| idx = sys.argv.index("--user-id") | |
| user_id_filter = sys.argv[idx + 1] | |
| print(f"Filtering to user_id: {user_id_filter}") | |
| async with AsyncSessionLocal() as db: | |
| # ββ 1. DB clients ββββββββββββββββββββββββββββββββββββββββββββββ | |
| query = select(DatabaseClient).where(DatabaseClient.status == "active") | |
| if user_id_filter: | |
| query = query.where(DatabaseClient.user_id == user_id_filter) | |
| result = await db.execute(query) | |
| db_clients = result.scalars().all() | |
| print(f"\nFound {len(db_clients)} active DB client(s)") | |
| for client in db_clients: | |
| try: | |
| await on_db_registered(client.id, client.user_id) | |
| print(f" β db_client {client.id} ({client.name})") | |
| except Exception as e: | |
| print(f" β db_client {client.id} ({client.name}): {e}") | |
| # ββ 2. Tabular files βββββββββββββββββββββββββββββββββββββββββββ | |
| query = select(Document).where( | |
| Document.file_type.in_(["csv", "xlsx"]), | |
| Document.status == "completed", | |
| ) | |
| if user_id_filter: | |
| query = query.where(Document.user_id == user_id_filter) | |
| result = await db.execute(query) | |
| docs = result.scalars().all() | |
| print(f"\nFound {len(docs)} completed tabular file(s)") | |
| for doc in docs: | |
| try: | |
| await on_tabular_uploaded(doc.id, doc.user_id) | |
| print(f" β {doc.file_type} {doc.id} ({doc.filename})") | |
| except Exception as e: | |
| print(f" β {doc.file_type} {doc.id} ({doc.filename}): {e}") | |
| print("\nDone.") | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |