fastapi-v2 / app /assistants.py
LittleKnife's picture
Update app/assistants.py
1b18eff verified
import json
import io
import os
from datetime import datetime, timezone
import json
import random
from time import sleep
import openai
import pandas as pd
from dotenv import load_dotenv
import logging
import psycopg2
from psycopg2 import sql
import pytz
from app.exceptions import AssistantError, BaseOurcoachException, OpenAIRequestError, UtilsError
from app.utils import get_booked_gg_sessions, get_growth_guide, get_growth_guide_summary, get_user_subscriptions, get_users_mementos, print_log
from app.flows import FOLLOW_UP_STATE, GENERAL_COACHING_STATE, MICRO_ACTION_STATE, REFLECTION_STATE
from app.web_search import SearchEngine
load_dotenv()
logger = logging.getLogger(__name__)
OURCOACH_DASHBOARD_URL = os.getenv("OURCOACH_DASHBOARD_URL")
class FeedbackContent:
def __init__(self, content, role):
self.types = [
{"name": "Inspirational Quotes",
"description": "Short, impactful quotes from the user's chosen Legendary Persona. Don't forget to mention the name of the person who said that quote",
"examples": ['"The journey of a thousand miles begins with a single step." – Lao Tzu'],
"search": False},
{"name": "Tips & Advice",
"description": "Practical suggestions or strategies to help users overcome challenges and enhance their personal growth.",
"examples": ['Tip: Try setting a small, achievable goal each morning to build momentum'],
"search": False},
{"name": "Encouragement Messages",
"description": "Positive affirmations and supportive statements to boost the user's morale.",
"examples": ["Remember, every challenge is an opportunity to grow stronger. You’ve got this! 💪"],
"search": False},
{"name": "Resource Links",
"description": "Links to external content such as videos, articles, podcasts, or tutorials that provide additional insights or learning opportunities.",
"examples": ['🎥 Watch this video on building meaningful connections'],
"search": True},
{"name": "Personalised Recommendations",
"description": "Tailored suggestions based on the user's progress, preferences, and responses.",
"examples": ['Based on your interest in improving relationships, we recommend reading "The Five Love Languages."'],
"search": False},
{"name": "Affirmations",
"description": "Positive statements that reinforce the user's self-belief and confidence.",
"examples": ["I am capable of achieving my goals and overcoming any challenges"],
"search": False},
{"name": "Progress Highlights/Milestones",
"description": "Updates on the user's progress, celebrating milestones and achievements.",
"examples": ["Congratulations, Maya! You’ve completed 15 days of the Growth Phase. Keep up the great work!"],
"search": False},
{"name": "Success Stories/Testimonials",
"description": "Real-life stories or testimonials from other users who have achieved their goals.",
"examples": ["Hear how John transformed his daily habits and achieved his personal goals: Read his story"],
"search": True},
{"name": "Mindfulness/Meditation Prompts",
"description": "Guided prompts to help users practice mindfulness or meditation.",
"examples": ["Take a deep breath and spend five minutes reflecting on what you’re grateful for today"],
"search": False},
{"name": "Book/Podcast Recommendations",
"description": "Suggestions for books or podcasts that align with the user's interests and goals.",
"examples": ['📚 "We recommend reading \'Atomic Habits\' by James Clear to help you build effective habits"'],
"search": True},
{"name": "Visual Content (Images, Infographics)",
"description": "Incorporation of visual elements to enhance message delivery and engagement",
"examples": ["Example: (Insert an infographic about habit formation)"],
"search": True},
{"name": "Gamification Elements (Badges, Rewards)",
"description": "Incorporation of game-like elements to motivate and engage users.",
"examples": ["You’ve earned the ‘Consistency Champion’ badge for completing 7 consecutive days!"],
"search": False},
{"name": "Habit-Forming Tips",
"description": "Suggestions to help users build and maintain positive habits.",
"examples": ["Habit Tip: Start your day by drinking a glass of water to kickstart your metabolism"],
"search": False},
{"name": "Seasonal/Themed Content",
"description": "Content that aligns with seasons, holidays, or specific themes to keep interactions timely and relevant.",
"examples": ["Spring Theme: Embrace new beginnings by decluttering one area of your life today"],
"search": False},
{"name": "Inspirational Stories or Case Studies",
"description": "Detailed narratives or case studies showcasing significant transformations or achievements.",
"examples": ["Case Study: How Sarah Overcame Procrastination and Achieved Her Goals Read More"],
"search": False},
{"name": "Fun Facts Related to Personal Growth",
"description": "Interesting and relevant facts that can inspire or inform users.",
"examples": ["Fun Fact: People who write down their goals are 42% more likely to achieve them"],
"search": False},
{"name": "Time Management Tips",
"description": "Suggestions to help users manage their time more effectively.",
"examples": ["Tip: Prioritize your tasks using the Eisenhower Matrix to focus on what’s important"],
"search": False}
]
@staticmethod
def select_random_types(self, formatted_output=False):
selected = random.sample(self.types, 3)
# if formatted_output:
# result = "\n\n".join(
# f"Name: {t['name']}\nDescription: {t['description']}\nExample: {t['examples'][0]}"
# for t in selected
# )
# return result
return selected
def get_current_datetime(user_id):
db_params = {
'dbname': 'ourcoach',
'user': 'ourcoach',
'password': 'hvcTL3kN3pOG5KteT17T',
'host': 'staging-ourcoach.cx8se8o0iaiy.ap-southeast-1.rds.amazonaws.com',
'port': '5432'
}
with psycopg2.connect(**db_params) as conn:
with conn.cursor() as cursor:
query = sql.SQL("SELECT * FROM {table} WHERE id = %s").format(table=sql.Identifier('public', 'users'))
cursor.execute(query, (user_id,))
row = cursor.fetchone()
if (row):
colnames = [desc[0] for desc in cursor.description]
user_data = dict(zip(colnames, row))
user_timezone = user_data['timezone']
return datetime.now().astimezone(pytz.timezone(user_timezone))
class Assistant:
def catch_error(func):
def wrapper(self, *args, **kwargs):
try:
return func(self, *args, **kwargs)
except (BaseOurcoachException) as e:
raise e
except Exception as e:
# Handle other exceptions
logger.error(f"An unexpected error occurred in Assistant: {e}")
raise AssistantError(user_id=self.cm.user.user_id, message="Unexpected error in Assistant", e=str(e))
return wrapper
def __init__(self, id, cm):
self.id = id
self.cm = cm
self.recent_run = None
def cancel_run(self, run, thread):
logger.info(f"(asst) Attempting to cancel run: {run} for thread: {thread}", extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_cancel_run"})
if isinstance(run, str):
try:
run = self.cm.client.beta.threads.runs.retrieve(
thread_id=thread,
run_id=run
)
thread = self.cm.client.beta.threads.retrieve(thread_id=thread)
except openai.NotFoundError:
logger.warning(f"Thread {thread} already deleted: {run}", extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_cancel_run"})
return True
if isinstance(run, PseudoRun):
if run.id == "pseudo_run":
logger.info(f"Run already completed: {run.id}", extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_cancel_run"})
return True
try:
logger.info(f"Attempting to cancel run: {run}", extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_cancel_run"})
if run.status != 'completed':
cancel = self.cm.client.beta.threads.runs.cancel(thread_id=thread.id, run_id=run.id)
while cancel.status != 'cancelled':
sleep(0.05)
cancel = self.cm.client.beta.threads.runs.retrieve(
thread_id=thread.id,
run_id=cancel.id
)
logger.info(f"Succesfully cancelled run: {run.id}", extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_cancel_run"})
return True
else:
logger.info(f"Run already completed: {run.id}", extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_cancel_run"})
return False
except openai.BadRequestError:
# check if run has expired. run has a field 'expires_at' like run.expires_at = 1735008568
# if expired, return True and log run already expired
if run.expires_at < get_current_datetime().timestamp():
logger.warning(f"Run already expired: {run.id}", extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_cancel_run"})
return True
else:
logger.warning(f"Error cancelling run: {run.id}", extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_cancel_run"})
return False
@catch_error
def process(self, thread, text):
try:
# template_search = self.cm.add_message_to_thread(thread.id, "assistant", f"Pay attention to the current state you are in and the flow template to respond to the users query:")
message = self.cm.add_message_to_thread(thread.id, "user", text)
run = self.cm.client.beta.threads.runs.create_and_poll(
thread_id=thread.id,
assistant_id=self.id,
model="gpt-4o-mini",
)
just_finished_intro = False
if run.status == 'completed':
logger.info(f"Run Completed: {run.status}", extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_process"})
self.recent_run = run
return run, just_finished_intro, message
elif run.status == 'failed':
raise OpenAIRequestError(user_id=self.cm.user.user_id, message="Run failed", run_id=run.id)
elif run.status == 'requires_action':
reccursion = 0
logger.info(f"[Run Pending] Status: {run.status}", extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_process"})
while run.status == 'requires_action':
logger.info(f"Run Calling tool [{reccursion}]: {run.status}", extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_process"})
run, just_finished_intro = self.call_tool(run, thread)
reccursion += 1
if reccursion > 10:
logger.warning(f"Run has exceeded maximum recussrion depth({10}) for function_call: {run}", extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_process"})
raise OpenAIRequestError(user_id=self.cm.id, message="Tool Call Reccursion Depth Reached")
if run.status == 'cancel':
logger.warning(f"RUN NOT COMPLETED: {run}", extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_process"})
self.cancel_run(run, thread)
run.status = 'cancelled'
logger.warning(f"Yeap Run Cancelled: {run.status}", extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_process"})
return run, just_finished_intro, message
elif run.status == 'completed':
logger.info(f"Run Completed: {run.status}", extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_process"})
self.recent_run = run
return run, just_finished_intro, message
elif run.status == 'failed':
raise OpenAIRequestError(user_id=self.cm.id, message="Run failed")
return run, just_finished_intro, message
except Exception as e:
# Cancel the run
logger.error(f"Error in process: {e}", extra={"user_id": self.cm.user.user_id, "endpoint": 'assistant_process'})
logger.error(f"Cancelling run {run.id} for thread {thread.id}", extra={"user_id": self.cm.user.user_id, "endpoint": 'cancel_erroneous_run'})
self.cancel_run(run, thread)
logger.error(f"Run {run.id} cancelled for thread {thread.id}", extra={"user_id": self.cm.user.user_id, "endpoint": 'cancel_erroneous_run'})
raise e
def call_tool(self, run, thread):
try:
tool_outputs = []
logger.info(f"Required actions: {list(map(lambda x: f'{x.function.name}({x.function.arguments})', run.required_action.submit_tool_outputs.tool_calls))}",
extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_call_tool"})
just_finish_intro = False
for tool in run.required_action.submit_tool_outputs.tool_calls:
if tool.function.name == "transition":
transitions = json.loads(tool.function.arguments)
logger.info(f"Transition: {transitions['from']} -> {transitions['to']}",
extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_transition"})
if transitions['from'] == "PLANNING STATE":
tool_outputs.append({
"tool_call_id": tool.id,
"output": f"help the user set a new goal"
})
# logger.info(f"Exiting the introduction state",
# extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_transition"})
# just_finish_intro = True
# # run = self.cancel_run(run, thread)
# # logger.info(f"Successfully cancelled run",
# # extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_transition"})
# tool_outputs.append({
# "tool_call_id": tool.id,
# "output": "true"
# })
else:
flow_instructions = ""
if transitions['to'] == "REFLECTION STATE":
flow_instructions = REFLECTION_STATE
next = self.cm.matters_most.next()
logger.info(f"Successfully moved to bext matters most: {next}")
question_format = random.choice(['[Option 1] Likert-Scale Objective Question','[Option 2] Multiple-Choice Question','[Option 3] Yes-No Question'])
flow_instructions = f"""Today's opening question format is: {question_format}. Send a WARM, SUCCINCT and PERSONALIZED (according to my PROFILE) first message in this format! The most warm, succinct & personalized message will get rewarded!\n
The reflection topic is: {next}. YOU MUST SEND CREATIVE, PERSONALIZED (only mention specific terms that are related to the reflection topic/area), AND SUCCINCT MESSAGES!! The most creative message will be rewarded! And make every new reflection fresh and not boring! \n
""" + flow_instructions
elif transitions['to'] == "FOLLOW UP STATE":
flow_instructions = FOLLOW_UP_STATE
elif transitions['to'] == "GENERAL COACHING STATE":
flow_instructions = GENERAL_COACHING_STATE
tool_outputs.append({
"tool_call_id": tool.id,
"output": f"{flow_instructions}\n\n" + f"** Follow the above flow template to respond to the user **"
})
elif tool.function.name == "get_date":
# print(f"[DATETIME]: {get_current_datetime()}")
# self.cm.state['date'] = 'date': pd.Timestamp.now().strftime("%Y-%m-%d %a %H:%M:%S")
# get and update the current time to self.cm.state['date'] but keep the date component
current_time = get_current_datetime(self.cm.user.user_id)
# replace time component of self.cm.state['date'] with the current time
self.cm.state['date'] = str(pd.to_datetime(self.cm.state['date']).replace(hour=current_time.hour, minute=current_time.minute, second=current_time.second))
logger.info(f"Current datetime: {self.cm.state['date']}",
extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_get_date"})
tool_outputs.append({
"tool_call_id": tool.id,
"output": f"{self.cm.state['date']}"
})
elif tool.function.name == "create_goals" or tool.function.name == "create_memento":
json_string = json.loads(tool.function.arguments)
json_string['created'] = str(self.cm.state['date'])
json_string['updated'] = None
logger.info(f"New event: {json_string}",
extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_create_event"})
# Create a folder for the user's mementos if it doesn't exist
user_mementos_folder = os.path.join("mementos", "to_upload", self.cm.user.user_id)
# Ensure the directory exists
os.makedirs(user_mementos_folder, exist_ok=True)
# Construct the full file path for the JSON file
file_path = os.path.join(user_mementos_folder, f"{json_string['title']}.json")
# Save the JSON string as a file
with open(file_path, "w") as json_file:
json.dump(json_string, json_file)
tool_outputs.append({
"tool_call_id": tool.id,
"output": f"** [Success]: Added event to the user's vector store**"
})
elif tool.function.name == "msearch":
queries = json.loads(tool.function.arguments)['queries']
logger.info(f"Searching for: {queries}",
extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_msearch"})
tool_outputs.append({
"tool_call_id": tool.id,
"output": f"** retrieve any files related to: {queries} **"
})
elif tool.function.name == "get_mementos":
logger.info(f"Getting mementos", extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_get_mementos"})
args = json.loads(tool.function.arguments)
logger.info(f"ARGS: {args}", extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_get_mementos"})
queries = args['queries']
if 'on' not in args:
on = pd.to_datetime(self.cm.state['date']).date()
else:
on = args['on']
if on == '':
on = pd.to_datetime(self.cm.state['date']).date()
else:
on = pd.to_datetime(args['on']).date()
logger.info(f"Query date: {on}", extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_get_mementos"})
# query the user memento db for all mementos where follow_up_on is equal to the query date
mementos = get_users_mementos(self.cm.user.user_id, on)
# if on == "":
# instruction = f"** Fetch all files (mementos) from this thread's Memento vector_store ([id={self.cm.user_personal_memory.id}]) **"
# else:
# instruction = f"** Fetch files (mementos) from this thread's Memento vector_store ([id={self.cm.user_personal_memory.id}]) where the follow_up_on field matches the query date: {on} (ignore the time component, focus only on the date)**"
# # f"** File search this threads' Memento vector_store ([id={self.cm.user_personal_memory.id}]) for the most relevant mementos based on the recent conversation history and context:{context} **"
logger.info(f"Finish Getting mementos: {mementos}", extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_get_mementos"})
tool_outputs.append({
"tool_call_id": tool.id,
"output": f"Today's mementos: {mementos}" if len(mementos) else "No mementos to follow up today."
})
elif tool.function.name == "get_feedback_types":
print_log("WARNING","Calling get_feedback_types", extra={"user_id": self.cm.user.user_id, "endpoint": "get_feedback_types"})
logger.warning("Calling get_feedback_types", extra={"user_id": self.cm.user.user_id, "endpoint": "get_feedback_types"})
feedbacks = [
("Inspirational Quotes", "📜", "Short, impactful quotes from the user's chosen Legendary Persona"),
("Tips & Advice", "💡", "Practical suggestions or strategies for personal growth (no need to say \"Tips:\" in the beggining of your tips)"),
("Encouragement", "🌈", "Positive affirmations and supportive statements"),
("Personalized Recommendations", "🧩", "Tailored suggestions based on user progress"),
("Affirmations", "✨", "Positive statements for self-belief and confidence"),
("Mindfulness/Meditation", "🧘‍♀️", "Guided prompts for mindfulness practice"),
("Book/Podcast", "📚🎧", "Suggestions aligned with user interests"),
("Habit Tip", "🔄", "Tips for building and maintaining habits"),
("Seasonal Content", "🌸", "Time and theme-relevant interactions"),
("Fun Fact", "🎉", "Interesting and inspiring facts (no need to say \"Fun Fact:\" in the beggining of your tips)"),
("Time Management", "⏳", "Tips for effective time management")
]
sample_feedbacks = random.sample(feedbacks, 3)
print_log("INFO",f"Feedback types: {sample_feedbacks}", extra={"user_id": self.cm.user.user_id, "endpoint": "get_feedback_types"})
logger.info(f"Feedback types: {sample_feedbacks}", extra={"user_id": self.cm.user.user_id, "endpoint": "get_feedback_types"})
tool_outputs.append({
"tool_call_id": tool.id,
"output": "Generate a final coach message (feedback message) using these 3 feedback types (together with the stated emoji at the beginning of each feedback): " + str(sample_feedbacks)
})
elif tool.function.name == "search_web":
type = json.loads(tool.function.arguments)['resource_type']
query = json.loads(tool.function.arguments)['query']
logger.info(f"Getting microaction theme: {type} - {query}", extra={"user_id": self.cm.user.user_id, "endpoint": "get_microaction_theme"})
relevant_context = SearchEngine.search(type, query, self.cm.user.user_id)
logger.info(f"Finish Getting microaction theme: {relevant_context}", extra={"user_id": self.cm.user.user_id, "endpoint": "get_microaction_theme"})
tool_outputs.append({
"tool_call_id": tool.id,
"output": f"** Relevant context: {relevant_context} **"
})
elif tool.function.name == "end_conversation":
day_n = json.loads(tool.function.arguments)['day_n']
completed_micro_action = json.loads(tool.function.arguments)['completed_micro_action']
area_of_deep_reflection = json.loads(tool.function.arguments)['area_of_deep_reflection']
self.cm.user.update_micro_action_status(completed_micro_action)
self.cm.user.trigger_deep_reflection_point(area_of_deep_reflection)
# NOTE: we will record whether the user has completed the theme for the day
# NOTE: if no, we will include a short followup message to encourage the user the next day
logger.info(f"Ending conversation after {day_n} days. Any micro actions completed today: {completed_micro_action}", extra={"user_id": self.cm.user.user_id, "endpoint": "end_conversation"})
tool_outputs.append({
"tool_call_id": tool.id,
"output": "true"
})
elif tool.function.name == "create_smart_goal":
print_log("WARNING", f"Creating a SMART goal...", extra={"user_id": self.cm.user.user_id, "endpoint": "create_smart_goal"})
logger.warning(f"Creating a SMART goal...", extra={"user_id": self.cm.user.user_id, "endpoint": "create_smart_goal"})
user_goal = json.loads(tool.function.arguments)['goal']
user_goal_area = json.loads(tool.function.arguments)['area']
self.cm.user.set_goal(user_goal, user_goal_area)
print_log("INFO", f"SMART goal approved: {user_goal}", extra={"user_id": self.cm.user.user_id, "endpoint": "create_smart_goal"})
logger.info(f"SMART goal approved: {user_goal}", extra={"user_id": self.cm.user.user_id, "endpoint": "create_smart_goal"})
tool_outputs.append({
"tool_call_id": tool.id,
"output": "true"
})
elif tool.function.name == "start_now":
logger.info(f"Starting Growth Plan on Day 0",
extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_start_now"})
# set intro finish to true
just_finish_intro = True
# cancel current run
run = PseudoRun(id=run.id, status="cancel", metadata={"message": "start_now"})
return run, just_finish_intro
elif tool.function.name == "change_goal":
logger.info(f"Changing user goal...", extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_change_goal"})
# switch back to the intro assistant, so we set just_finish_intro to False again
just_finish_intro = False
self.cm.user.reset_cumulative_plan_day()
# cancel current run
run = PseudoRun(id=run.id, status="cancel", metadata={"message": "change_goal"})
return run, just_finish_intro
elif tool.function.name == "complete_goal":
logger.info(f"Completing user goal...", extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_complete_goal"})
goal = self.cm.user.update_goal(None, 'COMPLETED')
logger.info(f"Marked users' goal: {goal} as COMPLETED", extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_complete_goal"})
tool_outputs.append({
"tool_call_id": tool.id,
"output": f"Marked users' goal: {goal} as COMPLETED"
})
elif tool.function.name == "process_reminder":
reminder = json.loads(tool.function.arguments)["content"]
timestamp = json.loads(tool.function.arguments)["timestamp"]
recurrence = json.loads(tool.function.arguments)["recurrence"]
action = json.loads(tool.function.arguments)["action"]
logger.info(f"Setting reminder: {reminder} for {timestamp} with recurrence: {recurrence}", extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_process_reminder"})
# timestamp is a string like: YYYY-mm-ddTHH:MM:SSZ (2025-01-05T11:00:00Z)
# convert to datetime object
timestamp = pd.to_datetime(timestamp, format="%Y-%m-%dT%H:%M:%SZ")
logger.info(f"Formatted timestamp: {timestamp}", extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_process_reminder"})
output = f"({recurrence if recurrence else 'One-Time'}) Reminder ({reminder}) set for ({timestamp})"
logger.info(output,
extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_set_reminder"})
self.cm.user.set_reminder({"reminder": reminder, "timestamp": timestamp, 'recurrence': recurrence, 'action': action})
tool_outputs.append({
"tool_call_id": tool.id,
"output": f"** {output} **"
})
elif tool.function.name == "get_user_info":
category = json.loads(tool.function.arguments)['category']
# one of [
# "personal",
# "challenges",
# "recommended_actions",
# "micro_actions",
# "other_focusses",
# "reminders",
# "goal",
# "growth_guide_session",
# "life_score",
# "recent_wins",
# "subscription_info"
# ]
logger.info(f"Getting user information: {category}",
extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_get_user_info"})
user_info = '** If the user asks for their progress, also include a link to their Revelation Dashboard: {OURCOACH_DASHBOARD_URL} so that they can find out more **\n\n'
if category == "personal":
user_info += f"** Personal Information **\n\n{self.cm.user.user_info}"
user_info += f"\n\n** User's Mantra This Week:**\n\n{self.cm.user.mantra or 'Mantra not available.'}"
elif category == "challenges":
user_info += f"** User's Challenges (prioritise ONGOING challenges) **\n\n{self.cm.user.challenges}\n\nLet the user know that ongoing challenges from their growth guide will be integrated into their day-to-day interaction."
elif category == "recommended_actions":
user_info += f"** User's Recommended Actions (upcoming microactions, recommended by growth guide) **\n\n{self.cm.user.recommended_micro_actions}\n\nLet the user know that these microactions from their growth guide will be integrated into their day-to-day interaction."
elif category == "micro_actions":
user_info += f"** User's Micro Actions (already introduced microactions) **\n\n{self.cm.user.micro_actions}"
elif category == "other_focusses":
user_info += f"** User's Other Focusses (other areas of focus) **\n\n{self.cm.user.other_focusses}\n\nLet the user know that other areas of focus from their growth guide will be integrated into their day-to-day interaction."
elif category == "reminders":
user_info += f"** User's Reminders **\n\n{self.cm.user.reminders}"
elif category == "goal":
user_info += f"** User's Goal (prioritise the latest [last item in the array] goal). Do not mention the status of their goal. **\n\n{self.cm.user.goal}"
elif category == "growth_guide_session":
growth_guide = get_growth_guide(self.cm.user.user_id)
user_info += f"** Users' Growth Guide (always refer to the Growth Guide as 'Growth Guide <Name>') **\n{growth_guide}"
logger.info(f"User's growth guide: {growth_guide}",
extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_get_user_info"})
booked_sessions = get_booked_gg_sessions(self.cm.user.user_id)
# for each booking, if the booking has completed, fetch the zoom_ai_summary and gg_report from
for booking in booked_sessions:
if booking['status'] == "completed":
summary_data = get_growth_guide_summary(self.cm.user.user_id, booking['booking_id'])
logger.info(f"Summary data for booking: {booking['booking_id']} - {summary_data}",
extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_get_user_info"})
if summary_data:
booking['zoom_ai_summary'] = summary_data['zoom_ai_summary']
booking['gg_report'] = summary_data['gg_report']
else:
booking['zoom_ai_summary'] = "Growth Guide has not uploaded the report yet"
booking['gg_report'] = "Growth Guide has not uploaded the report yet"
logger.info(f"User's booked sessions: {booked_sessions}",
extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_get_user_info"})
if len(booked_sessions):
# booked session is an array of jsons
# convers it to have i) json where i = 1...N where N is the len of booked_sessions
# join the entire array into 1 string with each item seperated by a newline
formatted_sessions = "\n".join([f"** Session {i+1} **\n{json.dumps(session, indent=4)}" for i, session in enumerate(booked_sessions)])
logger.info(f"Formatted booked sessions: {formatted_sessions}",
extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_get_user_info"})
user_info += f"\n** GG Session Bookings & Summaries (most recent first, time is in users' local timezone but no need to mention this) **\n{formatted_sessions}"
else:
user_info += f"\n** GG Session Summaries **\nNo GG yet. Let the user know they can book one now through their Revelation Dashboard: {OURCOACH_DASHBOARD_URL}! (When including links, DO **NOT** use a hyperlink format. Just show the link as plain text. Example: \"Revelation Dashboard: https://...\")"
user_info += f"\n** Suggested Growth Guide Topics **\n{self.cm.user.recommended_gg_topics}\nOnly suggest 1-2 topics and let the user know they can can find more suggestions on their dashboard"
elif category == "life_score":
user_info += f"** User's Life scores for each area **\n\n Personal Growth: {self.cm.user.personal_growth_score} || Career: {self.cm.user.career_growth_score} || Health/Wellness: {self.cm.user.health_and_wellness_score} || Relationships: {self.cm.user.relationship_score} || Mental Health: {self.cm.user.mental_well_being_score}"
elif category == "recent_wins":
user_info += f"** User's Recent Wins / Achievements **\n\n {self.cm.user.recent_wins}"
elif category == "subscription_info":
subscription_history = get_user_subscriptions(self.cm.user.user_id)
logger.info(f"User's subscription history: {subscription_history}",
extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_get_user_info"})
user_info += f"** User's Subscription Information **\n\n This is a sorted list (most recent first, which also represent the current subscription status) of the users subscription history:\n{subscription_history}\nNote that the stripe_status is one of 'trialing' = (user is on a free trial), 'cancelled' = (user has cancelled their subscription), 'active' = (user is a Premium user). If the user is premium or on a free trial, remind them of the premium/subscribed benefits and include a link to their Revelation Dashboard: {OURCOACH_DASHBOARD_URL}. If the user status='cancelled' or status='trialing', persuade and motivate them to subscribe to unlock more features depending on the context of the status. Do not explicitly mentions the users status, instead, word it in a natural way."
logger.info(f"Finish Getting user information: {user_info}",
extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_get_user_info"})
tool_outputs.append({
"tool_call_id": tool.id,
"output": f"** User Info:\n\n{user_info} **"
})
elif tool.function.name == "extend_two_weeks":
logger.info(f"Changing plan from 1 week to 2 weeks...", extra={"user_id": self.cm.user.user_id, "endpoint": "extend_two_weeks"})
goal = self.cm.user.extend_growth_plan()
tool_outputs.append({
"tool_call_id": tool.id,
"output": f"Changed plan from 1 week to 2 weeks."
})
# Submit all tool outputs at once after collecting them in a list
if tool_outputs:
run = self.cm.client.beta.threads.runs.submit_tool_outputs_and_poll(
thread_id=thread.id,
run_id=run.id,
tool_outputs=tool_outputs
)
logger.info("Tool outputs submitted successfully",
extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_submit_tools"})
return run, just_finish_intro
else:
logger.warning("No tool outputs to submit",
extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_submit_tools"})
run = PseudoRun(status="completed", metadata={"message": "No tool outputs to submit"})
return run, just_finish_intro
except Exception as e:
# Cancel the run
logger.error(f"Yeap Error in call_tool: {e}", extra={"user_id": self.cm.user.user_id, "endpoint": 'assistant_call_tool'})
logger.error(f"Cancelling run {run.id} for thread {thread.id}", extra={"user_id": self.cm.user.user_id, "endpoint": 'cancel_erroneous_run'})
self.cancel_run(run, thread)
logger.error(f"Run {run.id} cancelled for thread {thread.id}", extra={"user_id": self.cm.user.user_id, "endpoint": 'cancel_erroneous_run'})
raise e
class PseudoRun:
def __init__(self, status, id='pseudo_run', metadata=None):
self.id = id
self.status = status
self.metadata = metadata or {}
class GeneralAssistant(Assistant):
def __init__(self, id, cm):
super().__init__(id, cm)
class PFAssistant(Assistant):
def __init__(self, id, cm):
super().__init__(id, cm)