Rauhan's picture
UPDATE: claude prompts
1b179fe
from ..models.requestModels import DeleteTable, LoadMySQLorPostgreSQL, LoadMongoDB
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from fastapi import APIRouter, Depends, UploadFile, File, Form
from fastapi.exceptions import HTTPException
from pymongo.mongo_client import MongoClient
from fastapi.responses import JSONResponse
from ..utils.functions import verifyToken
from pymongo.server_api import ServerApi
from sqlalchemy import create_engine
from urllib.request import urlopen
from supabase import create_client
from fastapi import APIRouter
import pandas as pd
from typing import Annotated
import tempfile
import json
import time
import io
import os
router = APIRouter()
security = HTTPBearer()
client = create_client(
supabase_url = os.environ["SUPABASE_URL"],
supabase_key = os.environ["SUPABASE_KEY"]
)
@router.post("/loadCsvData")
async def loadCsvData(projectId: Annotated[str, Form()], file: Annotated[UploadFile, File()], credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)]):
try:
if verifyToken(token = credentials.credentials):
project = client.table("Projects").select("projectId", "projectName", "dataTables").eq("projectId", projectId).execute().data[0]
with tempfile.NamedTemporaryFile(delete = True, suffix = ".parquet") as temp:
pd.read_csv(io.BytesIO(await file.read())).to_parquet(temp.name, compression = "snappy")
_ = client.storage.from_("AnalyticsHub").upload(
file = temp.name,
path = f"{projectId}/{os.path.splitext(file.filename)[0] + '.parquet'}"
)
if project["dataTables"]:
projectData = project["dataTables"] + f", {os.path.splitext(file.filename)[0]}"
else:
projectData = os.path.splitext(file.filename)[0]
_ = client.table("Projects").update({"dataTables": projectData}).eq("projectId", projectId).execute()
temp.close()
return JSONResponse(status_code = 200, content = {"status": "SUCCESS", "message": "Data loaded successfully"})
else:
return JSONResponse(status_code = 498, content = {"status": "ERROR", "errorDetail": "Invalid Token"})
except Exception as e:
raise HTTPException(status_code = 500, detail = f"Endpoint says: {e}")
@router.post("/loadExcelData")
async def loadExcelData(projectId: Annotated[str, Form()], file: Annotated[UploadFile, File()], credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)], sheetName: Annotated[str | None, Form()] = None):
try:
if verifyToken(token = credentials.credentials):
project = client.table("Projects").select("projectId", "projectName", "dataTables").eq("projectId", projectId).execute().data[0]
with tempfile.NamedTemporaryFile(delete = True, suffix = ".parquet") as temp:
pd.read_excel(io.BytesIO(await file.read()), sheet_name = sheetName).to_parquet(temp.name, compression = "snappy")
if sheetName == None:
fileName = f"{os.path.splitext(file.filename)[0] + '.parquet'}"
else:
fileName = f"{os.path.splitext(file.filename)[0] + '_' + sheetName + '.parquet'}"
_ = client.storage.from_("AnalyticsHub").upload(
file = temp.name,
path = f"{projectId}/{fileName}"
)
if project["dataTables"]:
projectData = project["dataTables"] + f", {fileName}"
else:
projectData = fileName
_ = client.table("Projects").update({"dataTables": projectData}).eq("projectId", projectId).execute()
temp.close()
return JSONResponse(status_code = 200, content = {"status": "SUCCESS", "message": "Data loaded successfully"})
else:
return JSONResponse(status_code = 498, content = {"status": "ERROR", "errorDetail": "Invalid Token"})
except Exception as e:
raise HTTPException(status_code = 500, detail = f"Endpoint says: {e}")
@router.post("/loadMySql")
async def loadMySql(connection: LoadMySQLorPostgreSQL, credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)]):
try:
if verifyToken(token = credentials.credentials):
project = client.table("Projects").select("projectId", "projectName", "dataTables").eq("projectId", connection.projectId).execute().data[0]
with tempfile.NamedTemporaryFile(delete = True, suffix = ".parquet") as temp:
connStr = f"mysql+pymysql://{connection.user}:{connection.password}@{connection.host}:{connection.port}/{connection.db}"
engine = create_engine(connStr)
pd.read_sql(f"SELECT * FROM {connection.table}", engine).to_parquet(temp.name, compression = "snappy")
_ = client.storage.from_("AnalyticsHub").upload(
file = temp.name,
path = f"{connection.projectId}/{connection.table + '.parquet'}"
)
if project["dataTables"]:
projectData = project["dataTables"] + f", {connection.table + '.parquet'}"
else:
projectData = connection.table
_ = client.table("Projects").update({"dataTables": projectData}).eq("projectId", connection.projectId).execute()
temp.close()
return JSONResponse(status_code = 200, content = {"status": "SUCCESS", "message": "Data loaded successfully"})
else:
return JSONResponse(status_code = 498, content = {"status": "ERROR", "errorDetail": "Invalid Token"})
except Exception as e:
raise HTTPException(status_code = 500, detail = f"Endpoint says: {e}")
@router.post("/loadPostgreSQL")
async def loadPostgreSQL(connection: LoadMySQLorPostgreSQL, credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)]):
try:
if verifyToken(token = credentials.credentials):
project = client.table("Projects").select("projectId", "projectName", "dataTables").eq("projectId", connection.projectId).execute().data[0]
with tempfile.NamedTemporaryFile(delete = True, suffix = ".parquet") as temp:
connStr = f"postgresql+psycopg2://{connection.user}:{connection.password}@{connection.host}:{connection.port}/{connection.db}"
engine = create_engine(connStr)
pd.read_sql(f"SELECT * FROM {connection.table}", engine).to_parquet(temp.name, compression = "snappy")
_ = client.storage.from_("AnalyticsHub").upload(
file = temp.name,
path = f"{connection.projectId}/{connection.table + '.parquet'}"
)
if project["dataTables"]:
projectData = project["dataTables"] + f", {connection.table + '.parquet'}"
else:
projectData = connection.table
_ = client.table("Projects").update({"dataTables": projectData}).eq("projectId", connection.projectId).execute()
temp.close()
return JSONResponse(status_code = 200, content = {"status": "SUCCESS", "message": "Data loaded successfully"})
else:
return JSONResponse(status_code = 498, content = {"status": "ERROR", "errorDetail": "Invalid Token"})
except Exception as e:
raise HTTPException(status_code = 500, detail = f"Endpoint says: {e}")
@router.post("/loadMongoDB")
async def loadMongoDB(connection: LoadMongoDB, credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)]):
try:
if verifyToken(token = credentials.credentials):
project = client.table("Projects").select("projectId", "projectName", "dataTables").eq("projectId", connection.projectId).execute().data[0]
with tempfile.NamedTemporaryFile(delete = True, suffix = ".parquet") as temp:
mongoClient = MongoClient(connection.connectionString, server_api=ServerApi('1'))
records = list(mongoClient[connection.db][connection.collection].find())
for record in records: record.pop("_id")
pd.DataFrame(records).to_parquet(temp.name, compression = "snappy")
_ = client.storage.from_("AnalyticsHub").upload(
file = temp.name,
path = f"{connection.projectId}/{connection.collection + '.parquet'}"
)
if project["dataTables"]:
projectData = project["dataTables"] + f", {connection.collection + '.parquet'}"
else:
projectData = connection.collection
_ = client.table("Projects").update({"dataTables": projectData}).eq("projectId", connection.projectId).execute()
temp.close()
return JSONResponse(status_code = 200, content = {"status": "SUCCESS", "message": "Data loaded successfully"})
else:
return JSONResponse(status_code = 498, content = {"status": "ERROR", "errorDetail": "Invalid Token"})
except Exception as e:
raise HTTPException(status_code = 500, detail = f"Endpoint says: {e}")
@router.delete("/deleteTable")
async def deleteTable(tableDetails: DeleteTable, credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)]):
try:
if verifyToken(token = credentials.credentials):
_ = client.storage.from_("AnalyticsHub").remove(f"{tableDetails.projectId}/{tableDetails.tableName}" + ".parquet")
projectTables = client.table("Projects").select("dataTables").eq("projectId", tableDetails.projectId).execute().data[0]["dataTables"]
projectTables = projectTables.split(", ")
projectTables.remove(tableDetails.tableName)
projectTables = ", ".join(projectTables)
_ = client.table("Projects").update({"dataTables": projectTables}).eq("projectId", tableDetails.projectId).execute()
fileUrl = os.environ["FILE_URL"].format(projectId = tableDetails.projectId, fileName = "metadata.json").replace(".parquet", "") + f"?cb={int(time.time())}"
jsonData = json.loads(urlopen(fileUrl).read())
jsonData.pop(tableDetails.tableName)
with io.BytesIO() as buffer:
buffer.write(json.dumps(jsonData, indent=4).encode("utf-8"))
buffer.seek(0)
client.storage.from_("AnalyticsHub").upload(path = f"{tableDetails.projectId}/metadata.json", file = buffer.getvalue(), file_options = {"upsert": "true"})
return JSONResponse(status_code = 200, content = {"status": "SUCCESS", "message": "Table deleted successfully"})
else:
return JSONResponse(status_code = 498, content = {"status": "ERROR", "errorDetail": "Invalid Token"})
except Exception as e:
raise HTTPException(status_code = 500, detail = f"Endpoint says: {e}")