fastapi-v2 / app /utils.py
BMCVRN's picture
Web Search and Multi-Modality (#16)
a7c7fdb verified
import logging
import boto3
from botocore.exceptions import NoCredentialsError, PartialCredentialsError
import os
from dotenv import load_dotenv
from fastapi import HTTPException, Security, Query, status
from fastapi.security import APIKeyHeader
from openai import OpenAI
import openai
import pandas as pd
import os
import logging
import json
import psycopg2
from psycopg2 import sql
import os
from dotenv import load_dotenv
from datetime import datetime, timezone
import pickle # Replace dill with pickle
import uuid
import pytz
from app.cache import CustomTTLCache, upload_file_to_s3
import pdfkit
import PyPDF2
from app.exceptions import BaseOurcoachException, DBError, OpenAIRequestError, UtilsError
load_dotenv()
# Environment Variables for API Keys
api_keys = [os.getenv('FASTAPI_KEY')]
api_key_header = APIKeyHeader(name="X-API-Key")
load_dotenv()
AWS_ACCESS_KEY = os.getenv('AWS_ACCESS_KEY')
AWS_SECRET_KEY = os.getenv('AWS_SECRET_KEY')
REGION = os.getenv('AWS_REGION')
logger = logging.getLogger(__name__)
# Replace the simple TTLCache with our custom implementation
user_cache = CustomTTLCache(ttl=120, cleanup_interval=30) # 2 minutes TTL
def catch_error(func):
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except BaseOurcoachException as e:
raise e
except openai.BadRequestError as e:
raise OpenAIRequestError(user_id='no-user', message="Bad Request to OpenAI", code="OpenAIError")
except Exception as e:
# Handle other exceptions
logger.error(f"An unexpected error occurred in Utils: {e}")
raise UtilsError(user_id='no-user', message="Unexpected error in Utils", e=str(e))
return wrapper
@catch_error
def force_file_move(source, destination):
function_name = force_file_move.__name__
logger.info(f"Attempting to move file from {source} to {destination}", extra={'endpoint': function_name})
# Ensure the destination directory exists
os.makedirs(os.path.dirname(destination), exist_ok=True)
# Move the file, replacing if it already exists
os.replace(source, destination)
logger.info(f"File moved successfully: {source} -> {destination}", extra={'endpoint': function_name})
@catch_error
def get_user(user_id):
function_name = get_user.__name__
logger.info(f"Fetching user {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
logger.info(f"[CACHE]: {user_cache}", extra={'user_id': user_id, 'endpoint': function_name})
if user_id in user_cache:
logger.info(f"User {user_id} found in cache", extra={'user_id': user_id, 'endpoint': function_name})
return user_cache[user_id]
else:
client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))
if not client:
raise OpenAIRequestError(user_id=user_id, message="Error creating OpenAI client", code="OpenAIError")
user_file = os.path.join('users', 'data', f'{user_id}.pkl')
# if os.path.exists(user_file):
# with open(user_file, 'rb') as f:
# user = pickle.load(f)
# user.client = client
# user.conversations.client = client
# with cache_lock:
# user_cache[user_id] = user
# return user
logger.warning(f"User {user_id} not found locally. Attempting to download from S3", extra={'user_id': user_id, 'endpoint': function_name})
download = download_file_from_s3(f'{user_id}.pkl', 'core-ai-assets')
logger.info(f"Download success: {download}", extra={'user_id': user_id, 'endpoint': function_name})
if (download):
with open(user_file, 'rb') as f:
user = pickle.load(f)
user.client = client
user.conversations.client = client
user_cache[user_id] = user # No need for lock here
os.remove(user_file)
logger.info(f"User {user_id} loaded successfully from S3", extra={'user_id': user_id, 'endpoint': function_name})
return user
else:
logger.error(f"User {user_id} pickle does not exist in S3", extra={'user_id': user_id, 'endpoint': function_name})
# check if user_info exists
user_info = get_user_info(user_id)
if (user_info):
# user has done onboarding but pickle file not created
raise DBError(user_id=user_id, message="User has done onboarding but pickle file not created", code="NoPickleError")
raise DBError(user_id=user_id, message="User has not onboarded yet", code="NoOnboardingError")
@catch_error
def generate_html(json_data, coach_name='Growth Guide', booking_id = None):
function_name = generate_html.__name__
data = json_data["pre_growth_guide_session_report"]
user_overview = data["user_overview"]
personality_insights = data["personality_insights"]
progress_snapshot = data["progress_snapshot"]
preparation_brief = json_data.get("users_growth_guide_preparation_brief", [])
session_script = json_data["30_minute_coaching_session_script"]
# Extract user name
user_name = user_overview["name"]
# Build Progress Snapshot
progress_items = ""
for key, value in progress_snapshot.items():
# Convert key to title case with spaces
formatted_key = key.replace("_", " ").title()
progress_items += f'<li><strong>{formatted_key}:</strong> {value}</li>\n'
# Build Personality Insights
love_languages = "".join(f"<li>{lang}</li>" for lang in personality_insights["top_love_languages"])
# Build Preparation Brief
preparation_items = "".join(
f'<li><strong>{item["key"].replace("_", " ").title()}:</strong> {item["value"]}</li>\n'
for item in preparation_brief)
# Build Session Overview
session_overview_list = session_script["session_overview"]
session_overview = "<ol>\n"
for item in session_overview_list:
session_overview += f"<li>{item}</li>\n"
session_overview += "</ol>"
# Build Detailed Segments
detailed_segments = ""
for segment in session_script["detailed_segments"]:
segment_title = segment["segment_title"]
# Build Coach Dialogue list
coach_dialogue_list = segment.get("coach_dialogue", [])
coach_dialogue_html = "<ul>\n"
for dialogue in coach_dialogue_list:
coach_dialogue_html += f"<li>{dialogue}</li>\n"
coach_dialogue_html += "</ul>"
# Build Guidance list
guidance_list = segment.get("guidance", [])
guidance_html = "<ul>\n"
for guidance_point in guidance_list:
guidance_html += f"<li>{guidance_point}</li>\n"
guidance_html += "</ul>"
detailed_segments += f'''
<div class="segment">
<h4>{segment_title}</h4>
<p class="coach-dialogue"><strong>Coach Dialogue:</strong>{coach_dialogue_html}</p>
<p class="guidance"><strong>Guidance:</strong>{guidance_html}</p>
</div>
'''
# Build Final HTML
html_content = f'''
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>User Profile - {user_name}</title>
<style>
body {{
font-family: Arial, sans-serif;
color: #333;
line-height: 1.6;
margin: 20px;
}}
h1, h2, h3, h4 {{
color: #2E86C1;
}}
p {{
margin: 10px 0;
}}
ul {{
margin-left: 20px;
}}
ol {{
margin-left: 20px;
}}
li {{
margin-bottom: 5px;
}}
.header {{
border-bottom: 2px solid #2E86C1;
padding-bottom: 10px;
margin-bottom: 20px;
}}
.section {{
margin-bottom: 30px;
}}
.footer {{
margin-top: 30px;
}}
/* Styles for the script */
.segment {{
background-color: #F2F3F4;
padding: 15px;
border-radius: 5px;
margin-bottom: 20px;
}}
.coach-dialogue, .guidance {{
margin-bottom: 10px;
}}
.coach-dialogue strong, .guidance strong {{
color: #2E86C1;
}}
.coach-dialogue ul, .guidance ul {{
margin-left: 20px;
}}
</style>
</head>
<body>
<div class="header">
<p>Dear {coach_name},</p>
<p>Here is the <strong>User Profile - {user_name}</strong> and the <strong>30-Minute Coaching Session Script</strong> for your upcoming session with <strong>{user_name}</strong>:</p>
</div>
<div class="section">
<h2>User Profile - {user_name}</h2>
<h3>User Overview</h3>
<ul>
<li><strong>Name:</strong> {user_overview["name"]}</li>
<li><strong>Age Group:</strong> {user_overview["age_group"]}</li>
<li><strong>Primary Goals:</strong> {user_overview["primary_goals"]}</li>
<li><strong>Preferred Coaching Style:</strong> {user_overview["preferred_coaching_style"]}</li>
</ul>
<h3>Personality Insights</h3>
<ul>
<li><strong>MBTI:</strong> {personality_insights["mbti"]}</li>
<li><strong>Top Love Languages:</strong>
<ol>
{love_languages}
</ol>
</li>
<li><strong>Belief in Astrology:</strong> {personality_insights["belief_in_astrology"]}</li>
</ul>
<h3>Progress Snapshot</h3>
<ul>
{progress_items}
</ul>
</div>
<div class="section">
<h2>30-Minute Coaching Session Script</h2>
<h3>Session Overview (30 Minutes)</h3>
{session_overview}
<h3>Detailed Segments</h3>
{detailed_segments}
</div>
<div class="footer">
<p>You may contact us at support@ourcoach.ai, if you have any questions.</p>
<p>Best regards,<br>ourcoach</p>
</div>
</body>
</html>
'''
file_path = os.path.join("bookings", "data",f"{booking_id}.html")
path_to_upload = os.path.join("bookings", "to_upload",f"{booking_id}.pdf")
password = "Ourcoach2024!"
## SAVING HTML FILE
# Open the file in write mode
with open(file_path, 'w', encoding='utf-8') as html_file:
html_file.write(html_content)
logger.info(f"File '{booking_id}.html' has been created successfully.", extra={'booking_id': booking_id, 'endpoint': function_name})
# Saving as PDF File
pdfkit.from_file(file_path, path_to_upload, options={'encoding': 'UTF-8'})
logger.info(f"File '{booking_id}.pdf' has been created successfully.", extra={'booking_id': booking_id, 'endpoint': function_name})
## ENCRYPTING PDF
logger.info(f"Encrypting '{booking_id}.pdf'...", extra={'booking_id': booking_id, 'endpoint': function_name})
with open(path_to_upload, 'rb') as file:
pdf_reader = PyPDF2.PdfReader(file)
pdf_writer = PyPDF2.PdfWriter()
# Add all pages to the writer
for page_num in range(len(pdf_reader.pages)):
pdf_writer.add_page(pdf_reader.pages[page_num])
# Encrypt the PDF with the given password
pdf_writer.encrypt(password)
with open(path_to_upload, 'wb') as encrypted_file:
pdf_writer.write(encrypted_file)
logger.info(f"Succesfully encrypted '{booking_id}.pdf'", extra={'booking_id': booking_id, 'endpoint': function_name})
filename = booking_id
logger.info(f"Uploading file {filename} to S3", extra={'booking_id': booking_id, 'endpoint': function_name})
bucket = 'core-ai-assets'
try:
if (AWS_ACCESS_KEY and AWS_SECRET_KEY):
session = boto3.session.Session(aws_access_key_id=AWS_ACCESS_KEY, aws_secret_access_key=AWS_SECRET_KEY, region_name=REGION)
else:
session = boto3.session.Session()
s3_client = session.client('s3')
with open(path_to_upload, "rb") as f:
## Upload to Production Folder
s3_client.upload_fileobj(f, bucket, f'dev/pre_gg_reports/{filename}.pdf')
logger.info(f"File {filename} uploaded successfully to S3", extra={'booking_id': booking_id, 'endpoint': function_name})
# Removing files
for file in os.listdir(os.path.join('bookings', 'data')):
os.remove(os.path.join('bookings', 'data', file))
for file in os.listdir(os.path.join('bookings', 'to_upload')):
os.remove(os.path.join('bookings', 'to_upload', file))
# force_file_move(os.path.join('users', 'to_upload', filename), os.path.join('users', 'data', filename))
except (FileNotFoundError, NoCredentialsError, PartialCredentialsError) as e:
raise DBError(user_id="no-user", message="Error uploading file to S3", code="S3Error")
@catch_error
def get_user_summary(user_id, update_rec_topics=False):
function_name = get_user_summary.__name__
logger.info(f"Generating user summary for user {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
# Step 1: Call get_user to get user's info
user = get_user(user_id)
user_info = user.user_info
user_messages = user.get_messages()
user_goal = '' if not user.goal else user.goal[-1].content
# Step 2: Construct the Prompt
chat_history = "\n".join(
[f"{message['role'].capitalize()}: {message['content']}" for message in user_messages]
)
# Build the system prompt according to the provided instructions
system_prompt = """
You are an AI language model designed to generate three outputs based on the user's profile and chat history:
1. **Pre-Growth Guide Session Report**: A comprehensive summary of the user's profile and life context for the Growth Guide (a human coach), covering five key areas: **mental well-being**, **physical health and wellness**, **relationships**, **career growth**, and **personal growth**.
2. **User's Growth Guide Preparation Brief**: A comprehensive brief guiding the user on what to discuss with the Growth Guide, providing actionable advice and highlighting key areas to focus on during their session, covering the same five key areas.
3. **30-Minute Coaching Session Script**: A detailed, partitioned script to help the coach prepare for the session, including dialogue, questions, and guidance tailored to the client's needs, covering the five key areas. The script should be partitioned into several sections in the JSON output, similar to the structure provided for the Pre-Growth Guide Session Report.
---
**Important Note**
The **chat history** shows the most updated information. Hence, if there is a difference between the goal/challenge/other key information in the user's chat history and the user's profile, you must create the reports based on the chat history!
---
**Instructions:**
- **Comprehensive Coverage**:
Ensure that all three outputs cover the following five key areas:
1. **Mental Well-being**
2. **Physical Health and Wellness**
3. **Relationships**
4. **Career Growth**
5. **Personal Growth**
If the chat history provided by the user does not touch on one or more of these areas, the report should state: "The user hasn't discussed this area yet. Maybe you can cover this during the Growth Guide session."
- **Output Format**:
Output the result in JSON format following the specified JSON schema. The outputs for the **Pre-Growth Guide Session Report** and the **30-Minute Coaching Session Script** should be partitioned into several JSON keys, similar to the structure provided for the Pre-Growth Guide Session Report.
---
### **1. Pre-Growth Guide Session Report**
**Objective**: Provide a comprehensive summary of the user's profile and life context for the Growth Guide, covering the five key areas.
**Format**:
- **user_overview**:
- **name**: The user's full name.
- **age_group**: The user's age range (e.g., "30-39").
- **primary_goals**: The main goals the user is focusing on.
- **preferred_coaching_style**: The coaching style the user prefers.
- **personality_insights**:
- **mbti**: The user's Myers-Briggs Type Indicator personality type.
- **top_love_languages**: A list of the user's top two love languages.
- **belief_in_astrology**: Whether the user believes in horoscope/astrology.
- **progress_snapshot**:
- **mental_well_being**: Summary of the user's mental well-being.
- **physical_health_and_wellness**: Summary of the user's physical health and wellness.
- **relationships**: Summary of the user's relationships.
- **career_growth**: Summary of the user's career growth.
- **personal_growth**: Summary of the user's personal growth.
If any of the key areas are not discussed, include a note: "The user hasn't discussed this area yet. Maybe you can cover this during the Growth Guide session."
---
### **2. User's Growth Guide Preparation Brief**
**Objective**: Guide the user on what to discuss with the Growth Guide, providing actionable advice and highlighting key areas to focus on during their session, covering the five key areas.
You must use the user's current **challenges** and **life goal** to make the preparation brief **personalized**! You **must** bold some words that you think is important! but it does **not** have to be the first few words!
Important Rules:
1. **ALWAYS** be succinct, valuable and personalized! Do **NOT** ask generic question. Ask a personalized question! And bold the key parts of the user brief!
2. **Session Length Awareness**: Be realistic about what can be effectively discussed in a 30-minute session. Prioritize the areas that are most pressing or offer the greatest opportunity for positive change.
3. **Guidance for Interaction**: Provide specific suggestions for topics to discuss with the **Growth Guide**, you are encouraged to use phrases like "Discuss with your Growth Guide how to...".
4. And for the second time, please be succinct and concise!!!
5. You **must** bold some words that you think is important! but it does **not** have to be the first few words!
**Format**:
Structure the brief with the following sections, and output it as a JSON object with these keys (don't forget to BE CONCISE! and you **must** bold some words that you think is important! but it does **not** have to be the first few words!):
- **reflect**: Provide personalized advice that encourages the user to contemplate their specific experiences, feelings, and thoughts related to each of the five key areas. Help them identify particular aspects they wish to improve, based on their challenges and goals.
- **recall_successes**: Prompt the user to remember past occasions when they effectively managed or made improvements in these areas. Encourage them to consider the strategies, habits, or resources that contributed to these successes, and how they might apply them now.
- **identify_challenges**: Advise the user to acknowledge current obstacles they are facing in each area. Encourage them to think critically about these challenges and consider potential solutions or support systems that could assist in overcoming them.
- **set_goals**: Encourage the user to define clear and achievable objectives for the upcoming session. Guide them to consider how making improvements in each key area can positively impact their overall well-being and life satisfaction.
- **additional_tips**: Offer practical advice to help the user prepare for the session. Suggestions may include arranging a quiet and comfortable space, gathering any relevant materials or notes, and approaching the session with openness and honesty.
---
### **3. 30-Minute Coaching Session Script**
**Objective**: Help the coach prepare for the session by providing a detailed, partitioned script tailored to the client's specific needs and goals, following a specific session order and focusing on the user's top three most important areas.
**IMPORTANT**: BE VERY COMPREHENSIVE IN THE "GUIDANCE" SECTION OF DETAILED SEGMENT!!!
**IMPORTANT**: NO NEED TO MENTION THE NAME OF THE COACH!!!
**Instructions**:
- **Session Overview (30 mins)**:
The session should follow this specific order:
1. **Warm Welcome and Rapport Building** (10 mins)
2. **Exploring X Goals** (10 mins)
3. **Developing X Strategies** (5 mins)
4. **Wrap-Up and Commitment** (5 mins)
The "X" in "Exploring X Goals" and "Developing X Strategies" should be replaced with the user's top three most important areas from the five key areas. Focus on one area per session. If possible, prioritize the areas based on the user's expressed concerns or goals.
- **Detailed Segments**:
For each segment, include:
- **Numbered Title**: Number and title of the session segment (e.g., `1. Warm Welcome and Trust Building (10 Minutes)`).
- **Coach Dialogue**: Provide the coach's dialogue for the segment, including initial statements, follow-up questions, and closing remarks. Present the dialogues as direct quotes, ensuring they align with the client's context and goals.
In the coach dialogue, especially during the warm welcome session, you may ask opening question and mention disclaimers that include:
- Opening question:
To ask the user if there's anything he/she would like to talk about
- Mention confidentiality:
To tell the user that at ourcoach, we prioritize the privacy and confidentiality of our clients. All information shared during the coaching session will remain strictly confidential and used solely for your personal development.
- What to expect from this session:
To tell the user what can they expect from this session
- Remind them that Zoom has recording turned on, so that they can receive an AI assisted report later:
To tell the user to note that this session will be recorded on Zoom to provide you with a comprehensive AI-assisted report afterward. This report will include key takeaways and action steps to help you achieve your goals.
And, in the coach dialogue during the "Exploring X Goals" session, you may ask the user if they have any other goals they want to explore, else if they don't, we can focus on the chosen goal!
And, in the coach dialogue during the "Wrap-Up and Commitment" session, based on today’s session, ask the user: Would you say that X is your biggest priority right now? Or are there any specific goals or areas you’d like to focus on in the coming weeks?
- **Guidance**: Offer specific and comprehensive suggestions for the coach on how to navigate the session, including actionable points and strategies. Use bullet points to clearly present each guidance item.
Note: For the "Plan Follow-up" part, it has to be next **month**
- **Additional Instructions**:
- Ensure that the **Coach Dialogue** is personalized and reflects the client's experiences and aspirations.
- The **Guidance** should include actionable suggestions, emphasizing techniques like creating safety, setting expectations, building rapport, encouraging reflection, focusing on synergies, and action planning. Be very comprehensive in this part! And use <b> </b> tag to bold the headers of each guidance points/items!
**Style Guidelines**:
- Use empathetic and supportive language.
- Encourage open-ended dialogue.
- Focus on actionable and achievable steps.
- Personalize the script to align with the client's experiences and aspirations.
- Present information in a clear, organized manner, using numbering and bullet points where appropriate.
---
**Note**:
- If the user hasn't discussed one or more of the key areas, the outputs should note this and suggest that these areas can be covered during the Growth Guide session.
---
** JSON OUTPUT FORMAT EXAMPLE **:
**IMPORTANT**: BE VERY COMPREHENSIVE IN THE "GUIDANCE" SECTION OF DETAILED SEGMENT!!!
**IMPORTANT**: NO NEED TO MENTION THE NAME OF THE COACH!!!
{
"pre_growth_guide_session_report": {
"user_overview": {
"name": "Alex Johnson",
"age_group": "25-34",
"primary_goals": "Improve mental well-being, advance career, enhance relationships",
"preferred_coaching_style": "Supportive and goal-oriented"
},
"personality_insights": {
"mbti": "ENFP",
"top_love_languages": ["Quality Time", "Words of Affirmation"],
"belief_in_astrology": "No"
},
"progress_snapshot": {
"mental_well_being": "Alex has been experiencing increased stress due to workload and is seeking ways to manage anxiety and improve overall mental health.",
"physical_health_and_wellness": "Maintains a regular exercise routine but wants to incorporate healthier eating habits.",
"relationships": "Feels disconnected from friends and family due to busy schedule; wishes to rebuild social connections.",
"career_growth": "Aiming for a promotion but feels uncertain about the necessary skills and how to stand out.",
"personal_growth": "Interested in learning new skills like photography and improving time management."
}
},
"users_growth_guide_preparation_brief": [
{
"key": "reflect",
"value": "⁠..."
},
{
"key": "recall_successes",
"value": "⁠..."
},
{
"key": "identify_challenges",
"value": "..."
},
{
"key": "set_goals",
"value": "⁠..."
},
{
"key": "additional_tips",
"value": "⁠..."
}
],
"30_minute_coaching_session_script": {
"session_overview": ["Warm Welcome and Trust Building (10 Minutes)","Exploring Holistic Life Goals and Aspirations (10 Minutes)","Identifying Interconnections and Priorities (5 Minutes)","Wrap-Up and Next Steps (5 Minutes)"],
"detailed_segments": [
{
"segment_title": "1. Warm Welcome and Trust Building (10 Minutes)",
"coach_dialogue": ["...","..."],
"guidance": ["<b>Create Safety:</b> Reassure Yew Wai by emphasizing confidentiality.","<b>Set Expectations</b>: Clearly outline the session’s structure to provide clarity and ease.\n<b>Build Rapport:</b> Show genuine curiosity about his recent experiences and emotions.\n<b>Validation: Acknowledge his efforts with empathy, e.g., “That’s a lot to manage, but it’s incredible how committed you are to each aspect of your life.”]
},
{
"segment_title": "2. Exploring Holistic Life Goals and Aspirations (10 Minutes)",
"coach_dialogue": ["...","..."],
"guidance": ["<b>Encourage Reflection:</b> Prompt Yew Wai to elaborate on his goals, covering areas like:","<b>Career:</b> Enhancing ourcoach user engagement and chat functionality.","<b>Health:</b> Preparing for the marathon and improving sleep.","<b>Relationships:</b> Nurturing his connection with Karina.","<b>Personal Growth:</b> Strengthening self-discipline.","<b>Connect Goals:</b> Highlight how goals may overlap, e.g., better sleep could enhance productivity at work.","<b>Acknowledge Motivations:</b> Reflect back his drivers for pursuing these goals, such as his desire for impact or balance."]
},
{
"segment_title": "3. Identifying Interconnections and Priorities (5 Minutes)",
"coach_dialogue": ["...","..."],
"guidance": ["<b>Focus on Synergies:</b> Show how one priority could impact other areas positively.","Example: A consistent morning routine could improve both health and work productivity.","<b>Prioritize Actionable Areas:</b> Help Yew Wai narrow his focus to one or two priorities.","<b>Use Probing Questions:</b> For example, “How could focusing on better sleep contribute to your overall energy and productivity?”"]
},
{
"segment_title": "4. Wrap-Up and Next Steps (5 Minutes)",
"coach_dialogue": ["...","..."],
"guidance": ["<b>Action Planning:</b> Collaborate with Yew Wai to define specific actions, e.g.:","Scheduling a 30-minute morning routine.","Blocking focused hours for ourcoach work.","Planning a date night with Karina.","<b>Encouragement:</b> Reinforce the value of small, consistent steps. For example, “It’s incredible how even small habits can create big changes over time.”","<b>Plan Follow-Up:</b> Suggest reconnecting in a month to reflect on progress.","<b>Close Positively:</b> End with a motivational statement, e.g., “You’re on a path to amazing things, and it’s inspiring to see your dedication.”"]
}
]
}
}
"""
# Combine user information and chat history for context
user_context = f"""
Based on the following user profile and chat history, generate the required reports.
**Important Note**
The **chat history** shows the most updated information. Hence, if there is a difference between the goal/challenge/other key information in the user's chat history and the user's profile, you must create the reports based on the chat history!
### CHAT HISTORY ###
{chat_history}
### USER GOAL ###
{user_goal}
### USER PROFILE ###
{user_info}
"""
# Step 3: Call the OpenAI API using the specified function
client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "system",
"content": [
{
"type": "text",
"text": system_prompt
}
]
},
{
"role": "user",
"content": [
{
"type": "text",
"text": user_context
}
]
}
],
response_format={
"type": "json_schema",
"json_schema": {
"name": "growth_guide_session",
"strict": True,
"schema": {
"type": "object",
"properties": {
"pre_growth_guide_session_report": {
"type": "object",
"description": "A comprehensive summary of the user's profile and life context for the Growth Guide.",
"properties": {
"user_overview": {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "The user's full name."
},
"age_group": {
"type": "string",
"description": "The user's age range (e.g., '30-39')."
},
"primary_goals": {
"type": "string",
"description": "The main goals the user is focusing on."
},
"preferred_coaching_style": {
"type": "string",
"description": "The coaching style the user prefers."
}
},
"required": ["name", "age_group", "primary_goals", "preferred_coaching_style"],
"additionalProperties": False
},
"personality_insights": {
"type": "object",
"properties": {
"mbti": {
"type": "string",
"description": "The user's Myers-Briggs Type Indicator personality type."
},
"top_love_languages": {
"type": "array",
"items": {
"type": "string"
},
"description": "A list of the user's top two love languages."
},
"belief_in_astrology": {
"type": "string",
"description": "Whether the user believes in horoscope/astrology."
}
},
"required": ["mbti", "top_love_languages", "belief_in_astrology"],
"additionalProperties": False
},
"progress_snapshot": {
"type": "object",
"properties": {
"mental_well_being": {
"type": "string",
"description": "Summary of the user's mental well-being."
},
"physical_health_and_wellness": {
"type": "string",
"description": "Summary of the user's physical health and wellness."
},
"relationships": {
"type": "string",
"description": "Summary of the user's relationships."
},
"career_growth": {
"type": "string",
"description": "Summary of the user's career growth."
},
"personal_growth": {
"type": "string",
"description": "Summary of the user's personal growth."
}
},
"required": [
"mental_well_being",
"physical_health_and_wellness",
"relationships",
"career_growth",
"personal_growth"
],
"additionalProperties": False
}
},
"required": ["user_overview", "personality_insights", "progress_snapshot"],
"additionalProperties": False
},
"users_growth_guide_preparation_brief": {
"type": "array",
"description": "A brief guiding the user on what to discuss with the Growth Guide, providing actionable advice and highlighting key areas to focus on.",
"items": {
"type": "object",
"properties": {
"key": {
"type": "string",
"description": "The section heading."
},
"value": {
"type": "string",
"description": "Content for the section."
}
},
"required": [
"key",
"value"
],
"additionalProperties": False
}
},
"30_minute_coaching_session_script": {
"type": "object",
"description": "A detailed, partitioned script to help the coach prepare for the session, following the specified session order and focusing on the user's top three most important areas.",
"properties": {
"session_overview": {
"type": "array",
"items": {
"type": "string"
},
"description": "Breakdown of the session segments with time frames."
},
"detailed_segments": {
"type": "array",
"items": {
"type": "object",
"properties": {
"segment_title": {
"type": "string",
"description": "Title of the session segment."
},
"coach_dialogue": {
"type": "array",
"items": {
"type": "string"
},
"description": "Suggested coach dialogue during the session"
},
"guidance": {
"type": "array",
"items": {
"type": "string"
},
"description": "Suggestions for the coach on how to navigate responses."
}
},
"required": ["segment_title", "coach_dialogue", "guidance"],
"additionalProperties": False
},
"description": "Detailed information for each session segment."
}
},
"required": [
"session_overview",
"detailed_segments"
],
"additionalProperties": False
}
},
"required": [
"pre_growth_guide_session_report",
"users_growth_guide_preparation_brief",
"30_minute_coaching_session_script"
],
"additionalProperties": False
}
}
}
,
temperature=0.5,
max_tokens=3000,
top_p=1,
frequency_penalty=0,
presence_penalty=0
)
# Get response and convert into dictionary
reports = json.loads(response.choices[0].message.content)
# html_output = generate_html(reports, coach_name)
# reports['html_report'] = html_output
# Store users_growth_guide_preparation_brief in the User object
if update_rec_topics:
user.set_recommened_gg_topics(reports['users_growth_guide_preparation_brief'])
# Step 4: Return the JSON reports
logger.info(f"User summary generated successfully for user {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
return reports
@catch_error
def create_pre_gg_report(booking_id):
function_name = create_pre_gg_report.__name__
# Get user_id from booking_id
logger.info(f"Retrieving booking details for {booking_id}", extra={'booking_id': booking_id, 'endpoint': function_name})
db_params = {
'dbname': 'ourcoach',
'user': 'ourcoach',
'password': 'hvcTL3kN3pOG5KteT17T',
'host': 'staging-ourcoach.cx8se8o0iaiy.ap-southeast-1.rds.amazonaws.com',
'port': '5432'
}
try:
with psycopg2.connect(**db_params) as conn:
with conn.cursor() as cursor:
query = sql.SQL("""
select user_id
from {table}
where id = %s
"""
).format(table=sql.Identifier('public', 'booking'))
cursor.execute(query, (booking_id,))
row = cursor.fetchone()
if (row):
colnames = [desc[0] for desc in cursor.description]
booking_data = dict(zip(colnames, row))
### MODIFY THE FORMAT OF USER DATA
user_id = booking_data['user_id']
logger.info(f"User info retrieved successfully for {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
else:
logger.warning(f"No user info found for {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
except psycopg2.Error as e:
logger.error(f"Database error while retrieving user info for {user_id}: {e}", extra={'user_id': user_id, 'endpoint': function_name})
raise DBError(user_id=user_id, message="Error retrieving user info", code="SQLError", e=str(e))
# Run get_user_summary
user_report = get_user_summary(user_id)
# Run generate_html
generate_html(user_report, booking_id=booking_id)
return True
@catch_error
def get_user_life_status(user_id):
function_name = get_user_life_status.__name__
logger.info(f"Generating user life status for user {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
user = get_user(user_id)
user_info = user.user_info
user_messages = user.get_messages()
# Step 2: Construct the Prompt
chat_history = "\n".join(
[f"{message['role'].capitalize()}: {message['content']}" for message in user_messages]
)
logger.info(f"Fetched user data for: {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
# Build the system prompt according to the provided instructions
system_prompt = """
You are an AI assistant that generates a personalized life status report for users based on their profile and chat history. Your task is to analyze the provided user data and produce a JSON output following the specified schema.
**Instructions:**
1. **Mantra of the Week:**
- Create a very short encouragement quote that encapsulates the user's journey toward achieving their goals.
- The mantra **MUST** be a single sentence with fewer than 5 words.
- Do **NOT** call the user's name in the mantra!
**Output Format:**
Produce your response in JSON format adhering to the following schema:
```json
{
"mantra_of_the_week": str
}
```
**Guidelines:**
- The `mantra_of_the_week` should be personalized, positive, and encouraging. It **MUST** be a single sentence with fewer than 5 words.
"""
# Combine user information and chat history for context
user_context = f"""
Based on the following user profile and chat history, generate the life status!
### USER PROFILE ###
{user_info}
### CHAT HISTORY ###
{chat_history}
"""
# Step 3: Call the OpenAI API using the specified function
client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": [
{
"type": "text",
"text": system_prompt
}
]
},
{
"role": "user",
"content": [
{
"type": "text",
"text": user_context
}
]
}
],
response_format={
"type": "json_schema",
"json_schema": {
"name": "life_status_report",
"strict": True,
"schema": {
"type": "object",
"properties": {
"mantra_of_the_week": {
"type": "string",
"description": "A very short encouragement quote that encapsulates the user's journey to achieve their goals."
}
},
"required": [
"mantra_of_the_week"
],
"additionalProperties": False
}
}
}
,
temperature=0.5,
max_tokens=3000,
top_p=1,
frequency_penalty=0,
presence_penalty=0
)
# Get response and convert into dictionary
mantra = json.loads(response.choices[0].message.content)["mantra_of_the_week"]
# Update the users mantra
# user.set_mantra(mantra)
# We remove because we want the mantra to be updated weekly (by backend), not updated everytime we call this endpoint/func
cumulative_life_score = {
"overall": user.personal_growth_score + user.career_growth_score + user.relationship_score + user.mental_well_being_score + user.health_and_wellness_score,
"personal_growth": user.personal_growth_score,
"health_and_wellness": user.health_and_wellness_score,
"mental_well_being": user.mental_well_being_score,
"career_growth": user.career_growth_score,
"relationship": user.relationship_score
}
logger.info(f"{user.score_history}",extra={'user_id': user_id, 'endpoint': function_name})
# Get current life score
if len(user.score_history) == 0:
thirtydays_life_score = cumulative_life_score
else:
# Calculate previous 30 days date
now = pd.Timestamp.now()
thirty_days_ago = now - pd.Timedelta(days=30)
# Filter the data
filtered_data = [entry for entry in user.score_history if thirty_days_ago <= entry["created_at"] <= now]
logger.info(f"Filtered Data: {filtered_data}", extra={'user_id': user_id, 'endpoint': function_name})
# Normalize area names to match expected keys
area_mapping = {
"Personal Growth": "personal_growth",
"Health and Wellness": "health_and_wellness",
"Mental Well-being": "mental_well_being",
"Career Growth": "career_growth",
"Relationship": "relationship"
}
# Normalize area names in filtered data
for entry in filtered_data:
entry["area"] = area_mapping.get(entry["area"], entry["area"])
# Sum points_added, group by area
temp_df = pd.DataFrame(filtered_data)
grouped_points = temp_df.groupby("area")["points_added"].sum()
# Debug: Check the grouped points result
logger.info(f"Grouped Points: {grouped_points}", extra={'user_id': user_id, 'endpoint': function_name})
# Structure the output safely
thirtydays_life_score = {
"overall": int(sum([
grouped_points.get("personal_growth", 0),
grouped_points.get("career_growth", 0),
grouped_points.get("health_and_wellness", 0),
grouped_points.get("mental_well_being", 0),
grouped_points.get("relationship", 0),
])),
"personal_growth": int(grouped_points.get("personal_growth", 0)),
"health_and_wellness": int(grouped_points.get("health_and_wellness", 0)),
"mental_well_being": int(grouped_points.get("mental_well_being", 0)),
"career_growth": int(grouped_points.get("career_growth", 0)),
"relationship": int(grouped_points.get("relationship", 0))
}
# Debug: Check the final structured result
logger.info(f"Final Thirty Days Life Score: {thirtydays_life_score}", extra={'user_id': user_id, 'endpoint': function_name})
# Get current goal
current_goal = '' if not user.goal else user.goal[-1].content
# Get life score achievements in list
recent_wins = user.recent_wins
# Combine everything
reports = {
"life_score": thirtydays_life_score,
"cumulative_life_score": cumulative_life_score,
"mantra_of_the_week": mantra.replace('.',''),
"goal": current_goal,
"recent_wins": recent_wins
}
# Step 4: Return the JSON reports
logger.info(f"User life status generated successfully for user {user_id}: {reports}", extra={'user_id': user_id, 'endpoint': function_name})
return reports
async def get_api_key(api_key_header: str = Security(api_key_header)) -> str:
if api_key_header not in api_keys: # Check against list of valid keys
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid API key"
)
return api_key_header
@catch_error
def get_user_info(user_id):
function_name = get_user_info.__name__
logger.info(f"Retrieving user info for {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
db_params = {
'dbname': 'ourcoach',
'user': 'ourcoach',
'password': 'hvcTL3kN3pOG5KteT17T',
'host': 'staging-ourcoach.cx8se8o0iaiy.ap-southeast-1.rds.amazonaws.com',
'port': '5432'
}
try:
with psycopg2.connect(**db_params) as conn:
with conn.cursor() as cursor:
query = sql.SQL("SELECT left(onboarding,length(onboarding)-1)||',\"growth_guide_name\":\"'||coalesce(b.full_name,'')||'\"}}' onboarding FROM {table} a LEFT JOIN {coach_tbl} b ON a.assign_coach_id = b.id WHERE a.id = %s").format(table=sql.Identifier('public', 'users'), coach_tbl = sql.Identifier('public','coach'))
cursor.execute(query, (user_id,))
row = cursor.fetchone()
if (row):
colnames = [desc[0] for desc in cursor.description]
user_data = dict(zip(colnames, row))
### MODIFY THE FORMAT OF USER DATA
user_data_clean = json.loads(user_data['onboarding'])
# doLiving = "\n".join([f"- {item['question']} : {item['answer']}" for item in user_data_clean.get('doLiving', [])])
doLiving = user_data_clean.get('mySituation', '')
whoImportant = "\n".join([f"- {item['question']} : {item['answer']}" for item in user_data_clean.get('whoImportant', [])])
challenges = "\n".join([f"- {item['question']} : {item['answer']}" for item in user_data_clean.get('challenges', [])])
user_data_formatted = f"""
### USER PROFILE ###
Name: {user_data_clean.get('firstName', '')}
Growth Guide Name: {user_data_clean.get('growth_guide_name', '')}
{user_data_clean.get('firstName', '')}'s challenges (You **must** use this information for the PLANNING STATE):
{challenges}
Persona:
{user_data_clean.get('legendPersona', '')}
Pronouns: {user_data_clean.get('pronouns', '')}
Birthday: {user_data_clean.get('birthDate', '')}
{user_data_clean.get('firstName', '')}'s MBTI: {user_data_clean.get('mbti', '')}
{user_data_clean.get('firstName', '')}'s Love Language: {user_data_clean.get('loveLanguage', '')}
Has {user_data_clean.get('firstName', '')} tried coaching before: {user_data_clean.get('triedCoaching', '')}
Belief in Astrology: {user_data_clean.get('astrology', '')}
The most important area in {user_data_clean.get('firstName', '')}'s life: {user_data_clean.get('mattersMost', ['', '', '', '', ''])[0]}
The second most important area in {user_data_clean.get('firstName', '')}'s life: {user_data_clean.get('mattersMost', ['', '', '', '', ''])[1]}
The third most important area in {user_data_clean.get('firstName', '')}'s life: {user_data_clean.get('mattersMost', ['', '', '', '', ''])[2]}
The fourth most important area in {user_data_clean.get('firstName', '')}'s life: {user_data_clean.get('mattersMost', ['', '', '', '', ''])[3]}
The fifth most important area in {user_data_clean.get('firstName', '')}'s life: {user_data_clean.get('mattersMost', ['', '', '', '', ''])[4]} (Matters the least)
What does {user_data_clean.get('firstName', '')} do for a living:
{doLiving}
{user_data_clean.get('firstName', '')}'s current situation: {user_data_clean.get('mySituation', '')}
{user_data_clean.get('firstName', '')}'s most important person:
{whoImportant}
"""
logger.info(f"User info retrieved successfully for {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
return user_data_formatted, user_data_clean.get('legendPersona', '')
else:
logger.warning(f"No user info found for {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
raise DBError(user_id=user_id, message="Error retrieving user info", code="NoOnboardingError", e=str(e))
except psycopg2.Error as e:
logger.error(f"Database error while retrieving user info for {user_id}: {e}", extra={'user_id': user_id, 'endpoint': function_name})
raise DBError(user_id=user_id, message="Error retrieving user info", code="SQLError", e=str(e))
@catch_error
def get_growth_guide_summary(user_id, booking_id):
function_name = get_growth_guide_summary.__name__
logger.info(f"Retrieving growth guide summary for user {user_id} and session {booking_id}", extra={'user_id': user_id, 'endpoint': function_name})
db_params = {
'dbname': 'ourcoach',
'user': 'ourcoach',
'password': 'hvcTL3kN3pOG5KteT17T',
'host': 'staging-ourcoach.cx8se8o0iaiy.ap-southeast-1.rds.amazonaws.com',
'port': '5432'
}
try:
with psycopg2.connect(**db_params) as conn:
with conn.cursor() as cursor:
query = sql.SQL("SELECT * FROM {table} WHERE user_id = %s AND booking_id = %s").format(table=sql.Identifier('public', 'user_notes'))
cursor.execute(query, (user_id, booking_id))
row = cursor.fetchone()
if (row):
colnames = [desc[0] for desc in cursor.description]
summary_data = dict(zip(colnames, row))
logger.info(f"Growth guide summary retrieved successfully for user {user_id} and session {booking_id}: {summary_data}", extra={'user_id': user_id, 'endpoint': function_name})
return summary_data
else:
logger.warning(f"No growth guide summary found for user {user_id} and session {booking_id}", extra={'user_id': user_id, 'endpoint': function_name})
return None
except psycopg2.Error as e:
logger.error(f"Database error while retrieving growth guide summary for user {user_id} and session {booking_id}: {e}", extra={'user_id': user_id, 'endpoint': function_name})
raise DBError(user_id=user_id, message="Error retrieving user info", code="SQLError", e=str(e))
@catch_error
def get_all_bookings():
function_name = get_all_bookings.__name__
logger.info(f"Retrieving all bookings", extra={'endpoint': function_name})
db_params = {
'dbname': 'ourcoach',
'user': 'ourcoach',
'password': 'hvcTL3kN3pOG5KteT17T',
'host': 'staging-ourcoach.cx8se8o0iaiy.ap-southeast-1.rds.amazonaws.com',
'port': '5432'
}
try:
with psycopg2.connect(**db_params) as conn:
with conn.cursor() as cursor:
query = sql.SQL("SELECT id, user_id FROM {table}").format(table=sql.Identifier('public', 'booking'))
cursor.execute(query)
rows = cursor.fetchall()
bookings = [{'booking_id': row[0], 'user_id': row[1]} for row in rows]
logger.info(f"Retrieved {len(bookings)} bookings", extra={'endpoint': function_name})
return bookings
except psycopg2.Error as e:
bookings = []
logger.error(f"Database error while retrieving bookings: {e}", extra={'endpoint': function_name})
raise DBError(user_id='no-user', message="Error retrieving user info", code="SQLError", e=str(e))
finally:
return bookings
@catch_error
def update_growth_guide_summary(user_id, session_id, ourcoach_summary):
function_name = update_growth_guide_summary.__name__
logger.info(f"Updating growth guide summary for user {user_id} and session {session_id}", extra={'user_id': user_id, 'endpoint': function_name})
db_params = {
'dbname': 'ourcoach',
'user': 'ourcoach',
'password': 'hvcTL3kN3pOG5KteT17T',
'host': 'staging-ourcoach.cx8se8o0iaiy.ap-southeast-1.rds.amazonaws.com',
'port': '5432'
}
try:
with psycopg2.connect(**db_params) as conn:
with conn.cursor() as cursor:
query = sql.SQL("""
UPDATE {table}
SET ourcoach_summary = %s
WHERE user_id = %s AND booking_id = %s
""").format(table=sql.Identifier('public', 'user_notes'))
cursor.execute(query, (json.dumps(ourcoach_summary), user_id, session_id))
conn.commit()
logger.info(f"Growth guide summary updated successfully for user {user_id} and session {session_id}", extra={'user_id': user_id, 'endpoint': function_name})
except psycopg2.Error as e:
logger.error(f"Database error while updating growth guide summary: {e}", extra={'user_id': user_id, 'endpoint': function_name})
raise DBError(user_id=user_id, message="Error updating growth guide summary", code="SQLError", e=str(e))
@catch_error
def add_growth_guide_session(user_id, session_id, coach_id, session_started_at, zoom_ai_summary, gg_report, ourcoach_summary):
function_name = add_growth_guide_session.__name__
logger.info(f"Adding growth guide session for user {user_id} and session {session_id}", extra={'user_id': user_id, 'endpoint': function_name})
db_params = {
'dbname': 'ourcoach',
'user': 'ourcoach',
'password': 'hvcTL3kN3pOG5KteT17T',
'host': 'staging-ourcoach.cx8se8o0iaiy.ap-southeast-1.rds.amazonaws.com',
'port': '5432'
}
try:
with psycopg2.connect(**db_params) as conn:
with conn.cursor() as cursor:
query = sql.SQL("""
INSERT INTO {table} (booking_id, coach_id, session_started_at, user_id, updated_at, gg_report, ourcoach_summary, created_at, zoom_ai_summary)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
""").format(table=sql.Identifier('public', 'user_notes'))
current_time = datetime.now(timezone.utc)
cursor.execute(query, (
session_id,
coach_id,
session_started_at,
user_id,
current_time,
json.dumps(gg_report),
json.dumps(ourcoach_summary),
current_time,
json.dumps(zoom_ai_summary)
))
conn.commit()
logger.info(f"Growth guide session added successfully for user {user_id} and session {session_id}", extra={'user_id': user_id, 'endpoint': function_name})
except psycopg2.Error as e:
logger.error(f"Database error while adding growth guide session: {e}", extra={'user_id': user_id, 'endpoint': function_name})
raise DBError(user_id=user_id, message="Error adding growth guide session", code="SQLError", e=str(e))
@catch_error
def get_growth_guide_session(user_id, session_id):
# returns the zoom_ai_summary and the gg_report columns from the POST_GG table
function_name = get_growth_guide_session.__name__
logger.info(f"Retrieving growth guide session for user {user_id} and session {session_id}", extra={'user_id': user_id, 'endpoint': function_name})
db_params = {
'dbname': 'ourcoach',
'user': 'ourcoach',
'password': 'hvcTL3kN3pOG5KteT17T',
'host': 'staging-ourcoach.cx8se8o0iaiy.ap-southeast-1.rds.amazonaws.com',
'port': '5432'
}
try:
with psycopg2.connect(**db_params) as conn:
with conn.cursor() as cursor:
query = sql.SQL("SELECT * FROM {table} WHERE user_id = %s AND booking_id = %s").format(table=sql.Identifier('public', 'user_notes'))
cursor.execute(query, (user_id, session_id))
row = cursor.fetchone()
if (row):
colnames = [desc[0] for desc in cursor.description]
session_data = dict(zip(colnames, row))
logger.info(f"Growth guide session retrieved successfully for user {user_id} and session {session_id}: {session_data}", extra={'user_id': user_id, 'endpoint': function_name})
return session_data
else:
logger.warning(f"No growth guide session found for user {user_id} and session {session_id}", extra={'user_id': user_id, 'endpoint': function_name})
return None
except psycopg2.Error as e:
logger.error(f"Database error while retrieving growth guide session for user {user_id} and session {session_id}: {e}", extra={'user_id': user_id, 'endpoint': function_name})
raise DBError(user_id=user_id, message="Error retrieving user info", code="SQLError", e=str(e))
@catch_error
def download_file_from_s3(filename, bucket):
user_id = filename.split('.')[0]
function_name = download_file_from_s3.__name__
logger.info(f"Downloading file {filename} from S3 bucket {bucket}", extra={'user_id': user_id, 'endpoint': function_name})
file_path = os.path.join('users', 'data', filename)
try:
if (AWS_ACCESS_KEY and AWS_SECRET_KEY):
session = boto3.session.Session(aws_access_key_id=AWS_ACCESS_KEY, aws_secret_access_key=AWS_SECRET_KEY, region_name=REGION)
else:
session = boto3.session.Session()
s3_client = session.client('s3')
with open(file_path, 'wb') as f:
## Upload to Production Folder
s3_client.download_fileobj(bucket, f"dev/users/{filename}", f)
logger.info(f"File {filename} downloaded successfully from S3", extra={'user_id': user_id, 'endpoint': function_name})
return True
except Exception as e:
logger.error(f"Error downloading file {filename} from S3: {e}", extra={'user_id': user_id, 'endpoint': function_name})
if (os.path.exists(file_path)):
os.remove(file_path)
raise DBError(user_id=user_id, message="Error downloading file from S3", code="S3Error", e=str(e))
@catch_error
def add_to_cache(user):
user_id = user.user_id
function_name = add_to_cache.__name__
logger.info(f"Adding user {user_id} to the cache", extra={'user_id': user_id, 'endpoint': function_name})
user_cache[user_id] = user
logger.info(f"User {user_id} added to the cache", extra={'user_id': user_id, 'endpoint': function_name})
return True
@catch_error
def pop_cache(user_id):
if user_id == 'all':
user_cache.reset_cache()
return True
if user_id not in user_cache:
logger.warning(f"[POPPING] User {user_id} not found in the cache", extra={'user_id': user_id, 'endpoint': 'pop_cache'})
# check if file exists
if os.path.exists(os.path.join("users", "to_upload", f"{user_id}.pkl")):
# upload file
logger.info(f"Attempting upload file {user_id}.json to S3", extra={'user_id': user_id, 'endpoint': 'pop_cache'})
upload_file_to_s3(f"{user_id}.pkl")
user_cache.pop(user_id, None)
logger.info(f"User {user_id} has been removed from the cache", extra={'user_id': user_id, 'endpoint': 'pop_cache'})
return True
@catch_error
def update_user(user):
user_id = user.user_id
function_name = update_user.__name__
logger.info(f"Updating user {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
# remove from cache, which will also upload the file
pop_cache(user_id)
logger.info(f"User {user_id} has been removed from the cache", extra={'user_id': user_id, 'endpoint': function_name})
logger.info(f"User {user.user_id} updated successfully in S3", extra={'user_id': user_id, 'endpoint': function_name})
return True
@catch_error
def upload_mementos_to_db(user_id):
function_name = upload_mementos_to_db.__name__
logger.info(f"Uploading mementos to DB for user {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
db_params = {
'dbname': 'ourcoach',
'user': 'ourcoach',
'password': 'hvcTL3kN3pOG5KteT17T',
'host': 'staging-ourcoach.cx8se8o0iaiy.ap-southeast-1.rds.amazonaws.com',
'port': '5432'
}
folder_path = os.path.join("mementos", "to_upload", user_id)
if (not os.path.exists(folder_path)):
logger.warning(f"No mementos folder found for user {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
return True # Return True as this is not an error condition
try:
memento_files = [f for f in os.listdir(folder_path) if f.endswith('.json')]
if (not memento_files):
logger.info(f"No memento files found for user {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
return True
with psycopg2.connect(**db_params) as conn:
with conn.cursor() as cursor:
base_query = """
INSERT INTO public.user_memento
(user_id, type, title, description, tags, priority,
mood, status, location, recurrence, context, created_at, follow_up_on)
VALUES (%s, %s, %s, %s, %s::jsonb, %s, %s, %s, %s, %s, %s, %s, %s)
"""
for filename in memento_files:
file_path = os.path.join(folder_path, filename)
try:
with open(file_path, 'r', encoding='utf-8') as json_file:
data = json.load(json_file)
# Convert tags array to proper JSON string
tags_json = json.dumps(data.get('tags', []))
# Prepare data with proper defaults and transformations
memento_data = [
user_id, # Replace the user_id from JSON with the actual user_id
data.get('type', ''),
data.get('title', ''),
data.get('description', ''),
tags_json, # Send tags as JSON string
data.get('priority', ''),
data.get('mood', ''),
data.get('status', ''),
data.get('location', ''),
data.get('recurrence', ''),
data.get('context', ''),
datetime.now(timezone.utc),
pd.to_datetime(data.get('follow_up_on', ''))
]
cursor.execute(base_query, memento_data)
conn.commit()
# Remove file after successful insert
os.remove(file_path)
logger.info(f"Successfully processed memento {filename}", extra={'user_id': user_id, 'endpoint': function_name})
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON in file {filename}: {str(e)}", extra={'user_id': user_id, 'endpoint': function_name})
continue
except Exception as e:
logger.error(f"Error processing memento {filename}: {str(e)}", extra={'user_id': user_id, 'endpoint': function_name})
continue
# Try to remove the directory after processing all files
try:
os.rmdir(folder_path)
except OSError:
pass # Ignore if directory is not empty or already removed
return True
except psycopg2.Error as e:
logger.error(f"Database error while uploading mementos: {str(e)}", extra={'user_id': user_id, 'endpoint': function_name})
raise DBError(user_id=user_id, message="Error uploading mementos", code="SQLError", e=str(e))
@catch_error
def get_users_mementos(user_id, date):
function_name = get_users_mementos.__name__
db_params = {
'dbname': 'ourcoach',
'user': 'ourcoach',
'password': 'hvcTL3kN3pOG5KteT17T',
'host': 'staging-ourcoach.cx8se8o0iaiy.ap-southeast-1.rds.amazonaws.com',
'port': '5432'
}
logger.info(f"Retrieving mementos for user {user_id} on date {date}", extra={'endpoint': function_name, 'user_id': user_id})
# Convert date string to PostgreSQL compatible format
parsed_date = date
logger.info(f"Retrieving mementos for user {user_id} on date {parsed_date}", extra={'endpoint': function_name, 'user_id': user_id})
try:
with psycopg2.connect(**db_params) as conn:
with conn.cursor() as cursor:
query = sql.SQL("""
SELECT * FROM public.user_memento
WHERE user_id = %s AND DATE(follow_up_on) = %s
""")
cursor.execute(query, (user_id, parsed_date))
rows = cursor.fetchall()
if rows:
colnames = [desc[0] for desc in cursor.description]
mementos = [dict(zip(colnames, row)) for row in rows]
logger.info(f"Retrieved {len(mementos)} mementos for user {user_id} on date {date}", extra={'endpoint': function_name, 'user_id': user_id})
return mementos
else:
logger.info(f"No mementos found for user {user_id} on date {date}", extra={'endpoint': function_name, 'user_id': user_id})
return []
except psycopg2.Error as e:
mementos = []
logger.error(f"Database error while retrieving mementos: {e}", extra={'endpoint': function_name, 'user_id': user_id})
raise DBError(user_id=user_id, message="Error retrieving mementos", code="SQLError", e=str(e))
finally:
return mementos
@catch_error
def id_to_persona(assistant_id):
# persona_to_assistant = {
# "Coach Steve": "asst_mUm6MBcW544p1iVov9mwIC96",
# "Coach Aris": "asst_4WcktKgYdDnXA1QUlWvrNfWV",
# "Coach Teresa": "asst_4UVkFK6r2pbz6NK6kNzG4sTW"
# }
assistant_to_persona = {
"asst_mUm6MBcW544p1iVov9mwIC96": "Coach Steve, based on the persona of Steve Jobs (Innovation & Leadership)",
"asst_4WcktKgYdDnXA1QUlWvrNfWV": "Coach Aris, based on the persona of Aristotle (Logic & Decision Making)",
"asst_4UVkFK6r2pbz6NK6kNzG4sTW": "Coach Teresa, based on the persona of Mother Teresa (Compassion & Empathy)"
}
return assistant_to_persona.get(assistant_id, "Coach Steve, based on the persona of Steve Jobs")
@catch_error
def get_growth_guide(user_id):
function_name = get_growth_guide.__name__
logger.info(f"Retrieving growth guide for user {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
db_params = {
'dbname': 'ourcoach',
'user': 'ourcoach',
'password': "hvcTL3kN3pOG5KteT17T",
'host': "staging-ourcoach.cx8se8o0iaiy.ap-southeast-1.rds.amazonaws.com",
'port': '5432'
}
try:
with psycopg2.connect(**db_params) as conn:
with conn.cursor() as cursor:
query = sql.SQL("SELECT assign_coach_id FROM {table} WHERE id = %s").format(table=sql.Identifier('public', 'users'))
cursor.execute(query, (user_id,))
row = cursor.fetchone()
if row:
logger.info(f"Growth guide retrieved successfully for user {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
gg_id = row[0]
# Now query the coach table (public.coach) and take columns = ['id', 'full_name', 'email', 'bio',]
query = sql.SQL("SELECT full_name, email, bio FROM {table} WHERE id = %s").format(table=sql.Identifier('public', 'coach'))
cursor.execute(query, (gg_id,))
row = cursor.fetchone()
if row:
colnames = ['full_name', 'email', 'bio']
coach_data = dict(zip(colnames, row))
logger.info(f"Coach data {coach_data} retrieved successfully for user {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
return coach_data
else:
logger.warning(f"No growth guide found for user {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
return None
except psycopg2.Error as e:
logger.error(f"Database error while retrieving growth guide for user {user_id}: {e}", extra={'user_id': user_id, 'endpoint': function_name})
raise DBError(user_id=user_id, message="Error retrieving growth guide", code="SQLError", e=str(e))
def get_booked_gg_sessions(user_id):
# query the public.booking table for all bookings with user_id = user_id. sort by most recent first.
# also transform the status column from int to string as:
# 0 : creating
# 1 : pending
# 2 : completed
# 3 : canceled
function_name = get_booked_gg_sessions.__name__
logger.info(f"Retrieving booked growth guide sessions for user {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
db_params = {
'dbname': 'ourcoach',
'user': 'ourcoach',
'password': "hvcTL3kN3pOG5KteT17T",
'host': "staging-ourcoach.cx8se8o0iaiy.ap-southeast-1.rds.amazonaws.com",
'port': '5432'
}
try:
# first, query the public.users table and get the users local timezone from the timezone column
user_timezone = get_user_local_timezone(user_id)
with psycopg2.connect(**db_params) as conn:
with conn.cursor() as cursor:
query = sql.SQL("SELECT * FROM {table} WHERE user_id = %s ORDER BY created_at DESC").format(table=sql.Identifier('public', 'booking'))
cursor.execute(query, (user_id,))
rows = cursor.fetchall()
bookings = []
if rows:
colnames = [desc[0] for desc in cursor.description]
raw_bookings = [dict(zip(colnames, row)) for row in rows]
for booking in raw_bookings:
booking['status'] = {
0: 'creating',
1: 'pending',
2: 'completed',
3: 'canceled'
}.get(booking['status'], 'creating')
# convert datetime (in UTC) to users local timezone and convert to a string in the format YYYY-MM-DD %a HH:MM:SS
booking['session_date'] = booking['session_started_at'].astimezone(pytz.utc).astimezone(pytz.timezone(user_timezone)).strftime('%Y-%m-%d %a %H:%M:%S')
booking['created_at'] = booking['created_at'].astimezone(pytz.utc).astimezone(pytz.timezone(user_timezone)).strftime('%Y-%m-%d %a %H:%M:%S')
booking['updated_at'] = booking['updated_at'].astimezone(pytz.utc).astimezone(pytz.timezone(user_timezone)).strftime('%Y-%m-%d %a %H:%M:%S')
booking['booking_id'] = booking['id']
booking['user_rating'] = booking['rate']
booking["user_session_feedback"] = booking['comment']
# convert the coach_id to coach_name
query = sql.SQL("SELECT full_name FROM {table} WHERE id = %s").format(table=sql.Identifier('public', 'coach'))
cursor.execute(query, (booking['coach_id'],))
row = cursor.fetchone()
if row:
booking['coach_name'] = row[0]
else:
booking['coach_name'] = 'Unknown'
booking = {k: v for k, v in booking.items() if k in ['status', 'booking_id', 'duration', 'user_rating', 'user_session_feedback', 'session_date', 'coach_name', 'created_at', 'updated_at']}
bookings.append(booking)
logger.info(f"Retrieved {len(bookings)} booked growth guide sessions for user {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
return bookings
else:
logger.warning(f"No booked growth guide sessions found for user {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
return []
except psycopg2.Error as e:
bookings = []
logger.error(f"Database error while retrieving booked growth guide sessions for user {user_id}: {e}", extra={'user_id': user_id, 'endpoint': function_name})
raise DBError(user_id=user_id, message="Error retrieving booked growth guide sessions", code="SQLError", e=str(e))
finally:
return bookings
@catch_error
def get_user_local_timezone(user_id):
function_name = get_user_local_timezone.__name__
logger.info(f"Retrieving local timezone for user {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
db_params = {
'dbname': 'ourcoach',
'user': 'ourcoach',
'password': "hvcTL3kN3pOG5KteT17T",
'host': "staging-ourcoach.cx8se8o0iaiy.ap-southeast-1.rds.amazonaws.com",
'port': '5432'
}
try:
with psycopg2.connect(**db_params) as conn:
with conn.cursor() as cursor:
query = sql.SQL("SELECT timezone FROM {table} WHERE id = %s").format(table=sql.Identifier('public', 'users'))
cursor.execute(query, (user_id,))
row = cursor.fetchone()
if row:
user_timezone = row[0]
logger.info(f"User timezone {user_timezone} retrieved successfully for user {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
return user_timezone
else:
user_timezone = 'Asia/Singapore'
logger.warning(f"No timezone found for user {user_id}. Using default timezone {user_timezone}", extra={'user_id': user_id, 'endpoint': function_name})
return user_timezone
except psycopg2.Error as e:
logger.error(f"Database error while retrieving local timezone for user {user_id}: {e}", extra={'user_id': user_id, 'endpoint': function_name})
raise DBError(user_id=user_id, message="Error retrieving local timezone", code="SQLError", e=str(e))
@catch_error
def get_user_subscriptions(user_id):
function_name = get_user_subscriptions.__name__
logger.info(f"Retrieving subscriptions for user {user_id}", extra={'endpoint': function_name})
db_params = {
'dbname': 'ourcoach',
'user': 'ourcoach',
'password': 'hvcTL3kN3pOG5KteT17T',
'host': 'staging-ourcoach.cx8se8o0iaiy.ap-southeast-1.rds.amazonaws.com',
'port': '5432'
}
try:
# get users timezone
user_timezone = get_user_local_timezone(user_id)
with psycopg2.connect(**db_params) as conn:
with conn.cursor() as cursor:
query = sql.SQL("""
SELECT * FROM {table}
WHERE user_id = %s
ORDER BY period_started DESC
""").format(table=sql.Identifier('public', 'user_subscription'))
cursor.execute(query, (user_id,))
rows = cursor.fetchall()
# for each row in rows, transform the period_started and period_ended columns to subscription_start_date and subscription_end_date
# additionally, convert thesubscription_start_date, subscription_end_date, created_at, updated_at to the users local timezone
if rows:
colnames = [desc[0] for desc in cursor.description]
rows = [dict(zip(colnames, row)) for row in rows]
for row in rows:
# pass
row['subscription_start_date'] = row['period_started'].astimezone(pytz.utc).astimezone(pytz.timezone(user_timezone)).strftime('%Y-%m-%d %a %H:%M:%S')
row['subscription_end_date'] = row['period_ended'].astimezone(pytz.utc).astimezone(pytz.timezone(user_timezone)).strftime('%Y-%m-%d %a %H:%M:%S')
row['paid_at'] = row['paid_at'].astimezone(pytz.utc).astimezone(pytz.timezone(user_timezone)).strftime('%Y-%m-%d %a %H:%M:%S')
row['canceled_at'] = row['canceled_at'].astimezone(pytz.utc).astimezone(pytz.timezone(user_timezone)).strftime('%Y-%m-%d %a %H:%M:%S') if row['canceled_at'] else None
row['status'] = row['stripe_status']
del row['period_started']
del row['period_ended']
del row['stripe_subscription_id']
del row['stripe_invoice_id']
del row['id']
del row['user_id']
logger.info(f"Retrieved {len(rows)} subscriptions for user {user_id}", extra={'endpoint': function_name})
return rows
else:
return ["No subscriptions found for user"]
except psycopg2.Error as e:
logger.error(f"Database error while retrieving user subscriptions: {e}", extra={'endpoint': function_name})
raise DBError(user_id=user_id, message="Error retrieving user subscriptions", code="SQLError", e=str(e))
def generate_uuid():
return str(uuid.uuid4())
def print_log(level, message, **kwargs):
"""
Print log in JSON format for better readability in CloudWatch.
Parameters:
level (str): The log level (e.g., "INFO", "ERROR", "DEBUG").
message (str): The log message.
**kwargs: Additional key-value pairs to include in the log.
example:
print_log("INFO", "User logged in", user_id=123, action="login")
"""
log_entry = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"level": level,
"message": message,
}
log_entry.update(kwargs)
print(json.dumps(log_entry, ensure_ascii=False))