rohitdeshmukh318's picture
new version
c5f9c5f
"""
api/routers/upload.py
POST /api/upload/file – upload CSV or SQLite, store in Supabase, ingest schema
"""
import io
from typing import Literal
from fastapi import APIRouter, File, Form, HTTPException, UploadFile
from pydantic import BaseModel
from storage.supabase_storage import upload_file
from schema.ingestor import ingest_schema
router = APIRouter()
ALLOWED_TYPES = {
"text/csv": ("csv", "text/csv"),
"application/x-sqlite3": ("sqlite", "application/x-sqlite3"),
}
# Extensions that map to sqlite even with a generic content-type
_SQLITE_EXTENSIONS = (".sqlite", ".db")
_CSV_EXTENSIONS = (".csv",)
MAX_SIZE_BYTES = 50 * 1024 * 1024 # 50 MB
class UploadResponse(BaseModel):
connector_id: str
public_url: str
file_type: str
tables_ingested: int
@router.post("/file", response_model=UploadResponse)
async def upload_data_file(
file: UploadFile = File(...),
user_id: str = Form(default="anonymous"),
):
content_type = file.content_type or "application/octet-stream"
fname = (file.filename or "").lower()
# Override content-type by extension (browsers often send generic type)
if fname.endswith(".csv"):
content_type = "text/csv"
elif fname.endswith(".sqlite") or fname.endswith(".db"):
content_type = "application/x-sqlite3"
if content_type not in ALLOWED_TYPES:
raise HTTPException(
status_code=415,
detail=f"Unsupported file type: {content_type}. Supported: CSV, SQLite",
)
file_bytes = await file.read()
if len(file_bytes) > MAX_SIZE_BYTES:
raise HTTPException(status_code=413, detail="File exceeds 50 MB limit")
file_kind, mime = ALLOWED_TYPES[content_type]
# Upload to Supabase Storage
public_url = upload_file(
file_bytes=file_bytes,
filename=fname or f"upload.{file_kind}",
content_type=mime,
)
connector_id = f"{file_kind}:{public_url}"
# Ingest schema into pgvector
try:
n = ingest_schema(connector_id)
except Exception as exc:
raise HTTPException(status_code=500, detail=f"Schema ingestion failed: {exc}")
return UploadResponse(
connector_id=connector_id,
public_url=public_url,
file_type=file_kind,
tables_ingested=n,
)
class DatabaseConnectionRequest(BaseModel):
url: str
schema_name: str = "public"
class DatabaseConnectionResponse(BaseModel):
connector_id: str
tables_ingested: int
status: str
@router.post("/connect-db", response_model=DatabaseConnectionResponse)
async def connect_database(req: DatabaseConnectionRequest):
url = req.url.strip()
if not url.startswith(("postgresql://", "postgres://")):
raise HTTPException(
status_code=400,
detail="Unsupported protocol. Only standard PostgreSQL/Postgres URIs are supported."
)
# Validate connectivity in an isolated read-only cursor
from connectors.crypto import encrypt_connection_string
from connectors.neon_connector import NeonConnector
try:
connector = NeonConnector(schema=req.schema_name, db_url=url)
# Test schema fetch to ensure credentials work
connector.get_schema()
except Exception as e:
raise HTTPException(
status_code=400,
detail=f"Database connection failed: {e}. Please ensure your database is publicly accessible and credentials are correct."
)
encrypted_url = encrypt_connection_string(url)
connector_id = f"postgres-enc:{encrypted_url}:{req.schema_name}"
# Ingest schema metadata into the vector search memory safely
try:
n = ingest_schema(connector_id)
except Exception as exc:
raise HTTPException(
status_code=500,
detail=f"Failed to index database schemas: {exc}"
)
return DatabaseConnectionResponse(
connector_id=connector_id,
tables_ingested=n,
status="connected",
)