fastapi-v2 / app /utils.py
BMCVRN's picture
[FEATURE] Add Logic to Calculate and Get Life Score (#1)
e2224db verified
raw
history blame
60.4 kB
import logging
import boto3
from botocore.exceptions import ClientError, NoCredentialsError, PartialCredentialsError
import os
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException, Security, Query, status
from fastapi.security import APIKeyHeader
from openai import OpenAI
import pandas as pd
from pydantic import BaseModel
import os
import logging
import json
import psycopg2
from psycopg2 import sql
import os
from dotenv import load_dotenv
import datetime
import threading
import pickle # Replace dill with pickle
from cachetools import TTLCache
import threading
import time
load_dotenv()
# Environment Variables for API Keys
api_keys = [os.getenv('FASTAPI_KEY')]
api_key_header = APIKeyHeader(name="X-API-Key")
load_dotenv()
AWS_ACCESS_KEY = os.getenv('AWS_ACCESS_KEY')
AWS_SECRET_KEY = os.getenv('AWS_SECRET_KEY')
REGION = os.getenv('AWS_REGION')
logger = logging.getLogger(__name__)
class AutoSaveTTLCache(TTLCache):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.lock = threading.Lock()
def pop(self, key, *args):
with self.lock:
if key in self:
value = self[key]
# Save to S3 before removing
filename = f'{key}.pkl'
try:
# value.save_user()
upload_file_to_s3(filename)
logger.info(f"User {key} saved to S3 during cache eviction",
extra={'user_id': key, 'endpoint': 'cache_eviction'})
except Exception as e:
logger.error(f"Failed to save user {key} to S3 during cache eviction: {e}",
extra={'user_id': key, 'endpoint': 'cache_eviction'})
return super().pop(key, *args)
# Replace the simple TTLCache with our custom implementation
user_cache = AutoSaveTTLCache(maxsize=100, ttl=1200) # 20 minutes TTL
def force_file_move(source, destination):
function_name = force_file_move.__name__
logger.info(f"Attempting to move file from {source} to {destination}", extra={'endpoint': function_name})
try:
# Ensure the destination directory exists
os.makedirs(os.path.dirname(destination), exist_ok=True)
# Move the file, replacing if it already exists
os.replace(source, destination)
logger.info(f"File moved successfully: {source} -> {destination}", extra={'endpoint': function_name})
except FileNotFoundError:
logger.error(f"Source file not found: {source}", extra={'endpoint': function_name})
except Exception as e:
logger.error(f"An error occurred while moving file: {e}", extra={'endpoint': function_name})
def get_user(user_id):
function_name = get_user.__name__
logger.info(f"Fetching user {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
try:
if user_id in user_cache:
logger.info(f"User {user_id} found in cache", extra={'user_id': user_id, 'endpoint': function_name})
return user_cache[user_id]
except KeyError:
# TTLCache automatically removes expired entries
pass
client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))
user_file = os.path.join('users', 'data', f'{user_id}.pkl')
# if os.path.exists(user_file):
# with open(user_file, 'rb') as f:
# user = pickle.load(f)
# user.client = client
# user.conversations.client = client
# with cache_lock:
# user_cache[user_id] = user
# return user
logger.warning(f"User {user_id} not found locally. Attempting to download from S3", extra={'user_id': user_id, 'endpoint': function_name})
download = download_file_from_s3(f'{user_id}.pkl', 'core-ai-assets')
logger.info(f"Download success: {download}", extra={'user_id': user_id, 'endpoint': function_name})
if (download):
with open(user_file, 'rb') as f:
user = pickle.load(f)
user.client = client
user.conversations.client = client
user_cache[user_id] = user # No need for lock here
os.remove(user_file)
logger.info(f"User {user_id} loaded successfully from S3", extra={'user_id': user_id, 'endpoint': function_name})
return user
else:
logger.error(f"User {user_id} pickle does not exist in S3", extra={'user_id': user_id, 'endpoint': function_name})
# check if user_info exists
user_info = get_user_info(user_id)
if (user_info):
# user has done onboarding but pickle file not created
raise ReferenceError(f"User {user_id} pickle still being created")
raise LookupError(f"User [{user_id}] has not onboarded yet")
def get_life_score(user_id):
function_name = get_life_score.__name__
logger.info(f"Generating user life score for user {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
# Step 1: Call get_user to get user's info
try:
user = get_user(user_id)
# user_info = user.user_info
# user_messages = user.get_messages()
## all the life scores here
life_score = {
"personal_growth_score": user.personal_growth_score,
"career_growth_score": user.career_growth_score,
"relationship_score": user.relationship_score,
"mental_well_being_score": user.mental_well_being_score,
"health_and_wellness_score": user.health_and_wellness_score
}
except LookupError as e:
logger.error(f"Error fetching user data: {e}", extra={'user_id': user_id, 'endpoint': function_name})
raise e
return life_score
def get_user_summary(user_id):
function_name = get_user_summary.__name__
logger.info(f"Generating user summary for user {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
# Step 1: Call get_user to get user's info
try:
user = get_user(user_id)
user_info = user.user_info
user_messages = user.get_messages()
except LookupError as e:
logger.error(f"Error fetching user data: {e}", extra={'user_id': user_id, 'endpoint': function_name})
raise e
# Step 2: Construct the Prompt
chat_history = "\n".join(
[f"{message['role'].capitalize()}: {message['content']}" for message in user_messages]
)
# Build the system prompt according to the provided instructions
system_prompt = """
You are an AI language model designed to generate three outputs based on the user's profile and chat history:
1. **Pre-Growth Guide Session Report**: A comprehensive summary of the user's profile and life context for the Growth Guide (a human coach), covering five key areas: **mental well-being**, **physical health and wellness**, **relationships**, **career growth**, and **personal growth**.
2. **User's Growth Guide Preparation Brief**: A comprehensive brief guiding the user on what to discuss with the Growth Guide, providing actionable advice and highlighting key areas to focus on during their session, covering the same five key areas.
3. **30-Minute Coaching Session Script**: A detailed, partitioned script to help the coach prepare for the session, including dialogue, questions, and guidance tailored to the client's needs, covering the five key areas. The script should be partitioned into several sections in the JSON output, similar to the structure provided for the Pre-Growth Guide Session Report.
---
**Instructions:**
- **Comprehensive Coverage**:
Ensure that all three outputs cover the following five key areas:
1. **Mental Well-being**
2. **Physical Health and Wellness**
3. **Relationships**
4. **Career Growth**
5. **Personal Growth**
If the chat history provided by the user does not touch on one or more of these areas, the report should state: "The user hasn't discussed this area yet. Maybe you can cover this during the Growth Guide session."
- **Output Format**:
Output the result in JSON format following the specified JSON schema. The outputs for the **Pre-Growth Guide Session Report** and the **30-Minute Coaching Session Script** should be partitioned into several JSON keys, similar to the structure provided for the Pre-Growth Guide Session Report.
---
### **1. Pre-Growth Guide Session Report**
**Objective**: Provide a comprehensive summary of the user's profile and life context for the Growth Guide, covering the five key areas.
**Format**:
- **user_overview**:
- **name**: The user's full name.
- **age_group**: The user's age range (e.g., "30-39").
- **primary_goals**: The main goals the user is focusing on.
- **preferred_coaching_style**: The coaching style the user prefers.
- **personality_insights**:
- **mbti**: The user's Myers-Briggs Type Indicator personality type.
- **top_love_languages**: A list of the user's top two love languages.
- **belief_in_astrology**: Whether the user believes in horoscope/astrology.
- **progress_snapshot**:
- **mental_well_being**: Summary of the user's mental well-being.
- **physical_health_and_wellness**: Summary of the user's physical health and wellness.
- **relationships**: Summary of the user's relationships.
- **career_growth**: Summary of the user's career growth.
- **personal_growth**: Summary of the user's personal growth.
If any of the key areas are not discussed, include a note: "The user hasn't discussed this area yet. Maybe you can cover this during the Growth Guide session."
- **engagement_insights**:
- **daily_reflection_participation**: The user's engagement level with daily reflections.
- **weekly_streaks**: Any streaks or consistent patterns in the user's engagement.
- **recent_feedback**: Recent comments or feedback from the user highlighting current concerns or focus areas.
---
### **2. User's Growth Guide Preparation Brief**
**Objective**: Guide the user on what to discuss with the Growth Guide, providing actionable advice and highlighting key areas to focus on during their session, covering the five key areas.
**Format**:
Structure the brief with the following sections, and output it as a JSON object with these keys:
- **reflect**: Encourage the user to consider what each focus area means to them and which aspects they want to improve.
- **recall_successes**: Prompt the user to think of times when they effectively managed or improved in the five key areas, and what strategies worked for them then.
- **identify_challenges**: Encourage the user to be ready to discuss any obstacles they're facing in each area, and to consider possible solutions or resources that might help.
- **set_goals**: Ask the user to decide what they hope to achieve from the session and how improvements in each area will impact their life.
- **additional_tips**: Provide practical advice for the session, such as choosing a quiet space, having materials ready, and being open to sharing thoughts honestly.
---
### **3. 30-Minute Coaching Session Script**
**Objective**: Help the coach prepare for the session by providing a detailed, partitioned script tailored to the client's specific needs and goals, following a specific session order and focusing on the user's top three most important areas.
**Instructions**:
- **Session Order**:
The session should follow this specific order:
1. **Warm Welcome and Rapport Building** (5 mins)
2. **Exploring X Goals** (10 mins)
3. **Developing X Strategies** (10 mins)
4. **Wrap-Up and Commitment** (5 mins)
The "X" in "Exploring X Goals" and "Developing X Strategies" should be replaced with the user's top three most important areas from the five key areas. Focus on one area per session. If possible, prioritize the areas based on the user's expressed concerns or goals.
- **Detailed Segments**:
For each segment, include:
- **segment_title**: Title of the session segment.
- **coach_dialogue**: Scripted prompts and questions the coach can use.
- **client_engagement**: Opportunities for the client to respond and reflect.
- **guidance**: Suggestions for the coach on how to navigate responses.
- **Additional Instructions**:
- Ensure that each of the user's top three areas is addressed in separate sessions. If any area was not previously discussed by the user, include guidance for the coach to introduce or explore that area during the session.
- **Additional Tips for the Coach**:
Offer advice on tone, active listening, empathy, and techniques to encourage self-reflection and solution-focused thinking.
- **Action Plan Summary**:
Summarize the agreed-upon steps the client will take before the next session, covering the focused area.
- **Scheduling Next Session**:
Include a prompt for confirming the next meeting time.
- **End of Session**:
Conclude with a positive note, reinforcing confidence and expressing anticipation for the client's progress.
**Style Guidelines**:
- Use empathetic and supportive language.
- Encourage open-ended dialogue.
- Focus on actionable and achievable steps.
- Personalize the script to align with the client's experiences and aspirations.
---
**Note**:
- If the user hasn't discussed one or more of the key areas, the outputs should note this and suggest that these areas can be covered during the Growth Guide session.
---
** JSON OUTPUT FORMAT EXAMPLE **:
{
"pre_growth_guide_session_report": {
"user_overview": {
"name": "Alex Johnson",
"age_group": "25-34",
"primary_goals": "Improve mental well-being, advance career, enhance relationships",
"preferred_coaching_style": "Supportive and goal-oriented"
},
"personality_insights": {
"mbti": "ENFP",
"top_love_languages": ["Quality Time", "Words of Affirmation"],
"belief_in_astrology": "No"
},
"progress_snapshot": {
"mental_well_being": "Alex has been experiencing increased stress due to workload and is seeking ways to manage anxiety and improve overall mental health.",
"physical_health_and_wellness": "Maintains a regular exercise routine but wants to incorporate healthier eating habits.",
"relationships": "Feels disconnected from friends and family due to busy schedule; wishes to rebuild social connections.",
"career_growth": "Aiming for a promotion but feels uncertain about the necessary skills and how to stand out.",
"personal_growth": "Interested in learning new skills like photography and improving time management."
},
"engagement_insights": {
"daily_reflection_participation": "Consistently completes daily reflections, often focusing on stress levels and work-life balance.",
"weekly_streaks": "Maintained a 4-week streak of daily reflections and goal tracking.",
"recent_feedback": "\"I'm struggling to balance my work with my personal life and it's affecting my relationships.\""
}
},
"users_growth_guide_preparation_brief": [
{
"key": "reflect",
"value": "Consider what mental well-being means to you. Which aspects of your life—such as stress management, anxiety reduction, or emotional balance—do you want to improve?"
},
{
"key": "recall_successes",
"value": "Think of times when you effectively managed stress or maintained a positive mindset. What strategies or habits helped you during those times?"
},
{
"key": "identify_challenges",
"value": "Be ready to discuss current obstacles you're facing in managing stress and anxiety. Consider any patterns or triggers you've noticed."
},
{
"key": "set_goals",
"value": "Decide what you hope to achieve from this session. How would improving your mental well-being impact your daily life and long-term goals?"
},
{
"key": "additional_tips",
"value": "Environment: Choose a quiet, comfortable space.\nMaterials: Have a notebook and pen ready to jot down insights.\nOpenness: Be prepared to share your thoughts honestly and openly."
}
],
"30_minute_coaching_session_script": {
"session_overview": "1. Warm Welcome and Rapport Building (5 mins)\n2. Exploring Mental Well-being Goals (10 mins)\n3. Developing Mental Well-being Strategies (10 mins)\n4. Wrap-Up and Commitment (5 mins)",
"detailed_segments": [
{
"segment_title": "Warm Welcome and Rapport Building",
"coach_dialogue": "“Hello, Alex! It's great to see you today. How have you been feeling lately?”",
"client_engagement": "Allow Alex to share recent experiences and feelings.",
"guidance": "Listen attentively and acknowledge his feelings to build rapport."
},
{
"segment_title": "Exploring Mental Well-being Goals",
"coach_dialogue": "“You've mentioned wanting to improve your mental well-being. Could you tell me more about what that means to you?”",
"client_engagement": "Encourage Alex to discuss specific aspects he wants to focus on.",
"guidance": "Ask open-ended questions to help him delve deeper into his goals."
},
{
"segment_title": "Developing Mental Well-being Strategies",
"coach_dialogue": "“Let's explore some strategies to help manage your stress and improve your mental health. What ideas have you considered?”",
"client_engagement": "Allow him to suggest strategies; if needed, offer suggestions such as mindfulness techniques, time management, or setting boundaries.",
"guidance": "Collaborate to develop practical and achievable steps."
},
{
"segment_title": "Wrap-Up and Commitment",
"coach_dialogue": "“To summarize, you've decided to incorporate mindfulness meditation and set clear work-life boundaries, right?”",
"client_engagement": "Ensure accuracy and gauge his commitment level.",
"guidance": "Offer encouragement and address any concerns he might have."
}
],
"additional_tips_for_the_coach": "Maintain an empathetic tone throughout the session. Use active listening to validate his feelings. Encourage self-reflection by asking open-ended questions.",
"action_plan_summary": "Alex will practice mindfulness meditation for 10 minutes daily and set a strict end time for work each day to improve work-life balance.",
"scheduling_next_session": "“Let's plan to meet again in two weeks to discuss your progress. Does that work for you?”",
"end_of_session": "“Thank you for sharing so openly today, Alex. I believe these steps will make a positive impact. I'm looking forward to hearing about your progress in our next session.”"
}
}
"""
# Combine user information and chat history for context
user_context = f"""
Based on the following user profile and chat history, generate the required reports.
### USER PROFILE ###
{user_info}
### CHAT HISTORY ###
{chat_history}
"""
# Step 3: Call the OpenAI API using the specified function
client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))
try:
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": [
{
"type": "text",
"text": system_prompt
}
]
},
{
"role": "user",
"content": [
{
"type": "text",
"text": user_context
}
]
}
],
response_format={
"type": "json_schema",
"json_schema": {
"name": "growth_guide_session",
"strict": True,
"schema": {
"type": "object",
"properties": {
"pre_growth_guide_session_report": {
"type": "object",
"description": "A comprehensive summary of the user's profile and life context for the Growth Guide.",
"properties": {
"user_overview": {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "The user's full name."
},
"age_group": {
"type": "string",
"description": "The user's age range (e.g., '30-39')."
},
"primary_goals": {
"type": "string",
"description": "The main goals the user is focusing on."
},
"preferred_coaching_style": {
"type": "string",
"description": "The coaching style the user prefers."
}
},
"required": ["name", "age_group", "primary_goals", "preferred_coaching_style"],
"additionalProperties": False
},
"personality_insights": {
"type": "object",
"properties": {
"mbti": {
"type": "string",
"description": "The user's Myers-Briggs Type Indicator personality type."
},
"top_love_languages": {
"type": "array",
"items": {
"type": "string"
},
"description": "A list of the user's top two love languages."
},
"belief_in_astrology": {
"type": "string",
"description": "Whether the user believes in horoscope/astrology."
}
},
"required": ["mbti", "top_love_languages", "belief_in_astrology"],
"additionalProperties": False
},
"progress_snapshot": {
"type": "object",
"properties": {
"mental_well_being": {
"type": "string",
"description": "Summary of the user's mental well-being."
},
"physical_health_and_wellness": {
"type": "string",
"description": "Summary of the user's physical health and wellness."
},
"relationships": {
"type": "string",
"description": "Summary of the user's relationships."
},
"career_growth": {
"type": "string",
"description": "Summary of the user's career growth."
},
"personal_growth": {
"type": "string",
"description": "Summary of the user's personal growth."
}
},
"required": [
"mental_well_being",
"physical_health_and_wellness",
"relationships",
"career_growth",
"personal_growth"
],
"additionalProperties": False
},
"engagement_insights": {
"type": "object",
"properties": {
"daily_reflection_participation": {
"type": "string",
"description": "The user's engagement level with daily reflections."
},
"weekly_streaks": {
"type": "string",
"description": "Any streaks or consistent patterns in the user's engagement."
},
"recent_feedback": {
"type": "string",
"description": "Recent comments or feedback from the user."
}
},
"required": ["daily_reflection_participation", "weekly_streaks", "recent_feedback"],
"additionalProperties": False
}
},
"required": ["user_overview", "personality_insights", "progress_snapshot", "engagement_insights"],
"additionalProperties": False
},
"users_growth_guide_preparation_brief": {
"type": "array",
"description": "A brief guiding the user on what to discuss with the Growth Guide, providing actionable advice and highlighting key areas to focus on.",
"items": {
"type": "object",
"properties": {
"key": {
"type": "string",
"description": "The section heading."
},
"value": {
"type": "string",
"description": "Content for the section."
}
},
"required": [
"key",
"value"
],
"additionalProperties": False
}
},
"30_minute_coaching_session_script": {
"type": "object",
"description": "A detailed, partitioned script to help the coach prepare for the session, following the specified session order and focusing on the user's top three most important areas.",
"properties": {
"session_overview": {
"type": "string",
"description": "Breakdown of the session segments with time frames."
},
"detailed_segments": {
"type": "array",
"items": {
"type": "object",
"properties": {
"segment_title": {
"type": "string",
"description": "Title of the session segment."
},
"coach_dialogue": {
"type": "string",
"description": "Scripted prompts and questions the coach can use."
},
"client_engagement": {
"type": "string",
"description": "Opportunities for the client to respond and reflect."
},
"guidance": {
"type": "string",
"description": "Suggestions for the coach on how to navigate responses."
}
},
"required": ["segment_title", "coach_dialogue", "client_engagement", "guidance"],
"additionalProperties": False
},
"description": "Detailed information for each session segment."
},
"additional_tips_for_the_coach": {
"type": "string",
"description": "Advice on tone, active listening, empathy, etc."
},
"action_plan_summary": {
"type": "string",
"description": "Summary of the agreed-upon steps the client will take."
},
"scheduling_next_session": {
"type": "string",
"description": "Prompt for confirming the next meeting time."
},
"end_of_session": {
"type": "string",
"description": "Conclusion with a positive note."
}
},
"required": [
"session_overview",
"detailed_segments",
"additional_tips_for_the_coach",
"action_plan_summary",
"scheduling_next_session",
"end_of_session"
],
"additionalProperties": False
}
},
"required": [
"pre_growth_guide_session_report",
"users_growth_guide_preparation_brief",
"30_minute_coaching_session_script"
],
"additionalProperties": False
}
}
}
,
temperature=0.5,
max_tokens=3000,
top_p=1,
frequency_penalty=0,
presence_penalty=0
)
# Get response and convert into dictionary
reports = json.loads(response.choices[0].message.content)
except Exception as e:
logger.error(f"OpenAI API call failed: {e}", extra={'user_id': user_id, 'endpoint': function_name})
raise e
# Step 4: Return the JSON reports
logger.info(f"User summary generated successfully for user {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
return reports
def get_user_life_status(user_id):
function_name = get_user_life_status.__name__
logger.info(f"Generating user life status for user {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
# Step 1: Call get_user to get user's info
try:
user = get_user(user_id)
user_info = user.user_info
user_messages = user.get_messages()
except LookupError as e:
logger.error(f"Error fetching user data: {e}", extra={'user_id': user_id, 'endpoint': function_name})
raise e
# Step 2: Construct the Prompt
chat_history = "\n".join(
[f"{message['role'].capitalize()}: {message['content']}" for message in user_messages]
)
# Build the system prompt according to the provided instructions
system_prompt = """
You are an AI assistant that generates a personalized life status report for users based on their profile and chat history. Your task is to analyze the provided user data and produce a JSON output following the specified schema.
**Instructions:**
1. **Life Score:**
- Calculate numerical scores for each of the following areas based on the user's profile and chat history, especially their responses to quantitative questions (e.g., "on a scale of 1 to 5, how do you feel about ..."):
- `personal_growth`
- `career_growth`
- `mental_well_being`
- `health_and_wellness`
- `relationship`
- Each score should be a float between 0 and 1. If an area hasn't been covered in the chat history, assign it a score of 0.
- Compute the `overall` score as the average of the non-zero area scores.
2. **Mantra of the Week:**
- Create a very short encouragement quote that encapsulates the user's journey toward achieving their goals.
- The mantra **MUST** be a single sentence with fewer than 5 words.
3. **This Week's Focus:**
- Identify the top three most important areas for the user from the five key areas:
- `personal_growth`
- `career_growth`
- `mental_well_being`
- `health_and_wellness`
- `relationship`
- For each selected area, provide a concise statement outlining the user's focus in that area.
4. **Suggested Action Items:**
- For the same three areas identified above, suggest concrete and actionable items that the user can undertake to make progress.
**Output Format:**
Produce your response in JSON format adhering to the following schema:
```json
{
"life_score": {
"overall": float,
"personal_growth": float,
"health_and_wellness": float,
"mental_well_being": float,
"career_growth": float,
"relationship": float
},
"mantra_of_the_week": str,
"this_week_focus": [
{"area": str, "focus": str},
{"area": str, "focus": str},
{"area": str, "focus": str}
],
"action_items": [
{"area": str, "focus": str},
{"area": str, "focus": str},
{"area": str, "focus": str}
]
}
```
**Guidelines:**
- Ensure all numerical values are floats (e.g., 3.0 instead of 3).
- The `mantra_of_the_week` should be personalized, positive, and encouraging. It **MUST** be a single sentence with fewer than 5 words.
- Focus statements and action items should be clear, specific, and tailored to the user's context.
- Do not include any additional text or commentary outside of the JSON structure.
"""
# Combine user information and chat history for context
user_context = f"""
Based on the following user profile and chat history, generate the life status!
### USER PROFILE ###
{user_info}
### CHAT HISTORY ###
{chat_history}
"""
# Step 3: Call the OpenAI API using the specified function
client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'))
try:
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": [
{
"type": "text",
"text": system_prompt
}
]
},
{
"role": "user",
"content": [
{
"type": "text",
"text": user_context
}
]
}
],
response_format={
"type": "json_schema",
"json_schema": {
"name": "life_status_report",
"strict": True,
"schema": {
"type": "object",
"properties": {
"life_score": {
"type": "object",
"description": "Numerical life scores across different areas.",
"properties": {
"overall": {
"type": "number",
"description": "Overall life score, average of all area scores."
},
"personal_growth": {
"type": "number",
"description": "Life score for personal growth."
},
"health_and_wellness": {
"type": "number",
"description": "Life score for health and wellness."
},
"mental_well_being": {
"type": "number",
"description": "Life score for mental well-being."
},
"career_growth": {
"type": "number",
"description": "Life score for career growth."
},
"relationship": {
"type": "number",
"description": "Life score for relationships."
}
},
"required": [
"overall",
"personal_growth",
"health_and_wellness",
"mental_well_being",
"career_growth",
"relationship"
],
"additionalProperties": False
},
"mantra_of_the_week": {
"type": "string",
"description": "A very short encouragement quote that encapsulates the user's journey to achieve their goals."
},
"this_week_focus": {
"type": "array",
"description": "List of the user's top 3 most important areas with focus.",
"items": {
"type": "object",
"properties": {
"area": {
"type": "string",
"description": "The area of focus (e.g., 'personal_growth')."
},
"focus": {
"type": "string",
"description": "The focus within that area."
}
},
"required": ["area", "focus"],
"additionalProperties": False
}
},
"action_items": {
"type": "array",
"description": "List of suggested concrete action items for the user.",
"items": {
"type": "object",
"properties": {
"area": {
"type": "string",
"description": "The area of action (e.g., 'personal_growth')."
},
"focus": {
"type": "string",
"description": "Concrete action items in that area."
}
},
"required": ["area", "focus"],
"additionalProperties": False
}
}
},
"required": [
"life_score",
"mantra_of_the_week",
"this_week_focus",
"action_items"
],
"additionalProperties": False
}
}
}
,
temperature=0.5,
max_tokens=3000,
top_p=1,
frequency_penalty=0,
presence_penalty=0
)
# Get response and convert into dictionary
reports = json.loads(response.choices[0].message.content)
except Exception as e:
logger.error(f"OpenAI API call failed: {e}", extra={'user_id': user_id, 'endpoint': function_name})
raise e
# Step 4: Return the JSON reports
logger.info(f"User life status generated successfully for user {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
return reports
def get_api_key(api_key_header: str = Security(api_key_header)) -> str:
if api_key_header == os.getenv("FASTAPI_KEY"):
return api_key_header
raise HTTPException(
status_code=403,
detail="Could not validate credentials"
)
def get_user_info(user_id):
function_name = get_user_info.__name__
logger.info(f"Retrieving user info for {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
db_params = {
'dbname': 'ourcoach',
'user': 'ourcoach',
'password': 'tfPMEnYFWKGSQqZWVhQo',
'host': 'production-ourcoach.cx8se8o0iaiy.ap-southeast-1.rds.amazonaws.com',
'port': '5432'
}
try:
with psycopg2.connect(**db_params) as conn:
with conn.cursor() as cursor:
query = sql.SQL("SELECT * FROM {table} WHERE id = %s").format(table=sql.Identifier('public', 'users'))
cursor.execute(query, (user_id,))
row = cursor.fetchone()
if (row):
colnames = [desc[0] for desc in cursor.description]
user_data = dict(zip(colnames, row))
### MODIFY THE FORMAT OF USER DATA
user_data_clean = json.loads(user_data['onboarding'])
# doLiving = "\n".join([f"- {item['question']} : {item['answer']}" for item in user_data_clean.get('doLiving', [])])
doLiving = user_data_clean.get('mySituation', '')
whoImportant = "\n".join([f"- {item['question']} : {item['answer']}" for item in user_data_clean.get('whoImportant', [])])
challenges = "\n".join([f"- {item['question']} : {item['answer']}" for item in user_data_clean.get('challenges', [])])
user_data_formatted = f"""
### USER PROFILE ###
Name: {user_data_clean.get('firstName', '')}
{user_data_clean.get('firstName', '')}'s Legendary Persona: {user_data_clean.get('legendPersona', '')}
Pronouns: {user_data_clean.get('pronouns', '')}
Birthday: {user_data_clean.get('birthDate', '')}
{user_data_clean.get('firstName', '')}'s MBTI: {user_data_clean.get('mbti', '')}
{user_data_clean.get('firstName', '')}'s Love Language: {user_data_clean.get('loveLanguage', '')}
Has {user_data_clean.get('firstName', '')} tried coaching before: {user_data_clean.get('triedCoaching', '')}
Belief in Astrology: {user_data_clean.get('astrology', '')}
The most important area in {user_data_clean.get('firstName', '')}'s life: {user_data_clean.get('mattersMost', ['', '', '', '', ''])[0]}
The second most important area in {user_data_clean.get('firstName', '')}'s life: {user_data_clean.get('mattersMost', ['', '', '', '', ''])[1]}
The third most important area in {user_data_clean.get('firstName', '')}'s life: {user_data_clean.get('mattersMost', ['', '', '', '', ''])[2]}
The fourth most important area in {user_data_clean.get('firstName', '')}'s life: {user_data_clean.get('mattersMost', ['', '', '', '', ''])[3]}
The fifth most important area in {user_data_clean.get('firstName', '')}'s life: {user_data_clean.get('mattersMost', ['', '', '', '', ''])[4]} (Matters the least)
What does {user_data_clean.get('firstName', '')} do for a living:
{doLiving}
{user_data_clean.get('firstName', '')}'s current situation: {user_data_clean.get('mySituation', '')}
{user_data_clean.get('firstName', '')}'s most important person:
{whoImportant}
{user_data_clean.get('firstName', '')}'s challenges:
{challenges}
"""
logger.info(f"User info retrieved successfully for {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
return user_data_formatted, user_data_clean.get('mattersMost', ['', '', '', '', ''])
else:
logger.warning(f"No user info found for {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
return None
except psycopg2.Error as e:
logger.error(f"Database error while retrieving user info for {user_id}: {e}", extra={'user_id': user_id, 'endpoint': function_name})
return None
def add_growth_guide_session(user_id, session_id, coach_id, session_started_at, zoom_ai_summary, gg_report, ourcoach_summary):
function_name = add_growth_guide_session.__name__
logger.info(f"Adding growth guide session for user {user_id} and session {session_id}", extra={'user_id': user_id, 'endpoint': function_name})
db_params = {
'dbname': 'ourcoach',
'user': 'ourcoach',
'password': 'tfPMEnYFWKGSQqZWVhQo',
'host': 'production-ourcoach.cx8se8o0iaiy.ap-southeast-1.rds.amazonaws.com',
'port': '5432'
}
try:
with psycopg2.connect(**db_params) as conn:
with conn.cursor() as cursor:
query = sql.SQL("""
INSERT INTO {table} (booking_id, coach_id, session_started_at, user_id, updated_at, gg_report, ourcoach_summary, created_at, zoom_ai_summary)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
""").format(table=sql.Identifier('public', 'user_notes'))
current_time = datetime.datetime.now()
cursor.execute(query, (
session_id,
coach_id,
session_started_at,
user_id,
current_time,
json.dumps(gg_report),
json.dumps(ourcoach_summary),
current_time,
json.dumps(zoom_ai_summary)
))
conn.commit()
logger.info(f"Growth guide session added successfully for user {user_id} and session {session_id}", extra={'user_id': user_id, 'endpoint': function_name})
except psycopg2.Error as e:
logger.error(f"Database error while adding growth guide session: {e}", extra={'user_id': user_id, 'endpoint': function_name})
raise e
def get_growth_guide_session(user_id, session_id):
# returns the zoom_ai_summary and the gg_report columns from the POST_GG table
function_name = get_growth_guide_session.__name__
logger.info(f"Retrieving growth guide session for user {user_id} and session {session_id}", extra={'user_id': user_id, 'endpoint': function_name})
db_params = {
'dbname': 'ourcoach',
'user': 'ourcoach',
'password': 'tfPMEnYFWKGSQqZWVhQo',
'host': 'production-ourcoach.cx8se8o0iaiy.ap-southeast-1.rds.amazonaws.com',
'port': '5432'
}
try:
with psycopg2.connect(**db_params) as conn:
with conn.cursor() as cursor:
query = sql.SQL("SELECT * FROM {table} WHERE user_id = %s AND booking_id = %s").format(table=sql.Identifier('public', 'user_notes'))
cursor.execute(query, (user_id, session_id))
row = cursor.fetchone()
if (row):
colnames = [desc[0] for desc in cursor.description]
session_data = dict(zip(colnames, row))
logger.info(f"Growth guide session retrieved successfully for user {user_id} and session {session_id}: {session_data}", extra={'user_id': user_id, 'endpoint': function_name})
return session_data
else:
logger.warning(f"No growth guide session found for user {user_id} and session {session_id}", extra={'user_id': user_id, 'endpoint': function_name})
return None
except psycopg2.Error as e:
logger.error(f"Database error while retrieving growth guide session for user {user_id} and session {session_id}: {e}", extra={'user_id': user_id, 'endpoint': function_name})
return None
def upload_file_to_s3(filename):
user_id = filename.split('.')[0]
function_name = upload_file_to_s3.__name__
logger.info(f"Uploading file {filename} to S3", extra={'user_id': user_id, 'endpoint': function_name})
bucket = 'core-ai-assets'
try:
if (AWS_ACCESS_KEY and AWS_SECRET_KEY):
session = boto3.session.Session(aws_access_key_id=AWS_ACCESS_KEY, aws_secret_access_key=AWS_SECRET_KEY, region_name=REGION)
else:
session = boto3.session.Session()
s3_client = session.client('s3')
with open(os.path.join('users', 'to_upload', filename), "rb") as f:
## Upload to Production Folder
s3_client.upload_fileobj(f, bucket, f'production/users/{filename}')
logger.info(f"File {filename} uploaded successfully to S3", extra={'user_id': user_id, 'endpoint': function_name})
os.remove(os.path.join('users', 'to_upload', filename))
# force_file_move(os.path.join('users', 'to_upload', filename), os.path.join('users', 'data', filename))
return True
except (FileNotFoundError, NoCredentialsError, PartialCredentialsError) as e:
logger.error(f"S3 upload failed for {filename}: {e}", extra={'user_id': user_id, 'endpoint': function_name})
return False
def download_file_from_s3(filename, bucket):
user_id = filename.split('.')[0]
function_name = download_file_from_s3.__name__
logger.info(f"Downloading file {filename} from S3 bucket {bucket}", extra={'user_id': user_id, 'endpoint': function_name})
file_path = os.path.join('users', 'data', filename)
try:
if (AWS_ACCESS_KEY and AWS_SECRET_KEY):
session = boto3.session.Session(aws_access_key_id=AWS_ACCESS_KEY, aws_secret_access_key=AWS_SECRET_KEY, region_name=REGION)
else:
session = boto3.session.Session()
s3_client = session.client('s3')
with open(file_path, 'wb') as f:
## Upload to Production Folder
s3_client.download_fileobj(bucket, f"production/users/{filename}", f)
logger.info(f"File {filename} downloaded successfully from S3", extra={'user_id': user_id, 'endpoint': function_name})
return True
except Exception as e:
logger.error(f"Error downloading file {filename} from S3: {e}", extra={'user_id': user_id, 'endpoint': function_name})
if (os.path.exists(file_path)):
os.remove(file_path)
return False
def pop_cache(user_id):
if user_id not in user_cache:
logger.warning(f"[POPPING] User {user_id} not found in the cache", extra={'user_id': user_id, 'endpoint': 'pop_cache'})
# check if file exists
if os.path.exists(os.path.join("users", "to_upload", f"{user_id}.pkl")):
# upload file
logger.info(f"Attempting upload file {user_id}.json to S3", extra={'user_id': user_id, 'endpoint': 'pop_cache'})
upload_file_to_s3(f"{user_id}.pkl")
try:
user_cache.pop(user_id, None)
logger.info(f"User {user_id} has been removed from the cache", extra={'user_id': user_id, 'endpoint': 'pop_cache'})
return True
except:
return False
def update_user(user):
user_id = user.user_id
function_name = update_user.__name__
logger.info(f"Updating user {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
# remove from cache, which will also upload the file
pop_cache(user_id)
logger.info(f"User {user_id} has been removed from the cache", extra={'user_id': user_id, 'endpoint': function_name})
logger.info(f"User {user.user_id} updated successfully in S3", extra={'user_id': user_id, 'endpoint': function_name})
return True
def upload_mementos_to_db(user_id):
function_name = upload_mementos_to_db.__name__
logger.info(f"Uploading mementos to DB for user {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
db_params = {
'dbname': 'ourcoach',
'user': 'ourcoach',
'password': 'tfPMEnYFWKGSQqZWVhQo',
'host': 'production-ourcoach.cx8se8o0iaiy.ap-southeast-1.rds.amazonaws.com',
'port': '5432'
}
folder_path = os.path.join("mementos", "to_upload", user_id)
if (not os.path.exists(folder_path)):
logger.warning(f"No mementos folder found for user {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
return True # Return True as this is not an error condition
try:
memento_files = [f for f in os.listdir(folder_path) if f.endswith('.json')]
if (not memento_files):
logger.info(f"No memento files found for user {user_id}", extra={'user_id': user_id, 'endpoint': function_name})
return True
with psycopg2.connect(**db_params) as conn:
with conn.cursor() as cursor:
base_query = """
INSERT INTO public.user_memento
(user_id, type, title, description, tags, priority,
mood, status, location, recurrence, context, created_at, follow_up_on)
VALUES (%s, %s, %s, %s, %s::jsonb, %s, %s, %s, %s, %s, %s, %s, %s)
"""
for filename in memento_files:
file_path = os.path.join(folder_path, filename)
try:
with open(file_path, 'r', encoding='utf-8') as json_file:
data = json.load(json_file)
# Convert tags array to proper JSON string
tags_json = json.dumps(data.get('tags', []))
# Prepare data with proper defaults and transformations
memento_data = [
user_id, # Replace the user_id from JSON with the actual user_id
data.get('type', ''),
data.get('title', ''),
data.get('description', ''),
tags_json, # Send tags as JSON string
data.get('priority', ''),
data.get('mood', ''),
data.get('status', ''),
data.get('location', ''),
data.get('recurrence', ''),
data.get('context', ''),
datetime.datetime.now(),
pd.to_datetime(data.get('follow_up_on', ''))
]
cursor.execute(base_query, memento_data)
conn.commit()
# Remove file after successful insert
os.remove(file_path)
logger.info(f"Successfully processed memento {filename}", extra={'user_id': user_id, 'endpoint': function_name})
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON in file {filename}: {str(e)}", extra={'user_id': user_id, 'endpoint': function_name})
continue
except Exception as e:
logger.error(f"Error processing memento {filename}: {str(e)}", extra={'user_id': user_id, 'endpoint': function_name})
continue
# Try to remove the directory after processing all files
try:
os.rmdir(folder_path)
except OSError:
pass # Ignore if directory is not empty or already removed
return True
except psycopg2.Error as e:
logger.error(f"Database error while uploading mementos: {str(e)}", extra={'user_id': user_id, 'endpoint': function_name})
raise ConnectionError(f"Database error: {str(e)}")
except Exception as e:
logger.error(f"Unexpected error uploading mementos: {str(e)}", extra={'user_id': user_id, 'endpoint': function_name})
return False
def get_users_mementos(user_id, date):
function_name = get_users_mementos.__name__
db_params = {
'dbname': 'ourcoach',
'user': 'ourcoach',
'password': 'tfPMEnYFWKGSQqZWVhQo',
'host': 'production-ourcoach.cx8se8o0iaiy.ap-southeast-1.rds.amazonaws.com',
'port': '5432'
}
logger.info(f"Retrieving mementos for user {user_id} on date {date}", extra={'endpoint': function_name, 'user_id': user_id})
# Convert date string to PostgreSQL compatible format
parsed_date = date
logger.info(f"Retrieving mementos for user {user_id} on date {parsed_date}", extra={'endpoint': function_name, 'user_id': user_id})
try:
with psycopg2.connect(**db_params) as conn:
with conn.cursor() as cursor:
query = sql.SQL("""
SELECT * FROM public.user_memento
WHERE user_id = %s AND DATE(follow_up_on) = %s
""")
cursor.execute(query, (user_id, parsed_date))
rows = cursor.fetchall()
if rows:
colnames = [desc[0] for desc in cursor.description]
mementos = [dict(zip(colnames, row)) for row in rows]
logger.info(f"Retrieved {len(mementos)} mementos for user {user_id} on date {date}", extra={'endpoint': function_name, 'user_id': user_id})
return mementos
else:
logger.info(f"No mementos found for user {user_id} on date {date}", extra={'endpoint': function_name, 'user_id': user_id})
return []
except psycopg2.Error as e:
logger.error(f"Database error while retrieving mementos: {e}", extra={'endpoint': function_name, 'user_id': user_id})
return []
def print_log(level, message, **kwargs):
"""
Print log in JSON format for better readability in CloudWatch.
Parameters:
level (str): The log level (e.g., "INFO", "ERROR", "DEBUG").
message (str): The log message.
**kwargs: Additional key-value pairs to include in the log.
example:
print_log("INFO", "User logged in", user_id=123, action="login")
"""
log_entry = {
"timestamp": datetime.datetime.utcnow().isoformat() + "Z",
"level": level,
"message": message,
}
log_entry.update(kwargs)
print(json.dumps(log_entry, ensure_ascii=False))