Spaces:
Sleeping
Sleeping
| import json | |
| import io | |
| import os | |
| from datetime import datetime, timezone | |
| import json | |
| import random | |
| from time import sleep | |
| import openai | |
| import pandas as pd | |
| from dotenv import load_dotenv | |
| import logging | |
| import psycopg2 | |
| from psycopg2 import sql | |
| import pytz | |
| from app.exceptions import AssistantError, BaseOurcoachException, OpenAIRequestError, UtilsError | |
| from app.utils import get_booked_gg_sessions, get_growth_guide, get_growth_guide_summary, get_user_subscriptions, get_users_mementos, print_log | |
| from app.flows import FOLLOW_UP_STATE, GENERAL_COACHING_STATE, MICRO_ACTION_STATE, REFLECTION_STATE | |
| from app.web_search import SearchEngine | |
| load_dotenv() | |
| logger = logging.getLogger(__name__) | |
| OURCOACH_DASHBOARD_URL = os.getenv("OURCOACH_DASHBOARD_URL") | |
| class FeedbackContent: | |
| def __init__(self, content, role): | |
| self.types = [ | |
| {"name": "Inspirational Quotes", | |
| "description": "Short, impactful quotes from the user's chosen Legendary Persona. Don't forget to mention the name of the person who said that quote", | |
| "examples": ['"The journey of a thousand miles begins with a single step." – Lao Tzu'], | |
| "search": False}, | |
| {"name": "Tips & Advice", | |
| "description": "Practical suggestions or strategies to help users overcome challenges and enhance their personal growth.", | |
| "examples": ['Tip: Try setting a small, achievable goal each morning to build momentum'], | |
| "search": False}, | |
| {"name": "Encouragement Messages", | |
| "description": "Positive affirmations and supportive statements to boost the user's morale.", | |
| "examples": ["Remember, every challenge is an opportunity to grow stronger. You’ve got this! 💪"], | |
| "search": False}, | |
| {"name": "Resource Links", | |
| "description": "Links to external content such as videos, articles, podcasts, or tutorials that provide additional insights or learning opportunities.", | |
| "examples": ['🎥 Watch this video on building meaningful connections'], | |
| "search": True}, | |
| {"name": "Personalised Recommendations", | |
| "description": "Tailored suggestions based on the user's progress, preferences, and responses.", | |
| "examples": ['Based on your interest in improving relationships, we recommend reading "The Five Love Languages."'], | |
| "search": False}, | |
| {"name": "Affirmations", | |
| "description": "Positive statements that reinforce the user's self-belief and confidence.", | |
| "examples": ["I am capable of achieving my goals and overcoming any challenges"], | |
| "search": False}, | |
| {"name": "Progress Highlights/Milestones", | |
| "description": "Updates on the user's progress, celebrating milestones and achievements.", | |
| "examples": ["Congratulations, Maya! You’ve completed 15 days of the Growth Phase. Keep up the great work!"], | |
| "search": False}, | |
| {"name": "Success Stories/Testimonials", | |
| "description": "Real-life stories or testimonials from other users who have achieved their goals.", | |
| "examples": ["Hear how John transformed his daily habits and achieved his personal goals: Read his story"], | |
| "search": True}, | |
| {"name": "Mindfulness/Meditation Prompts", | |
| "description": "Guided prompts to help users practice mindfulness or meditation.", | |
| "examples": ["Take a deep breath and spend five minutes reflecting on what you’re grateful for today"], | |
| "search": False}, | |
| {"name": "Book/Podcast Recommendations", | |
| "description": "Suggestions for books or podcasts that align with the user's interests and goals.", | |
| "examples": ['📚 "We recommend reading \'Atomic Habits\' by James Clear to help you build effective habits"'], | |
| "search": True}, | |
| {"name": "Visual Content (Images, Infographics)", | |
| "description": "Incorporation of visual elements to enhance message delivery and engagement", | |
| "examples": ["Example: (Insert an infographic about habit formation)"], | |
| "search": True}, | |
| {"name": "Gamification Elements (Badges, Rewards)", | |
| "description": "Incorporation of game-like elements to motivate and engage users.", | |
| "examples": ["You’ve earned the ‘Consistency Champion’ badge for completing 7 consecutive days!"], | |
| "search": False}, | |
| {"name": "Habit-Forming Tips", | |
| "description": "Suggestions to help users build and maintain positive habits.", | |
| "examples": ["Habit Tip: Start your day by drinking a glass of water to kickstart your metabolism"], | |
| "search": False}, | |
| {"name": "Seasonal/Themed Content", | |
| "description": "Content that aligns with seasons, holidays, or specific themes to keep interactions timely and relevant.", | |
| "examples": ["Spring Theme: Embrace new beginnings by decluttering one area of your life today"], | |
| "search": False}, | |
| {"name": "Inspirational Stories or Case Studies", | |
| "description": "Detailed narratives or case studies showcasing significant transformations or achievements.", | |
| "examples": ["Case Study: How Sarah Overcame Procrastination and Achieved Her Goals Read More"], | |
| "search": False}, | |
| {"name": "Fun Facts Related to Personal Growth", | |
| "description": "Interesting and relevant facts that can inspire or inform users.", | |
| "examples": ["Fun Fact: People who write down their goals are 42% more likely to achieve them"], | |
| "search": False}, | |
| {"name": "Time Management Tips", | |
| "description": "Suggestions to help users manage their time more effectively.", | |
| "examples": ["Tip: Prioritize your tasks using the Eisenhower Matrix to focus on what’s important"], | |
| "search": False} | |
| ] | |
| def select_random_types(self, formatted_output=False): | |
| selected = random.sample(self.types, 3) | |
| # if formatted_output: | |
| # result = "\n\n".join( | |
| # f"Name: {t['name']}\nDescription: {t['description']}\nExample: {t['examples'][0]}" | |
| # for t in selected | |
| # ) | |
| # return result | |
| return selected | |
| def get_current_datetime(user_id): | |
| db_params = { | |
| 'dbname': 'ourcoach', | |
| 'user': 'ourcoach', | |
| 'password': 'hvcTL3kN3pOG5KteT17T', | |
| 'host': 'staging-ourcoach.cx8se8o0iaiy.ap-southeast-1.rds.amazonaws.com', | |
| 'port': '5432' | |
| } | |
| with psycopg2.connect(**db_params) as conn: | |
| with conn.cursor() as cursor: | |
| query = sql.SQL("SELECT * FROM {table} WHERE id = %s").format(table=sql.Identifier('public', 'users')) | |
| cursor.execute(query, (user_id,)) | |
| row = cursor.fetchone() | |
| if (row): | |
| colnames = [desc[0] for desc in cursor.description] | |
| user_data = dict(zip(colnames, row)) | |
| user_timezone = user_data['timezone'] | |
| return datetime.now().astimezone(pytz.timezone(user_timezone)) | |
| class Assistant: | |
| def catch_error(func): | |
| def wrapper(self, *args, **kwargs): | |
| try: | |
| return func(self, *args, **kwargs) | |
| except (BaseOurcoachException) as e: | |
| raise e | |
| except Exception as e: | |
| # Handle other exceptions | |
| logger.error(f"An unexpected error occurred in Assistant: {e}") | |
| raise AssistantError(user_id=self.cm.user.user_id, message="Unexpected error in Assistant", e=str(e)) | |
| return wrapper | |
| def __init__(self, id, cm): | |
| self.id = id | |
| self.cm = cm | |
| self.recent_run = None | |
| def cancel_run(self, run, thread): | |
| logger.info(f"(asst) Attempting to cancel run: {run} for thread: {thread}", extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_cancel_run"}) | |
| if isinstance(run, str): | |
| try: | |
| run = self.cm.client.beta.threads.runs.retrieve( | |
| thread_id=thread, | |
| run_id=run | |
| ) | |
| thread = self.cm.client.beta.threads.retrieve(thread_id=thread) | |
| except openai.NotFoundError: | |
| logger.warning(f"Thread {thread} already deleted: {run}", extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_cancel_run"}) | |
| return True | |
| if isinstance(run, PseudoRun): | |
| if run.id == "pseudo_run": | |
| logger.info(f"Run already completed: {run.id}", extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_cancel_run"}) | |
| return True | |
| try: | |
| logger.info(f"Attempting to cancel run: {run}", extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_cancel_run"}) | |
| if run.status != 'completed': | |
| cancel = self.cm.client.beta.threads.runs.cancel(thread_id=thread.id, run_id=run.id) | |
| while cancel.status != 'cancelled': | |
| sleep(0.05) | |
| cancel = self.cm.client.beta.threads.runs.retrieve( | |
| thread_id=thread.id, | |
| run_id=cancel.id | |
| ) | |
| logger.info(f"Succesfully cancelled run: {run.id}", extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_cancel_run"}) | |
| return True | |
| else: | |
| logger.info(f"Run already completed: {run.id}", extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_cancel_run"}) | |
| return False | |
| except openai.BadRequestError: | |
| # check if run has expired. run has a field 'expires_at' like run.expires_at = 1735008568 | |
| # if expired, return True and log run already expired | |
| if run.expires_at < get_current_datetime().timestamp(): | |
| logger.warning(f"Run already expired: {run.id}", extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_cancel_run"}) | |
| return True | |
| else: | |
| logger.warning(f"Error cancelling run: {run.id}", extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_cancel_run"}) | |
| return False | |
| def process(self, thread, text): | |
| try: | |
| # template_search = self.cm.add_message_to_thread(thread.id, "assistant", f"Pay attention to the current state you are in and the flow template to respond to the users query:") | |
| message = self.cm.add_message_to_thread(thread.id, "user", text) | |
| run = self.cm.client.beta.threads.runs.create_and_poll( | |
| thread_id=thread.id, | |
| assistant_id=self.id, | |
| model="gpt-4o-mini", | |
| ) | |
| just_finished_intro = False | |
| if run.status == 'completed': | |
| logger.info(f"Run Completed: {run.status}", extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_process"}) | |
| self.recent_run = run | |
| return run, just_finished_intro, message | |
| elif run.status == 'failed': | |
| raise OpenAIRequestError(user_id=self.cm.user.user_id, message="Run failed", run_id=run.id) | |
| elif run.status == 'requires_action': | |
| reccursion = 0 | |
| logger.info(f"[Run Pending] Status: {run.status}", extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_process"}) | |
| while run.status == 'requires_action': | |
| logger.info(f"Run Calling tool [{reccursion}]: {run.status}", extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_process"}) | |
| run, just_finished_intro = self.call_tool(run, thread) | |
| reccursion += 1 | |
| if reccursion > 10: | |
| logger.warning(f"Run has exceeded maximum recussrion depth({10}) for function_call: {run}", extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_process"}) | |
| raise OpenAIRequestError(user_id=self.cm.id, message="Tool Call Reccursion Depth Reached") | |
| if run.status == 'cancel': | |
| logger.warning(f"RUN NOT COMPLETED: {run}", extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_process"}) | |
| self.cancel_run(run, thread) | |
| run.status = 'cancelled' | |
| logger.warning(f"Yeap Run Cancelled: {run.status}", extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_process"}) | |
| return run, just_finished_intro, message | |
| elif run.status == 'completed': | |
| logger.info(f"Run Completed: {run.status}", extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_process"}) | |
| self.recent_run = run | |
| return run, just_finished_intro, message | |
| elif run.status == 'failed': | |
| raise OpenAIRequestError(user_id=self.cm.id, message="Run failed") | |
| return run, just_finished_intro, message | |
| except Exception as e: | |
| # Cancel the run | |
| logger.error(f"Error in process: {e}", extra={"user_id": self.cm.user.user_id, "endpoint": 'assistant_process'}) | |
| logger.error(f"Cancelling run {run.id} for thread {thread.id}", extra={"user_id": self.cm.user.user_id, "endpoint": 'cancel_erroneous_run'}) | |
| self.cancel_run(run, thread) | |
| logger.error(f"Run {run.id} cancelled for thread {thread.id}", extra={"user_id": self.cm.user.user_id, "endpoint": 'cancel_erroneous_run'}) | |
| raise e | |
| def call_tool(self, run, thread): | |
| try: | |
| tool_outputs = [] | |
| logger.info(f"Required actions: {list(map(lambda x: f'{x.function.name}({x.function.arguments})', run.required_action.submit_tool_outputs.tool_calls))}", | |
| extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_call_tool"}) | |
| just_finish_intro = False | |
| for tool in run.required_action.submit_tool_outputs.tool_calls: | |
| if tool.function.name == "transition": | |
| transitions = json.loads(tool.function.arguments) | |
| logger.info(f"Transition: {transitions['from']} -> {transitions['to']}", | |
| extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_transition"}) | |
| if transitions['from'] == "PLANNING STATE": | |
| tool_outputs.append({ | |
| "tool_call_id": tool.id, | |
| "output": f"help the user set a new goal" | |
| }) | |
| # logger.info(f"Exiting the introduction state", | |
| # extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_transition"}) | |
| # just_finish_intro = True | |
| # # run = self.cancel_run(run, thread) | |
| # # logger.info(f"Successfully cancelled run", | |
| # # extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_transition"}) | |
| # tool_outputs.append({ | |
| # "tool_call_id": tool.id, | |
| # "output": "true" | |
| # }) | |
| else: | |
| flow_instructions = "" | |
| if transitions['to'] == "REFLECTION STATE": | |
| flow_instructions = REFLECTION_STATE | |
| next = self.cm.matters_most.next() | |
| logger.info(f"Successfully moved to bext matters most: {next}") | |
| question_format = random.choice(['[Option 1] Likert-Scale Objective Question','[Option 2] Multiple-Choice Question','[Option 3] Yes-No Question']) | |
| flow_instructions = f"""Today's opening question format is: {question_format}. Send a WARM, SUCCINCT and PERSONALIZED (according to my PROFILE) first message in this format! The most warm, succinct & personalized message will get rewarded!\n | |
| The reflection topic is: {next}. YOU MUST SEND CREATIVE, PERSONALIZED (only mention specific terms that are related to the reflection topic/area), AND SUCCINCT MESSAGES!! The most creative message will be rewarded! And make every new reflection fresh and not boring! \n | |
| """ + flow_instructions | |
| elif transitions['to'] == "FOLLOW UP STATE": | |
| flow_instructions = FOLLOW_UP_STATE | |
| elif transitions['to'] == "GENERAL COACHING STATE": | |
| flow_instructions = GENERAL_COACHING_STATE | |
| tool_outputs.append({ | |
| "tool_call_id": tool.id, | |
| "output": f"{flow_instructions}\n\n" + f"** Follow the above flow template to respond to the user **" | |
| }) | |
| elif tool.function.name == "get_date": | |
| # print(f"[DATETIME]: {get_current_datetime()}") | |
| # self.cm.state['date'] = 'date': pd.Timestamp.now().strftime("%Y-%m-%d %a %H:%M:%S") | |
| # get and update the current time to self.cm.state['date'] but keep the date component | |
| current_time = get_current_datetime(self.cm.user.user_id) | |
| # replace time component of self.cm.state['date'] with the current time | |
| self.cm.state['date'] = str(pd.to_datetime(self.cm.state['date']).replace(hour=current_time.hour, minute=current_time.minute, second=current_time.second)) | |
| logger.info(f"Current datetime: {self.cm.state['date']}", | |
| extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_get_date"}) | |
| tool_outputs.append({ | |
| "tool_call_id": tool.id, | |
| "output": f"{self.cm.state['date']}" | |
| }) | |
| elif tool.function.name == "create_goals" or tool.function.name == "create_memento": | |
| json_string = json.loads(tool.function.arguments) | |
| json_string['created'] = str(self.cm.state['date']) | |
| json_string['updated'] = None | |
| logger.info(f"New event: {json_string}", | |
| extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_create_event"}) | |
| # Create a folder for the user's mementos if it doesn't exist | |
| user_mementos_folder = os.path.join("mementos", "to_upload", self.cm.user.user_id) | |
| # Ensure the directory exists | |
| os.makedirs(user_mementos_folder, exist_ok=True) | |
| # Construct the full file path for the JSON file | |
| file_path = os.path.join(user_mementos_folder, f"{json_string['title']}.json") | |
| # Save the JSON string as a file | |
| with open(file_path, "w") as json_file: | |
| json.dump(json_string, json_file) | |
| tool_outputs.append({ | |
| "tool_call_id": tool.id, | |
| "output": f"** [Success]: Added event to the user's vector store**" | |
| }) | |
| elif tool.function.name == "msearch": | |
| queries = json.loads(tool.function.arguments)['queries'] | |
| logger.info(f"Searching for: {queries}", | |
| extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_msearch"}) | |
| tool_outputs.append({ | |
| "tool_call_id": tool.id, | |
| "output": f"** retrieve any files related to: {queries} **" | |
| }) | |
| elif tool.function.name == "get_mementos": | |
| logger.info(f"Getting mementos", extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_get_mementos"}) | |
| args = json.loads(tool.function.arguments) | |
| logger.info(f"ARGS: {args}", extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_get_mementos"}) | |
| queries = args['queries'] | |
| if 'on' not in args: | |
| on = pd.to_datetime(self.cm.state['date']).date() | |
| else: | |
| on = args['on'] | |
| if on == '': | |
| on = pd.to_datetime(self.cm.state['date']).date() | |
| else: | |
| on = pd.to_datetime(args['on']).date() | |
| logger.info(f"Query date: {on}", extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_get_mementos"}) | |
| # query the user memento db for all mementos where follow_up_on is equal to the query date | |
| mementos = get_users_mementos(self.cm.user.user_id, on) | |
| # if on == "": | |
| # instruction = f"** Fetch all files (mementos) from this thread's Memento vector_store ([id={self.cm.user_personal_memory.id}]) **" | |
| # else: | |
| # instruction = f"** Fetch files (mementos) from this thread's Memento vector_store ([id={self.cm.user_personal_memory.id}]) where the follow_up_on field matches the query date: {on} (ignore the time component, focus only on the date)**" | |
| # # f"** File search this threads' Memento vector_store ([id={self.cm.user_personal_memory.id}]) for the most relevant mementos based on the recent conversation history and context:{context} **" | |
| logger.info(f"Finish Getting mementos: {mementos}", extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_get_mementos"}) | |
| tool_outputs.append({ | |
| "tool_call_id": tool.id, | |
| "output": f"Today's mementos: {mementos}" if len(mementos) else "No mementos to follow up today." | |
| }) | |
| elif tool.function.name == "get_feedback_types": | |
| print_log("WARNING","Calling get_feedback_types", extra={"user_id": self.cm.user.user_id, "endpoint": "get_feedback_types"}) | |
| logger.warning("Calling get_feedback_types", extra={"user_id": self.cm.user.user_id, "endpoint": "get_feedback_types"}) | |
| feedbacks = [ | |
| ("Inspirational Quotes", "📜", "Short, impactful quotes from the user's chosen Legendary Persona"), | |
| ("Tips & Advice", "💡", "Practical suggestions or strategies for personal growth (no need to say \"Tips:\" in the beggining of your tips)"), | |
| ("Encouragement", "🌈", "Positive affirmations and supportive statements"), | |
| ("Personalized Recommendations", "🧩", "Tailored suggestions based on user progress"), | |
| ("Affirmations", "✨", "Positive statements for self-belief and confidence"), | |
| ("Mindfulness/Meditation", "🧘♀️", "Guided prompts for mindfulness practice"), | |
| ("Book/Podcast", "📚🎧", "Suggestions aligned with user interests"), | |
| ("Habit Tip", "🔄", "Tips for building and maintaining habits"), | |
| ("Seasonal Content", "🌸", "Time and theme-relevant interactions"), | |
| ("Fun Fact", "🎉", "Interesting and inspiring facts (no need to say \"Fun Fact:\" in the beggining of your tips)"), | |
| ("Time Management", "⏳", "Tips for effective time management") | |
| ] | |
| sample_feedbacks = random.sample(feedbacks, 3) | |
| print_log("INFO",f"Feedback types: {sample_feedbacks}", extra={"user_id": self.cm.user.user_id, "endpoint": "get_feedback_types"}) | |
| logger.info(f"Feedback types: {sample_feedbacks}", extra={"user_id": self.cm.user.user_id, "endpoint": "get_feedback_types"}) | |
| tool_outputs.append({ | |
| "tool_call_id": tool.id, | |
| "output": "Generate a final coach message (feedback message) using these 3 feedback types (together with the stated emoji at the beginning of each feedback): " + str(sample_feedbacks) | |
| }) | |
| elif tool.function.name == "search_web": | |
| type = json.loads(tool.function.arguments)['resource_type'] | |
| query = json.loads(tool.function.arguments)['query'] | |
| logger.info(f"Getting microaction theme: {type} - {query}", extra={"user_id": self.cm.user.user_id, "endpoint": "get_microaction_theme"}) | |
| relevant_context = SearchEngine.search(type, query, self.cm.user.user_id) | |
| logger.info(f"Finish Getting microaction theme: {relevant_context}", extra={"user_id": self.cm.user.user_id, "endpoint": "get_microaction_theme"}) | |
| tool_outputs.append({ | |
| "tool_call_id": tool.id, | |
| "output": f"** Relevant context: {relevant_context} **" | |
| }) | |
| elif tool.function.name == "end_conversation": | |
| day_n = json.loads(tool.function.arguments)['day_n'] | |
| completed_micro_action = json.loads(tool.function.arguments)['completed_micro_action'] | |
| area_of_deep_reflection = json.loads(tool.function.arguments)['area_of_deep_reflection'] | |
| self.cm.user.update_micro_action_status(completed_micro_action) | |
| self.cm.user.trigger_deep_reflection_point(area_of_deep_reflection) | |
| # NOTE: we will record whether the user has completed the theme for the day | |
| # NOTE: if no, we will include a short followup message to encourage the user the next day | |
| logger.info(f"Ending conversation after {day_n} days. Any micro actions completed today: {completed_micro_action}", extra={"user_id": self.cm.user.user_id, "endpoint": "end_conversation"}) | |
| tool_outputs.append({ | |
| "tool_call_id": tool.id, | |
| "output": "true" | |
| }) | |
| elif tool.function.name == "create_smart_goal": | |
| print_log("WARNING", f"Creating a SMART goal...", extra={"user_id": self.cm.user.user_id, "endpoint": "create_smart_goal"}) | |
| logger.warning(f"Creating a SMART goal...", extra={"user_id": self.cm.user.user_id, "endpoint": "create_smart_goal"}) | |
| user_goal = json.loads(tool.function.arguments)['goal'] | |
| user_goal_area = json.loads(tool.function.arguments)['area'] | |
| self.cm.user.set_goal(user_goal, user_goal_area) | |
| print_log("INFO", f"SMART goal approved: {user_goal}", extra={"user_id": self.cm.user.user_id, "endpoint": "create_smart_goal"}) | |
| logger.info(f"SMART goal approved: {user_goal}", extra={"user_id": self.cm.user.user_id, "endpoint": "create_smart_goal"}) | |
| tool_outputs.append({ | |
| "tool_call_id": tool.id, | |
| "output": "true" | |
| }) | |
| elif tool.function.name == "start_now": | |
| logger.info(f"Starting Growth Plan on Day 0", | |
| extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_start_now"}) | |
| # set intro finish to true | |
| just_finish_intro = True | |
| # cancel current run | |
| run = PseudoRun(id=run.id, status="cancel", metadata={"message": "start_now"}) | |
| return run, just_finish_intro | |
| elif tool.function.name == "change_goal": | |
| logger.info(f"Changing user goal...", extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_change_goal"}) | |
| # switch back to the intro assistant, so we set just_finish_intro to False again | |
| just_finish_intro = False | |
| self.cm.user.reset_cumulative_plan_day() | |
| # cancel current run | |
| run = PseudoRun(id=run.id, status="cancel", metadata={"message": "change_goal"}) | |
| return run, just_finish_intro | |
| elif tool.function.name == "complete_goal": | |
| logger.info(f"Completing user goal...", extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_complete_goal"}) | |
| goal = self.cm.user.update_goal(None, 'COMPLETED') | |
| logger.info(f"Marked users' goal: {goal} as COMPLETED", extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_complete_goal"}) | |
| tool_outputs.append({ | |
| "tool_call_id": tool.id, | |
| "output": f"Marked users' goal: {goal} as COMPLETED" | |
| }) | |
| elif tool.function.name == "process_reminder": | |
| reminder = json.loads(tool.function.arguments)["content"] | |
| timestamp = json.loads(tool.function.arguments)["timestamp"] | |
| recurrence = json.loads(tool.function.arguments)["recurrence"] | |
| action = json.loads(tool.function.arguments)["action"] | |
| logger.info(f"Setting reminder: {reminder} for {timestamp} with recurrence: {recurrence}", extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_process_reminder"}) | |
| # timestamp is a string like: YYYY-mm-ddTHH:MM:SSZ (2025-01-05T11:00:00Z) | |
| # convert to datetime object | |
| timestamp = pd.to_datetime(timestamp, format="%Y-%m-%dT%H:%M:%SZ") | |
| logger.info(f"Formatted timestamp: {timestamp}", extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_process_reminder"}) | |
| output = f"({recurrence if recurrence else 'One-Time'}) Reminder ({reminder}) set for ({timestamp})" | |
| logger.info(output, | |
| extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_set_reminder"}) | |
| self.cm.user.set_reminder({"reminder": reminder, "timestamp": timestamp, 'recurrence': recurrence, 'action': action}) | |
| tool_outputs.append({ | |
| "tool_call_id": tool.id, | |
| "output": f"** {output} **" | |
| }) | |
| elif tool.function.name == "get_user_info": | |
| category = json.loads(tool.function.arguments)['category'] | |
| # one of [ | |
| # "personal", | |
| # "challenges", | |
| # "recommended_actions", | |
| # "micro_actions", | |
| # "other_focusses", | |
| # "reminders", | |
| # "goal", | |
| # "growth_guide_session", | |
| # "life_score", | |
| # "recent_wins", | |
| # "subscription_info" | |
| # ] | |
| logger.info(f"Getting user information: {category}", | |
| extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_get_user_info"}) | |
| user_info = '** If the user asks for their progress, also include a link to their Revelation Dashboard: {OURCOACH_DASHBOARD_URL} so that they can find out more **\n\n' | |
| if category == "personal": | |
| user_info += f"** Personal Information **\n\n{self.cm.user.user_info}" | |
| user_info += f"\n\n** User's Mantra This Week:**\n\n{self.cm.user.mantra or 'Mantra not available.'}" | |
| elif category == "challenges": | |
| user_info += f"** User's Challenges (prioritise ONGOING challenges) **\n\n{self.cm.user.challenges}\n\nLet the user know that ongoing challenges from their growth guide will be integrated into their day-to-day interaction." | |
| elif category == "recommended_actions": | |
| user_info += f"** User's Recommended Actions (upcoming microactions, recommended by growth guide) **\n\n{self.cm.user.recommended_micro_actions}\n\nLet the user know that these microactions from their growth guide will be integrated into their day-to-day interaction." | |
| elif category == "micro_actions": | |
| user_info += f"** User's Micro Actions (already introduced microactions) **\n\n{self.cm.user.micro_actions}" | |
| elif category == "other_focusses": | |
| user_info += f"** User's Other Focusses (other areas of focus) **\n\n{self.cm.user.other_focusses}\n\nLet the user know that other areas of focus from their growth guide will be integrated into their day-to-day interaction." | |
| elif category == "reminders": | |
| user_info += f"** User's Reminders **\n\n{self.cm.user.reminders}" | |
| elif category == "goal": | |
| user_info += f"** User's Goal (prioritise the latest [last item in the array] goal). Do not mention the status of their goal. **\n\n{self.cm.user.goal}" | |
| elif category == "growth_guide_session": | |
| growth_guide = get_growth_guide(self.cm.user.user_id) | |
| user_info += f"** Users' Growth Guide (always refer to the Growth Guide as 'Growth Guide <Name>') **\n{growth_guide}" | |
| logger.info(f"User's growth guide: {growth_guide}", | |
| extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_get_user_info"}) | |
| booked_sessions = get_booked_gg_sessions(self.cm.user.user_id) | |
| # for each booking, if the booking has completed, fetch the zoom_ai_summary and gg_report from | |
| for booking in booked_sessions: | |
| if booking['status'] == "completed": | |
| summary_data = get_growth_guide_summary(self.cm.user.user_id, booking['booking_id']) | |
| logger.info(f"Summary data for booking: {booking['booking_id']} - {summary_data}", | |
| extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_get_user_info"}) | |
| if summary_data: | |
| booking['zoom_ai_summary'] = summary_data['zoom_ai_summary'] | |
| booking['gg_report'] = summary_data['gg_report'] | |
| else: | |
| booking['zoom_ai_summary'] = "Growth Guide has not uploaded the report yet" | |
| booking['gg_report'] = "Growth Guide has not uploaded the report yet" | |
| logger.info(f"User's booked sessions: {booked_sessions}", | |
| extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_get_user_info"}) | |
| if len(booked_sessions): | |
| # booked session is an array of jsons | |
| # convers it to have i) json where i = 1...N where N is the len of booked_sessions | |
| # join the entire array into 1 string with each item seperated by a newline | |
| formatted_sessions = "\n".join([f"** Session {i+1} **\n{json.dumps(session, indent=4)}" for i, session in enumerate(booked_sessions)]) | |
| logger.info(f"Formatted booked sessions: {formatted_sessions}", | |
| extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_get_user_info"}) | |
| user_info += f"\n** GG Session Bookings & Summaries (most recent first, time is in users' local timezone but no need to mention this) **\n{formatted_sessions}" | |
| else: | |
| user_info += f"\n** GG Session Summaries **\nNo GG yet. Let the user know they can book one now through their Revelation Dashboard: {OURCOACH_DASHBOARD_URL}! (When including links, DO **NOT** use a hyperlink format. Just show the link as plain text. Example: \"Revelation Dashboard: https://...\")" | |
| user_info += f"\n** Suggested Growth Guide Topics **\n{self.cm.user.recommended_gg_topics}\nOnly suggest 1-2 topics and let the user know they can can find more suggestions on their dashboard" | |
| elif category == "life_score": | |
| user_info += f"** User's Life scores for each area **\n\n Personal Growth: {self.cm.user.personal_growth_score} || Career: {self.cm.user.career_growth_score} || Health/Wellness: {self.cm.user.health_and_wellness_score} || Relationships: {self.cm.user.relationship_score} || Mental Health: {self.cm.user.mental_well_being_score}" | |
| elif category == "recent_wins": | |
| user_info += f"** User's Recent Wins / Achievements **\n\n {self.cm.user.recent_wins}" | |
| elif category == "subscription_info": | |
| subscription_history = get_user_subscriptions(self.cm.user.user_id) | |
| logger.info(f"User's subscription history: {subscription_history}", | |
| extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_get_user_info"}) | |
| user_info += f"** User's Subscription Information **\n\n This is a sorted list (most recent first, which also represent the current subscription status) of the users subscription history:\n{subscription_history}\nNote that the stripe_status is one of 'trialing' = (user is on a free trial), 'cancelled' = (user has cancelled their subscription), 'active' = (user is a Premium user). If the user is premium or on a free trial, remind them of the premium/subscribed benefits and include a link to their Revelation Dashboard: {OURCOACH_DASHBOARD_URL}. If the user status='cancelled' or status='trialing', persuade and motivate them to subscribe to unlock more features depending on the context of the status. Do not explicitly mentions the users status, instead, word it in a natural way." | |
| logger.info(f"Finish Getting user information: {user_info}", | |
| extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_get_user_info"}) | |
| tool_outputs.append({ | |
| "tool_call_id": tool.id, | |
| "output": f"** User Info:\n\n{user_info} **" | |
| }) | |
| elif tool.function.name == "extend_two_weeks": | |
| logger.info(f"Changing plan from 1 week to 2 weeks...", extra={"user_id": self.cm.user.user_id, "endpoint": "extend_two_weeks"}) | |
| goal = self.cm.user.extend_growth_plan() | |
| tool_outputs.append({ | |
| "tool_call_id": tool.id, | |
| "output": f"Changed plan from 1 week to 2 weeks." | |
| }) | |
| # Submit all tool outputs at once after collecting them in a list | |
| if tool_outputs: | |
| run = self.cm.client.beta.threads.runs.submit_tool_outputs_and_poll( | |
| thread_id=thread.id, | |
| run_id=run.id, | |
| tool_outputs=tool_outputs | |
| ) | |
| logger.info("Tool outputs submitted successfully", | |
| extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_submit_tools"}) | |
| return run, just_finish_intro | |
| else: | |
| logger.warning("No tool outputs to submit", | |
| extra={"user_id": self.cm.user.user_id, "endpoint": "assistant_submit_tools"}) | |
| run = PseudoRun(status="completed", metadata={"message": "No tool outputs to submit"}) | |
| return run, just_finish_intro | |
| except Exception as e: | |
| # Cancel the run | |
| logger.error(f"Yeap Error in call_tool: {e}", extra={"user_id": self.cm.user.user_id, "endpoint": 'assistant_call_tool'}) | |
| logger.error(f"Cancelling run {run.id} for thread {thread.id}", extra={"user_id": self.cm.user.user_id, "endpoint": 'cancel_erroneous_run'}) | |
| self.cancel_run(run, thread) | |
| logger.error(f"Run {run.id} cancelled for thread {thread.id}", extra={"user_id": self.cm.user.user_id, "endpoint": 'cancel_erroneous_run'}) | |
| raise e | |
| class PseudoRun: | |
| def __init__(self, status, id='pseudo_run', metadata=None): | |
| self.id = id | |
| self.status = status | |
| self.metadata = metadata or {} | |
| class GeneralAssistant(Assistant): | |
| def __init__(self, id, cm): | |
| super().__init__(id, cm) | |
| class PFAssistant(Assistant): | |
| def __init__(self, id, cm): | |
| super().__init__(id, cm) |