File size: 4,196 Bytes
2334370
f3f0c98
472833f
 
72194b1
2334370
472833f
 
 
2334370
b9f0345
b143861
e52a29f
c61c488
e52a29f
472833f
 
 
 
 
 
1931298
 
472833f
 
1931298
472833f
38447a4
472833f
 
c695b18
 
 
72194b1
5fb9e55
472833f
0bd628a
aab8f97
 
72194b1
472833f
1931298
 
 
 
 
 
472833f
 
1931298
472833f
1931298
 
 
 
472833f
1931298
472833f
1931298
aae0618
98106cb
 
 
 
409b44f
b9f0345
98106cb
409b44f
84ca561
7b246f4
3ee7700
 
 
 
 
 
 
 
 
6b15a3d
b143861
935eaff
1931298
472833f
1931298
472833f
 
 
 
1931298
 
 
 
 
 
472833f
 
 
1931298
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
472833f
1931298
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
from fastapi import APIRouter, Query, Depends, HTTPException
from typing import Dict, Any, List, Union 
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from app.deps import verify_api_key
from app.db import bootstrap
from app.mapper import canonify_df
import pandas as pd
import json
from datetime import datetime
from app.core.event_hub import event_hub
import logging
logger = logging.getLogger(__name__)  

router = APIRouter(tags=["datasources"])  



# =======================================================================
# 2️⃣  SMART JSON ENDPOINT – fully schema-agnostic and multi-table aware
# =======================================================================
# app/routers/datasources.py

class JsonPayload(BaseModel):
    config: Dict[str, Any]
    data: Union[List[Any], Dict[str, Any]]  # Flexible: list or { "tables": {...} }

@router.post("/json")
async def create_source_json(
    payload: JsonPayload,
    orgId: str = Query(...),      # βœ… From Vercel
    sourceId: str = Query(...),   # βœ… From Vercel
    type: str = Query(...),       # βœ… From Vercel

    _: str = Depends(verify_api_key),
):
    
    org_id = orgId
    source_id = sourceId
    
    """
    Enterprise ingestion endpoint:
    - Stores raw audit trail
    - Normalizes to canonical schema
    - Auto-detects industry
    - Broadcasts real-time updates
    - Returns comprehensive metadata
    """
    try:
        # βœ… Validate payload
        if not payload or not payload.data:
            raise HTTPException(
                status_code=400, 
                detail="Missing payload.data. Expected list or dict."
            )

        # 1. πŸ’Ύ Store raw data for audit & lineage
        bootstrap(orgId, payload.data)
        print(f"[api/json] βœ… Raw data stored for org: {orgId}")
       
        industry_task = {
            "id": f"detect_industry:{org_id}:{source_id}:{int(datetime.now().timestamp())}",
            "function": "detect_industry",
             "args": {"org_id": org_id, "source_id": source_id}
        }
        event_hub.lpush("python:task_queue", json.dumps(industry_task))
        #  Entity will be auto-queued by process_detect_industry()
       
        df, industry, confidence = canonify_df(org_id, source_id)
        
        # Convert DataFrame to JSON-safe format
        preview_df = df.head(3).copy()
        for col in preview_df.columns:
           if pd.api.types.is_datetime64_any_dtype(preview_df[col]):
                preview_df[col] = preview_df[col].dt.strftime('%Y-%m-%d %H:%M:%S')
           elif pd.api.types.is_timedelta64_dtype(preview_df[col]):
               preview_df[col] = preview_df[col].astype(str)

        preview_rows = preview_df.to_dict("records") if not preview_df.empty else []

        
        
        # 5. βœ… Return comprehensive response
        return JSONResponse(
            status_code=200,
            content={
                "id": sourceId,
                "status": "processed",
                "industry": industry,
                "confidence": round(confidence, 4),
                "recentRows": preview_rows,
                "message": "βœ… Data ingested and normalized successfully",
                "rowsProcessed": len(df),
                "schemaColumns": list(df.columns) if not df.empty else [],
                "processingTimeMs": 0, # You can add timing if needed
            }
        )

    except HTTPException:
        raise  # Re-raise FastAPI errors as-is
        
    except pd.errors.EmptyDataError:
        print(f"[api/json] ⚠️ Empty data for org: {orgId}")
        return JSONResponse(
            status_code=200,  # Not an error - just no data
            content={
                "id": sourceId,
                "status": "no_data",
                "industry": "unknown",
                "confidence": 0.0,
                "message": "⚠️ No valid data rows found",
                "rowsProcessed": 0,
            }
        )
        
    except Exception as e:
        print(f"[api/json] ❌ Unexpected error: {e}")
        raise HTTPException(
            status_code=500, 
            detail=f"Ingestion pipeline failed: {str(e)}"
        )