Spaces:
Sleeping
Sleeping
| from ..models.requestModels import DeleteTable, LoadMySQLorPostgreSQL, LoadMongoDB | |
| from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer | |
| from fastapi import APIRouter, Depends, UploadFile, File, Form | |
| from fastapi.exceptions import HTTPException | |
| from pymongo.mongo_client import MongoClient | |
| from fastapi.responses import JSONResponse | |
| from ..utils.functions import verifyToken | |
| from pymongo.server_api import ServerApi | |
| from sqlalchemy import create_engine | |
| from urllib.request import urlopen | |
| from supabase import create_client | |
| from fastapi import APIRouter | |
| import pandas as pd | |
| from typing import Annotated | |
| import tempfile | |
| import json | |
| import time | |
| import io | |
| import os | |
| router = APIRouter() | |
| security = HTTPBearer() | |
| client = create_client( | |
| supabase_url = os.environ["SUPABASE_URL"], | |
| supabase_key = os.environ["SUPABASE_KEY"] | |
| ) | |
| async def loadCsvData(projectId: Annotated[str, Form()], file: Annotated[UploadFile, File()], credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)]): | |
| try: | |
| if verifyToken(token = credentials.credentials): | |
| project = client.table("Projects").select("projectId", "projectName", "dataTables").eq("projectId", projectId).execute().data[0] | |
| with tempfile.NamedTemporaryFile(delete = True, suffix = ".parquet") as temp: | |
| pd.read_csv(io.BytesIO(await file.read())).to_parquet(temp.name, compression = "snappy") | |
| _ = client.storage.from_("AnalyticsHub").upload( | |
| file = temp.name, | |
| path = f"{projectId}/{os.path.splitext(file.filename)[0] + '.parquet'}" | |
| ) | |
| if project["dataTables"]: | |
| projectData = project["dataTables"] + f", {os.path.splitext(file.filename)[0]}" | |
| else: | |
| projectData = os.path.splitext(file.filename)[0] | |
| _ = client.table("Projects").update({"dataTables": projectData}).eq("projectId", projectId).execute() | |
| temp.close() | |
| return JSONResponse(status_code = 200, content = {"status": "SUCCESS", "message": "Data loaded successfully"}) | |
| else: | |
| return JSONResponse(status_code = 498, content = {"status": "ERROR", "errorDetail": "Invalid Token"}) | |
| except Exception as e: | |
| raise HTTPException(status_code = 500, detail = f"Endpoint says: {e}") | |
| async def loadExcelData(projectId: Annotated[str, Form()], file: Annotated[UploadFile, File()], credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)], sheetName: Annotated[str | None, Form()] = None): | |
| try: | |
| if verifyToken(token = credentials.credentials): | |
| project = client.table("Projects").select("projectId", "projectName", "dataTables").eq("projectId", projectId).execute().data[0] | |
| with tempfile.NamedTemporaryFile(delete = True, suffix = ".parquet") as temp: | |
| pd.read_excel(io.BytesIO(await file.read()), sheet_name = sheetName).to_parquet(temp.name, compression = "snappy") | |
| if sheetName == None: | |
| fileName = f"{os.path.splitext(file.filename)[0] + '.parquet'}" | |
| else: | |
| fileName = f"{os.path.splitext(file.filename)[0] + '_' + sheetName + '.parquet'}" | |
| _ = client.storage.from_("AnalyticsHub").upload( | |
| file = temp.name, | |
| path = f"{projectId}/{fileName}" | |
| ) | |
| if project["dataTables"]: | |
| projectData = project["dataTables"] + f", {fileName}" | |
| else: | |
| projectData = fileName | |
| _ = client.table("Projects").update({"dataTables": projectData}).eq("projectId", projectId).execute() | |
| temp.close() | |
| return JSONResponse(status_code = 200, content = {"status": "SUCCESS", "message": "Data loaded successfully"}) | |
| else: | |
| return JSONResponse(status_code = 498, content = {"status": "ERROR", "errorDetail": "Invalid Token"}) | |
| except Exception as e: | |
| raise HTTPException(status_code = 500, detail = f"Endpoint says: {e}") | |
| async def loadMySql(connection: LoadMySQLorPostgreSQL, credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)]): | |
| try: | |
| if verifyToken(token = credentials.credentials): | |
| project = client.table("Projects").select("projectId", "projectName", "dataTables").eq("projectId", connection.projectId).execute().data[0] | |
| with tempfile.NamedTemporaryFile(delete = True, suffix = ".parquet") as temp: | |
| connStr = f"mysql+pymysql://{connection.user}:{connection.password}@{connection.host}:{connection.port}/{connection.db}" | |
| engine = create_engine(connStr) | |
| pd.read_sql(f"SELECT * FROM {connection.table}", engine).to_parquet(temp.name, compression = "snappy") | |
| _ = client.storage.from_("AnalyticsHub").upload( | |
| file = temp.name, | |
| path = f"{connection.projectId}/{connection.table + '.parquet'}" | |
| ) | |
| if project["dataTables"]: | |
| projectData = project["dataTables"] + f", {connection.table + '.parquet'}" | |
| else: | |
| projectData = connection.table | |
| _ = client.table("Projects").update({"dataTables": projectData}).eq("projectId", connection.projectId).execute() | |
| temp.close() | |
| return JSONResponse(status_code = 200, content = {"status": "SUCCESS", "message": "Data loaded successfully"}) | |
| else: | |
| return JSONResponse(status_code = 498, content = {"status": "ERROR", "errorDetail": "Invalid Token"}) | |
| except Exception as e: | |
| raise HTTPException(status_code = 500, detail = f"Endpoint says: {e}") | |
| async def loadPostgreSQL(connection: LoadMySQLorPostgreSQL, credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)]): | |
| try: | |
| if verifyToken(token = credentials.credentials): | |
| project = client.table("Projects").select("projectId", "projectName", "dataTables").eq("projectId", connection.projectId).execute().data[0] | |
| with tempfile.NamedTemporaryFile(delete = True, suffix = ".parquet") as temp: | |
| connStr = f"postgresql+psycopg2://{connection.user}:{connection.password}@{connection.host}:{connection.port}/{connection.db}" | |
| engine = create_engine(connStr) | |
| pd.read_sql(f"SELECT * FROM {connection.table}", engine).to_parquet(temp.name, compression = "snappy") | |
| _ = client.storage.from_("AnalyticsHub").upload( | |
| file = temp.name, | |
| path = f"{connection.projectId}/{connection.table + '.parquet'}" | |
| ) | |
| if project["dataTables"]: | |
| projectData = project["dataTables"] + f", {connection.table + '.parquet'}" | |
| else: | |
| projectData = connection.table | |
| _ = client.table("Projects").update({"dataTables": projectData}).eq("projectId", connection.projectId).execute() | |
| temp.close() | |
| return JSONResponse(status_code = 200, content = {"status": "SUCCESS", "message": "Data loaded successfully"}) | |
| else: | |
| return JSONResponse(status_code = 498, content = {"status": "ERROR", "errorDetail": "Invalid Token"}) | |
| except Exception as e: | |
| raise HTTPException(status_code = 500, detail = f"Endpoint says: {e}") | |
| async def loadMongoDB(connection: LoadMongoDB, credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)]): | |
| try: | |
| if verifyToken(token = credentials.credentials): | |
| project = client.table("Projects").select("projectId", "projectName", "dataTables").eq("projectId", connection.projectId).execute().data[0] | |
| with tempfile.NamedTemporaryFile(delete = True, suffix = ".parquet") as temp: | |
| mongoClient = MongoClient(connection.connectionString, server_api=ServerApi('1')) | |
| records = list(mongoClient[connection.db][connection.collection].find()) | |
| for record in records: record.pop("_id") | |
| pd.DataFrame(records).to_parquet(temp.name, compression = "snappy") | |
| _ = client.storage.from_("AnalyticsHub").upload( | |
| file = temp.name, | |
| path = f"{connection.projectId}/{connection.collection + '.parquet'}" | |
| ) | |
| if project["dataTables"]: | |
| projectData = project["dataTables"] + f", {connection.collection + '.parquet'}" | |
| else: | |
| projectData = connection.collection | |
| _ = client.table("Projects").update({"dataTables": projectData}).eq("projectId", connection.projectId).execute() | |
| temp.close() | |
| return JSONResponse(status_code = 200, content = {"status": "SUCCESS", "message": "Data loaded successfully"}) | |
| else: | |
| return JSONResponse(status_code = 498, content = {"status": "ERROR", "errorDetail": "Invalid Token"}) | |
| except Exception as e: | |
| raise HTTPException(status_code = 500, detail = f"Endpoint says: {e}") | |
| async def deleteTable(tableDetails: DeleteTable, credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)]): | |
| try: | |
| if verifyToken(token = credentials.credentials): | |
| _ = client.storage.from_("AnalyticsHub").remove(f"{tableDetails.projectId}/{tableDetails.tableName}" + ".parquet") | |
| projectTables = client.table("Projects").select("dataTables").eq("projectId", tableDetails.projectId).execute().data[0]["dataTables"] | |
| projectTables = projectTables.split(", ") | |
| projectTables.remove(tableDetails.tableName) | |
| projectTables = ", ".join(projectTables) | |
| _ = client.table("Projects").update({"dataTables": projectTables}).eq("projectId", tableDetails.projectId).execute() | |
| fileUrl = os.environ["FILE_URL"].format(projectId = tableDetails.projectId, fileName = "metadata.json").replace(".parquet", "") + f"?cb={int(time.time())}" | |
| jsonData = json.loads(urlopen(fileUrl).read()) | |
| jsonData.pop(tableDetails.tableName) | |
| with io.BytesIO() as buffer: | |
| buffer.write(json.dumps(jsonData, indent=4).encode("utf-8")) | |
| buffer.seek(0) | |
| client.storage.from_("AnalyticsHub").upload(path = f"{tableDetails.projectId}/metadata.json", file = buffer.getvalue(), file_options = {"upsert": "true"}) | |
| return JSONResponse(status_code = 200, content = {"status": "SUCCESS", "message": "Table deleted successfully"}) | |
| else: | |
| return JSONResponse(status_code = 498, content = {"status": "ERROR", "errorDetail": "Invalid Token"}) | |
| except Exception as e: | |
| raise HTTPException(status_code = 500, detail = f"Endpoint says: {e}") | |