Commit ·
8b4794c
1
Parent(s): 887c1c4
[KM-565] sync catalog on document and db client deletion
Browse files- add CatalogStore.remove_source() to drop a single source by source_id
- add on_tabular_deleted / on_db_deleted triggers
- call triggers from delete endpoints so catalog stays in sync
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- src/api/v1/db_client.py +2 -0
- src/api/v1/document.py +2 -0
- src/catalog/store.py +12 -0
- src/pipeline/triggers.py +16 -0
src/api/v1/db_client.py
CHANGED
|
@@ -406,6 +406,8 @@ async def delete_database_client(
|
|
| 406 |
raise HTTPException(status_code=403, detail="Access denied")
|
| 407 |
|
| 408 |
await database_client_service.delete(db, client_id)
|
|
|
|
|
|
|
| 409 |
return {"status": "success", "message": "Database client deleted successfully"}
|
| 410 |
|
| 411 |
|
|
|
|
| 406 |
raise HTTPException(status_code=403, detail="Access denied")
|
| 407 |
|
| 408 |
await database_client_service.delete(db, client_id)
|
| 409 |
+
from src.pipeline.triggers import on_db_deleted
|
| 410 |
+
await on_db_deleted(client_id, user_id)
|
| 411 |
return {"status": "success", "message": "Database client deleted successfully"}
|
| 412 |
|
| 413 |
|
src/api/v1/document.py
CHANGED
|
@@ -92,6 +92,8 @@ async def delete_document(
|
|
| 92 |
):
|
| 93 |
"""Delete a document."""
|
| 94 |
await document_pipeline.delete(document_id, user_id, db)
|
|
|
|
|
|
|
| 95 |
return {"status": "success", "message": "Document deleted successfully"}
|
| 96 |
|
| 97 |
|
|
|
|
| 92 |
):
|
| 93 |
"""Delete a document."""
|
| 94 |
await document_pipeline.delete(document_id, user_id, db)
|
| 95 |
+
from src.pipeline.triggers import on_tabular_deleted
|
| 96 |
+
await on_tabular_deleted(document_id, user_id)
|
| 97 |
return {"status": "success", "message": "Document deleted successfully"}
|
| 98 |
|
| 99 |
|
src/catalog/store.py
CHANGED
|
@@ -63,6 +63,18 @@ class CatalogStore:
|
|
| 63 |
sources=len(catalog.sources),
|
| 64 |
)
|
| 65 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 66 |
async def delete(self, user_id: str) -> None:
|
| 67 |
async with AsyncSessionLocal() as session:
|
| 68 |
await session.execute(delete(CatalogRow).where(CatalogRow.user_id == user_id))
|
|
|
|
| 63 |
sources=len(catalog.sources),
|
| 64 |
)
|
| 65 |
|
| 66 |
+
async def remove_source(self, user_id: str, source_id: str) -> None:
|
| 67 |
+
existing = await self.get(user_id)
|
| 68 |
+
if existing is None:
|
| 69 |
+
logger.info("remove_source: no catalog found", user_id=user_id, source_id=source_id)
|
| 70 |
+
return
|
| 71 |
+
filtered = [s for s in existing.sources if s.source_id != source_id]
|
| 72 |
+
if len(filtered) == len(existing.sources):
|
| 73 |
+
logger.info("remove_source: source not in catalog", user_id=user_id, source_id=source_id)
|
| 74 |
+
return
|
| 75 |
+
await self.upsert(existing.model_copy(update={"sources": filtered}))
|
| 76 |
+
logger.info("remove_source: source removed", user_id=user_id, source_id=source_id)
|
| 77 |
+
|
| 78 |
async def delete(self, user_id: str) -> None:
|
| 79 |
async with AsyncSessionLocal() as session:
|
| 80 |
await session.execute(delete(CatalogRow).where(CatalogRow.user_id == user_id))
|
src/pipeline/triggers.py
CHANGED
|
@@ -69,6 +69,22 @@ async def on_document_uploaded(document_id: str, user_id: str) -> None:
|
|
| 69 |
await document_pipeline.process(document_id, user_id, db)
|
| 70 |
|
| 71 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 72 |
async def on_catalog_rebuild_requested(user_id: str) -> None:
|
| 73 |
"""Re-introspect every source in the user's catalog and upsert the result.
|
| 74 |
|
|
|
|
| 69 |
await document_pipeline.process(document_id, user_id, db)
|
| 70 |
|
| 71 |
|
| 72 |
+
async def on_tabular_deleted(document_id: str, user_id: str) -> None:
|
| 73 |
+
"""Remove a tabular source from the user's catalog when its document is deleted."""
|
| 74 |
+
from src.catalog.store import CatalogStore
|
| 75 |
+
|
| 76 |
+
logger.info("on_tabular_deleted triggered", user_id=user_id, document_id=document_id)
|
| 77 |
+
await CatalogStore().remove_source(user_id, source_id=document_id)
|
| 78 |
+
|
| 79 |
+
|
| 80 |
+
async def on_db_deleted(client_id: str, user_id: str) -> None:
|
| 81 |
+
"""Remove a schema source from the user's catalog when its DB client is deleted."""
|
| 82 |
+
from src.catalog.store import CatalogStore
|
| 83 |
+
|
| 84 |
+
logger.info("on_db_deleted triggered", user_id=user_id, client_id=client_id)
|
| 85 |
+
await CatalogStore().remove_source(user_id, source_id=client_id)
|
| 86 |
+
|
| 87 |
+
|
| 88 |
async def on_catalog_rebuild_requested(user_id: str) -> None:
|
| 89 |
"""Re-introspect every source in the user's catalog and upsert the result.
|
| 90 |
|