nextAnalytics / databases /supabase_db.py
honey234's picture
updated backend
d1fa8b0
import os
from supabase import create_client, Client
from fastapi import HTTPException
from functools import cache
from models.competitor_analysis_model import CompetitorAnalysisModel
from models.pain_point_model import PainPointAnalysisModel
from models.session_model import InputInfoModel, UserSessionModel
@cache
def get_db_client() -> Client:
try:
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_SERVICE_ROLE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY")
supabase: Client = create_client(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY)
return supabase
except Exception as e:
print(f"Failed to connect to the database: {e}", flush=True)
raise HTTPException(status_code=500, detail="Failed to connect to the database")
db_client = get_db_client()
def get_user_with_id(external_id):
data = db_client.table("user_info").select("*").eq("external_id",external_id).execute()
user_list = data.data
return user_list
def create_user_with_id(external_id,email):
nuser_obj = {"external_id": external_id,"email":email}
user = db_client.table("user_info").insert(nuser_obj).execute().data
return user
# create user session
def create_user_session(user_id: int, input_info: InputInfoModel):
"""
Creates a new user session in the database.
Args:
user_id (int): The ID of the user.
input_info (InputInfoModel): The input information provided by the user.
Raises:
HTTPException: If the user session creation fails.
"""
# Initialize session_info
session_info = {}
# Iterate over the field inputs and create a list of dictionaries
for key in input_info.field_inputs.keys():
session_info[key] = []
for val in input_info.field_inputs[key]:
session_info[key].append({val: -1})
# Create the user session model
data = UserSessionModel(
user_id=user_id, input_info=input_info.model_dump(), session_info=session_info
)
# Insert the user session into the database
user_session = db_client.table("sessions").insert(data.model_dump()).execute().data
# Check if the user session was created successfully
if not user_session:
raise HTTPException(status_code=500, detail="Failed to create user session")
return user_session[0]
def update_user_session(user_session: dict,session_info:dict=None, process_info:dict=None) -> None:
"""
Updates a user session in the database.
Args:
user_session (dict): user session got from previous steps.
session_info (dict): The updated session information.
process_info (dict): The updated process information.
Raises:
HTTPException: If the user session update fails.
"""
# Update the user session in the database
if session_info is None:
session_info = user_session["session_info"]
if process_info is None:
process_info = user_session["process_info"]
session = db_client.table("sessions").update({"session_info": session_info,"process_info": process_info}).eq("id", user_session["id"]).execute().data
# Check if the user session was updated successfully
if not session:
raise HTTPException(status_code=500, detail="Failed to update user session")
def save_pain_point_analysis(data:PainPointAnalysisModel)->None:
try:
data= db_client.table("pain_point_analysis").insert(data.model_dump()).execute().data
return data[0]
except Exception as e:
print("Failed to save pain point analysis:", e)
raise HTTPException(status_code=500, detail="Failed to save pain point analysis")
def save_competitor_analysis(data:CompetitorAnalysisModel)->None:
try:
data= db_client.table("competitor_analysis").insert(data.model_dump()).execute().data
return data[0]
except Exception as e:
print("Failed to save pain point analysis:", e)
raise HTTPException(status_code=500, detail="Failed to save pain point analysis")