File size: 4,196 Bytes
2334370 f3f0c98 472833f 72194b1 2334370 472833f 2334370 b9f0345 b143861 e52a29f c61c488 e52a29f 472833f 1931298 472833f 1931298 472833f 38447a4 472833f c695b18 72194b1 5fb9e55 472833f 0bd628a aab8f97 72194b1 472833f 1931298 472833f 1931298 472833f 1931298 472833f 1931298 472833f 1931298 aae0618 98106cb 409b44f b9f0345 98106cb 409b44f 84ca561 7b246f4 3ee7700 6b15a3d b143861 935eaff 1931298 472833f 1931298 472833f 1931298 472833f 1931298 472833f 1931298 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 | from fastapi import APIRouter, Query, Depends, HTTPException
from typing import Dict, Any, List, Union
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from app.deps import verify_api_key
from app.db import bootstrap
from app.mapper import canonify_df
import pandas as pd
import json
from datetime import datetime
from app.core.event_hub import event_hub
import logging
logger = logging.getLogger(__name__)
router = APIRouter(tags=["datasources"])
# =======================================================================
# 2οΈβ£ SMART JSON ENDPOINT β fully schema-agnostic and multi-table aware
# =======================================================================
# app/routers/datasources.py
class JsonPayload(BaseModel):
config: Dict[str, Any]
data: Union[List[Any], Dict[str, Any]] # Flexible: list or { "tables": {...} }
@router.post("/json")
async def create_source_json(
payload: JsonPayload,
orgId: str = Query(...), # β
From Vercel
sourceId: str = Query(...), # β
From Vercel
type: str = Query(...), # β
From Vercel
_: str = Depends(verify_api_key),
):
org_id = orgId
source_id = sourceId
"""
Enterprise ingestion endpoint:
- Stores raw audit trail
- Normalizes to canonical schema
- Auto-detects industry
- Broadcasts real-time updates
- Returns comprehensive metadata
"""
try:
# β
Validate payload
if not payload or not payload.data:
raise HTTPException(
status_code=400,
detail="Missing payload.data. Expected list or dict."
)
# 1. πΎ Store raw data for audit & lineage
bootstrap(orgId, payload.data)
print(f"[api/json] β
Raw data stored for org: {orgId}")
industry_task = {
"id": f"detect_industry:{org_id}:{source_id}:{int(datetime.now().timestamp())}",
"function": "detect_industry",
"args": {"org_id": org_id, "source_id": source_id}
}
event_hub.lpush("python:task_queue", json.dumps(industry_task))
# Entity will be auto-queued by process_detect_industry()
df, industry, confidence = canonify_df(org_id, source_id)
# Convert DataFrame to JSON-safe format
preview_df = df.head(3).copy()
for col in preview_df.columns:
if pd.api.types.is_datetime64_any_dtype(preview_df[col]):
preview_df[col] = preview_df[col].dt.strftime('%Y-%m-%d %H:%M:%S')
elif pd.api.types.is_timedelta64_dtype(preview_df[col]):
preview_df[col] = preview_df[col].astype(str)
preview_rows = preview_df.to_dict("records") if not preview_df.empty else []
# 5. β
Return comprehensive response
return JSONResponse(
status_code=200,
content={
"id": sourceId,
"status": "processed",
"industry": industry,
"confidence": round(confidence, 4),
"recentRows": preview_rows,
"message": "β
Data ingested and normalized successfully",
"rowsProcessed": len(df),
"schemaColumns": list(df.columns) if not df.empty else [],
"processingTimeMs": 0, # You can add timing if needed
}
)
except HTTPException:
raise # Re-raise FastAPI errors as-is
except pd.errors.EmptyDataError:
print(f"[api/json] β οΈ Empty data for org: {orgId}")
return JSONResponse(
status_code=200, # Not an error - just no data
content={
"id": sourceId,
"status": "no_data",
"industry": "unknown",
"confidence": 0.0,
"message": "β οΈ No valid data rows found",
"rowsProcessed": 0,
}
)
except Exception as e:
print(f"[api/json] β Unexpected error: {e}")
raise HTTPException(
status_code=500,
detail=f"Ingestion pipeline failed: {str(e)}"
) |