import logging import boto3 from botocore.exceptions import ClientError, NoCredentialsError, PartialCredentialsError import os from dotenv import load_dotenv from fastapi import FastAPI, HTTPException, Security, Query, status from fastapi.security import APIKeyHeader from openai import OpenAI import pandas as pd from pydantic import BaseModel import os import logging import json import psycopg2 from psycopg2 import sql import os from dotenv import load_dotenv import datetime import threading import pickle # Replace dill with pickle from cachetools import TTLCache import threading import time import uuid from app.cache import CustomTTLCache, upload_file_to_s3 import pdfkit import PyPDF2 import secrets import string load_dotenv() # Environment Variables for API Keys api_keys = [os.getenv('FASTAPI_KEY')] api_key_header = APIKeyHeader(name="X-API-Key") load_dotenv() AWS_ACCESS_KEY = os.getenv('AWS_ACCESS_KEY') AWS_SECRET_KEY = os.getenv('AWS_SECRET_KEY') REGION = os.getenv('AWS_REGION') logger = logging.getLogger(__name__) # Replace the simple TTLCache with our custom implementation user_cache = CustomTTLCache(ttl=120, cleanup_interval=30) # 2 minutes TTL def force_file_move(source, destination): function_name = force_file_move.__name__ logger.info(f"Attempting to move file from {source} to {destination}", extra={'endpoint': function_name}) try: # Ensure the destination directory exists os.makedirs(os.path.dirname(destination), exist_ok=True) # Move the file, replacing if it already exists os.replace(source, destination) logger.info(f"File moved successfully: {source} -> {destination}", extra={'endpoint': function_name}) except FileNotFoundError: logger.error(f"Source file not found: {source}", extra={'endpoint': function_name}) except Exception as e: logger.error(f"An error occurred while moving file: {e}", extra={'endpoint': function_name}) def get_user(user_id): function_name = get_user.__name__ logger.info(f"Fetching user {user_id}", extra={'user_id': user_id, 'endpoint': function_name}) logger.info(f"[CACHE]: {user_cache}", extra={'user_id': user_id, 'endpoint': function_name}) if user_id in user_cache: logger.info(f"User {user_id} found in cache", extra={'user_id': user_id, 'endpoint': function_name}) return user_cache[user_id] else: client = OpenAI(api_key=os.getenv('OPENAI_API_KEY')) user_file = os.path.join('users', 'data', f'{user_id}.pkl') # if os.path.exists(user_file): # with open(user_file, 'rb') as f: # user = pickle.load(f) # user.client = client # user.conversations.client = client # with cache_lock: # user_cache[user_id] = user # return user logger.warning(f"User {user_id} not found locally. Attempting to download from S3", extra={'user_id': user_id, 'endpoint': function_name}) download = download_file_from_s3(f'{user_id}.pkl', 'core-ai-assets') logger.info(f"Download success: {download}", extra={'user_id': user_id, 'endpoint': function_name}) if (download): with open(user_file, 'rb') as f: user = pickle.load(f) user.client = client user.conversations.client = client user_cache[user_id] = user # No need for lock here os.remove(user_file) logger.info(f"User {user_id} loaded successfully from S3", extra={'user_id': user_id, 'endpoint': function_name}) return user else: logger.error(f"User {user_id} pickle does not exist in S3", extra={'user_id': user_id, 'endpoint': function_name}) # check if user_info exists user_info = get_user_info(user_id) if (user_info): # user has done onboarding but pickle file not created raise ReferenceError(f"User {user_id} pickle still being created") raise LookupError(f"User [{user_id}] has not onboarded yet") def generate_html(json_data, coach_name='Growth Guide', booking_id = None): function_name = generate_html.__name__ data = json_data["pre_growth_guide_session_report"] user_overview = data["user_overview"] personality_insights = data["personality_insights"] progress_snapshot = data["progress_snapshot"] preparation_brief = json_data.get("users_growth_guide_preparation_brief", []) session_script = json_data["30_minute_coaching_session_script"] # Extract user name user_name = user_overview["name"] # Build Progress Snapshot progress_items = "" for key, value in progress_snapshot.items(): # Convert key to title case with spaces formatted_key = key.replace("_", " ").title() progress_items += f'
  • {formatted_key}: {value}
  • \n' # Build Personality Insights love_languages = "".join(f"
  • {lang}
  • " for lang in personality_insights["top_love_languages"]) # Build Preparation Brief preparation_items = "".join( f'
  • {item["key"].replace("_", " ").title()}: {item["value"]}
  • \n' for item in preparation_brief) # Build Session Overview session_overview_list = session_script["session_overview"] session_overview = "
      \n" for item in session_overview_list: session_overview += f"
    1. {item}
    2. \n" session_overview += "
    " # Build Detailed Segments detailed_segments = "" for segment in session_script["detailed_segments"]: segment_title = segment["segment_title"] # Build Coach Dialogue list coach_dialogue_list = segment.get("coach_dialogue", []) coach_dialogue_html = "" # Build Guidance list guidance_list = segment.get("guidance", []) guidance_html = "" detailed_segments += f'''

    {segment_title}

    Coach Dialogue:{coach_dialogue_html}

    Guidance:{guidance_html}

    ''' # Build Final HTML html_content = f''' User Profile - {user_name}

    Dear {coach_name},

    Here is the User Profile - {user_name} and the 30-Minute Coaching Session Script for your upcoming session with {user_name}:

    User Profile - {user_name}

    User Overview

    Personality Insights

    Progress Snapshot

    30-Minute Coaching Session Script

    Session Overview (30 Minutes)

    {session_overview}

    Detailed Segments

    {detailed_segments}
    ''' file_path = os.path.join("bookings", "data",f"{booking_id}.html") path_to_upload = os.path.join("bookings", "to_upload",f"{booking_id}.pdf") password = "Ourcoach2024!" ## SAVING HTML FILE try: # Open the file in write mode with open(file_path, 'w', encoding='utf-8') as html_file: html_file.write(html_content) logger.info(f"File '{booking_id}.html' has been created successfully.", extra={'booking_id': booking_id, 'endpoint': function_name}) # Saving as PDF File pdfkit.from_file(file_path, path_to_upload, options={'encoding': 'UTF-8'}) logger.info(f"File '{booking_id}.pdf' has been created successfully.", extra={'booking_id': booking_id, 'endpoint': function_name}) ## ENCRYPTING PDF logger.info(f"Encrypting '{booking_id}.pdf'...", extra={'booking_id': booking_id, 'endpoint': function_name}) with open(path_to_upload, 'rb') as file: pdf_reader = PyPDF2.PdfReader(file) pdf_writer = PyPDF2.PdfWriter() # Add all pages to the writer for page_num in range(len(pdf_reader.pages)): pdf_writer.add_page(pdf_reader.pages[page_num]) # Encrypt the PDF with the given password pdf_writer.encrypt(password) with open(path_to_upload, 'wb') as encrypted_file: pdf_writer.write(encrypted_file) logger.info(f"Succesfully encrypted '{booking_id}.pdf'", extra={'booking_id': booking_id, 'endpoint': function_name}) except Exception as e: logger.error(f"An error occurred: {e}", extra={'booking_id': booking_id, 'endpoint': function_name}) raise filename = booking_id logger.info(f"Uploading file {filename} to S3", extra={'booking_id': booking_id, 'endpoint': function_name}) bucket = 'core-ai-assets' try: if (AWS_ACCESS_KEY and AWS_SECRET_KEY): session = boto3.session.Session(aws_access_key_id=AWS_ACCESS_KEY, aws_secret_access_key=AWS_SECRET_KEY, region_name=REGION) else: session = boto3.session.Session() s3_client = session.client('s3') with open(path_to_upload, "rb") as f: ## Upload to Production Folder s3_client.upload_fileobj(f, bucket, f'staging/pre_gg_reports/{filename}.pdf') logger.info(f"File {filename} uploaded successfully to S3", extra={'booking_id': booking_id, 'endpoint': function_name}) # Removing files for file in os.listdir(os.path.join('bookings', 'data')): os.remove(os.path.join('bookings', 'data', file)) for file in os.listdir(os.path.join('bookings', 'to_upload')): os.remove(os.path.join('bookings', 'to_upload', file)) # force_file_move(os.path.join('users', 'to_upload', filename), os.path.join('users', 'data', filename)) except (FileNotFoundError, NoCredentialsError, PartialCredentialsError) as e: logger.error(f"S3 upload failed for {filename}: {e}", extra={'booking_id': booking_id, 'endpoint': function_name}) raise def get_user_summary(user_id): function_name = get_user_summary.__name__ logger.info(f"Generating user summary for user {user_id}", extra={'user_id': user_id, 'endpoint': function_name}) # Step 1: Call get_user to get user's info try: user = get_user(user_id) user_info = user.user_info user_messages = user.get_messages() except LookupError as e: logger.error(f"Error fetching user data: {e}", extra={'user_id': user_id, 'endpoint': function_name}) raise e # Step 2: Construct the Prompt chat_history = "\n".join( [f"{message['role'].capitalize()}: {message['content']}" for message in user_messages] ) # Build the system prompt according to the provided instructions system_prompt = """ You are an AI language model designed to generate three outputs based on the user's profile and chat history: 1. **Pre-Growth Guide Session Report**: A comprehensive summary of the user's profile and life context for the Growth Guide (a human coach), covering five key areas: **mental well-being**, **physical health and wellness**, **relationships**, **career growth**, and **personal growth**. 2. **User's Growth Guide Preparation Brief**: A comprehensive brief guiding the user on what to discuss with the Growth Guide, providing actionable advice and highlighting key areas to focus on during their session, covering the same five key areas. 3. **30-Minute Coaching Session Script**: A detailed, partitioned script to help the coach prepare for the session, including dialogue, questions, and guidance tailored to the client's needs, covering the five key areas. The script should be partitioned into several sections in the JSON output, similar to the structure provided for the Pre-Growth Guide Session Report. --- **Instructions:** - **Comprehensive Coverage**: Ensure that all three outputs cover the following five key areas: 1. **Mental Well-being** 2. **Physical Health and Wellness** 3. **Relationships** 4. **Career Growth** 5. **Personal Growth** If the chat history provided by the user does not touch on one or more of these areas, the report should state: "The user hasn't discussed this area yet. Maybe you can cover this during the Growth Guide session." - **Output Format**: Output the result in JSON format following the specified JSON schema. The outputs for the **Pre-Growth Guide Session Report** and the **30-Minute Coaching Session Script** should be partitioned into several JSON keys, similar to the structure provided for the Pre-Growth Guide Session Report. --- ### **1. Pre-Growth Guide Session Report** **Objective**: Provide a comprehensive summary of the user's profile and life context for the Growth Guide, covering the five key areas. **Format**: - **user_overview**: - **name**: The user's full name. - **age_group**: The user's age range (e.g., "30-39"). - **primary_goals**: The main goals the user is focusing on. - **preferred_coaching_style**: The coaching style the user prefers. - **personality_insights**: - **mbti**: The user's Myers-Briggs Type Indicator personality type. - **top_love_languages**: A list of the user's top two love languages. - **belief_in_astrology**: Whether the user believes in horoscope/astrology. - **progress_snapshot**: - **mental_well_being**: Summary of the user's mental well-being. - **physical_health_and_wellness**: Summary of the user's physical health and wellness. - **relationships**: Summary of the user's relationships. - **career_growth**: Summary of the user's career growth. - **personal_growth**: Summary of the user's personal growth. If any of the key areas are not discussed, include a note: "The user hasn't discussed this area yet. Maybe you can cover this during the Growth Guide session." --- ### **2. User's Growth Guide Preparation Brief** **Objective**: Guide the user on what to discuss with the Growth Guide, providing actionable advice and highlighting key areas to focus on during their session, covering the five key areas. **Format**: Structure the brief with the following sections, and output it as a JSON object with these keys: - **reflect**: Encourage the user to consider what each focus area means to them and which aspects they want to improve. - **recall_successes**: Prompt the user to think of times when they effectively managed or improved in the five key areas, and what strategies worked for them then. - **identify_challenges**: Encourage the user to be ready to discuss any obstacles they're facing in each area, and to consider possible solutions or resources that might help. - **set_goals**: Ask the user to decide what they hope to achieve from the session and how improvements in each area will impact their life. - **additional_tips**: Provide practical advice for the session, such as choosing a quiet space, having materials ready, and being open to sharing thoughts honestly. --- ### **3. 30-Minute Coaching Session Script** **Objective**: Help the coach prepare for the session by providing a detailed, partitioned script tailored to the client's specific needs and goals, following a specific session order and focusing on the user's top three most important areas. **IMPORTANT**: BE VERY COMPREHENSIVE IN THE "GUIDANCE" SECTION OF DETAILED SEGMENT!!! **IMPORTANT**: NO NEED TO MENTION THE NAME OF THE COACH!!! **Instructions**: - **Session Overview (30 mins)**: The session should follow this specific order: 1. **Warm Welcome and Rapport Building** (10 mins) 2. **Exploring X Goals** (10 mins) 3. **Developing X Strategies** (5 mins) 4. **Wrap-Up and Commitment** (5 mins) The "X" in "Exploring X Goals" and "Developing X Strategies" should be replaced with the user's top three most important areas from the five key areas. Focus on one area per session. If possible, prioritize the areas based on the user's expressed concerns or goals. - **Detailed Segments**: For each segment, include: - **Numbered Title**: Number and title of the session segment (e.g., `1. Warm Welcome and Trust Building (10 Minutes)`). - **Coach Dialogue**: Provide the coach's dialogue for the segment, including initial statements, follow-up questions, and closing remarks. Present the dialogues as direct quotes, ensuring they align with the client's context and goals. In the coach dialogue, especially during the warm welcome session, you may ask opening question and mention disclaimers that include: - Opening question: To ask the user if there's anything he/she would like to talk about - Mention confidentiality: To tell the user that at ourcoach, we prioritize the privacy and confidentiality of our clients. All information shared during the coaching session will remain strictly confidential and used solely for your personal development. - What to expect from this session: To tell the user what can they expect from this session - Remind them that Zoom has recording turned on, so that they can receive an AI assisted report later: To tell the user to note that this session will be recorded on Zoom to provide you with a comprehensive AI-assisted report afterward. This report will include key takeaways and action steps to help you achieve your goals. And, in the coach dialogue during the "Exploring X Goals" session, you may ask the user if they have any other goals they want to explore, else if they don't, we can focus on the chosen goal! And, in the coach dialogue during the "Wrap-Up and Commitment" session, based on today’s session, ask the user: Would you say that X is your biggest priority right now? Or are there any specific goals or areas you’d like to focus on in the coming weeks? - **Guidance**: Offer specific and comprehensive suggestions for the coach on how to navigate the session, including actionable points and strategies. Use bullet points to clearly present each guidance item. Note: For the "Plan Follow-up" part, it has to be next **month** - **Additional Instructions**: - Ensure that the **Coach Dialogue** is personalized and reflects the client's experiences and aspirations. - The **Guidance** should include actionable suggestions, emphasizing techniques like creating safety, setting expectations, building rapport, encouraging reflection, focusing on synergies, and action planning. Be very comprehensive in this part! And use tag to bold the headers of each guidance points/items! **Style Guidelines**: - Use empathetic and supportive language. - Encourage open-ended dialogue. - Focus on actionable and achievable steps. - Personalize the script to align with the client's experiences and aspirations. - Present information in a clear, organized manner, using numbering and bullet points where appropriate. --- **Note**: - If the user hasn't discussed one or more of the key areas, the outputs should note this and suggest that these areas can be covered during the Growth Guide session. --- ** JSON OUTPUT FORMAT EXAMPLE **: **IMPORTANT**: BE VERY COMPREHENSIVE IN THE "GUIDANCE" SECTION OF DETAILED SEGMENT!!! **IMPORTANT**: NO NEED TO MENTION THE NAME OF THE COACH!!! { "pre_growth_guide_session_report": { "user_overview": { "name": "Alex Johnson", "age_group": "25-34", "primary_goals": "Improve mental well-being, advance career, enhance relationships", "preferred_coaching_style": "Supportive and goal-oriented" }, "personality_insights": { "mbti": "ENFP", "top_love_languages": ["Quality Time", "Words of Affirmation"], "belief_in_astrology": "No" }, "progress_snapshot": { "mental_well_being": "Alex has been experiencing increased stress due to workload and is seeking ways to manage anxiety and improve overall mental health.", "physical_health_and_wellness": "Maintains a regular exercise routine but wants to incorporate healthier eating habits.", "relationships": "Feels disconnected from friends and family due to busy schedule; wishes to rebuild social connections.", "career_growth": "Aiming for a promotion but feels uncertain about the necessary skills and how to stand out.", "personal_growth": "Interested in learning new skills like photography and improving time management." } }, "users_growth_guide_preparation_brief": [ { "key": "reflect", "value": "Consider what mental well-being means to you. Which aspects of your life—such as stress management, anxiety reduction, or emotional balance—do you want to improve?" }, { "key": "recall_successes", "value": "Think of times when you effectively managed stress or maintained a positive mindset. What strategies or habits helped you during those times?" }, { "key": "identify_challenges", "value": "Be ready to discuss current obstacles you're facing in managing stress and anxiety. Consider any patterns or triggers you've noticed." }, { "key": "set_goals", "value": "Decide what you hope to achieve from this session. How would improving your mental well-being impact your daily life and long-term goals?" }, { "key": "additional_tips", "value": "Environment: Choose a quiet, comfortable space.\nMaterials: Have a notebook and pen ready to jot down insights.\nOpenness: Be prepared to share your thoughts honestly and openly." } ], "30_minute_coaching_session_script": { "session_overview": ["Warm Welcome and Trust Building (10 Minutes)","Exploring Holistic Life Goals and Aspirations (10 Minutes)","Identifying Interconnections and Priorities (5 Minutes)","Wrap-Up and Next Steps (5 Minutes)"], "detailed_segments": [ { "segment_title": "1. Warm Welcome and Trust Building (10 Minutes)", "coach_dialogue": ["...","..."], "guidance": ["Create Safety: Reassure Yew Wai by emphasizing confidentiality.","Set Expectations: Clearly outline the session’s structure to provide clarity and ease.\nBuild Rapport: Show genuine curiosity about his recent experiences and emotions.\nValidation: Acknowledge his efforts with empathy, e.g., “That’s a lot to manage, but it’s incredible how committed you are to each aspect of your life.”] }, { "segment_title": "2. Exploring Holistic Life Goals and Aspirations (10 Minutes)", "coach_dialogue": ["...","..."], "guidance": ["Encourage Reflection: Prompt Yew Wai to elaborate on his goals, covering areas like:","Career: Enhancing ourcoach user engagement and chat functionality.","Health: Preparing for the marathon and improving sleep.","Relationships: Nurturing his connection with Karina.","Personal Growth: Strengthening self-discipline.","Connect Goals: Highlight how goals may overlap, e.g., better sleep could enhance productivity at work.","Acknowledge Motivations: Reflect back his drivers for pursuing these goals, such as his desire for impact or balance."] }, { "segment_title": "3. Identifying Interconnections and Priorities (5 Minutes)", "coach_dialogue": ["...","..."], "guidance": ["Focus on Synergies: Show how one priority could impact other areas positively.","Example: A consistent morning routine could improve both health and work productivity.","Prioritize Actionable Areas: Help Yew Wai narrow his focus to one or two priorities.","Use Probing Questions: For example, “How could focusing on better sleep contribute to your overall energy and productivity?”"] }, { "segment_title": "4. Wrap-Up and Next Steps (5 Minutes)", "coach_dialogue": ["...","..."], "guidance": ["Action Planning: Collaborate with Yew Wai to define specific actions, e.g.:","Scheduling a 30-minute morning routine.","Blocking focused hours for ourcoach work.","Planning a date night with Karina.","Encouragement: Reinforce the value of small, consistent steps. For example, “It’s incredible how even small habits can create big changes over time.”","Plan Follow-Up: Suggest reconnecting in a month to reflect on progress.","Close Positively: End with a motivational statement, e.g., “You’re on a path to amazing things, and it’s inspiring to see your dedication.”"] } ] } } """ # Combine user information and chat history for context user_context = f""" Based on the following user profile and chat history, generate the required reports. ### USER PROFILE ### {user_info} ### CHAT HISTORY ### {chat_history} """ # Step 3: Call the OpenAI API using the specified function client = OpenAI(api_key=os.getenv('OPENAI_API_KEY')) try: response = client.chat.completions.create( model="gpt-4o-mini", messages=[ { "role": "system", "content": [ { "type": "text", "text": system_prompt } ] }, { "role": "user", "content": [ { "type": "text", "text": user_context } ] } ], response_format={ "type": "json_schema", "json_schema": { "name": "growth_guide_session", "strict": True, "schema": { "type": "object", "properties": { "pre_growth_guide_session_report": { "type": "object", "description": "A comprehensive summary of the user's profile and life context for the Growth Guide.", "properties": { "user_overview": { "type": "object", "properties": { "name": { "type": "string", "description": "The user's full name." }, "age_group": { "type": "string", "description": "The user's age range (e.g., '30-39')." }, "primary_goals": { "type": "string", "description": "The main goals the user is focusing on." }, "preferred_coaching_style": { "type": "string", "description": "The coaching style the user prefers." } }, "required": ["name", "age_group", "primary_goals", "preferred_coaching_style"], "additionalProperties": False }, "personality_insights": { "type": "object", "properties": { "mbti": { "type": "string", "description": "The user's Myers-Briggs Type Indicator personality type." }, "top_love_languages": { "type": "array", "items": { "type": "string" }, "description": "A list of the user's top two love languages." }, "belief_in_astrology": { "type": "string", "description": "Whether the user believes in horoscope/astrology." } }, "required": ["mbti", "top_love_languages", "belief_in_astrology"], "additionalProperties": False }, "progress_snapshot": { "type": "object", "properties": { "mental_well_being": { "type": "string", "description": "Summary of the user's mental well-being." }, "physical_health_and_wellness": { "type": "string", "description": "Summary of the user's physical health and wellness." }, "relationships": { "type": "string", "description": "Summary of the user's relationships." }, "career_growth": { "type": "string", "description": "Summary of the user's career growth." }, "personal_growth": { "type": "string", "description": "Summary of the user's personal growth." } }, "required": [ "mental_well_being", "physical_health_and_wellness", "relationships", "career_growth", "personal_growth" ], "additionalProperties": False } }, "required": ["user_overview", "personality_insights", "progress_snapshot"], "additionalProperties": False }, "users_growth_guide_preparation_brief": { "type": "array", "description": "A brief guiding the user on what to discuss with the Growth Guide, providing actionable advice and highlighting key areas to focus on.", "items": { "type": "object", "properties": { "key": { "type": "string", "description": "The section heading." }, "value": { "type": "string", "description": "Content for the section." } }, "required": [ "key", "value" ], "additionalProperties": False } }, "30_minute_coaching_session_script": { "type": "object", "description": "A detailed, partitioned script to help the coach prepare for the session, following the specified session order and focusing on the user's top three most important areas.", "properties": { "session_overview": { "type": "array", "items": { "type": "string" }, "description": "Breakdown of the session segments with time frames." }, "detailed_segments": { "type": "array", "items": { "type": "object", "properties": { "segment_title": { "type": "string", "description": "Title of the session segment." }, "coach_dialogue": { "type": "array", "items": { "type": "string" }, "description": "Suggested coach dialogue during the session" }, "guidance": { "type": "array", "items": { "type": "string" }, "description": "Suggestions for the coach on how to navigate responses." } }, "required": ["segment_title", "coach_dialogue", "guidance"], "additionalProperties": False }, "description": "Detailed information for each session segment." } }, "required": [ "session_overview", "detailed_segments" ], "additionalProperties": False } }, "required": [ "pre_growth_guide_session_report", "users_growth_guide_preparation_brief", "30_minute_coaching_session_script" ], "additionalProperties": False } } } , temperature=0.5, max_tokens=3000, top_p=1, frequency_penalty=0, presence_penalty=0 ) # Get response and convert into dictionary reports = json.loads(response.choices[0].message.content) # html_output = generate_html(reports, coach_name) # reports['html_report'] = html_output except Exception as e: logger.error(f"OpenAI API call failed: {e}", extra={'user_id': user_id, 'endpoint': function_name}) raise e # Step 4: Return the JSON reports logger.info(f"User summary generated successfully for user {user_id}", extra={'user_id': user_id, 'endpoint': function_name}) return reports def create_pre_gg_report(booking_id): function_name = create_pre_gg_report.__name__ # Get user_id from booking_id try: logger.info(f"Retrieving booking details for {booking_id}", extra={'booking_id': booking_id, 'endpoint': function_name}) db_params = { 'dbname': 'ourcoach', 'user': 'ourcoach', 'password': 'hvcTL3kN3pOG5KteT17T', 'host': 'staging-ourcoach.cx8se8o0iaiy.ap-southeast-1.rds.amazonaws.com', 'port': '5432' } try: with psycopg2.connect(**db_params) as conn: with conn.cursor() as cursor: query = sql.SQL(""" select user_id from {table} where id = %s """ ).format(table=sql.Identifier('public', 'booking')) cursor.execute(query, (booking_id,)) row = cursor.fetchone() if (row): colnames = [desc[0] for desc in cursor.description] booking_data = dict(zip(colnames, row)) ### MODIFY THE FORMAT OF USER DATA user_id = booking_data['user_id'] logger.info(f"User info retrieved successfully for {user_id}", extra={'user_id': user_id, 'endpoint': function_name}) else: logger.warning(f"No user info found for {user_id}", extra={'user_id': user_id, 'endpoint': function_name}) except psycopg2.Error as e: logger.error(f"Database error while retrieving user info for {user_id}: {e}", extra={'user_id': user_id, 'endpoint': function_name}) raise # Run get_user_summary user_report = get_user_summary(user_id) # Run generate_html generate_html(user_report, booking_id=booking_id) return True except Exception as e: logger.error(f"An error occured: {e}", extra={'booking_id': booking_id, 'endpoint': function_name}) raise def get_user_life_status(user_id): function_name = get_user_life_status.__name__ logger.info(f"Generating user life status for user {user_id}", extra={'user_id': user_id, 'endpoint': function_name}) # Step 1: Call get_user to get user's info try: user = get_user(user_id) user_info = user.user_info user_messages = user.get_messages() except LookupError as e: logger.error(f"Error fetching user data: {e}", extra={'user_id': user_id, 'endpoint': function_name}) raise e # Step 2: Construct the Prompt chat_history = "\n".join( [f"{message['role'].capitalize()}: {message['content']}" for message in user_messages] ) logger.info(f"Fetched user data for: {user_id}", extra={'user_id': user_id, 'endpoint': function_name}) # Build the system prompt according to the provided instructions system_prompt = """ You are an AI assistant that generates a personalized life status report for users based on their profile and chat history. Your task is to analyze the provided user data and produce a JSON output following the specified schema. **Instructions:** 1. **Mantra of the Week:** - Create a very short encouragement quote that encapsulates the user's journey toward achieving their goals. - The mantra **MUST** be a single sentence with fewer than 5 words. **Output Format:** Produce your response in JSON format adhering to the following schema: ```json { "mantra_of_the_week": str } ``` **Guidelines:** - The `mantra_of_the_week` should be personalized, positive, and encouraging. It **MUST** be a single sentence with fewer than 5 words. """ # Combine user information and chat history for context user_context = f""" Based on the following user profile and chat history, generate the life status! ### USER PROFILE ### {user_info} ### CHAT HISTORY ### {chat_history} """ # Step 3: Call the OpenAI API using the specified function client = OpenAI(api_key=os.getenv('OPENAI_API_KEY')) try: response = client.chat.completions.create( model="gpt-4o-mini", messages=[ { "role": "system", "content": [ { "type": "text", "text": system_prompt } ] }, { "role": "user", "content": [ { "type": "text", "text": user_context } ] } ], response_format={ "type": "json_schema", "json_schema": { "name": "life_status_report", "strict": True, "schema": { "type": "object", "properties": { "mantra_of_the_week": { "type": "string", "description": "A very short encouragement quote that encapsulates the user's journey to achieve their goals." } }, "required": [ "mantra_of_the_week" ], "additionalProperties": False } } } , temperature=0.5, max_tokens=3000, top_p=1, frequency_penalty=0, presence_penalty=0 ) # Get response and convert into dictionary mantra = json.loads(response.choices[0].message.content)["mantra_of_the_week"] except Exception as e: logger.error(f"OpenAI API call failed: {e}", extra={'user_id': user_id, 'endpoint': function_name}) raise e # Get current life score life_score = { "overall": user.personal_growth_score + user.career_growth_score + user.relationship_score + user.mental_well_being_score + user.health_and_wellness_score, "personal_growth": user.personal_growth_score, "health_and_wellness": user.health_and_wellness_score, "mental_well_being": user.mental_well_being_score, "career_growth": user.career_growth_score, "relationship": user.relationship_score } # Get current goal current_goal = user.goal if not user.goal else user.goal[-1].content # Get life score achievements in list recent_wins = user.recent_wins # Combine everything reports = { "life_score": life_score, "mantra_of_the_week": mantra, "goal": current_goal, "recent_wins": recent_wins } # Step 4: Return the JSON reports logger.info(f"User life status generated successfully for user {user_id}", extra={'user_id': user_id, 'endpoint': function_name}) return reports def get_api_key(api_key_header: str = Security(api_key_header)) -> str: if api_key_header == os.getenv("FASTAPI_KEY"): return api_key_header raise HTTPException( status_code=403, detail="Could not validate credentials" ) def get_user_info(user_id): function_name = get_user_info.__name__ logger.info(f"Retrieving user info for {user_id}", extra={'user_id': user_id, 'endpoint': function_name}) db_params = { 'dbname': 'ourcoach', 'user': 'ourcoach', 'password': 'hvcTL3kN3pOG5KteT17T', 'host': 'staging-ourcoach.cx8se8o0iaiy.ap-southeast-1.rds.amazonaws.com', 'port': '5432' } try: with psycopg2.connect(**db_params) as conn: with conn.cursor() as cursor: query = sql.SQL("SELECT * FROM {table} WHERE id = %s").format(table=sql.Identifier('public', 'users')) cursor.execute(query, (user_id,)) row = cursor.fetchone() if (row): colnames = [desc[0] for desc in cursor.description] user_data = dict(zip(colnames, row)) ### MODIFY THE FORMAT OF USER DATA user_data_clean = json.loads(user_data['onboarding']) # doLiving = "\n".join([f"- {item['question']} : {item['answer']}" for item in user_data_clean.get('doLiving', [])]) doLiving = user_data_clean.get('mySituation', '') whoImportant = "\n".join([f"- {item['question']} : {item['answer']}" for item in user_data_clean.get('whoImportant', [])]) challenges = "\n".join([f"- {item['question']} : {item['answer']}" for item in user_data_clean.get('challenges', [])]) user_data_formatted = f""" ### USER PROFILE ### Name: {user_data_clean.get('firstName', '')} {user_data_clean.get('firstName', '')}'s challenges (You **must** use this information for the PLANNING STATE): {challenges} {user_data_clean.get('firstName', '')}'s Legendary Persona: {user_data_clean.get('legendPersona', '')} Pronouns: {user_data_clean.get('pronouns', '')} Birthday: {user_data_clean.get('birthDate', '')} {user_data_clean.get('firstName', '')}'s MBTI: {user_data_clean.get('mbti', '')} {user_data_clean.get('firstName', '')}'s Love Language: {user_data_clean.get('loveLanguage', '')} Has {user_data_clean.get('firstName', '')} tried coaching before: {user_data_clean.get('triedCoaching', '')} Belief in Astrology: {user_data_clean.get('astrology', '')} The most important area in {user_data_clean.get('firstName', '')}'s life: {user_data_clean.get('mattersMost', ['', '', '', '', ''])[0]} The second most important area in {user_data_clean.get('firstName', '')}'s life: {user_data_clean.get('mattersMost', ['', '', '', '', ''])[1]} The third most important area in {user_data_clean.get('firstName', '')}'s life: {user_data_clean.get('mattersMost', ['', '', '', '', ''])[2]} The fourth most important area in {user_data_clean.get('firstName', '')}'s life: {user_data_clean.get('mattersMost', ['', '', '', '', ''])[3]} The fifth most important area in {user_data_clean.get('firstName', '')}'s life: {user_data_clean.get('mattersMost', ['', '', '', '', ''])[4]} (Matters the least) What does {user_data_clean.get('firstName', '')} do for a living: {doLiving} {user_data_clean.get('firstName', '')}'s current situation: {user_data_clean.get('mySituation', '')} {user_data_clean.get('firstName', '')}'s most important person: {whoImportant} """ logger.info(f"User info retrieved successfully for {user_id}", extra={'user_id': user_id, 'endpoint': function_name}) return user_data_formatted, user_data_clean.get('mattersMost', ['', '', '', '', '']) else: logger.warning(f"No user info found for {user_id}", extra={'user_id': user_id, 'endpoint': function_name}) return None except psycopg2.Error as e: logger.error(f"Database error while retrieving user info for {user_id}: {e}", extra={'user_id': user_id, 'endpoint': function_name}) return None def get_growth_guide_summary(user_id, session_id): function_name = get_growth_guide_summary.__name__ logger.info(f"Retrieving growth guide summary for user {user_id} and session {session_id}", extra={'user_id': user_id, 'endpoint': function_name}) db_params = { 'dbname': 'ourcoach', 'user': 'ourcoach', 'password': 'hvcTL3kN3pOG5KteT17T', 'host': 'staging-ourcoach.cx8se8o0iaiy.ap-southeast-1.rds.amazonaws.com', 'port': '5432' } try: with psycopg2.connect(**db_params) as conn: with conn.cursor() as cursor: query = sql.SQL("SELECT * FROM {table} WHERE user_id = %s AND booking_id = %s").format(table=sql.Identifier('public', 'user_notes')) cursor.execute(query, (user_id, session_id)) row = cursor.fetchone() if (row): colnames = [desc[0] for desc in cursor.description] summary_data = dict(zip(colnames, row)) logger.info(f"Growth guide summary retrieved successfully for user {user_id} and session {session_id}: {summary_data}", extra={'user_id': user_id, 'endpoint': function_name}) return summary_data else: logger.warning(f"No growth guide summary found for user {user_id} and session {session_id}", extra={'user_id': user_id, 'endpoint': function_name}) return None except psycopg2.Error as e: logger.error(f"Database error while retrieving growth guide summary for user {user_id} and session {session_id}: {e}", extra={'user_id': user_id, 'endpoint': function_name}) return None def get_all_bookings(): function_name = get_all_bookings.__name__ logger.info(f"Retrieving all bookings", extra={'endpoint': function_name}) db_params = { 'dbname': 'ourcoach', 'user': 'ourcoach', 'password': 'hvcTL3kN3pOG5KteT17T', 'host': 'staging-ourcoach.cx8se8o0iaiy.ap-southeast-1.rds.amazonaws.com', 'port': '5432' } try: with psycopg2.connect(**db_params) as conn: with conn.cursor() as cursor: query = sql.SQL("SELECT id, user_id FROM {table}").format(table=sql.Identifier('public', 'booking')) cursor.execute(query) rows = cursor.fetchall() bookings = [{'booking_id': row[0], 'user_id': row[1]} for row in rows] logger.info(f"Retrieved {len(bookings)} bookings", extra={'endpoint': function_name}) return bookings except psycopg2.Error as e: logger.error(f"Database error while retrieving bookings: {e}", extra={'endpoint': function_name}) return [] def update_growth_guide_summary(user_id, session_id, ourcoach_summary): function_name = update_growth_guide_summary.__name__ logger.info(f"Updating growth guide summary for user {user_id} and session {session_id}", extra={'user_id': user_id, 'endpoint': function_name}) db_params = { 'dbname': 'ourcoach', 'user': 'ourcoach', 'password': 'hvcTL3kN3pOG5KteT17T', 'host': 'staging-ourcoach.cx8se8o0iaiy.ap-southeast-1.rds.amazonaws.com', 'port': '5432' } try: with psycopg2.connect(**db_params) as conn: with conn.cursor() as cursor: query = sql.SQL(""" UPDATE {table} SET ourcoach_summary = %s WHERE user_id = %s AND booking_id = %s """).format(table=sql.Identifier('public', 'user_notes')) cursor.execute(query, (json.dumps(ourcoach_summary), user_id, session_id)) conn.commit() logger.info(f"Growth guide summary updated successfully for user {user_id} and session {session_id}", extra={'user_id': user_id, 'endpoint': function_name}) except psycopg2.Error as e: logger.error(f"Database error while updating growth guide summary: {e}", extra={'user_id': user_id, 'endpoint': function_name}) raise e def add_growth_guide_session(user_id, session_id, coach_id, session_started_at, zoom_ai_summary, gg_report, ourcoach_summary): function_name = add_growth_guide_session.__name__ logger.info(f"Adding growth guide session for user {user_id} and session {session_id}", extra={'user_id': user_id, 'endpoint': function_name}) db_params = { 'dbname': 'ourcoach', 'user': 'ourcoach', 'password': 'hvcTL3kN3pOG5KteT17T', 'host': 'staging-ourcoach.cx8se8o0iaiy.ap-southeast-1.rds.amazonaws.com', 'port': '5432' } try: with psycopg2.connect(**db_params) as conn: with conn.cursor() as cursor: query = sql.SQL(""" INSERT INTO {table} (booking_id, coach_id, session_started_at, user_id, updated_at, gg_report, ourcoach_summary, created_at, zoom_ai_summary) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) """).format(table=sql.Identifier('public', 'user_notes')) current_time = datetime.datetime.now() cursor.execute(query, ( session_id, coach_id, session_started_at, user_id, current_time, json.dumps(gg_report), json.dumps(ourcoach_summary), current_time, json.dumps(zoom_ai_summary) )) conn.commit() logger.info(f"Growth guide session added successfully for user {user_id} and session {session_id}", extra={'user_id': user_id, 'endpoint': function_name}) except psycopg2.Error as e: logger.error(f"Database error while adding growth guide session: {e}", extra={'user_id': user_id, 'endpoint': function_name}) raise e def get_growth_guide_session(user_id, session_id): # returns the zoom_ai_summary and the gg_report columns from the POST_GG table function_name = get_growth_guide_session.__name__ logger.info(f"Retrieving growth guide session for user {user_id} and session {session_id}", extra={'user_id': user_id, 'endpoint': function_name}) db_params = { 'dbname': 'ourcoach', 'user': 'ourcoach', 'password': 'hvcTL3kN3pOG5KteT17T', 'host': 'staging-ourcoach.cx8se8o0iaiy.ap-southeast-1.rds.amazonaws.com', 'port': '5432' } try: with psycopg2.connect(**db_params) as conn: with conn.cursor() as cursor: query = sql.SQL("SELECT * FROM {table} WHERE user_id = %s AND booking_id = %s").format(table=sql.Identifier('public', 'user_notes')) cursor.execute(query, (user_id, session_id)) row = cursor.fetchone() if (row): colnames = [desc[0] for desc in cursor.description] session_data = dict(zip(colnames, row)) logger.info(f"Growth guide session retrieved successfully for user {user_id} and session {session_id}: {session_data}", extra={'user_id': user_id, 'endpoint': function_name}) return session_data else: logger.warning(f"No growth guide session found for user {user_id} and session {session_id}", extra={'user_id': user_id, 'endpoint': function_name}) return None except psycopg2.Error as e: logger.error(f"Database error while retrieving growth guide session for user {user_id} and session {session_id}: {e}", extra={'user_id': user_id, 'endpoint': function_name}) return None def download_file_from_s3(filename, bucket): user_id = filename.split('.')[0] function_name = download_file_from_s3.__name__ logger.info(f"Downloading file {filename} from S3 bucket {bucket}", extra={'user_id': user_id, 'endpoint': function_name}) file_path = os.path.join('users', 'data', filename) try: if (AWS_ACCESS_KEY and AWS_SECRET_KEY): session = boto3.session.Session(aws_access_key_id=AWS_ACCESS_KEY, aws_secret_access_key=AWS_SECRET_KEY, region_name=REGION) else: session = boto3.session.Session() s3_client = session.client('s3') with open(file_path, 'wb') as f: ## Upload to Production Folder s3_client.download_fileobj(bucket, f"staging/users/{filename}", f) logger.info(f"File {filename} downloaded successfully from S3", extra={'user_id': user_id, 'endpoint': function_name}) return True except Exception as e: logger.error(f"Error downloading file {filename} from S3: {e}", extra={'user_id': user_id, 'endpoint': function_name}) if (os.path.exists(file_path)): os.remove(file_path) return False def add_to_cache(user): user_id = user.user_id function_name = add_to_cache.__name__ logger.info(f"Adding user {user_id} to the cache", extra={'user_id': user_id, 'endpoint': function_name}) user_cache[user_id] = user logger.info(f"User {user_id} added to the cache", extra={'user_id': user_id, 'endpoint': function_name}) return True def pop_cache(user_id): if user_id == 'all': user_cache.reset_cache() return True if user_id not in user_cache: logger.warning(f"[POPPING] User {user_id} not found in the cache", extra={'user_id': user_id, 'endpoint': 'pop_cache'}) # check if file exists if os.path.exists(os.path.join("users", "to_upload", f"{user_id}.pkl")): # upload file logger.info(f"Attempting upload file {user_id}.json to S3", extra={'user_id': user_id, 'endpoint': 'pop_cache'}) upload_file_to_s3(f"{user_id}.pkl") try: user_cache.pop(user_id, None) logger.info(f"User {user_id} has been removed from the cache", extra={'user_id': user_id, 'endpoint': 'pop_cache'}) return True except: return False def update_user(user): user_id = user.user_id function_name = update_user.__name__ logger.info(f"Updating user {user_id}", extra={'user_id': user_id, 'endpoint': function_name}) # remove from cache, which will also upload the file pop_cache(user_id) logger.info(f"User {user_id} has been removed from the cache", extra={'user_id': user_id, 'endpoint': function_name}) logger.info(f"User {user.user_id} updated successfully in S3", extra={'user_id': user_id, 'endpoint': function_name}) return True def upload_mementos_to_db(user_id): function_name = upload_mementos_to_db.__name__ logger.info(f"Uploading mementos to DB for user {user_id}", extra={'user_id': user_id, 'endpoint': function_name}) db_params = { 'dbname': 'ourcoach', 'user': 'ourcoach', 'password': 'hvcTL3kN3pOG5KteT17T', 'host': 'staging-ourcoach.cx8se8o0iaiy.ap-southeast-1.rds.amazonaws.com', 'port': '5432' } folder_path = os.path.join("mementos", "to_upload", user_id) if (not os.path.exists(folder_path)): logger.warning(f"No mementos folder found for user {user_id}", extra={'user_id': user_id, 'endpoint': function_name}) return True # Return True as this is not an error condition try: memento_files = [f for f in os.listdir(folder_path) if f.endswith('.json')] if (not memento_files): logger.info(f"No memento files found for user {user_id}", extra={'user_id': user_id, 'endpoint': function_name}) return True with psycopg2.connect(**db_params) as conn: with conn.cursor() as cursor: base_query = """ INSERT INTO public.user_memento (user_id, type, title, description, tags, priority, mood, status, location, recurrence, context, created_at, follow_up_on) VALUES (%s, %s, %s, %s, %s::jsonb, %s, %s, %s, %s, %s, %s, %s, %s) """ for filename in memento_files: file_path = os.path.join(folder_path, filename) try: with open(file_path, 'r', encoding='utf-8') as json_file: data = json.load(json_file) # Convert tags array to proper JSON string tags_json = json.dumps(data.get('tags', [])) # Prepare data with proper defaults and transformations memento_data = [ user_id, # Replace the user_id from JSON with the actual user_id data.get('type', ''), data.get('title', ''), data.get('description', ''), tags_json, # Send tags as JSON string data.get('priority', ''), data.get('mood', ''), data.get('status', ''), data.get('location', ''), data.get('recurrence', ''), data.get('context', ''), datetime.datetime.now(), pd.to_datetime(data.get('follow_up_on', '')) ] cursor.execute(base_query, memento_data) conn.commit() # Remove file after successful insert os.remove(file_path) logger.info(f"Successfully processed memento {filename}", extra={'user_id': user_id, 'endpoint': function_name}) except json.JSONDecodeError as e: logger.error(f"Invalid JSON in file {filename}: {str(e)}", extra={'user_id': user_id, 'endpoint': function_name}) continue except Exception as e: logger.error(f"Error processing memento {filename}: {str(e)}", extra={'user_id': user_id, 'endpoint': function_name}) continue # Try to remove the directory after processing all files try: os.rmdir(folder_path) except OSError: pass # Ignore if directory is not empty or already removed return True except psycopg2.Error as e: logger.error(f"Database error while uploading mementos: {str(e)}", extra={'user_id': user_id, 'endpoint': function_name}) raise ConnectionError(f"Database error: {str(e)}") except Exception as e: logger.error(f"Unexpected error uploading mementos: {str(e)}", extra={'user_id': user_id, 'endpoint': function_name}) return False def get_users_mementos(user_id, date): function_name = get_users_mementos.__name__ db_params = { 'dbname': 'ourcoach', 'user': 'ourcoach', 'password': 'hvcTL3kN3pOG5KteT17T', 'host': 'staging-ourcoach.cx8se8o0iaiy.ap-southeast-1.rds.amazonaws.com', 'port': '5432' } logger.info(f"Retrieving mementos for user {user_id} on date {date}", extra={'endpoint': function_name, 'user_id': user_id}) # Convert date string to PostgreSQL compatible format parsed_date = date logger.info(f"Retrieving mementos for user {user_id} on date {parsed_date}", extra={'endpoint': function_name, 'user_id': user_id}) try: with psycopg2.connect(**db_params) as conn: with conn.cursor() as cursor: query = sql.SQL(""" SELECT * FROM public.user_memento WHERE user_id = %s AND DATE(follow_up_on) = %s """) cursor.execute(query, (user_id, parsed_date)) rows = cursor.fetchall() if rows: colnames = [desc[0] for desc in cursor.description] mementos = [dict(zip(colnames, row)) for row in rows] logger.info(f"Retrieved {len(mementos)} mementos for user {user_id} on date {date}", extra={'endpoint': function_name, 'user_id': user_id}) return mementos else: logger.info(f"No mementos found for user {user_id} on date {date}", extra={'endpoint': function_name, 'user_id': user_id}) return [] except psycopg2.Error as e: logger.error(f"Database error while retrieving mementos: {e}", extra={'endpoint': function_name, 'user_id': user_id}) return [] def generate_uuid(): return str(uuid.uuid4()) def print_log(level, message, **kwargs): """ Print log in JSON format for better readability in CloudWatch. Parameters: level (str): The log level (e.g., "INFO", "ERROR", "DEBUG"). message (str): The log message. **kwargs: Additional key-value pairs to include in the log. example: print_log("INFO", "User logged in", user_id=123, action="login") """ log_entry = { "timestamp": datetime.datetime.utcnow().isoformat() + "Z", "level": level, "message": message, } log_entry.update(kwargs) print(json.dumps(log_entry, ensure_ascii=False))