Spaces:
Paused
Paused
File size: 4,196 Bytes
98cacb0 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 | from fastapi import APIRouter, Query, Depends, HTTPException
from typing import Dict, Any, List, Union
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from app.deps import verify_api_key
from app.db import bootstrap
from app.mapper import canonify_df
import pandas as pd
import json
from datetime import datetime
from app.core.event_hub import event_hub
import logging
logger = logging.getLogger(__name__)
router = APIRouter(tags=["datasources"])
# =======================================================================
# 2οΈβ£ SMART JSON ENDPOINT β fully schema-agnostic and multi-table aware
# =======================================================================
# app/routers/datasources.py
class JsonPayload(BaseModel):
config: Dict[str, Any]
data: Union[List[Any], Dict[str, Any]] # Flexible: list or { "tables": {...} }
@router.post("/json")
async def create_source_json(
payload: JsonPayload,
orgId: str = Query(...), # β
From Vercel
sourceId: str = Query(...), # β
From Vercel
type: str = Query(...), # β
From Vercel
_: str = Depends(verify_api_key),
):
org_id = orgId
source_id = sourceId
"""
Enterprise ingestion endpoint:
- Stores raw audit trail
- Normalizes to canonical schema
- Auto-detects industry
- Broadcasts real-time updates
- Returns comprehensive metadata
"""
try:
# β
Validate payload
if not payload or not payload.data:
raise HTTPException(
status_code=400,
detail="Missing payload.data. Expected list or dict."
)
# 1. πΎ Store raw data for audit & lineage
bootstrap(orgId, payload.data)
print(f"[api/json] β
Raw data stored for org: {orgId}")
industry_task = {
"id": f"detect_industry:{org_id}:{source_id}:{int(datetime.now().timestamp())}",
"function": "detect_industry",
"args": {"org_id": org_id, "source_id": source_id}
}
event_hub.lpush("python:task_queue", json.dumps(industry_task))
# Entity will be auto-queued by process_detect_industry()
df, industry, confidence = canonify_df(org_id, source_id)
# Convert DataFrame to JSON-safe format
preview_df = df.head(3).copy()
for col in preview_df.columns:
if pd.api.types.is_datetime64_any_dtype(preview_df[col]):
preview_df[col] = preview_df[col].dt.strftime('%Y-%m-%d %H:%M:%S')
elif pd.api.types.is_timedelta64_dtype(preview_df[col]):
preview_df[col] = preview_df[col].astype(str)
preview_rows = preview_df.to_dict("records") if not preview_df.empty else []
# 5. β
Return comprehensive response
return JSONResponse(
status_code=200,
content={
"id": sourceId,
"status": "processed",
"industry": industry,
"confidence": round(confidence, 4),
"recentRows": preview_rows,
"message": "β
Data ingested and normalized successfully",
"rowsProcessed": len(df),
"schemaColumns": list(df.columns) if not df.empty else [],
"processingTimeMs": 0, # You can add timing if needed
}
)
except HTTPException:
raise # Re-raise FastAPI errors as-is
except pd.errors.EmptyDataError:
print(f"[api/json] β οΈ Empty data for org: {orgId}")
return JSONResponse(
status_code=200, # Not an error - just no data
content={
"id": sourceId,
"status": "no_data",
"industry": "unknown",
"confidence": 0.0,
"message": "β οΈ No valid data rows found",
"rowsProcessed": 0,
}
)
except Exception as e:
print(f"[api/json] β Unexpected error: {e}")
raise HTTPException(
status_code=500,
detail=f"Ingestion pipeline failed: {str(e)}"
) |