import os from supabase import create_client, Client from fastapi import HTTPException from functools import cache from models.competitor_analysis_model import CompetitorAnalysisModel from models.pain_point_model import PainPointAnalysisModel from models.session_model import InputInfoModel, UserSessionModel @cache def get_db_client() -> Client: try: SUPABASE_URL = os.getenv("SUPABASE_URL") SUPABASE_SERVICE_ROLE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY") supabase: Client = create_client(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY) return supabase except Exception as e: print(f"Failed to connect to the database: {e}", flush=True) raise HTTPException(status_code=500, detail="Failed to connect to the database") db_client = get_db_client() def get_user_with_id(external_id): data = db_client.table("user_info").select("*").eq("external_id",external_id).execute() user_list = data.data return user_list def create_user_with_id(external_id,email): nuser_obj = {"external_id": external_id,"email":email} user = db_client.table("user_info").insert(nuser_obj).execute().data return user # create user session def create_user_session(user_id: int, input_info: InputInfoModel): """ Creates a new user session in the database. Args: user_id (int): The ID of the user. input_info (InputInfoModel): The input information provided by the user. Raises: HTTPException: If the user session creation fails. """ # Initialize session_info session_info = {} # Iterate over the field inputs and create a list of dictionaries for key in input_info.field_inputs.keys(): session_info[key] = [] for val in input_info.field_inputs[key]: session_info[key].append({val: -1}) # Create the user session model data = UserSessionModel( user_id=user_id, input_info=input_info.model_dump(), session_info=session_info ) # Insert the user session into the database user_session = db_client.table("sessions").insert(data.model_dump()).execute().data # Check if the user session was created successfully if not user_session: raise HTTPException(status_code=500, detail="Failed to create user session") return user_session[0] def update_user_session(user_session: dict,session_info:dict=None, process_info:dict=None) -> None: """ Updates a user session in the database. Args: user_session (dict): user session got from previous steps. session_info (dict): The updated session information. process_info (dict): The updated process information. Raises: HTTPException: If the user session update fails. """ # Update the user session in the database if session_info is None: session_info = user_session["session_info"] if process_info is None: process_info = user_session["process_info"] session = db_client.table("sessions").update({"session_info": session_info,"process_info": process_info}).eq("id", user_session["id"]).execute().data # Check if the user session was updated successfully if not session: raise HTTPException(status_code=500, detail="Failed to update user session") def save_pain_point_analysis(data:PainPointAnalysisModel)->None: try: data= db_client.table("pain_point_analysis").insert(data.model_dump()).execute().data return data[0] except Exception as e: print("Failed to save pain point analysis:", e) raise HTTPException(status_code=500, detail="Failed to save pain point analysis") def save_competitor_analysis(data:CompetitorAnalysisModel)->None: try: data= db_client.table("competitor_analysis").insert(data.model_dump()).execute().data return data[0] except Exception as e: print("Failed to save pain point analysis:", e) raise HTTPException(status_code=500, detail="Failed to save pain point analysis")