import logging
import boto3
from botocore.exceptions import ClientError, NoCredentialsError, PartialCredentialsError
import os
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException, Security, Query, status
from fastapi.security import APIKeyHeader
from openai import OpenAI
import pandas as pd
from pydantic import BaseModel
import os
import logging
import json
import psycopg2
from psycopg2 import sql
import os
from dotenv import load_dotenv
import datetime
import threading
import pickle # Replace dill with pickle
from cachetools import TTLCache
import threading
import time
import uuid
from app.cache import CustomTTLCache, upload_file_to_s3
import pdfkit
import PyPDF2
import secrets
import string
load_dotenv()
# Environment Variables for API Keys
api_keys = [os.getenv('FASTAPI_KEY')]
api_key_header = APIKeyHeader(name="X-API-Key")
load_dotenv()
AWS_ACCESS_KEY = os.getenv('AWS_ACCESS_KEY')
AWS_SECRET_KEY = os.getenv('AWS_SECRET_KEY')
REGION = os.getenv('AWS_REGION')
logger = logging.getLogger(__name__)
# Replace the simple TTLCache with our custom implementation
user_cache = CustomTTLCache(ttl=120, cleanup_interval=30) # 2 minutes TTL
def force_file_move(source, destination):
function_name = force_file_move.__name__
logger.info(f"Attempting to move file from {source} to {destination}", extra={'endpoint': function_name})
try:
# Ensure the destination directory exists
os.makedirs(os.path.dirname(destination), exist_ok=True)
# Move the file, replacing if it already exists
os.replace(source, destination)
logger.info(f"File moved successfully: {source} -> {destination}", extra={'endpoint': function_name})
except FileNotFoundError:
logger.error(f"Source file not found: {source}", extra={'endpoint': function_name})
except Exception as e:
logger.error(f"An error occurred while moving file: {e}", extra={'endpoint': function_name})
def get_user(user_id):
function_name = get_user.__name__
logger.info(f"Fetching user {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
logger.info(f"[CACHE]: {user_cache}", extra={'user_id': user_id, 'endpoint': function_name})
if user_id in user_cache:
logger.info(f"User {user_id} found in cache", extra={'user_id': user_id, 'endpoint': function_name})
return user_cache[user_id]
else:
client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))
user_file = os.path.join('users', 'data', f'{user_id}.pkl')
# if os.path.exists(user_file):
# with open(user_file, 'rb') as f:
# user = pickle.load(f)
# user.client = client
# user.conversations.client = client
# with cache_lock:
# user_cache[user_id] = user
# return user
logger.warning(f"User {user_id} not found locally. Attempting to download from S3", extra={'user_id': user_id, 'endpoint': function_name})
download = download_file_from_s3(f'{user_id}.pkl', 'core-ai-assets')
logger.info(f"Download success: {download}", extra={'user_id': user_id, 'endpoint': function_name})
if (download):
with open(user_file, 'rb') as f:
user = pickle.load(f)
user.client = client
user.conversations.client = client
user_cache[user_id] = user # No need for lock here
os.remove(user_file)
logger.info(f"User {user_id} loaded successfully from S3", extra={'user_id': user_id, 'endpoint': function_name})
return user
else:
logger.error(f"User {user_id} pickle does not exist in S3", extra={'user_id': user_id, 'endpoint': function_name})
# check if user_info exists
user_info = get_user_info(user_id)
if (user_info):
# user has done onboarding but pickle file not created
raise ReferenceError(f"User {user_id} pickle still being created")
raise LookupError(f"User [{user_id}] has not onboarded yet")
def generate_html(json_data, coach_name='Growth Guide', booking_id = None):
function_name = generate_html.__name__
data = json_data["pre_growth_guide_session_report"]
user_overview = data["user_overview"]
personality_insights = data["personality_insights"]
progress_snapshot = data["progress_snapshot"]
preparation_brief = json_data.get("users_growth_guide_preparation_brief", [])
session_script = json_data["30_minute_coaching_session_script"]
# Extract user name
user_name = user_overview["name"]
# Build Progress Snapshot
progress_items = ""
for key, value in progress_snapshot.items():
# Convert key to title case with spaces
formatted_key = key.replace("_", " ").title()
progress_items += f'
{formatted_key}: {value}\n'
# Build Personality Insights
love_languages = "".join(f"{lang}" for lang in personality_insights["top_love_languages"])
# Build Preparation Brief
preparation_items = "".join(
f'{item["key"].replace("_", " ").title()}: {item["value"]}\n'
for item in preparation_brief)
# Build Session Overview
session_overview_list = session_script["session_overview"]
session_overview = "\n"
for item in session_overview_list:
session_overview += f"- {item}
\n"
session_overview += "
"
# Build Detailed Segments
detailed_segments = ""
for segment in session_script["detailed_segments"]:
segment_title = segment["segment_title"]
# Build Coach Dialogue list
coach_dialogue_list = segment.get("coach_dialogue", [])
coach_dialogue_html = "\n"
for dialogue in coach_dialogue_list:
coach_dialogue_html += f"- {dialogue}
\n"
coach_dialogue_html += "
"
# Build Guidance list
guidance_list = segment.get("guidance", [])
guidance_html = "\n"
for guidance_point in guidance_list:
guidance_html += f"- {guidance_point}
\n"
guidance_html += "
"
detailed_segments += f'''
{segment_title}
Coach Dialogue:{coach_dialogue_html}
Guidance:{guidance_html}
'''
# Build Final HTML
html_content = f'''
User Profile - {user_name}
User Profile - {user_name}
User Overview
- Name: {user_overview["name"]}
- Age Group: {user_overview["age_group"]}
- Primary Goals: {user_overview["primary_goals"]}
- Preferred Coaching Style: {user_overview["preferred_coaching_style"]}
Personality Insights
- MBTI: {personality_insights["mbti"]}
- Top Love Languages:
{love_languages}
- Belief in Astrology: {personality_insights["belief_in_astrology"]}
Progress Snapshot
30-Minute Coaching Session Script
Session Overview (30 Minutes)
{session_overview}
Detailed Segments
{detailed_segments}
'''
file_path = os.path.join("bookings", "data",f"{booking_id}.html")
path_to_upload = os.path.join("bookings", "to_upload",f"{booking_id}.pdf")
password = "Ourcoach2024!"
## SAVING HTML FILE
try:
# Open the file in write mode
with open(file_path, 'w', encoding='utf-8') as html_file:
html_file.write(html_content)
logger.info(f"File '{booking_id}.html' has been created successfully.", extra={'booking_id': booking_id, 'endpoint': function_name})
# Saving as PDF File
pdfkit.from_file(file_path, path_to_upload, options={'encoding': 'UTF-8'})
logger.info(f"File '{booking_id}.pdf' has been created successfully.", extra={'booking_id': booking_id, 'endpoint': function_name})
## ENCRYPTING PDF
logger.info(f"Encrypting '{booking_id}.pdf'...", extra={'booking_id': booking_id, 'endpoint': function_name})
with open(path_to_upload, 'rb') as file:
pdf_reader = PyPDF2.PdfReader(file)
pdf_writer = PyPDF2.PdfWriter()
# Add all pages to the writer
for page_num in range(len(pdf_reader.pages)):
pdf_writer.add_page(pdf_reader.pages[page_num])
# Encrypt the PDF with the given password
pdf_writer.encrypt(password)
with open(path_to_upload, 'wb') as encrypted_file:
pdf_writer.write(encrypted_file)
logger.info(f"Succesfully encrypted '{booking_id}.pdf'", extra={'booking_id': booking_id, 'endpoint': function_name})
except Exception as e:
logger.error(f"An error occurred: {e}", extra={'booking_id': booking_id, 'endpoint': function_name})
raise
filename = booking_id
logger.info(f"Uploading file {filename} to S3", extra={'booking_id': booking_id, 'endpoint': function_name})
bucket = 'core-ai-assets'
try:
if (AWS_ACCESS_KEY and AWS_SECRET_KEY):
session = boto3.session.Session(aws_access_key_id=AWS_ACCESS_KEY, aws_secret_access_key=AWS_SECRET_KEY, region_name=REGION)
else:
session = boto3.session.Session()
s3_client = session.client('s3')
with open(path_to_upload, "rb") as f:
## Upload to Production Folder
s3_client.upload_fileobj(f, bucket, f'staging/pre_gg_reports/{filename}.pdf')
logger.info(f"File {filename} uploaded successfully to S3", extra={'booking_id': booking_id, 'endpoint': function_name})
# Removing files
for file in os.listdir(os.path.join('bookings', 'data')):
os.remove(os.path.join('bookings', 'data', file))
for file in os.listdir(os.path.join('bookings', 'to_upload')):
os.remove(os.path.join('bookings', 'to_upload', file))
# force_file_move(os.path.join('users', 'to_upload', filename), os.path.join('users', 'data', filename))
except (FileNotFoundError, NoCredentialsError, PartialCredentialsError) as e:
logger.error(f"S3 upload failed for {filename}: {e}", extra={'booking_id': booking_id, 'endpoint': function_name})
raise
def get_user_summary(user_id):
function_name = get_user_summary.__name__
logger.info(f"Generating user summary for user {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
# Step 1: Call get_user to get user's info
try:
user = get_user(user_id)
user_info = user.user_info
user_messages = user.get_messages()
except LookupError as e:
logger.error(f"Error fetching user data: {e}", extra={'user_id': user_id, 'endpoint': function_name})
raise e
# Step 2: Construct the Prompt
chat_history = "\n".join(
[f"{message['role'].capitalize()}: {message['content']}" for message in user_messages]
)
# Build the system prompt according to the provided instructions
system_prompt = """
You are an AI language model designed to generate three outputs based on the user's profile and chat history:
1. **Pre-Growth Guide Session Report**: A comprehensive summary of the user's profile and life context for the Growth Guide (a human coach), covering five key areas: **mental well-being**, **physical health and wellness**, **relationships**, **career growth**, and **personal growth**.
2. **User's Growth Guide Preparation Brief**: A comprehensive brief guiding the user on what to discuss with the Growth Guide, providing actionable advice and highlighting key areas to focus on during their session, covering the same five key areas.
3. **30-Minute Coaching Session Script**: A detailed, partitioned script to help the coach prepare for the session, including dialogue, questions, and guidance tailored to the client's needs, covering the five key areas. The script should be partitioned into several sections in the JSON output, similar to the structure provided for the Pre-Growth Guide Session Report.
---
**Instructions:**
- **Comprehensive Coverage**:
Ensure that all three outputs cover the following five key areas:
1. **Mental Well-being**
2. **Physical Health and Wellness**
3. **Relationships**
4. **Career Growth**
5. **Personal Growth**
If the chat history provided by the user does not touch on one or more of these areas, the report should state: "The user hasn't discussed this area yet. Maybe you can cover this during the Growth Guide session."
- **Output Format**:
Output the result in JSON format following the specified JSON schema. The outputs for the **Pre-Growth Guide Session Report** and the **30-Minute Coaching Session Script** should be partitioned into several JSON keys, similar to the structure provided for the Pre-Growth Guide Session Report.
---
### **1. Pre-Growth Guide Session Report**
**Objective**: Provide a comprehensive summary of the user's profile and life context for the Growth Guide, covering the five key areas.
**Format**:
- **user_overview**:
- **name**: The user's full name.
- **age_group**: The user's age range (e.g., "30-39").
- **primary_goals**: The main goals the user is focusing on.
- **preferred_coaching_style**: The coaching style the user prefers.
- **personality_insights**:
- **mbti**: The user's Myers-Briggs Type Indicator personality type.
- **top_love_languages**: A list of the user's top two love languages.
- **belief_in_astrology**: Whether the user believes in horoscope/astrology.
- **progress_snapshot**:
- **mental_well_being**: Summary of the user's mental well-being.
- **physical_health_and_wellness**: Summary of the user's physical health and wellness.
- **relationships**: Summary of the user's relationships.
- **career_growth**: Summary of the user's career growth.
- **personal_growth**: Summary of the user's personal growth.
If any of the key areas are not discussed, include a note: "The user hasn't discussed this area yet. Maybe you can cover this during the Growth Guide session."
---
### **2. User's Growth Guide Preparation Brief**
**Objective**: Guide the user on what to discuss with the Growth Guide, providing actionable advice and highlighting key areas to focus on during their session, covering the five key areas.
**Format**:
Structure the brief with the following sections, and output it as a JSON object with these keys:
- **reflect**: Encourage the user to consider what each focus area means to them and which aspects they want to improve.
- **recall_successes**: Prompt the user to think of times when they effectively managed or improved in the five key areas, and what strategies worked for them then.
- **identify_challenges**: Encourage the user to be ready to discuss any obstacles they're facing in each area, and to consider possible solutions or resources that might help.
- **set_goals**: Ask the user to decide what they hope to achieve from the session and how improvements in each area will impact their life.
- **additional_tips**: Provide practical advice for the session, such as choosing a quiet space, having materials ready, and being open to sharing thoughts honestly.
---
### **3. 30-Minute Coaching Session Script**
**Objective**: Help the coach prepare for the session by providing a detailed, partitioned script tailored to the client's specific needs and goals, following a specific session order and focusing on the user's top three most important areas.
**IMPORTANT**: BE VERY COMPREHENSIVE IN THE "GUIDANCE" SECTION OF DETAILED SEGMENT!!!
**IMPORTANT**: NO NEED TO MENTION THE NAME OF THE COACH!!!
**Instructions**:
- **Session Overview (30 mins)**:
The session should follow this specific order:
1. **Warm Welcome and Rapport Building** (10 mins)
2. **Exploring X Goals** (10 mins)
3. **Developing X Strategies** (5 mins)
4. **Wrap-Up and Commitment** (5 mins)
The "X" in "Exploring X Goals" and "Developing X Strategies" should be replaced with the user's top three most important areas from the five key areas. Focus on one area per session. If possible, prioritize the areas based on the user's expressed concerns or goals.
- **Detailed Segments**:
For each segment, include:
- **Numbered Title**: Number and title of the session segment (e.g., `1. Warm Welcome and Trust Building (10 Minutes)`).
- **Coach Dialogue**: Provide the coach's dialogue for the segment, including initial statements, follow-up questions, and closing remarks. Present the dialogues as direct quotes, ensuring they align with the client's context and goals.
In the coach dialogue, especially during the warm welcome session, you may ask opening question and mention disclaimers that include:
- Opening question:
To ask the user if there's anything he/she would like to talk about
- Mention confidentiality:
To tell the user that at ourcoach, we prioritize the privacy and confidentiality of our clients. All information shared during the coaching session will remain strictly confidential and used solely for your personal development.
- What to expect from this session:
To tell the user what can they expect from this session
- Remind them that Zoom has recording turned on, so that they can receive an AI assisted report later:
To tell the user to note that this session will be recorded on Zoom to provide you with a comprehensive AI-assisted report afterward. This report will include key takeaways and action steps to help you achieve your goals.
And, in the coach dialogue during the "Exploring X Goals" session, you may ask the user if they have any other goals they want to explore, else if they don't, we can focus on the chosen goal!
And, in the coach dialogue during the "Wrap-Up and Commitment" session, based on today’s session, ask the user: Would you say that X is your biggest priority right now? Or are there any specific goals or areas you’d like to focus on in the coming weeks?
- **Guidance**: Offer specific and comprehensive suggestions for the coach on how to navigate the session, including actionable points and strategies. Use bullet points to clearly present each guidance item.
Note: For the "Plan Follow-up" part, it has to be next **month**
- **Additional Instructions**:
- Ensure that the **Coach Dialogue** is personalized and reflects the client's experiences and aspirations.
- The **Guidance** should include actionable suggestions, emphasizing techniques like creating safety, setting expectations, building rapport, encouraging reflection, focusing on synergies, and action planning. Be very comprehensive in this part! And use tag to bold the headers of each guidance points/items!
**Style Guidelines**:
- Use empathetic and supportive language.
- Encourage open-ended dialogue.
- Focus on actionable and achievable steps.
- Personalize the script to align with the client's experiences and aspirations.
- Present information in a clear, organized manner, using numbering and bullet points where appropriate.
---
**Note**:
- If the user hasn't discussed one or more of the key areas, the outputs should note this and suggest that these areas can be covered during the Growth Guide session.
---
** JSON OUTPUT FORMAT EXAMPLE **:
**IMPORTANT**: BE VERY COMPREHENSIVE IN THE "GUIDANCE" SECTION OF DETAILED SEGMENT!!!
**IMPORTANT**: NO NEED TO MENTION THE NAME OF THE COACH!!!
{
"pre_growth_guide_session_report": {
"user_overview": {
"name": "Alex Johnson",
"age_group": "25-34",
"primary_goals": "Improve mental well-being, advance career, enhance relationships",
"preferred_coaching_style": "Supportive and goal-oriented"
},
"personality_insights": {
"mbti": "ENFP",
"top_love_languages": ["Quality Time", "Words of Affirmation"],
"belief_in_astrology": "No"
},
"progress_snapshot": {
"mental_well_being": "Alex has been experiencing increased stress due to workload and is seeking ways to manage anxiety and improve overall mental health.",
"physical_health_and_wellness": "Maintains a regular exercise routine but wants to incorporate healthier eating habits.",
"relationships": "Feels disconnected from friends and family due to busy schedule; wishes to rebuild social connections.",
"career_growth": "Aiming for a promotion but feels uncertain about the necessary skills and how to stand out.",
"personal_growth": "Interested in learning new skills like photography and improving time management."
}
},
"users_growth_guide_preparation_brief": [
{
"key": "reflect",
"value": "Consider what mental well-being means to you. Which aspects of your life—such as stress management, anxiety reduction, or emotional balance—do you want to improve?"
},
{
"key": "recall_successes",
"value": "Think of times when you effectively managed stress or maintained a positive mindset. What strategies or habits helped you during those times?"
},
{
"key": "identify_challenges",
"value": "Be ready to discuss current obstacles you're facing in managing stress and anxiety. Consider any patterns or triggers you've noticed."
},
{
"key": "set_goals",
"value": "Decide what you hope to achieve from this session. How would improving your mental well-being impact your daily life and long-term goals?"
},
{
"key": "additional_tips",
"value": "Environment: Choose a quiet, comfortable space.\nMaterials: Have a notebook and pen ready to jot down insights.\nOpenness: Be prepared to share your thoughts honestly and openly."
}
],
"30_minute_coaching_session_script": {
"session_overview": ["Warm Welcome and Trust Building (10 Minutes)","Exploring Holistic Life Goals and Aspirations (10 Minutes)","Identifying Interconnections and Priorities (5 Minutes)","Wrap-Up and Next Steps (5 Minutes)"],
"detailed_segments": [
{
"segment_title": "1. Warm Welcome and Trust Building (10 Minutes)",
"coach_dialogue": ["...","..."],
"guidance": ["Create Safety: Reassure Yew Wai by emphasizing confidentiality.","Set Expectations: Clearly outline the session’s structure to provide clarity and ease.\nBuild Rapport: Show genuine curiosity about his recent experiences and emotions.\nValidation: Acknowledge his efforts with empathy, e.g., “That’s a lot to manage, but it’s incredible how committed you are to each aspect of your life.”]
},
{
"segment_title": "2. Exploring Holistic Life Goals and Aspirations (10 Minutes)",
"coach_dialogue": ["...","..."],
"guidance": ["Encourage Reflection: Prompt Yew Wai to elaborate on his goals, covering areas like:","Career: Enhancing ourcoach user engagement and chat functionality.","Health: Preparing for the marathon and improving sleep.","Relationships: Nurturing his connection with Karina.","Personal Growth: Strengthening self-discipline.","Connect Goals: Highlight how goals may overlap, e.g., better sleep could enhance productivity at work.","Acknowledge Motivations: Reflect back his drivers for pursuing these goals, such as his desire for impact or balance."]
},
{
"segment_title": "3. Identifying Interconnections and Priorities (5 Minutes)",
"coach_dialogue": ["...","..."],
"guidance": ["Focus on Synergies: Show how one priority could impact other areas positively.","Example: A consistent morning routine could improve both health and work productivity.","Prioritize Actionable Areas: Help Yew Wai narrow his focus to one or two priorities.","Use Probing Questions: For example, “How could focusing on better sleep contribute to your overall energy and productivity?”"]
},
{
"segment_title": "4. Wrap-Up and Next Steps (5 Minutes)",
"coach_dialogue": ["...","..."],
"guidance": ["Action Planning: Collaborate with Yew Wai to define specific actions, e.g.:","Scheduling a 30-minute morning routine.","Blocking focused hours for ourcoach work.","Planning a date night with Karina.","Encouragement: Reinforce the value of small, consistent steps. For example, “It’s incredible how even small habits can create big changes over time.”","Plan Follow-Up: Suggest reconnecting in a month to reflect on progress.","Close Positively: End with a motivational statement, e.g., “You’re on a path to amazing things, and it’s inspiring to see your dedication.”"]
}
]
}
}
"""
# Combine user information and chat history for context
user_context = f"""
Based on the following user profile and chat history, generate the required reports.
### USER PROFILE ###
{user_info}
### CHAT HISTORY ###
{chat_history}
"""
# Step 3: Call the OpenAI API using the specified function
client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))
try:
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": [
{
"type": "text",
"text": system_prompt
}
]
},
{
"role": "user",
"content": [
{
"type": "text",
"text": user_context
}
]
}
],
response_format={
"type": "json_schema",
"json_schema": {
"name": "growth_guide_session",
"strict": True,
"schema": {
"type": "object",
"properties": {
"pre_growth_guide_session_report": {
"type": "object",
"description": "A comprehensive summary of the user's profile and life context for the Growth Guide.",
"properties": {
"user_overview": {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "The user's full name."
},
"age_group": {
"type": "string",
"description": "The user's age range (e.g., '30-39')."
},
"primary_goals": {
"type": "string",
"description": "The main goals the user is focusing on."
},
"preferred_coaching_style": {
"type": "string",
"description": "The coaching style the user prefers."
}
},
"required": ["name", "age_group", "primary_goals", "preferred_coaching_style"],
"additionalProperties": False
},
"personality_insights": {
"type": "object",
"properties": {
"mbti": {
"type": "string",
"description": "The user's Myers-Briggs Type Indicator personality type."
},
"top_love_languages": {
"type": "array",
"items": {
"type": "string"
},
"description": "A list of the user's top two love languages."
},
"belief_in_astrology": {
"type": "string",
"description": "Whether the user believes in horoscope/astrology."
}
},
"required": ["mbti", "top_love_languages", "belief_in_astrology"],
"additionalProperties": False
},
"progress_snapshot": {
"type": "object",
"properties": {
"mental_well_being": {
"type": "string",
"description": "Summary of the user's mental well-being."
},
"physical_health_and_wellness": {
"type": "string",
"description": "Summary of the user's physical health and wellness."
},
"relationships": {
"type": "string",
"description": "Summary of the user's relationships."
},
"career_growth": {
"type": "string",
"description": "Summary of the user's career growth."
},
"personal_growth": {
"type": "string",
"description": "Summary of the user's personal growth."
}
},
"required": [
"mental_well_being",
"physical_health_and_wellness",
"relationships",
"career_growth",
"personal_growth"
],
"additionalProperties": False
}
},
"required": ["user_overview", "personality_insights", "progress_snapshot"],
"additionalProperties": False
},
"users_growth_guide_preparation_brief": {
"type": "array",
"description": "A brief guiding the user on what to discuss with the Growth Guide, providing actionable advice and highlighting key areas to focus on.",
"items": {
"type": "object",
"properties": {
"key": {
"type": "string",
"description": "The section heading."
},
"value": {
"type": "string",
"description": "Content for the section."
}
},
"required": [
"key",
"value"
],
"additionalProperties": False
}
},
"30_minute_coaching_session_script": {
"type": "object",
"description": "A detailed, partitioned script to help the coach prepare for the session, following the specified session order and focusing on the user's top three most important areas.",
"properties": {
"session_overview": {
"type": "array",
"items": {
"type": "string"
},
"description": "Breakdown of the session segments with time frames."
},
"detailed_segments": {
"type": "array",
"items": {
"type": "object",
"properties": {
"segment_title": {
"type": "string",
"description": "Title of the session segment."
},
"coach_dialogue": {
"type": "array",
"items": {
"type": "string"
},
"description": "Suggested coach dialogue during the session"
},
"guidance": {
"type": "array",
"items": {
"type": "string"
},
"description": "Suggestions for the coach on how to navigate responses."
}
},
"required": ["segment_title", "coach_dialogue", "guidance"],
"additionalProperties": False
},
"description": "Detailed information for each session segment."
}
},
"required": [
"session_overview",
"detailed_segments"
],
"additionalProperties": False
}
},
"required": [
"pre_growth_guide_session_report",
"users_growth_guide_preparation_brief",
"30_minute_coaching_session_script"
],
"additionalProperties": False
}
}
}
,
temperature=0.5,
max_tokens=3000,
top_p=1,
frequency_penalty=0,
presence_penalty=0
)
# Get response and convert into dictionary
reports = json.loads(response.choices[0].message.content)
# html_output = generate_html(reports, coach_name)
# reports['html_report'] = html_output
except Exception as e:
logger.error(f"OpenAI API call failed: {e}", extra={'user_id': user_id, 'endpoint': function_name})
raise e
# Step 4: Return the JSON reports
logger.info(f"User summary generated successfully for user {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
return reports
def create_pre_gg_report(booking_id):
function_name = create_pre_gg_report.__name__
# Get user_id from booking_id
try:
logger.info(f"Retrieving booking details for {booking_id}", extra={'booking_id': booking_id, 'endpoint': function_name})
db_params = {
'dbname': 'ourcoach',
'user': 'ourcoach',
'password': 'hvcTL3kN3pOG5KteT17T',
'host': 'staging-ourcoach.cx8se8o0iaiy.ap-southeast-1.rds.amazonaws.com',
'port': '5432'
}
try:
with psycopg2.connect(**db_params) as conn:
with conn.cursor() as cursor:
query = sql.SQL("""
select user_id
from {table}
where id = %s
"""
).format(table=sql.Identifier('public', 'booking'))
cursor.execute(query, (booking_id,))
row = cursor.fetchone()
if (row):
colnames = [desc[0] for desc in cursor.description]
booking_data = dict(zip(colnames, row))
### MODIFY THE FORMAT OF USER DATA
user_id = booking_data['user_id']
logger.info(f"User info retrieved successfully for {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
else:
logger.warning(f"No user info found for {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
except psycopg2.Error as e:
logger.error(f"Database error while retrieving user info for {user_id}: {e}", extra={'user_id': user_id, 'endpoint': function_name})
raise
# Run get_user_summary
user_report = get_user_summary(user_id)
# Run generate_html
generate_html(user_report, booking_id=booking_id)
return True
except Exception as e:
logger.error(f"An error occured: {e}", extra={'booking_id': booking_id, 'endpoint': function_name})
raise
def get_user_life_status(user_id):
function_name = get_user_life_status.__name__
logger.info(f"Generating user life status for user {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
# Step 1: Call get_user to get user's info
try:
user = get_user(user_id)
user_info = user.user_info
user_messages = user.get_messages()
except LookupError as e:
logger.error(f"Error fetching user data: {e}", extra={'user_id': user_id, 'endpoint': function_name})
raise e
# Step 2: Construct the Prompt
chat_history = "\n".join(
[f"{message['role'].capitalize()}: {message['content']}" for message in user_messages]
)
logger.info(f"Fetched user data for: {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
# Build the system prompt according to the provided instructions
system_prompt = """
You are an AI assistant that generates a personalized life status report for users based on their profile and chat history. Your task is to analyze the provided user data and produce a JSON output following the specified schema.
**Instructions:**
1. **Mantra of the Week:**
- Create a very short encouragement quote that encapsulates the user's journey toward achieving their goals.
- The mantra **MUST** be a single sentence with fewer than 5 words.
**Output Format:**
Produce your response in JSON format adhering to the following schema:
```json
{
"mantra_of_the_week": str
}
```
**Guidelines:**
- The `mantra_of_the_week` should be personalized, positive, and encouraging. It **MUST** be a single sentence with fewer than 5 words.
"""
# Combine user information and chat history for context
user_context = f"""
Based on the following user profile and chat history, generate the life status!
### USER PROFILE ###
{user_info}
### CHAT HISTORY ###
{chat_history}
"""
# Step 3: Call the OpenAI API using the specified function
client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))
try:
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": [
{
"type": "text",
"text": system_prompt
}
]
},
{
"role": "user",
"content": [
{
"type": "text",
"text": user_context
}
]
}
],
response_format={
"type": "json_schema",
"json_schema": {
"name": "life_status_report",
"strict": True,
"schema": {
"type": "object",
"properties": {
"mantra_of_the_week": {
"type": "string",
"description": "A very short encouragement quote that encapsulates the user's journey to achieve their goals."
}
},
"required": [
"mantra_of_the_week"
],
"additionalProperties": False
}
}
}
,
temperature=0.5,
max_tokens=3000,
top_p=1,
frequency_penalty=0,
presence_penalty=0
)
# Get response and convert into dictionary
mantra = json.loads(response.choices[0].message.content)["mantra_of_the_week"]
except Exception as e:
logger.error(f"OpenAI API call failed: {e}", extra={'user_id': user_id, 'endpoint': function_name})
raise e
# Get current life score
life_score = {
"overall": user.personal_growth_score + user.career_growth_score + user.relationship_score + user.mental_well_being_score + user.health_and_wellness_score,
"personal_growth": user.personal_growth_score,
"health_and_wellness": user.health_and_wellness_score,
"mental_well_being": user.mental_well_being_score,
"career_growth": user.career_growth_score,
"relationship": user.relationship_score
}
# Get current goal
current_goal = user.goal if not user.goal else user.goal[-1].content
# Get life score achievements in list
recent_wins = user.recent_wins
# Combine everything
reports = {
"life_score": life_score,
"mantra_of_the_week": mantra,
"goal": current_goal,
"recent_wins": recent_wins
}
# Step 4: Return the JSON reports
logger.info(f"User life status generated successfully for user {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
return reports
def get_api_key(api_key_header: str = Security(api_key_header)) -> str:
if api_key_header == os.getenv("FASTAPI_KEY"):
return api_key_header
raise HTTPException(
status_code=403,
detail="Could not validate credentials"
)
def get_user_info(user_id):
function_name = get_user_info.__name__
logger.info(f"Retrieving user info for {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
db_params = {
'dbname': 'ourcoach',
'user': 'ourcoach',
'password': 'hvcTL3kN3pOG5KteT17T',
'host': 'staging-ourcoach.cx8se8o0iaiy.ap-southeast-1.rds.amazonaws.com',
'port': '5432'
}
try:
with psycopg2.connect(**db_params) as conn:
with conn.cursor() as cursor:
query = sql.SQL("SELECT * FROM {table} WHERE id = %s").format(table=sql.Identifier('public', 'users'))
cursor.execute(query, (user_id,))
row = cursor.fetchone()
if (row):
colnames = [desc[0] for desc in cursor.description]
user_data = dict(zip(colnames, row))
### MODIFY THE FORMAT OF USER DATA
user_data_clean = json.loads(user_data['onboarding'])
# doLiving = "\n".join([f"- {item['question']} : {item['answer']}" for item in user_data_clean.get('doLiving', [])])
doLiving = user_data_clean.get('mySituation', '')
whoImportant = "\n".join([f"- {item['question']} : {item['answer']}" for item in user_data_clean.get('whoImportant', [])])
challenges = "\n".join([f"- {item['question']} : {item['answer']}" for item in user_data_clean.get('challenges', [])])
user_data_formatted = f"""
### USER PROFILE ###
Name: {user_data_clean.get('firstName', '')}
{user_data_clean.get('firstName', '')}'s challenges (You **must** use this information for the PLANNING STATE):
{challenges}
{user_data_clean.get('firstName', '')}'s Legendary Persona: {user_data_clean.get('legendPersona', '')}
Pronouns: {user_data_clean.get('pronouns', '')}
Birthday: {user_data_clean.get('birthDate', '')}
{user_data_clean.get('firstName', '')}'s MBTI: {user_data_clean.get('mbti', '')}
{user_data_clean.get('firstName', '')}'s Love Language: {user_data_clean.get('loveLanguage', '')}
Has {user_data_clean.get('firstName', '')} tried coaching before: {user_data_clean.get('triedCoaching', '')}
Belief in Astrology: {user_data_clean.get('astrology', '')}
The most important area in {user_data_clean.get('firstName', '')}'s life: {user_data_clean.get('mattersMost', ['', '', '', '', ''])[0]}
The second most important area in {user_data_clean.get('firstName', '')}'s life: {user_data_clean.get('mattersMost', ['', '', '', '', ''])[1]}
The third most important area in {user_data_clean.get('firstName', '')}'s life: {user_data_clean.get('mattersMost', ['', '', '', '', ''])[2]}
The fourth most important area in {user_data_clean.get('firstName', '')}'s life: {user_data_clean.get('mattersMost', ['', '', '', '', ''])[3]}
The fifth most important area in {user_data_clean.get('firstName', '')}'s life: {user_data_clean.get('mattersMost', ['', '', '', '', ''])[4]} (Matters the least)
What does {user_data_clean.get('firstName', '')} do for a living:
{doLiving}
{user_data_clean.get('firstName', '')}'s current situation: {user_data_clean.get('mySituation', '')}
{user_data_clean.get('firstName', '')}'s most important person:
{whoImportant}
"""
logger.info(f"User info retrieved successfully for {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
return user_data_formatted, user_data_clean.get('mattersMost', ['', '', '', '', ''])
else:
logger.warning(f"No user info found for {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
return None
except psycopg2.Error as e:
logger.error(f"Database error while retrieving user info for {user_id}: {e}", extra={'user_id': user_id, 'endpoint': function_name})
return None
def get_growth_guide_summary(user_id, session_id):
function_name = get_growth_guide_summary.__name__
logger.info(f"Retrieving growth guide summary for user {user_id} and session {session_id}", extra={'user_id': user_id, 'endpoint': function_name})
db_params = {
'dbname': 'ourcoach',
'user': 'ourcoach',
'password': 'hvcTL3kN3pOG5KteT17T',
'host': 'staging-ourcoach.cx8se8o0iaiy.ap-southeast-1.rds.amazonaws.com',
'port': '5432'
}
try:
with psycopg2.connect(**db_params) as conn:
with conn.cursor() as cursor:
query = sql.SQL("SELECT * FROM {table} WHERE user_id = %s AND booking_id = %s").format(table=sql.Identifier('public', 'user_notes'))
cursor.execute(query, (user_id, session_id))
row = cursor.fetchone()
if (row):
colnames = [desc[0] for desc in cursor.description]
summary_data = dict(zip(colnames, row))
logger.info(f"Growth guide summary retrieved successfully for user {user_id} and session {session_id}: {summary_data}", extra={'user_id': user_id, 'endpoint': function_name})
return summary_data
else:
logger.warning(f"No growth guide summary found for user {user_id} and session {session_id}", extra={'user_id': user_id, 'endpoint': function_name})
return None
except psycopg2.Error as e:
logger.error(f"Database error while retrieving growth guide summary for user {user_id} and session {session_id}: {e}", extra={'user_id': user_id, 'endpoint': function_name})
return None
def get_all_bookings():
function_name = get_all_bookings.__name__
logger.info(f"Retrieving all bookings", extra={'endpoint': function_name})
db_params = {
'dbname': 'ourcoach',
'user': 'ourcoach',
'password': 'hvcTL3kN3pOG5KteT17T',
'host': 'staging-ourcoach.cx8se8o0iaiy.ap-southeast-1.rds.amazonaws.com',
'port': '5432'
}
try:
with psycopg2.connect(**db_params) as conn:
with conn.cursor() as cursor:
query = sql.SQL("SELECT id, user_id FROM {table}").format(table=sql.Identifier('public', 'booking'))
cursor.execute(query)
rows = cursor.fetchall()
bookings = [{'booking_id': row[0], 'user_id': row[1]} for row in rows]
logger.info(f"Retrieved {len(bookings)} bookings", extra={'endpoint': function_name})
return bookings
except psycopg2.Error as e:
logger.error(f"Database error while retrieving bookings: {e}", extra={'endpoint': function_name})
return []
def update_growth_guide_summary(user_id, session_id, ourcoach_summary):
function_name = update_growth_guide_summary.__name__
logger.info(f"Updating growth guide summary for user {user_id} and session {session_id}", extra={'user_id': user_id, 'endpoint': function_name})
db_params = {
'dbname': 'ourcoach',
'user': 'ourcoach',
'password': 'hvcTL3kN3pOG5KteT17T',
'host': 'staging-ourcoach.cx8se8o0iaiy.ap-southeast-1.rds.amazonaws.com',
'port': '5432'
}
try:
with psycopg2.connect(**db_params) as conn:
with conn.cursor() as cursor:
query = sql.SQL("""
UPDATE {table}
SET ourcoach_summary = %s
WHERE user_id = %s AND booking_id = %s
""").format(table=sql.Identifier('public', 'user_notes'))
cursor.execute(query, (json.dumps(ourcoach_summary), user_id, session_id))
conn.commit()
logger.info(f"Growth guide summary updated successfully for user {user_id} and session {session_id}", extra={'user_id': user_id, 'endpoint': function_name})
except psycopg2.Error as e:
logger.error(f"Database error while updating growth guide summary: {e}", extra={'user_id': user_id, 'endpoint': function_name})
raise e
def add_growth_guide_session(user_id, session_id, coach_id, session_started_at, zoom_ai_summary, gg_report, ourcoach_summary):
function_name = add_growth_guide_session.__name__
logger.info(f"Adding growth guide session for user {user_id} and session {session_id}", extra={'user_id': user_id, 'endpoint': function_name})
db_params = {
'dbname': 'ourcoach',
'user': 'ourcoach',
'password': 'hvcTL3kN3pOG5KteT17T',
'host': 'staging-ourcoach.cx8se8o0iaiy.ap-southeast-1.rds.amazonaws.com',
'port': '5432'
}
try:
with psycopg2.connect(**db_params) as conn:
with conn.cursor() as cursor:
query = sql.SQL("""
INSERT INTO {table} (booking_id, coach_id, session_started_at, user_id, updated_at, gg_report, ourcoach_summary, created_at, zoom_ai_summary)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
""").format(table=sql.Identifier('public', 'user_notes'))
current_time = datetime.datetime.now()
cursor.execute(query, (
session_id,
coach_id,
session_started_at,
user_id,
current_time,
json.dumps(gg_report),
json.dumps(ourcoach_summary),
current_time,
json.dumps(zoom_ai_summary)
))
conn.commit()
logger.info(f"Growth guide session added successfully for user {user_id} and session {session_id}", extra={'user_id': user_id, 'endpoint': function_name})
except psycopg2.Error as e:
logger.error(f"Database error while adding growth guide session: {e}", extra={'user_id': user_id, 'endpoint': function_name})
raise e
def get_growth_guide_session(user_id, session_id):
# returns the zoom_ai_summary and the gg_report columns from the POST_GG table
function_name = get_growth_guide_session.__name__
logger.info(f"Retrieving growth guide session for user {user_id} and session {session_id}", extra={'user_id': user_id, 'endpoint': function_name})
db_params = {
'dbname': 'ourcoach',
'user': 'ourcoach',
'password': 'hvcTL3kN3pOG5KteT17T',
'host': 'staging-ourcoach.cx8se8o0iaiy.ap-southeast-1.rds.amazonaws.com',
'port': '5432'
}
try:
with psycopg2.connect(**db_params) as conn:
with conn.cursor() as cursor:
query = sql.SQL("SELECT * FROM {table} WHERE user_id = %s AND booking_id = %s").format(table=sql.Identifier('public', 'user_notes'))
cursor.execute(query, (user_id, session_id))
row = cursor.fetchone()
if (row):
colnames = [desc[0] for desc in cursor.description]
session_data = dict(zip(colnames, row))
logger.info(f"Growth guide session retrieved successfully for user {user_id} and session {session_id}: {session_data}", extra={'user_id': user_id, 'endpoint': function_name})
return session_data
else:
logger.warning(f"No growth guide session found for user {user_id} and session {session_id}", extra={'user_id': user_id, 'endpoint': function_name})
return None
except psycopg2.Error as e:
logger.error(f"Database error while retrieving growth guide session for user {user_id} and session {session_id}: {e}", extra={'user_id': user_id, 'endpoint': function_name})
return None
def download_file_from_s3(filename, bucket):
user_id = filename.split('.')[0]
function_name = download_file_from_s3.__name__
logger.info(f"Downloading file {filename} from S3 bucket {bucket}", extra={'user_id': user_id, 'endpoint': function_name})
file_path = os.path.join('users', 'data', filename)
try:
if (AWS_ACCESS_KEY and AWS_SECRET_KEY):
session = boto3.session.Session(aws_access_key_id=AWS_ACCESS_KEY, aws_secret_access_key=AWS_SECRET_KEY, region_name=REGION)
else:
session = boto3.session.Session()
s3_client = session.client('s3')
with open(file_path, 'wb') as f:
## Upload to Production Folder
s3_client.download_fileobj(bucket, f"staging/users/{filename}", f)
logger.info(f"File {filename} downloaded successfully from S3", extra={'user_id': user_id, 'endpoint': function_name})
return True
except Exception as e:
logger.error(f"Error downloading file {filename} from S3: {e}", extra={'user_id': user_id, 'endpoint': function_name})
if (os.path.exists(file_path)):
os.remove(file_path)
return False
def add_to_cache(user):
user_id = user.user_id
function_name = add_to_cache.__name__
logger.info(f"Adding user {user_id} to the cache", extra={'user_id': user_id, 'endpoint': function_name})
user_cache[user_id] = user
logger.info(f"User {user_id} added to the cache", extra={'user_id': user_id, 'endpoint': function_name})
return True
def pop_cache(user_id):
if user_id == 'all':
user_cache.reset_cache()
return True
if user_id not in user_cache:
logger.warning(f"[POPPING] User {user_id} not found in the cache", extra={'user_id': user_id, 'endpoint': 'pop_cache'})
# check if file exists
if os.path.exists(os.path.join("users", "to_upload", f"{user_id}.pkl")):
# upload file
logger.info(f"Attempting upload file {user_id}.json to S3", extra={'user_id': user_id, 'endpoint': 'pop_cache'})
upload_file_to_s3(f"{user_id}.pkl")
try:
user_cache.pop(user_id, None)
logger.info(f"User {user_id} has been removed from the cache", extra={'user_id': user_id, 'endpoint': 'pop_cache'})
return True
except:
return False
def update_user(user):
user_id = user.user_id
function_name = update_user.__name__
logger.info(f"Updating user {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
# remove from cache, which will also upload the file
pop_cache(user_id)
logger.info(f"User {user_id} has been removed from the cache", extra={'user_id': user_id, 'endpoint': function_name})
logger.info(f"User {user.user_id} updated successfully in S3", extra={'user_id': user_id, 'endpoint': function_name})
return True
def upload_mementos_to_db(user_id):
function_name = upload_mementos_to_db.__name__
logger.info(f"Uploading mementos to DB for user {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
db_params = {
'dbname': 'ourcoach',
'user': 'ourcoach',
'password': 'hvcTL3kN3pOG5KteT17T',
'host': 'staging-ourcoach.cx8se8o0iaiy.ap-southeast-1.rds.amazonaws.com',
'port': '5432'
}
folder_path = os.path.join("mementos", "to_upload", user_id)
if (not os.path.exists(folder_path)):
logger.warning(f"No mementos folder found for user {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
return True # Return True as this is not an error condition
try:
memento_files = [f for f in os.listdir(folder_path) if f.endswith('.json')]
if (not memento_files):
logger.info(f"No memento files found for user {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
return True
with psycopg2.connect(**db_params) as conn:
with conn.cursor() as cursor:
base_query = """
INSERT INTO public.user_memento
(user_id, type, title, description, tags, priority,
mood, status, location, recurrence, context, created_at, follow_up_on)
VALUES (%s, %s, %s, %s, %s::jsonb, %s, %s, %s, %s, %s, %s, %s, %s)
"""
for filename in memento_files:
file_path = os.path.join(folder_path, filename)
try:
with open(file_path, 'r', encoding='utf-8') as json_file:
data = json.load(json_file)
# Convert tags array to proper JSON string
tags_json = json.dumps(data.get('tags', []))
# Prepare data with proper defaults and transformations
memento_data = [
user_id, # Replace the user_id from JSON with the actual user_id
data.get('type', ''),
data.get('title', ''),
data.get('description', ''),
tags_json, # Send tags as JSON string
data.get('priority', ''),
data.get('mood', ''),
data.get('status', ''),
data.get('location', ''),
data.get('recurrence', ''),
data.get('context', ''),
datetime.datetime.now(),
pd.to_datetime(data.get('follow_up_on', ''))
]
cursor.execute(base_query, memento_data)
conn.commit()
# Remove file after successful insert
os.remove(file_path)
logger.info(f"Successfully processed memento {filename}", extra={'user_id': user_id, 'endpoint': function_name})
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON in file {filename}: {str(e)}", extra={'user_id': user_id, 'endpoint': function_name})
continue
except Exception as e:
logger.error(f"Error processing memento {filename}: {str(e)}", extra={'user_id': user_id, 'endpoint': function_name})
continue
# Try to remove the directory after processing all files
try:
os.rmdir(folder_path)
except OSError:
pass # Ignore if directory is not empty or already removed
return True
except psycopg2.Error as e:
logger.error(f"Database error while uploading mementos: {str(e)}", extra={'user_id': user_id, 'endpoint': function_name})
raise ConnectionError(f"Database error: {str(e)}")
except Exception as e:
logger.error(f"Unexpected error uploading mementos: {str(e)}", extra={'user_id': user_id, 'endpoint': function_name})
return False
def get_users_mementos(user_id, date):
function_name = get_users_mementos.__name__
db_params = {
'dbname': 'ourcoach',
'user': 'ourcoach',
'password': 'hvcTL3kN3pOG5KteT17T',
'host': 'staging-ourcoach.cx8se8o0iaiy.ap-southeast-1.rds.amazonaws.com',
'port': '5432'
}
logger.info(f"Retrieving mementos for user {user_id} on date {date}", extra={'endpoint': function_name, 'user_id': user_id})
# Convert date string to PostgreSQL compatible format
parsed_date = date
logger.info(f"Retrieving mementos for user {user_id} on date {parsed_date}", extra={'endpoint': function_name, 'user_id': user_id})
try:
with psycopg2.connect(**db_params) as conn:
with conn.cursor() as cursor:
query = sql.SQL("""
SELECT * FROM public.user_memento
WHERE user_id = %s AND DATE(follow_up_on) = %s
""")
cursor.execute(query, (user_id, parsed_date))
rows = cursor.fetchall()
if rows:
colnames = [desc[0] for desc in cursor.description]
mementos = [dict(zip(colnames, row)) for row in rows]
logger.info(f"Retrieved {len(mementos)} mementos for user {user_id} on date {date}", extra={'endpoint': function_name, 'user_id': user_id})
return mementos
else:
logger.info(f"No mementos found for user {user_id} on date {date}", extra={'endpoint': function_name, 'user_id': user_id})
return []
except psycopg2.Error as e:
logger.error(f"Database error while retrieving mementos: {e}", extra={'endpoint': function_name, 'user_id': user_id})
return []
def generate_uuid():
return str(uuid.uuid4())
def print_log(level, message, **kwargs):
"""
Print log in JSON format for better readability in CloudWatch.
Parameters:
level (str): The log level (e.g., "INFO", "ERROR", "DEBUG").
message (str): The log message.
**kwargs: Additional key-value pairs to include in the log.
example:
print_log("INFO", "User logged in", user_id=123, action="login")
"""
log_entry = {
"timestamp": datetime.datetime.utcnow().isoformat() + "Z",
"level": level,
"message": message,
}
log_entry.update(kwargs)
print(json.dumps(log_entry, ensure_ascii=False))