Spaces:
Sleeping
Sleeping
| import json | |
| import io | |
| import os | |
| import openai | |
| import pandas as pd | |
| from datetime import datetime, timezone | |
| import json | |
| from app.assistants import Assistant | |
| from app.exceptions import DBError | |
| import glob | |
| import pickle # Replace dill with pickle | |
| import random | |
| import logging | |
| import psycopg2 | |
| from psycopg2 import sql | |
| from app.conversation_manager import ConversationManager | |
| from app.exceptions import BaseOurcoachException, OpenAIRequestError, UserError | |
| from app.flows import FINAL_SUMMARY_STATE, FINAL_SUMMARY_STATE, MICRO_ACTION_STATE, MOTIVATION_INSPIRATION_STATE, OPEN_DISCUSSION_STATE, POST_GG_STATE, PROGRESS_REFLECTION_STATE, PROGRESS_SUMMARY_STATE, EDUCATION_STATE, FOLLUP_ACTION_STATE, FUNFACT_STATE | |
| from pydantic import BaseModel | |
| from datetime import datetime | |
| from app.utils import generate_uuid, get_booked_gg_sessions, get_growth_guide, get_growth_guide_summary, get_user_subscriptions, update_growth_guide_summary | |
| import dotenv | |
| import re | |
| import math | |
| dotenv.load_dotenv() | |
| OURCOACH_DASHBOARD_URL = os.getenv("OURCOACH_DASHBOARD_URL") | |
| class UserDataItem(BaseModel): | |
| role: str | |
| content: str | |
| user_id: str | |
| status: str | |
| created_at: str | |
| updated_at: str | |
| area: str | |
| class UserDataResponse(BaseModel): | |
| data: list[UserDataItem] | |
| class Index(BaseModel): | |
| value: int | |
| logger = logging.getLogger(__name__) | |
| def get_current_datetime(): | |
| return datetime.now(timezone.utc) | |
| class User: | |
| def catch_error(func): | |
| def wrapper(self, *args, **kwargs): | |
| try: | |
| return func(self, *args, **kwargs) | |
| except (BaseOurcoachException, openai.BadRequestError) as e: | |
| raise e | |
| except openai.BadRequestError as e: | |
| raise OpenAIRequestError(user_id=self.user_id, message="OpenAI Request Error", e=str(e)) | |
| except Exception as e: | |
| # Handle other exceptions | |
| logger.error(f"An unexpected error occurred in User: {e}") | |
| raise UserError(user_id=self.user_id, message="Unexpected error in User", e=str(e)) | |
| return wrapper | |
| def __init__(self, user_id, user_info, client, asst_id): | |
| self.user_id = user_id | |
| self.client = client | |
| self.asst_id = asst_id | |
| self.user_info = user_info | |
| self.done_first_reflection = None | |
| self.goal = [] | |
| self.last_gg_session = None | |
| self.micro_actions = [] | |
| self.recommended_micro_actions = [] | |
| self.challenges = [] | |
| self.other_focusses = [] | |
| self.personal_growth_score = 0 | |
| self.career_growth_score = 0 | |
| self.relationship_score = 0 | |
| self.mental_well_being_score = 0 | |
| self.health_and_wellness_score = 0 | |
| self.reminders = None | |
| self.recent_wins = [] | |
| self.recommended_gg_topics = [] | |
| self.mantra = None | |
| # Read growth_plan.json and store it | |
| # TESTING PURPOSE | |
| growth_plan = {"growthPlan": [ | |
| { | |
| "day": 1, | |
| "coachingTheme": "MICRO_ACTION_STATE" | |
| }, | |
| { | |
| "day": 2, | |
| "coachingTheme": "FOLLUP_ACTION_STATE" | |
| }, | |
| { | |
| "day": 3, | |
| "coachingTheme": "OPEN_DISCUSSION_STATE" | |
| }, | |
| { | |
| "day": 4, | |
| "coachingTheme": "MICRO_ACTION_STATE" | |
| }, | |
| { | |
| "day": 5, | |
| "coachingTheme": "FOLLUP_ACTION_STATE" | |
| }, | |
| { | |
| "day": 6, | |
| "coachingTheme": "FUNFACT_STATE" | |
| }, | |
| { | |
| "day": 7, | |
| "coachingTheme": "FINAL_SUMMARY_STATE" | |
| } | |
| ]} | |
| self.growth_plan = CircularQueue(array=growth_plan['growthPlan'], user_id=self.user_id) | |
| logger.info(f"User Growth Plan: {self.growth_plan} (Day: {self.growth_plan.current()['day']}/{len(self.growth_plan.array)})", extra={"user_id": self.user_id, "endpoint": "user_init"}) | |
| self.user_interaction_guidelines = self.generate_user_interaction_guidelines(user_info, client) | |
| self.conversations = ConversationManager(client, self, asst_id) | |
| self.score_history = [] | |
| self.cumulative_plan_day = 0 | |
| def extend_growth_plan(self): | |
| # Change current growth plan to 14d growth plan | |
| logger.info(f"Changing plan to 14d...", extra={"user_id": self.user_id, "endpoint": "extend_growth_plan"}) | |
| new_growth_plan = {"growthPlan": [ | |
| { | |
| "day": 1, | |
| "coachingTheme": "MICRO_ACTION_STATE" | |
| }, | |
| { | |
| "day": 2, | |
| "coachingTheme": "FOLLUP_ACTION_STATE" | |
| }, | |
| { | |
| "day": 3, | |
| "coachingTheme": "OPEN_DISCUSSION_STATE" | |
| }, | |
| { | |
| "day": 4, | |
| "coachingTheme": "MICRO_ACTION_STATE" | |
| }, | |
| { | |
| "day": 5, | |
| "coachingTheme": "FOLLUP_ACTION_STATE" | |
| }, | |
| { | |
| "day": 6, | |
| "coachingTheme": "FUNFACT_STATE" | |
| }, | |
| { | |
| "day": 7, | |
| "coachingTheme": "PROGRESS_REFLECTION_STATE" | |
| }, | |
| { | |
| "day": 8, | |
| "coachingTheme": "MICRO_ACTION_STATE" | |
| }, | |
| { | |
| "day": 9, | |
| "coachingTheme": "FOLLUP_ACTION_STATE" | |
| }, | |
| { | |
| "day": 10, | |
| "coachingTheme": "OPEN_DISCUSSION_STATE" | |
| }, | |
| { | |
| "day": 11, | |
| "coachingTheme": "MICRO_ACTION_STATE" | |
| }, | |
| { | |
| "day": 12, | |
| "coachingTheme": "FOLLUP_ACTION_STATE" | |
| }, | |
| { | |
| "day": 13, | |
| "coachingTheme": "FUNFACT_STATE" | |
| }, | |
| { | |
| "day": 14, | |
| "coachingTheme": "FINAL_SUMMARY_STATE" | |
| } | |
| ] | |
| } | |
| self.growth_plan = CircularQueue(array=new_growth_plan['growthPlan'], user_id=self.user_id) | |
| logger.info(f"User Growth Plan: {self.growth_plan} (Day: {self.growth_plan.current()['day']}/{len(self.growth_plan.array)})", extra={"user_id": self.user_id, "endpoint": "user_init"}) | |
| logger.info(f"Success.", extra={"user_id": self.user_id, "endpoint": "extend_growth_plan"}) | |
| return True | |
| def reset_cumulative_plan_day(self): | |
| logger.info(f"Reseting cumulative_plan_day", extra={"user_id": self.user_id, "endpoint": "reset_cumulative_plan_day"}) | |
| self.cumulative_plan_day = 0 | |
| def add_recent_wins(self, wins, context = None): | |
| prompt = f""" | |
| ## Role | |
| You are an expert in writing achievement message and progress notification. Your task is to use the user's achievement and context to formulate a short achievement message/progress notification. The output must be a one sentence short message (less than 15 words) in this JSON output schema: | |
| ```json | |
| {{ | |
| achievement_message: str | |
| }} | |
| ``` | |
| Note: No need to mention the user's name. Make it concise (less than 15 words) | |
| ## Example | |
| User's achievement: Completing a Task | |
| Achievement context: The user has completed a 10k run | |
| Output: | |
| ``` | |
| {{ | |
| achievement_message: You have completed a 10k run! | |
| }} | |
| ``` | |
| ## User Input | |
| User's achievement: {wins} | |
| Achievement context: {context} | |
| """ | |
| response = self.client.chat.completions.create( | |
| model="gpt-4o", | |
| messages=[{"role": "user", "content": prompt}], | |
| response_format = { | |
| "type": "json_schema", | |
| "json_schema": { | |
| "name": "achievement_message_schema", | |
| "strict": True, | |
| "schema": { | |
| "type": "object", | |
| "properties": { | |
| "achievement_message": { | |
| "type": "string", | |
| "description": "A message indicating an achievement." | |
| } | |
| }, | |
| "required": [ | |
| "achievement_message" | |
| ], | |
| "additionalProperties": False | |
| } | |
| } | |
| }, | |
| temperature=1 | |
| ) | |
| achievement_message = json.loads(response.choices[0].message.content)['achievement_message'] | |
| if len(self.recent_wins)<5: | |
| self.recent_wins.insert(0,achievement_message) | |
| else: | |
| self.recent_wins.pop() | |
| self.recent_wins.insert(0,achievement_message) | |
| def add_life_score_point(self, variable, points_added, notes): | |
| if variable == 'Personal Growth': | |
| self.personal_growth_score += points_added | |
| logger.info(f"Added {points_added} points to Personal Growth for {notes}", extra={"user_id": self.user_id, "endpoint": "add_life_score_point"}) | |
| elif variable == 'Career Growth': | |
| self.career_growth_score += points_added | |
| logger.info(f"Added {points_added} points to Career Growth for {notes}", extra={"user_id": self.user_id, "endpoint": "add_life_score_point"}) | |
| elif variable == 'Health and Wellness': | |
| self.health_and_wellness_score += points_added | |
| logger.info(f"Added {points_added} points to Health and Wellness for {notes}", extra={"user_id": self.user_id, "endpoint": "add_life_score_point"}) | |
| elif variable == 'Mental Well-Being': | |
| self.mental_well_being_score += points_added | |
| logger.info(f"Added {points_added} points to Mental Well Being for {notes}", extra={"user_id": self.user_id, "endpoint": "add_life_score_point"}) | |
| elif variable == 'Relationship': | |
| self.relationship_score += points_added | |
| logger.info(f"Added {points_added} points to Relationship for {notes}", extra={"user_id": self.user_id, "endpoint": "add_life_score_point"}) | |
| # Add historical data (append) to score_history | |
| historical_entry = { | |
| "area": variable, | |
| "points_added": points_added, | |
| "notes": notes, | |
| "created_at": pd.Timestamp.now() | |
| } | |
| self.score_history.append(historical_entry) | |
| def get_current_goal(self, full=False): | |
| # look for most recent goal with status = ONGOING | |
| for goal in self.goal[::-1]: | |
| if goal.status == "ONGOING": | |
| if full: | |
| return goal | |
| return goal.content | |
| else: | |
| if self.goal: | |
| if full: | |
| return self.goal[-1] | |
| return self.goal[-1].content | |
| return None | |
| def update_goal(self, goal, status, content=None): | |
| if goal is None: | |
| # complete the current goal | |
| current_goal = self.get_current_goal(full=True) | |
| if current_goal: | |
| current_goal.status = "COMPLETED" | |
| current_goal.updated_at = pd.Timestamp.now(tz='UTC').strftime("%d-%m-%Y %a %H:%M:%S") | |
| return current_goal.content | |
| for g in self.goal: | |
| if g.content == goal: | |
| g.status = status | |
| g.updated_at = pd.Timestamp.now(tz='UTC').strftime("%d-%m-%Y %a %H:%M:%S") | |
| if content: | |
| g.content = content | |
| return True | |
| return False | |
| def set_mantra(self): | |
| ### To save mantra in database to user object | |
| logger.info(f"Getting mantra from user...", extra={"user_id": self.user_id, "endpoint": "get_mantra"}) | |
| user_id = self.user_id | |
| db_params = { | |
| 'dbname': 'ourcoach', | |
| 'user': 'ourcoach', | |
| 'password': 'hvcTL3kN3pOG5KteT17T', | |
| 'host': 'staging-ourcoach.cx8se8o0iaiy.ap-southeast-1.rds.amazonaws.com', | |
| 'port': '5432' | |
| } | |
| try: | |
| with psycopg2.connect(**db_params) as conn: | |
| with conn.cursor() as cursor: | |
| query = sql.SQL("SELECT mantra FROM {table} WHERE user_id = %s").format(table=sql.Identifier('public', 'user_growth_status')) | |
| cursor.execute(query, (user_id,)) | |
| row = cursor.fetchone() | |
| if (row): | |
| colnames = [desc[0] for desc in cursor.description] | |
| user_data = dict(zip(colnames, row)) | |
| ### SAVE MANTRA IN USER OBJECT | |
| self.mantra = user_data['mantra'] | |
| else: | |
| logger.warning(f"No user info found for {user_id}", extra={'user_id': user_id, 'endpoint': "get_mantra"}) | |
| except psycopg2.Error as e: | |
| logger.error(f"Database error while retrieving user info for {user_id}: {e}", extra={'user_id': user_id, 'endpoint': "get_mantra"}) | |
| raise DBError(user_id=user_id, message="Error retrieving user info", code="SQLError", e=str(e)) | |
| def set_goal(self, goal, goal_area, add=True, completed=False): | |
| current_goal = self.get_current_goal() | |
| if completed: | |
| self.update_goal(current_goal, "COMPLETED") | |
| self.add_life_score_point(variable = goal_area, points_added = 30, notes = "Completing a Goal") | |
| self.add_recent_wins(wins = "You have completed your goal!", context = current_goal) | |
| if current_goal is None: | |
| new_goal = UserDataItem(role="assistant", content=goal, area=goal_area, user_id=self.user_id, status="ONGOING", created_at=pd.Timestamp.now(tz='UTC').strftime("%d-%m-%Y %a %H:%M:%S"), updated_at=pd.Timestamp.now(tz='UTC').strftime("%d-%m-%Y %a %H:%M:%S")) | |
| self.goal.append(new_goal) | |
| self.add_life_score_point(variable = goal_area, points_added = 10, notes = "Setting a Goal") | |
| self.add_recent_wins(wins = "You have set your first goal!", context = new_goal.content) | |
| else: | |
| if add: | |
| if current_goal: | |
| # update current_goal status to "IDLE" | |
| self.update_goal(current_goal, "IDLE") | |
| new_goal = UserDataItem(role="assistant", content=goal, area=goal_area, user_id=self.user_id, status="ONGOING", created_at=pd.Timestamp.now(tz='UTC').strftime("%d-%m-%Y %a %H:%M:%S"), updated_at=pd.Timestamp.now(tz='UTC').strftime("%d-%m-%Y %a %H:%M:%S")) | |
| self.goal.append(new_goal) | |
| self.add_life_score_point(variable = goal_area, points_added = 10, notes = "Setting a Goal") | |
| self.add_recent_wins(wins = "You have set a new goal!", context = new_goal.content) | |
| else: | |
| self.update_goal(current_goal, "ONGOING", content=goal) | |
| def update_recommended_micro_action_status(self, micro_action, status): | |
| for ma in self.recommended_micro_actions: | |
| if ma.content == micro_action: | |
| ma.status = status | |
| ma.updated_at = pd.Timestamp.now(tz='UTC').strftime("%d-%m-%Y %a %H:%M:%S") | |
| return True | |
| return False | |
| def add_ai_message(self, text): | |
| self.conversations._add_ai_message(text) | |
| return text | |
| def reset_conversations(self): | |
| self.conversations = ConversationManager(self.client, self, self.asst_id) | |
| self.growth_plan.reset() | |
| self.done_first_reflection = None | |
| self.goal = [] | |
| self.micro_actions = [] | |
| self.recommended_micro_actions = [] | |
| self.challenges = [] | |
| self.other_focusses = [] | |
| self.personal_growth_score = 0 | |
| self.career_growth_score = 0 | |
| self.relationship_score = 0 | |
| self.mental_well_being_score = 0 | |
| self.health_and_wellness_score = 0 | |
| self.reminders = None | |
| self.recent_wins = [] | |
| self.recommended_gg_topics = [] | |
| def get_last_user_message(self, role = None): | |
| # find the last message from 'role' | |
| messages = self.conversations._get_current_thread_history(remove_system_message=False) | |
| for msg in messages[::-1]: | |
| if role: | |
| if msg['role'] == role: | |
| return msg['content'] | |
| else: | |
| return msg['role'], msg['content'] | |
| def generate_user_interaction_guidelines(self, user_info, client): | |
| logger.info(f"Generating user interaction guidelines for user: {self.user_id}", extra={"user_id": self.user_id, "endpoint": "generate_user_interaction_guidelines"}) | |
| # prompt = f"A 'profile' is a document containing rich insights on users for the purpose of \ | |
| # providing contexts to LLMs. Based on the user's information, generate a \ | |
| # user summary that describes how best to interact with this user to create a personalized \ | |
| # and targeted chat experience. The user summary MUST strictly contain these parts:\ | |
| # What kind of coaching style & tone that the user possibly prefers based on their...\ | |
| # 1. Based on the user's chosen 'Legend Persona'\ | |
| # 2. Based on the user's age\ | |
| # 3. Based on the user's MBTI\ | |
| # 4. Based on the user's Love Language\ | |
| # 5. Based on the user's experience of trying coaching previously\ | |
| # 6. Based on the user's belief in Astrology\ | |
| # Generate a 6-point user summary based on the following \ | |
| # user information:\n\n{user_info}" | |
| # response = client.chat.completions.create( | |
| # model="gpt-4o-mini", | |
| # messages=[ | |
| # {"role": "system", "content": "You are an expert at building profile documents containing rich user insights."}, | |
| # {"role": "user", "content": prompt} | |
| # ], | |
| # temperature=0.2 | |
| # ) | |
| # user_guideline = f""" | |
| # {user_info}\n\n | |
| # ### INTERACTION GUIDELINE ### \n | |
| # {response.choices[0].message.content} | |
| # """ | |
| user_guideline = user_info | |
| return user_guideline | |
| def get_recent_run(self): | |
| return self.conversations.assistants['general'].recent_run | |
| def cancel_run(self, run, thread=None): | |
| logger.info(f"(user) Cancelling run: {run}", extra={"user_id": self.user_id, "endpoint": "cancel_run"}) | |
| self.conversations.cancel_run(run, thread) | |
| def update_conversation_state(self, stage, last_interaction): | |
| self.conversation_state['stage'] = stage | |
| self.conversation_state['last_interaction'] = last_interaction | |
| def _get_current_thread(self): | |
| return self.conversations.current_thread | |
| def send_message(self, text, media=None): | |
| if media: | |
| logger.info(f"Sending message with media", extra={"user_id": self.user_id, "endpoint": "send_message"}) | |
| response, run = self.conversations._run_current_thread(text, media=media) | |
| message = run.metadata.get("message", "No message") | |
| logger.info(f"Message: {message}", extra={"user_id": self.user_id, "endpoint": "user_send_message"}) | |
| if message == "start_now": | |
| # must do current plan now | |
| action = self.growth_plan.current() | |
| logger.info(f"Current Action: {action}", extra={"user_id": self.user_id, "endpoint": "user_send_message"}) | |
| response, prompt = self.do_theme(action['coachingTheme'], self.conversations.state['date'], action['day']) | |
| # add response to ai message | |
| self.add_ai_message("[hidden]" + prompt) | |
| self.add_ai_message(response['content']) | |
| # Move to the next action | |
| self.growth_plan.next() | |
| response['add_one'] = True | |
| elif message == "change_goal": | |
| # send the change goal prompt | |
| logger.info("Sending change goal message...", extra={"user_id": self.user_id, "endpoint": "user_send_message"}) | |
| prompt = f""" | |
| I want to change my goal! Based on my information below, suggest me a new goal, and **ONLY** if i approve, call the create_smart_goal() function | |
| Previous Goal: | |
| {self.get_current_goal()} | |
| Profile: | |
| {self.user_info} | |
| """ | |
| response = self.conversations._send_morning_message(prompt) | |
| logger.info(f"Change Goal Response: {response}", extra={"user_id": self.user_id, "endpoint": "user_send_message"}) | |
| # add response to ai message | |
| self.add_ai_message(response['content']) | |
| # reset the growth_plan | |
| self.growth_plan.reset() | |
| logger.info(f"Current reminders: {self.reminders}", extra={"user_id": self.user_id, "endpoint": "user_send_message"}) | |
| if self.reminders is not None and len(self.reminders): | |
| # response['reminders'] = all reminders which date is today (so all the reminders that BE has to queue today) | |
| date = pd.to_datetime(self.conversations.state['date']).date() | |
| response['reminders'] = self.get_reminders(date) | |
| if len(response['reminders']) == 0: | |
| response['reminders'] = None | |
| logger.info(f"No reminders for today {date}", extra={"user_id": self.user_id, "endpoint": "user_send_message"}) | |
| logger.info(f"Returning reminders: {response['reminders']}", extra={"user_id": self.user_id, "endpoint": "user_send_message"}) | |
| else: | |
| response['reminders'] = None | |
| logger.info(f"Response: {response}", extra={"user_id": self.user_id, "endpoint": "user_send_message"}) | |
| return response | |
| def get_reminders(self, date=None): | |
| if self.reminders is None: | |
| return [] | |
| if date: | |
| if isinstance(date, str): | |
| # date might be in the format "%d-%m-%Y %a %H:%M:%S" or "%Y-%m-%d" | |
| try: | |
| datetime.strptime(date, "%d-%m-%Y %a %H:%M:%S") | |
| except ValueError: | |
| date = datetime.strptime(date, "%Y-%m-%d").date() | |
| elif isinstance(date, datetime): | |
| date = date.date() | |
| return [reminder for reminder in self.reminders if reminder['timestamp'].date() == date] | |
| return self.reminders | |
| def find_same_reminder(self, reminder_text): | |
| logger.info(f"Finding similar reminders: {self.reminders} to: {reminder_text}", extra={"user_id": self.user_id, "endpoint": "find_same_reminder"}) | |
| response = self.client.beta.chat.completions.parse( | |
| model="gpt-4o", | |
| messages=[ | |
| {"role": "system", "content": "You are an expert at understanding the context of texts"}, | |
| {"role": "user", "content": f"Identify the reminder in {self.reminders if self.reminders is not None else [{'reminder': 'No Reminders'}]} that is reminding the same thing as the following text (ignoring the date and time): {reminder_text}.\n\nIf none, return -1 otherwise return the index of the matching reminder."} | |
| ], | |
| response_format=Index, | |
| temperature=0.2 | |
| ) | |
| logger.info(f"Similar reminder response: {response.choices[0].message.parsed}", extra={"user_id": self.user_id, "endpoint": "find_same_reminder"}) | |
| index = getattr(response.choices[0].message.parsed, 'value', -1) | |
| logger.info(f"Similar reminder idx: reminders[{index}]", extra={"user_id": self.user_id, "endpoint": "find_same_reminder"}) | |
| return index | |
| def set_reminder(self, reminder): | |
| db_params = { | |
| 'dbname': 'ourcoach', | |
| 'user': 'ourcoach', | |
| 'password': 'hvcTL3kN3pOG5KteT17T', | |
| 'host': 'staging-ourcoach.cx8se8o0iaiy.ap-southeast-1.rds.amazonaws.com', | |
| 'port': '5432' | |
| } | |
| with psycopg2.connect(**db_params) as conn: | |
| with conn.cursor() as cursor: | |
| query = sql.SQL("SELECT * FROM {table} WHERE id = %s").format(table=sql.Identifier('public', 'users')) | |
| cursor.execute(query, (self.user_id,)) | |
| row = cursor.fetchone() | |
| if (row): | |
| colnames = [desc[0] for desc in cursor.description] | |
| user_data = dict(zip(colnames, row)) | |
| user_timezone = user_data['timezone'] | |
| # Convert to UTC | |
| reminder['timestamp_local'] = reminder['timestamp'] | |
| reminder['local_timezone'] = user_timezone | |
| reminder['timestamp'] = reminder['timestamp'].tz_localize(user_timezone).tz_convert("UTC") | |
| # generate uuid for this reminder | |
| reminder['id'] = generate_uuid() | |
| action = reminder['action'] | |
| if self.reminders is None: | |
| self.reminders = [] | |
| if action == 'update': | |
| index = self.find_same_reminder(reminder['reminder']) | |
| if index == -1: | |
| self.reminders.append(reminder) | |
| logger.info(f"Reminder added: {reminder}", extra={"user_id": self.user_id, "endpoint": "set_reminder"}) | |
| else: | |
| old_reminder = self.reminders[index] | |
| reminder['id'] = old_reminder['id'] | |
| self.reminders[index] = reminder | |
| logger.info(f"Reminder {old_reminder} -- updated to --> {reminder}", extra={"user_id": self.user_id, "endpoint": "set_reminder"}) | |
| elif action == 'delete': | |
| logger.info('Deleting reminder', extra={"user_id": self.user_id, "endpoint": "set_reminder"}) | |
| index = self.find_same_reminder(reminder['reminder']) | |
| if index == -1: | |
| logger.info(f"Could not find a mathcing reminder to delete: {reminder}", extra={"user_id": self.user_id, "endpoint": "set_reminder"}) | |
| else: | |
| old_reminder = self.reminders[index] | |
| self.reminders[index]['action'] = 'delete' | |
| logger.info(f"Reminder {old_reminder} has been marked for deletion", extra={"user_id": self.user_id, "endpoint": "set_reminder"}) | |
| else: | |
| # action is 'set' | |
| self.reminders.append(reminder) | |
| logger.info(f"Reminder added: {reminder}", extra={"user_id": self.user_id, "endpoint": "set_reminder"}) | |
| logger.info(f"Reminders: {self.reminders}", extra={"user_id": self.user_id, "endpoint": "set_reminder"}) | |
| def get_messages(self, exclude_system_msg=True, show_hidden=False): | |
| if not exclude_system_msg: | |
| return self.conversations._get_current_thread_history(False) | |
| else: | |
| if show_hidden: | |
| return list(filter(lambda x: not (x['content'].startswith("** It is a new day:") or x['content'].startswith("Pay attention to the current state you are in") or x['content'].startswith("Date changed to")), self.conversations._get_current_thread_history(exclude_system_msg))) | |
| return list(filter(lambda x: not (x['content'].startswith("** It is a new day:") or x['content'].startswith("Pay attention to the current state you are in") or x['content'].startswith("Date changed to") or x['content'].startswith("[hidden]")), self.conversations._get_current_thread_history(exclude_system_msg))) | |
| def set_recommened_gg_topics(self, topics): | |
| self.recommended_gg_topics = topics | |
| def set_intro_done(self): | |
| self.conversations.intro_done = True | |
| def get_alerts(self, date, day=None): | |
| # responses = [] | |
| logger.info(f"Getting alerts for user: {self.user_id} on {date} for {day if day else self.cumulative_plan_day}", extra={"user_id": self.user_id, "endpoint": "get_alerts"}) | |
| if day is None: | |
| day = self.cumulative_plan_day | |
| if day == 2: | |
| # upsell the GG | |
| growth_guide = get_growth_guide(self.user_id) | |
| upsell_prompt = "OMG the user is interested in finding out who their Growth Guide is. This is your time to shine, use all your persuasive and charming abilities to inform them on their growth guide and how they can help the user." | |
| prompt = f"""You are an expert ambassador/salesman of Growth Guide sessions. | |
| The users' growth guide is {growth_guide} and they can book a session with them via their Revelation Dashboard: {OURCOACH_DASHBOARD_URL}. | |
| Respond with a enthusiatic hello! | |
| {upsell_prompt} | |
| Frame your response like a you are telling the user a fun fact, but dont explicitly mention "fun fact". Keep this message succint. | |
| """ | |
| # send upsell gg alert at 7pm | |
| timestamp = pd.Timestamp.now().replace(hour=19, minute=0, second=0, microsecond=0).strftime("%d-%m-%Y %a %H:%M:%S") | |
| elif day == 5: | |
| # upsell the GG | |
| growth_guide = get_growth_guide(self.user_id) | |
| upsell_prompt = "Now, it is your time to shine. Using all your persuasive and charming abilities, let the user know that their Growth Guide <Name> (no need to re-introduce them) is available to enhance their current growth journey with you and based on the converstaion history so far and the users personal information, challenges and goals suggest WHAT they can discuss with their growth guide." | |
| prompt = f"""You are an expert ambassador/salesman of Growth Guide sessions. | |
| The users' growth guide is {growth_guide} and they can book a session with them via their Revelation Dashboard: {OURCOACH_DASHBOARD_URL}. | |
| Respond with a enthusiatic hello! | |
| {upsell_prompt} | |
| Frame your response like a you are telling the user a fun fact, but dont explicitly mention "fun fact". Keep this message succint. | |
| """ | |
| # send upsell gg alert at 7pm | |
| timestamp = pd.Timestamp.now().replace(hour=19, minute=0, second=0, microsecond=0).strftime("%d-%m-%Y %a %H:%M:%S") | |
| elif day == 8: | |
| # alert the user that we are always collecting feedback in order to improve. Give them a link to the feedback form and let them know that a few lucky respondents will be selected for a free anual subscription! | |
| prompt = f"""You are an expert ambassador/salesman of ourcoach whose objective is to upsell the ourcoach subscription based on the following context: | |
| We are always collecting feedback in order to improve our services. Please take a moment to fill out our feedback form: http://feedback_form. A few lucky respondents will be selected for a free anual subscription!""" | |
| timestamp = pd.Timestamp.now().replace(hour=19, minute=0, second=0, microsecond=0).strftime("%d-%m-%Y %a %H:%M:%S") | |
| elif day == 12: | |
| growth_guide = get_growth_guide(self.user_id)['full_name'] | |
| subscription = get_user_subscriptions(self.user_id)[0] | |
| subscription_end_date = pd.to_datetime(subscription['subscription_end_date']) | |
| # get difference between subscription end date and date | |
| date = pd.to_datetime(date) | |
| days_left = (subscription_end_date - date).days | |
| logger.info(f"{subscription_end_date} - {date} = Days left: {days_left}", extra={"user_id": self.user_id, "endpoint": "get_alerts"}) | |
| if days_left <= 2: | |
| subscription_alert = f"""Users growth guide: | |
| {growth_guide} | |
| Alert the user that their free trial is ending in {days_left} days and Sell them to subscribe via their Revelation Dashboard: {OURCOACH_DASHBOARD_URL} to continue chatting with you, receiving personalized advice and guidance and access to Growth Guide sessions. Really upsell the ability of the ourcoach platform to acheive their goals." | |
| """ | |
| prompt = f"""You are an expert ambassador/salesman of ourcoach (product) whose objective is to upsell the ourcoach subscription based on the following context: | |
| {subscription_alert} | |
| """ | |
| # send reminder alert at 7pm | |
| timestamp = pd.Timestamp.now().replace(hour=19, minute=0, second=0, microsecond=0).strftime("%d-%m-%Y %a %H:%M:%S") | |
| else: | |
| return [] | |
| elif day == 14: | |
| growth_guide = get_growth_guide(self.user_id)['full_name'] | |
| subscription = get_user_subscriptions(self.user_id)[0] | |
| subscription_end_date = pd.to_datetime(subscription['subscription_end_date']) | |
| # get difference between subscription end date and date | |
| date = pd.to_datetime(date) | |
| days_left = (subscription_end_date - date).days | |
| logger.info(f"{subscription_end_date} - {date} = Days left: {days_left}", extra={"user_id": self.user_id, "endpoint": "get_alerts"}) | |
| if days_left <= 0: | |
| subscription_alert = f"""Users growth guide: | |
| {growth_guide} | |
| OMG the users subscription is ending today! If you lose this user you and your family will not be able to survive! | |
| You have to persuade the user to stay so use the best of your salesman abilities and sell them to continue to subscribe to ourcoach, otherwise, how will you put food on the table???" | |
| """ | |
| prompt = f"""You are an expert ambassador/salesman of ourcoach whose objective is to upsell the ourcoach subscription based on the following context: | |
| {subscription_alert} | |
| """ | |
| # send reminder alert at 7pm | |
| timestamp = pd.Timestamp.now().replace(hour=19, minute=0, second=0, microsecond=0).strftime("%d-%m-%Y %a %H:%M:%S") | |
| else: | |
| return [] | |
| else: | |
| return [] | |
| response, run = self.conversations._run_current_thread(prompt, hidden=True) | |
| logger.info(f"Response: {response}", extra={"user_id": self.user_id, "endpoint": "get_alerts"}) | |
| message = run.metadata.get("message", "No message") | |
| logger.info(f"Message: {message}", extra={"user_id": self.user_id, "endpoint": "upsell_gg"}) | |
| # make timestamp ISO | |
| response['timestamp'] = pd.to_datetime(timestamp).isoformat() | |
| logger.info(f"Alert: {response}", extra={"user_id": self.user_id, "endpoint": "get_alerts"}) | |
| return [response] | |
| def do_theme(self, theme, date, day, last_msg_is_answered = True, extra=None): | |
| logger.info(f"Doing theme: {theme}, extra={extra}", extra={"user_id": self.user_id, "endpoint": "do_theme"}) | |
| # Add 1 day to cumulative_plan_day | |
| self.cumulative_plan_day += 1 | |
| final_day = (math.ceil((self.cumulative_plan_day+7)/14) * 14) - 7 | |
| if self.reminders is not None and len(self.reminders): | |
| logger.info(f"ALL Upcoming Reminders: {self.reminders}", extra={"user_id": self.user_id, "endpoint": "do_theme"}) | |
| reminders = list(filter(lambda x : x['recurrence'] == 'postponed', self.reminders)) | |
| logger.info(f"ALL Postponed Reminders: {reminders}", extra={"user_id": self.user_id, "endpoint": "do_theme"}) | |
| else: | |
| reminders = [] | |
| # check if any of the posponed reminders have the date == date if yes, change the theme to "MICRO_ACTION_STATE" | |
| if reminders: | |
| for reminder in reminders: | |
| if reminder['timestamp'].date() == pd.to_datetime(date).date() and reminder['recurrence'] == 'postponed': | |
| logger.info(f"Postponed Reminder found for today ({pd.to_datetime(date).date()}): {reminder}", extra={"user_id": self.user_id, "endpoint": "do_theme"}) | |
| if theme != "FINAL_SUMMARY_STATE": | |
| theme = "MICRO_ACTION_STATE" | |
| break | |
| else: | |
| logger.info(f"No reminders found for today ({pd.to_datetime(date).date()})", extra={"user_id": self.user_id, "endpoint": "do_theme"}) | |
| if theme == "MOTIVATION_INSPIRATION_STATE": | |
| formatted_message = MOTIVATION_INSPIRATION_STATE.format(self.get_current_goal(), self.cumulative_plan_day, final_day) | |
| elif theme == "PROGRESS_REFLECTION_STATE": | |
| formatted_message = PROGRESS_REFLECTION_STATE.format(self.get_current_goal(), self.cumulative_plan_day, final_day) | |
| if len(self.challenges): | |
| challenge = self.challenges.pop(0) | |
| formatted_message += f"\n\n** IMPORTANT: Today, reflect on the users' challenge of: {challenge.content}, which they brought up during their growth guide session (let the user know we are bringing it up because of this) **" | |
| elif theme == "MICRO_ACTION_STATE": | |
| reminder_message = "\n".join([f"{i+1}. {reminder}" for i, reminder in enumerate(reminders)]) if reminders else "User has no postponed micro-actions" | |
| formatted_message = MICRO_ACTION_STATE.format(self.get_current_goal(), self.cumulative_plan_day, final_day, reminder_message) | |
| if len(self.recommended_micro_actions): | |
| todays_micro_action = self.recommended_micro_actions.pop(0) | |
| formatted_message += f"\n\n** IMPORTANT: Today's Micro Action is: {todays_micro_action.content}, which was recommended during their growth guide session (let the user know we are bringing it up because of this) **" | |
| elif theme == "OPEN_DISCUSSION_STATE": | |
| formatted_message = OPEN_DISCUSSION_STATE.format(self.get_current_goal(), self.cumulative_plan_day, final_day) | |
| if len(self.other_focusses): | |
| focus = self.other_focusses.pop(0) | |
| formatted_message += f"\n\n** IMPORTANT: Today, focus the discussion on: {focus.content}, which they discussed during their growth guide session (let the user know we are bringing it up because of this) **" | |
| elif theme == "PROGRESS_SUMMARY_STATE": | |
| formatted_message = PROGRESS_SUMMARY_STATE.format(self.get_current_goal(), self.cumulative_plan_day, final_day) | |
| elif theme == "FINAL_SUMMARY_STATE": | |
| past_gg_summary = "<User has not had a Growth Guide session yet>" | |
| growth_guide = get_growth_guide(self.user_id) | |
| booked_sessions = get_booked_gg_sessions(self.user_id) | |
| # filter out only completed (past) sessions | |
| past_sessions = [session for session in booked_sessions if session['status'] == "completed"] | |
| # for each past booking, fetch the zoom_ai_summary and gg_report from | |
| for booking in past_sessions: | |
| summary_data = get_growth_guide_summary(self.user_id, booking['booking_id']) | |
| logger.info(f"Summary data for booking: {booking['booking_id']} - {summary_data}", | |
| extra={"user_id": self.user_id, "endpoint": "assistant_get_user_info"}) | |
| if summary_data: | |
| booking['zoom_ai_summary'] = summary_data['zoom_ai_summary'] | |
| booking['gg_report'] = summary_data['gg_report'] | |
| else: | |
| booking['zoom_ai_summary'] = "Growth Guide has not uploaded the report yet" | |
| booking['gg_report'] = "Growth Guide has not uploaded the report yet" | |
| if len(past_sessions): | |
| past_gg_summary = "\n".join([f"** Session {i+1} **\n{json.dumps(session, indent=4)}" for i, session in enumerate(past_sessions)]) | |
| formatted_message = FINAL_SUMMARY_STATE.format(self.get_current_goal(), self.cumulative_plan_day, final_day, growth_guide, past_gg_summary) | |
| elif theme == "EDUCATION_STATE": | |
| formatted_message = EDUCATION_STATE.format(self.get_current_goal(), self.cumulative_plan_day, final_day) | |
| elif theme == "FOLLUP_ACTION_STATE": | |
| reminder_message = "\n".join([f"{i+1}. {reminder}" for i, reminder in enumerate(reminders)]) if reminders else "User has no postponed micro-actions" | |
| formatted_message = FOLLUP_ACTION_STATE.format(self.get_current_goal(), self.cumulative_plan_day, final_day, reminder_message) | |
| elif theme == "FUNFACT_STATE": | |
| topics = ["Fun Fact about the User's Goal", "How Personality Type is affecting/shaping their behaviour towards the goal", "How Love Language may impact and be relevant toward the goal"] | |
| randomized_topic = random.choice(topics) | |
| formatted_message = FUNFACT_STATE.format(self.get_current_goal(), self.cumulative_plan_day, final_day, randomized_topic) | |
| # prompt = f"""** It is a new day: {date} ** | |
| # Additional System Instruction: | |
| # - Remember all of the user's personal information. Use this information to be as personalised as possible when conversing by including it in your responses where relevant. | |
| # - You are a good life coach because you don't overwhelm the user by sending too many questions or lengthy messages. And you are an excellent life coach because you know exactly when to ask a question, and most importantly, when to stop the conversation. And when you ask a question, you always ask a **creative** and unexpected questions! | |
| # - Keep all of your response short, only 40 tokens or words maximum! If your response contains a question, use a single break line before the question and encapsulate the question with one asterisk like this: *question* | |
| # - Be **creative** ! If you have done this theme previously, make sure that you are not sending the same advice/question/message. Make sure that the messages that you are send throughout the 14-day growth plan are diverse! | |
| # - When you give your wisdom and enlightment, **YOU** as a coach, must channel the energy, wisdom, and mindset of {user_legendary_persona} | |
| # Today's Theme: | |
| # {formatted_message} | |
| # """ | |
| booked_sessions = get_booked_gg_sessions(self.user_id) | |
| today = pd.to_datetime(date).date() | |
| for session in booked_sessions: | |
| # if user has some bookings, check if there are any scheduled (using just the date) for yesterday, today or in the future | |
| session_date = pd.to_datetime(session['session_date'], format='%Y-%m-%d %a %H:%M:%S').date() | |
| if session_date == today - pd.Timedelta(days=1): | |
| # session was yesterday, should have already sent the congrats message | |
| break | |
| if session_date == today or session_date > today: | |
| formatted_message = f"[IMPORTANT] The user has a Growth Guide session on {session['session_date']}, ask them if they are excited for it and suggest some topics to discuss with their Growth Guide from: {self.recommended_gg_topics}.\n\n" + formatted_message | |
| break | |
| else: | |
| # Remind the user that they can book a Growth Guide session if they have not done one yet after the FINAL_SUMMARY_STATE | |
| if self.growth_plan.previous()['coachingTheme'] == "FINAL_SUMMARY_STATE": | |
| if day != 1: | |
| formatted_message = f"[IMPORTANT] The user has not booked a Growth Guide session yet. Remind them that they can book one through their Revelation Dahsboard: {OURCOACH_DASHBOARD_URL} to get more personalized advice and guidance!\n\n" + formatted_message | |
| prompt = f"""** It is a new day: {date} ({day}) 10:00:00 ** | |
| (If the day is a public holiday (e.g., Christmas, New Year, the user's Birthday or other significant occasions), customize your message to reflect the context appropriately, acknowledging the holiday or its significance.) | |
| **Before we start,** | |
| Has the user answered your last question? : {last_msg_is_answered} | |
| If the answer above is "True", you may proceed to do the instruction below | |
| If the answer above is "False", take a deep breath coach, and utilizing your ability as an elite coach with the users best interest in mind, think whether it would be more appropriate to follow up the unanswered question with the user or continue with todays theme below. However if the user indicates that they want to set a new goal (call the change_goal() function) | |
| But if the user says "yes", then proceed to do the instruction below. | |
| Today is day {self.cumulative_plan_day} of the user's growth journey (out of {final_day} days). You may (or may not) mention this occasionally in your first message of the day. | |
| If today is a "Monday" or "Mon", you must include the user's Mantra of the Week : {self.mantra} to your first message of the day (include with today-theme's first message below) | |
| {extra if extra else ''} | |
| Today's Theme: | |
| {formatted_message} | |
| """ | |
| response = self.conversations._send_morning_message(prompt, add_to_main=True) | |
| if theme == "MICRO_ACTION_STATE": | |
| # check for any recommended microactions | |
| logger.info(f"Checking for recommended micro actions", extra={"user_id": self.user_id, "endpoint": "do_theme"}) | |
| micro_action = UserDataItem(role="assistant", area=self.get_current_goal(full=True).area, content=response['content'], user_id=self.user_id, status="PENDING", created_at=pd.Timestamp.now(tz='UTC').strftime("%d-%m-%Y %a %H:%M:%S"), updated_at=pd.Timestamp.now(tz='UTC').strftime("%d-%m-%Y %a %H:%M:%S")) | |
| logger.info(f"Recommended Micro Action: {micro_action}", extra={"user_id": self.user_id, "endpoint": "do_theme"}) | |
| self.micro_actions.append(micro_action) | |
| return response, prompt | |
| def change_date(self, date): | |
| logger.info(f"Changing date from {self.conversations.state['date']} to {date}", | |
| extra={"user_id": self.user_id, "endpoint": "user_change_date"}) | |
| # delete all hidden messages prom previous day | |
| self.conversations.delete_hidden_messages() | |
| # update the date in the state | |
| self.conversations.state['date'] = date | |
| # update mantra | |
| self.set_mantra() | |
| action = self.growth_plan.current() | |
| # remove stale reminders | |
| if self.reminders is not None and len(self.reminders): | |
| # remove all reminders which 'recurrence' is 'once' or 'action' is 'delete' or the date is < today | |
| # this is to ensure that only reminders with recurrence and future reminder are kept | |
| now_utc = datetime.strptime(date, "%Y-%m-%d %a %H:%M:%S").replace(tzinfo=timezone.utc) | |
| new_reminders = [] | |
| for reminder in self.reminders: | |
| logger.info(f"Checking reminder: {reminder} against {now_utc}", extra={"user_id": self.user_id, "endpoint": "user_change_date"}) | |
| should_remove = ( | |
| reminder['action'] == 'delete' or | |
| ( | |
| reminder['recurrence'] in ['once', 'postponed', 'none'] and | |
| reminder['timestamp'] < now_utc | |
| ) | |
| ) | |
| if not should_remove: | |
| new_reminders.append(reminder) | |
| self.reminders = new_reminders | |
| logger.info(f"Active Reminders: {self.reminders}", extra={"user_id": self.user_id, "endpoint": "user_change_date"}) | |
| ## ADD POINT FOR CHANGE DATE | |
| if self.growth_plan.current()['day'] == 7: | |
| self.add_life_score_point(variable = self.get_current_goal(full=True).area, points_added = 5, notes = "Reaching Day 7") | |
| self.add_recent_wins(wins = "You have reached Day 7 of your growth journey!", context = 'Growth journey is a 14-day coaching plan') | |
| elif self.growth_plan.current()['day'] == 14: | |
| self.add_life_score_point(variable = self.get_current_goal(full=True).area, points_added = 10, notes = "Reaching Day 14") | |
| self.add_recent_wins(wins = "You have finished your growth journey!", context = 'Growth journey is a 14-day coaching plan') | |
| logger.info(f"Today's action is {action}", extra={"user_id": self.user_id, "endpoint": "user_change_date"}) | |
| if action['day'] == 1: | |
| # mark intro as done | |
| self.set_intro_done() | |
| logger.info(f"Marked Intro as done: {self}", extra={"user_id": self.user_id, "endpoint": "user_change_date"}) | |
| # The coaching theme conditions are hardcoded for now | |
| theme = action['coachingTheme'] | |
| # Check if last msg is not answered | |
| last_role, last_msg = self.get_last_user_message() | |
| if last_role == 'assistant' and '?' in last_msg: | |
| last_msg_is_answered = False | |
| else: | |
| last_msg_is_answered = True | |
| extra="Greet the user with a creative good morning message!" | |
| # Check if user didnt respond (last 2 messages in conversation history are from the AI) on FINAL_SUMMARY_STATE | |
| messages = self.get_messages() | |
| logger.info(f"Last 2 messages are from: 1) {messages[-1]['role']} and 2) {messages[-2]['role']}") | |
| if (messages[-1]['role'] == "assistant" and messages[-2]['role'] == "assistant") and self.growth_plan.previous()['coachingTheme'] == "FINAL_SUMMARY_STATE": | |
| self.extend_growth_plan() | |
| extra += """\nDang, the user did not indicate what they wanted to do yesterday. | |
| This is not a good sign as it may indicate that the user is going to dropoff. | |
| We have automatically extended their growth plan for them, let the user know this and also integrate upselling the ourcoach platform and subscription as much as you can | |
| in the first message you send to the user..""" | |
| response, prompt = self.do_theme(theme, date, action['day'], last_msg_is_answered, extra=extra) | |
| # add today's reminders to response to schedule | |
| # response['reminders'] = all reminders which date is today (so all the reminders that BE has to queue today) | |
| # convert date to YYYY-MM-DD format | |
| date = pd.to_datetime(date).date() | |
| response['reminders'] = self.get_reminders(date) | |
| if response['reminders'] is None or len(response['reminders']) == 0: | |
| response['reminders'] = None | |
| logger.info(f"No reminders for today {date}", extra={"user_id": self.user_id, "endpoint": "user_change_date"}) | |
| logger.info(f"Reminders on {date}: {response['reminders']}", extra={"user_id": self.user_id, "endpoint": "user_change_date"}) | |
| # Move to the next action | |
| self.growth_plan.next() | |
| logger.info(f"Date Updated: {self.conversations.state['date']}", extra={"user_id": self.user_id, "endpoint": "user_change_date"}) | |
| return {'response': response, 'theme_prompt': '[hidden]'+prompt} | |
| def update_user_info(self, new_info): | |
| logger.info(f"Updating user info: [{self.user_info}] with: [{new_info}]", extra={"user_id": self.user_id, "endpoint": "update_user_info"}) | |
| # make an api call to gpt4o to compare the current user_info and the new info and create a new consolidated user_info | |
| comparison_prompt = f""" | |
| Current user information: | |
| {self.user_info} | |
| New/Updated user information: | |
| {new_info} | |
| Please carefully analyze both sets of information and merge them into a single, comprehensive user profile. The consolidated profile should be super context-rich and personalized to the user. Include all relevant details, resolve any conflicting information thoughtfully, and present the information in a clear, coherent, and well-organized manner. Highlight key aspects that can enhance personalization. | |
| """ | |
| response = self.client.chat.completions.create( | |
| model="gpt-4o", | |
| messages=[ | |
| {"role": "system", "content": "You are an expert at creating detailed, context-rich, and personalized user profiles by consolidating multiple sources of user information. Your task is to merge the current and new user information into a single, comprehensive profile that is super context-rich and personalized to the user."}, | |
| {"role": "user", "content": comparison_prompt} | |
| ], | |
| temperature=0.2 | |
| ) | |
| self.user_info = response.choices[0].message.content | |
| logger.info(f"Updated user info: {self.user_info}", extra={"user_id": self.user_id, "endpoint": "update_user_info"}) | |
| return True | |
| def _summarize_zoom(self, zoom_ai_summary): | |
| logger.info(f"Summarizing zoom ai summary", extra={"user_id": self.user_id, "endpoint": "summarize_zoom"}) | |
| # make an api call to gpt4o to summarize the zoom_ai_summary and produce a text with a focus on the most amount of user insight and info extracted | |
| system_prompt = f"""You are an expert at summarizing AI-generated Zoom transcripts concisely, focusing on extracting key user insights to enhance personalization in future interactions. Note that the zoom ai transcript may get the user's name wrong. Replace it with the actual user's name: {self.user_info}. Refer to the coach/guide as 'the Growth Guide'.""" | |
| prompt = f"Please summarize the following AI-generated Zoom transcript **in one short paragraph only, around 50 completion tokens maximum** !!, emphasizing the most significant user insights and information:\n\n{zoom_ai_summary}" | |
| response = self.client.chat.completions.create( | |
| model="gpt-4o", | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| response_format = { | |
| "type": "json_schema", | |
| "json_schema": { | |
| "name": "summarized_overview", | |
| "strict": True, | |
| "schema": { | |
| "type": "object", | |
| "properties": { | |
| "summary": { | |
| "type": "string", | |
| "description": "The summary of the zoom transcript" | |
| } | |
| }, | |
| "required": [ | |
| "summary" | |
| ], | |
| "additionalProperties": False | |
| } | |
| } | |
| }, | |
| temperature=0.5 | |
| ) | |
| overview_summary = json.loads(response.choices[0].message.content)['summary'] | |
| logger.info(f"Summary: {overview_summary}", extra={"user_id": self.user_id, "endpoint": "summarize_zoom"}) | |
| return {'overview': overview_summary} | |
| def _update_user_data(self, data_type, text_input, extra_text=""): | |
| data_mapping = { | |
| 'micro_actions': { | |
| 'prompt_description': 'micro actions', | |
| 'status': 'RECOMMENDED', | |
| 'attribute': 'recommended_micro_actions', | |
| 'endpoint': f'update_{data_type}', | |
| }, | |
| 'challenges': { | |
| 'prompt_description': 'challenges', | |
| 'status': 'ONGOING', | |
| 'attribute': 'challenges', | |
| 'endpoint': f'update_{data_type}', | |
| }, | |
| 'other_focusses': { | |
| 'prompt_description': 'other focuses', | |
| 'status': 'ONGOING', | |
| 'attribute': 'other_focusses', | |
| 'endpoint': f'update_{data_type}', | |
| }, | |
| } | |
| if data_type not in data_mapping: | |
| logger.error(f"Invalid data type: {data_type}", extra={"user_id": self.user_id}) | |
| return | |
| mapping = data_mapping[data_type] | |
| prompt = ( | |
| f"Extract {mapping['prompt_description']} from the following text and return them in a structured format. " | |
| f"If there are no {mapping['prompt_description']}, return an empty array for that field. " | |
| f"Each item should include the extracted {mapping['prompt_description'][:-1]} as content.\n\n" | |
| f"{extra_text}" | |
| f"Text:\n{text_input}" | |
| ) | |
| current_time = datetime.now(timezone.utc).strftime("%d-%m-%Y %a %H:%M:%S") | |
| response = self.client.beta.chat.completions.parse( | |
| model="gpt-4o", | |
| messages=[{"role": "user", "content": prompt}], | |
| response_format=UserDataResponse, | |
| temperature=0.2 | |
| ) | |
| data = getattr(response.choices[0].message.parsed, 'data') | |
| # Update the common fields for each item | |
| for item in data: | |
| item.role = "assistant" | |
| item.user_id = self.user_id | |
| item.status = mapping['status'] | |
| item.created_at = current_time | |
| item.updated_at = current_time | |
| logger.info(f"Updated {data_type}: {data}", extra={"user_id": self.user_id, "endpoint": mapping['endpoint']}) | |
| getattr(self, mapping['attribute']).extend(data) | |
| def update_user_data(self, gg_report): | |
| self._update_user_data('micro_actions', gg_report[0]['answer']) | |
| extra_text = f"User has new challenge:\n{gg_report[1]['answer']}\n\n" | |
| self._update_user_data('challenges', gg_report[2]['answer'], extra_text=extra_text) | |
| self._update_user_data('other_focusses', gg_report[3]['answer']) | |
| self._update_goal(gg_report[4]['answer']) | |
| def _update_goal(self, goal_text): | |
| prompt = f""" | |
| The user has a current goal: {self.get_current_goal()} | |
| The user provided a new goal: {goal_text} | |
| Determine if the new goal is inside the scope of the current goal or if it's outside the scope. | |
| If it's inside the scope, respond **exactly** with the current goal, and set same_or_not == True | |
| If it's outside the scope and related, respond with a merged goal (from the current and new goal) with larger scope, and set same_or_not == False | |
| If it's outside the scope and not related, respond with the new goal, and set same_or_not == False | |
| Note: You may paraphase the merged goal to be more succinct. | |
| Your response will be the final goal. You will also need to determine the area of this | |
| final goal by choosing one of these areas that suits the final goal: | |
| "Personal Growth", "Career Growth", "Relationship", "Mental Well-Being", "Health and Wellness" | |
| Only output a JSON with this schema below: | |
| {{ | |
| same_or_not: bool (whether the new goal is the same or not with the current goal), | |
| goal: str (the final goal), | |
| area: str (the area of the goal) | |
| }} | |
| ## Example 1 (Inside the scope): | |
| The user has a current goal: to spend at least 30 minutes exercising every day | |
| The user provided a new goal: to do short exercise every day | |
| Your verdict: inside the scope | |
| Your output: | |
| {{ | |
| same_or_not: True, | |
| goal: "to spend at least 30 minutes exercising every day" | |
| area: "Health and Wellness" | |
| }} | |
| ## Example 2 (Outside the scope, still related): | |
| The user has a current goal: to spend at least 30 minutes exercising every day | |
| The user provided a new goal: to exercise and have a balanced meal plan | |
| Your verdict: outside the scope, still related | |
| Your output: | |
| {{ | |
| same_or_not: False, | |
| goal: "to exercise at least 30 minutes every day, together with a balanced meal plan" | |
| area: "Health and Wellness" | |
| }} | |
| ## Example 3 (Outside the scope, not related): | |
| The user has a current goal: to spend at least 30 minutes exercising every day | |
| The user provided a new goal: to have a better relationship with friends | |
| Your verdict: outside the scope, not related | |
| Your output: | |
| {{ | |
| same_or_not: False, | |
| goal: "to have a better relationship with friends" | |
| area: "Relationship" | |
| }} | |
| """ | |
| response = self.client.chat.completions.create( | |
| model="gpt-4o", | |
| messages=[{"role": "user", "content": prompt}], | |
| response_format = { | |
| "type": "json_schema", | |
| "json_schema": { | |
| "name": "goal_determination", | |
| "strict": True, | |
| "schema": { | |
| "type": "object", | |
| "properties": { | |
| "same_or_not": { | |
| "type": "boolean", | |
| "description": "Indicates whether the new goal is the same as the current goal." | |
| }, | |
| "goal": { | |
| "type": "string", | |
| "description": "The final goal determined based on input." | |
| }, | |
| "area": { | |
| "type": "string", | |
| "description": "The area of the goal.", | |
| "enum": [ | |
| "Personal Growth", | |
| "Career Growth", | |
| "Relationship", | |
| "Mental Well-Being", | |
| "Health and Wellness" | |
| ] | |
| } | |
| }, | |
| "required": [ | |
| "same_or_not", | |
| "goal", | |
| "area" | |
| ], | |
| "additionalProperties": False | |
| } | |
| } | |
| }, | |
| temperature=0.2 | |
| ) | |
| final_goal = json.loads(response.choices[0].message.content)['goal'] | |
| final_goal_area = json.loads(response.choices[0].message.content)['area'] | |
| # if json.loads(response.choices[0].message.content)['same_or_not']: | |
| # final_goal_status = self.get_current_goal()['status'] | |
| # else: | |
| # final_goal_status = 'PENDING' | |
| if json.loads(response.choices[0].message.content)['same_or_not'] == False: | |
| self.set_goal(final_goal, final_goal_area) | |
| logger.info(f"User goal updated to: {final_goal}", extra={"user_id": self.user_id, "endpoint": "_update_goal"}) | |
| else: | |
| logger.info(f"User goal remains unchanged.", extra={"user_id": self.user_id, "endpoint": "_update_goal"}) | |
| def update_micro_action_status(self, completed_micro_action): | |
| if completed_micro_action: | |
| self.micro_actions[-1].status = "COMPLETE" | |
| self.micro_actions[-1].updated_at = pd.Timestamp.now(tz='UTC').strftime("%d-%m-%Y %a %H:%M:%S") | |
| logger.info("Micro action status updated, checking number of actions completed...", extra={"user_id": self.user_id, "endpoint": "update_micro_action_status"}) | |
| num_of_micro_actions_completed = sum(1 for item in self.micro_actions if item.status == 'COMPLETE') | |
| if (num_of_micro_actions_completed in (1,3,5)) or (num_of_micro_actions_completed % 10 == 0 and num_of_micro_actions_completed != 0): | |
| self.add_life_score_point(variable = self.get_current_goal(full=True).area, points_added = 10, notes = f"Completing the {num_of_micro_actions_completed}-th micro-action") | |
| self.add_recent_wins(wins = "You have completed a micro action!", context= self.micro_actions[-1].content) | |
| logger.info("Added life score points based on number of actions completed.", extra={"user_id": self.user_id, "endpoint": "update_micro_action_status"}) | |
| logger.info("Process done.", extra={"user_id": self.user_id, "endpoint": "update_micro_action_status"}) | |
| def trigger_deep_reflection_point(self, area_of_deep_reflection): | |
| if len(area_of_deep_reflection)>0: | |
| for area in area_of_deep_reflection: | |
| self.add_life_score_point(variable = area, points_added = 5, notes = f"Doing a deep reflection about {area}") | |
| self.add_recent_wins(wins = f"You have done a deep reflection about your {area}!", context = 'Deep reflection') | |
| def add_point_for_booking(self): | |
| self.add_life_score_point(variable = self.get_current_goal(full=True).area, points_added = 5, notes = "Booking a GG session") | |
| self.add_recent_wins(wins = "You have booked a Growth Guide session!", context = "Growth Guide is a life coach") | |
| def add_point_for_completing_session(self): | |
| self.add_life_score_point(variable = self.get_current_goal(full=True).area, points_added = 20, notes = "Completing a GG session") | |
| self.add_recent_wins(wins = "You have completed a Growth Guide session!", context = "Growth Guide is a life coach") | |
| def build_ourcoach_report(self, overview, action_plan, gg_session_notes): | |
| logger.info(f"Building ourcoach report", extra={"user_id": self.user_id, "endpoint": "build_ourcoach_report"}) | |
| ourcoach_report = {'overview': overview['overview'], 'action_plan': action_plan, 'others': gg_session_notes} | |
| return ourcoach_report | |
| def process_growth_guide_session(self, session_data, booking_id): | |
| logger.info(f"Processing growth guide session data: {session_data}", extra={"user_id": self.user_id, "endpoint": "process_growth_guide_session"}) | |
| self.last_gg_session = booking_id | |
| # Generate the ourcoach_report (summary) | |
| zoom_ai_summary = session_data["zoom_ai_summary"] | |
| gg_report = session_data["gg_report"] | |
| overview = self._summarize_zoom(zoom_ai_summary) | |
| # Update user data based on growth guide answers | |
| self.update_user_data(gg_report) | |
| self.update_user_info(gg_report[5]['answer'] + "\n\n" + overview['overview']) | |
| # build ourcoach_report | |
| ourcoach_report = self.build_ourcoach_report(overview, list(map(lambda x : x.content, self.recommended_micro_actions)), gg_report[5]['answer']) | |
| # add this report to the db | |
| update_growth_guide_summary(self.user_id, booking_id, ourcoach_report) | |
| # Send hidden message to AI to generate the response | |
| logger.info(f"Sending hidden message to AI to generate response", extra={"user_id": self.user_id, "endpoint": "process_growth_guide_session"}) | |
| # post_gg_prompt = POST_GG_STATE.format(self.user_info, ourcoach_report, gg_report) | |
| post_gg_promt = f"""The user: {self.user_info} has completed their growth guide session. | |
| Send them a warm and congratulatory message acknowledging this and a link to their web dasboard. Only include the link once in the message. | |
| Capitalise the first letter in Revelation Dashboard and Growth Guide. Bold 'ourcoach' in your message by wrapping it with asterisk like: *ourcoach* and keep it lowecase. | |
| Example: | |
| Hey <user>! ..., you can find a details of your session on your Revelation Dashboard: {OURCOACH_DASHBOARD_URL}""" | |
| response = self.conversations._send_morning_message(post_gg_promt) | |
| logger.info(f"Response: {response}", extra={"user_id": self.user_id, "endpoint": "process_growth_guide_session"}) | |
| return response | |
| def ask_to_schedule_growth_guide_reminder(self, date): | |
| prompt = f""" ** The user has scheduled a Growth Guide session for {date} (current date: {self.conversations.state['date']}) **\n\nFirstly, greet the user warmly and excitedly and let them know that they have succesfully booked their Growth Guide session. | |
| Then, ask the user if they would like a reminder for the Growth Guide session. If they would like a reminder, create a new reminder 1 hour before their scheduled session.""" | |
| response = self.conversations._send_morning_message(prompt) | |
| self.add_ai_message("[hidden]" + prompt) | |
| self.add_ai_message(response['content']) | |
| logger.info(f"Response: {response}", extra={"user_id": self.user_id, "endpoint": "process_growth_guide_session"}) | |
| return response | |
| def infer_memento_follow_ups(self): | |
| mementos_path = os.path.join("mementos", "to_upload", f"{self.user_id}", "*.json") | |
| # mementos_path = f"mementos/to_upload/{self.user_id}/*.json" | |
| for file_path in glob.glob(mementos_path): | |
| with open(file_path, 'r+') as file: | |
| data = json.load(file) | |
| infered_follow_up = self._infer_follow_ups(data['created'], data['context']) | |
| logger.info(f"[Infered Follow Up]: {infered_follow_up}", extra={"user_id": self.user_id, "endpoint": "infer_memento_follow_ups"}) | |
| data['follow_up_on'] = infered_follow_up | |
| file.seek(0) | |
| json.dump(data, file, indent=4) | |
| file.truncate() | |
| return True | |
| def get_daily_messages(self): | |
| return self.conversations.get_daily_thread() | |
| def change_assistant(self, asst_id): | |
| self.asst_id = asst_id | |
| self.conversations.assistants['general'] = Assistant(self.asst_id, self.conversations) | |
| return self | |
| def __getstate__(self): | |
| state = self.__dict__.copy() | |
| if 'client' in state: | |
| del state['client'] | |
| return state | |
| def __setstate__(self, state): | |
| self.__dict__.update(state) | |
| self.client = None | |
| def __str__(self): | |
| return f"""User(user_id={self.user_id} | |
| micro_actions={self.micro_actions} | |
| recommended_actions={self.recommended_micro_actions} | |
| challenge={self.challenges} | |
| other_focusses={self.other_focusses} | |
| goals={self.goal} | |
| done_first_reflection={self.done_first_reflection} | |
| conversations={self.conversations}) | |
| user_info={self.user_info}""" | |
| def __repr__(self): | |
| return f"""User(user_id={self.user_id} | |
| micro_actions={self.micro_actions} | |
| recommended_actions={self.recommended_micro_actions} | |
| challenge={self.challenges} | |
| other_focusses={self.other_focusses} | |
| goals={self.goal} | |
| done_first_reflection={self.done_first_reflection} | |
| conversations={self.conversations}) | |
| user_info={self.user_info}""" | |
| def refresh(self, client): | |
| # copy user by creating new user object | |
| user = User(self.user_id, self.user_info, client, "asst_07u7sucvSXGJOjcjXr5EL6nD", self.user_interaction_guidelines) | |
| user.conversations = self.conversations.clone(client) | |
| if len(user.get_messages()) >= 1: | |
| user.done_first_reflection = True | |
| else: | |
| user.done_first_reflection = None | |
| # user.add_ai_message("** You are now in the Idle State **") | |
| return user | |
| def save_user(self): | |
| # Construct the file path dynamically for cross-platform compatibility | |
| file_path = os.path.join("users", "to_upload", f"{self.user_id}.pkl") | |
| # Ensure the directory exists | |
| os.makedirs(os.path.dirname(file_path), exist_ok=True) | |
| # Save the user object as a pickle file | |
| with open(file_path, 'wb') as file: | |
| pickle.dump(self, file) | |
| return file_path | |
| def load_user(user_id, client): | |
| # Construct the file path dynamically for cross-platform compatibility | |
| file_path = os.path.join("users", "data", f"{user_id}.pkl") | |
| # Load the user object from the pickle file | |
| with open(file_path, 'rb') as file: | |
| user = pickle.load(file) | |
| user.client = client # Re-attach the client object | |
| user.conversations.client = client # Re-attach the client to conversations | |
| return user | |
| class CircularQueue: | |
| def __init__(self, array, user_id): | |
| if not array: | |
| raise ValueError("Array cannot be empty") | |
| self.user_id = user_id | |
| self.array = array | |
| self.size = len(array) | |
| self.index = 0 # Tracks the current position in the array | |
| def __getstate__(self): | |
| state = self.__dict__.copy() | |
| return state | |
| def __setstate__(self, state): | |
| self.__dict__.update(state) | |
| def next(self): | |
| # Get the next element and advance the index | |
| element = self.array[self.index] | |
| logger.info(f"[Reflection Topic] [Previous]: {self.array[self.index-1] if self.index else 'None'} -> [Current]: {element}", extra={"user_id": self.user_id, "endpoint": "circular_queue_next"}) | |
| self.index = (self.index + 1) % self.size # Wrap around to 0 when the end is reached | |
| return element | |
| def current(self): | |
| return self.array[self.index] | |
| def previous(self): | |
| return self.array[self.index - 1] | |
| def reset(self): | |
| self.index = 0 | |
| def __str__(self): | |
| elements = [ | |
| f"[{item}]" if i == self.index else str(item) | |
| for i, item in enumerate(self.array) | |
| ] | |
| return f"CircularQueue: {' -> '.join(elements)}" | |
| def __repr__(self): | |
| elements = [ | |
| f"[{item}]" if i == self.index else str(item) | |
| for i, item in enumerate(self.array) | |
| ] | |
| return f"CircularQueue: {' -> '.join(elements)}" |