File size: 3,613 Bytes
6bff5d9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
"""API endpoints for the per-user data catalog index.

The index is a lightweight summary of every structured source registered
by a user (DB connections and tabular files). It is intended to be
consumed by the catalog refresher and by frontend listings — full
catalog payloads (tables + columns + samples + stats) are not exposed
here on purpose.
"""

from typing import List

from fastapi import APIRouter, HTTPException, Query, status

from src.catalog.store import CatalogStore
from src.middlewares.logging import get_logger, log_execution
from src.models.api.catalog import CatalogIndexEntry
from src.pipeline.triggers import on_catalog_rebuild_requested

logger = get_logger("data_catalog_api")

router = APIRouter(prefix="/api/v1", tags=["Data Catalog"])


@router.get(
    "/data-catalog/{user_id}",
    response_model=List[CatalogIndexEntry],
    summary="List the user's data catalog index",
    response_description="One entry per registered structured source.",
    responses={
        200: {"description": "Returns an empty list if the user has no registered sources."},
        500: {"description": "Internal server error while reading the catalog."},
    },
)
@log_execution(logger)
async def list_data_catalog_index(user_id: str):
    """
    Return a lightweight index of every structured source registered by the user.

    One entry per source (DB connection or tabular file), including the
    `source_id`, `source_type`, display `name`, `location_ref`, current
    `table_count`, and `updated_at` timestamp.

    Used by the catalog refresher to decide which sources need to be
    rebuilt. Returns an empty list if the user has no catalog yet.
    """
    try:
        catalog = await CatalogStore().get(user_id)
    except Exception as e:
        logger.error("Failed to read catalog index", user_id=user_id, error=str(e))
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"Failed to read catalog index: {e}",
        )

    if catalog is None:
        return []

    return [
        CatalogIndexEntry(
            source_id=s.source_id,
            source_type=s.source_type,
            name=s.name,
            location_ref=s.location_ref,
            table_count=len(s.tables),
            updated_at=s.updated_at,
        )
        for s in catalog.sources
    ]


@router.post(
    "/data-catalog/rebuild",
    status_code=status.HTTP_200_OK,
    summary="Rebuild the catalog for a user",
    response_description="Confirmation that the rebuild was triggered.",
    responses={
        200: {"description": "Rebuild completed. Per-source errors are logged but do not fail this request."},
        500: {"description": "Unexpected error before the rebuild loop started."},
    },
)
@log_execution(logger)
async def rebuild_data_catalog(
    user_id: str = Query(..., description="ID of the user whose catalog should be rebuilt."),
):
    """
    Re-introspect every source in the user's catalog and upsert the results.

    Each source (DB connection or tabular file) is processed independently.
    A failure on one source is logged but does not abort the remaining sources.
    If the user has no catalog yet, returns success with no-op.
    """
    try:
        await on_catalog_rebuild_requested(user_id)
    except Exception as e:
        logger.error("catalog rebuild failed", user_id=user_id, error=str(e))
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"Catalog rebuild failed: {e}",
        )
    return {"status": "success", "user_id": user_id}