Spaces:
Sleeping
Sleeping
| import json | |
| import io | |
| import os | |
| import openai | |
| import pandas as pd | |
| from datetime import datetime, timezone | |
| import json | |
| from app.assistants import Assistant | |
| import glob | |
| import pickle # Replace dill with pickle | |
| import random | |
| import logging | |
| import psycopg2 | |
| from psycopg2 import sql | |
| from app.conversation_manager import ConversationManager | |
| from app.exceptions import BaseOurcoachException, OpenAIRequestError, UserError | |
| from app.flows import FINAL_SUMMARY_STATE, FINAL_SUMMARY_STATE, MICRO_ACTION_STATE, MOTIVATION_INSPIRATION_STATE, OPEN_DISCUSSION_STATE, POST_GG_STATE, PROGRESS_REFLECTION_STATE, PROGRESS_SUMMARY_STATE, EDUCATION_STATE, FOLLUP_ACTION_STATE, FUNFACT_STATE | |
| from pydantic import BaseModel | |
| from datetime import datetime | |
| from app.utils import generate_uuid, update_growth_guide_summary | |
| import dotenv | |
| import re | |
| dotenv.load_dotenv() | |
| OURCOACH_DASHBOARD_URL = os.getenv("OURCOACH_DASHBOARD_URL") | |
| class UserDataItem(BaseModel): | |
| role: str | |
| content: str | |
| user_id: str | |
| status: str | |
| created_at: str | |
| updated_at: str | |
| area: str | |
| class UserDataResponse(BaseModel): | |
| data: list[UserDataItem] | |
| class Index(BaseModel): | |
| value: int | |
| logger = logging.getLogger(__name__) | |
| def get_current_datetime(): | |
| return datetime.now(timezone.utc) | |
| class User: | |
| def catch_error(func): | |
| def wrapper(self, *args, **kwargs): | |
| try: | |
| return func(self, *args, **kwargs) | |
| except (BaseOurcoachException, openai.BadRequestError) as e: | |
| raise e | |
| except openai.BadRequestError as e: | |
| raise OpenAIRequestError(user_id=self.user_id, message="OpenAI Request Error", e=str(e)) | |
| except Exception as e: | |
| # Handle other exceptions | |
| logger.error(f"An unexpected error occurred in User: {e}") | |
| raise UserError(user_id=self.user_id, message="Unexpected error in User", e=str(e)) | |
| return wrapper | |
| def __init__(self, user_id, user_info, client, asst_id): | |
| self.user_id = user_id | |
| self.client = client | |
| self.asst_id = asst_id | |
| self.user_info = user_info | |
| self.done_first_reflection = None | |
| self.goal = [] | |
| self.last_gg_session = None | |
| self.micro_actions = [] | |
| self.recommended_micro_actions = [] | |
| self.challenges = [] | |
| self.other_focusses = [] | |
| self.personal_growth_score = 0 | |
| self.career_growth_score = 0 | |
| self.relationship_score = 0 | |
| self.mental_well_being_score = 0 | |
| self.health_and_wellness_score = 0 | |
| self.reminders = None | |
| self.recent_wins = [] | |
| # Read growth_plan.json and store it | |
| # TESTING PURPOSE | |
| growth_plan = {"growthPlan": [ | |
| { | |
| "day": 1, | |
| "coachingTheme": "MICRO_ACTION_STATE" | |
| }, | |
| { | |
| "day": 2, | |
| "coachingTheme": "FOLLUP_ACTION_STATE" | |
| }, | |
| { | |
| "day": 3, | |
| "coachingTheme": "OPEN_DISCUSSION_STATE" | |
| }, | |
| { | |
| "day": 4, | |
| "coachingTheme": "FUNFACT_STATE" | |
| }, | |
| { | |
| "day": 5, | |
| "coachingTheme": "FINAL_SUMMARY_STATE" | |
| } | |
| ] | |
| } | |
| self.growth_plan = CircularQueue(array=growth_plan['growthPlan'], user_id=self.user_id) | |
| logger.info(f"User Growth Plan: {self.growth_plan} (Day: {self.growth_plan.current()['day']}/{len(self.growth_plan.array)})", extra={"user_id": self.user_id, "endpoint": "user_init"}) | |
| self.user_interaction_guidelines = self.generate_user_interaction_guidelines(user_info, client) | |
| self.conversations = ConversationManager(client, self, asst_id) | |
| def extend_growth_plan(self): | |
| # Change current growth plan to 14d growth plan | |
| logger.info(f"Changing plan to 14d...", extra={"user_id": self.user_id, "endpoint": "extend_growth_plan"}) | |
| new_growth_plan = {"growthPlan": [ | |
| { | |
| "day": 1, | |
| "coachingTheme": "MICRO_ACTION_STATE" | |
| }, | |
| { | |
| "day": 2, | |
| "coachingTheme": "FOLLUP_ACTION_STATE" | |
| }, | |
| { | |
| "day": 3, | |
| "coachingTheme": "OPEN_DISCUSSION_STATE" | |
| }, | |
| { | |
| "day": 4, | |
| "coachingTheme": "MICRO_ACTION_STATE" | |
| }, | |
| { | |
| "day": 5, | |
| "coachingTheme": "FOLLUP_ACTION_STATE" | |
| }, | |
| { | |
| "day": 6, | |
| "coachingTheme": "FUNFACT_STATE" | |
| }, | |
| { | |
| "day": 7, | |
| "coachingTheme": "PROGRESS_REFLECTION_STATE" | |
| }, | |
| { | |
| "day": 8, | |
| "coachingTheme": "MICRO_ACTION_STATE" | |
| }, | |
| { | |
| "day": 9, | |
| "coachingTheme": "FOLLUP_ACTION_STATE" | |
| }, | |
| { | |
| "day": 10, | |
| "coachingTheme": "OPEN_DISCUSSION_STATE" | |
| }, | |
| { | |
| "day": 11, | |
| "coachingTheme": "MICRO_ACTION_STATE" | |
| }, | |
| { | |
| "day": 12, | |
| "coachingTheme": "FOLLUP_ACTION_STATE" | |
| }, | |
| { | |
| "day": 13, | |
| "coachingTheme": "FUNFACT_STATE" | |
| }, | |
| { | |
| "day": 14, | |
| "coachingTheme": "FINAL_SUMMARY_STATE" | |
| } | |
| ] | |
| } | |
| self.growth_plan = CircularQueue(array=new_growth_plan['growthPlan'], user_id=self.user_id) | |
| logger.info(f"User Growth Plan: {self.growth_plan} (Day: {self.growth_plan.current()['day']}/{len(self.growth_plan.array)})", extra={"user_id": self.user_id, "endpoint": "user_init"}) | |
| logger.info(f"Success.", extra={"user_id": self.user_id, "endpoint": "extend_growth_plan"}) | |
| return True | |
| def add_recent_wins(self, wins, context = None): | |
| prompt = f""" | |
| ## Role | |
| You are an expert in writing achievement message and progress notification. Your task is to use the user's achievement and context to formulate a short and creative achievement message/progress notification. The output must be a one sentence short message (less than 15 words) in this JSON output schema: | |
| ```json | |
| {{ | |
| achievement_message: str | |
| }} | |
| ``` | |
| Note: No need to mention the user's name. Make it concise (less than 15 words) | |
| ## Example | |
| User's achievement: Completing a Task | |
| Achievement context: The user has completed a 10k run | |
| Output: | |
| ``` | |
| {{ | |
| achievement_message: You crushed it! Completing that 10k run is a huge milestone—way to go! | |
| }} | |
| ``` | |
| ## User Input | |
| User's achievement: {wins} | |
| Achievement context: {context} | |
| """ | |
| response = self.client.chat.completions.create( | |
| model="gpt-4o", | |
| messages=[{"role": "user", "content": prompt}], | |
| response_format = { | |
| "type": "json_schema", | |
| "json_schema": { | |
| "name": "achievement_message_schema", | |
| "strict": True, | |
| "schema": { | |
| "type": "object", | |
| "properties": { | |
| "achievement_message": { | |
| "type": "string", | |
| "description": "A message indicating an achievement." | |
| } | |
| }, | |
| "required": [ | |
| "achievement_message" | |
| ], | |
| "additionalProperties": False | |
| } | |
| } | |
| }, | |
| temperature=1 | |
| ) | |
| achievement_message = json.loads(response.choices[0].message.content)['achievement_message'] | |
| if len(self.recent_wins)<5: | |
| self.recent_wins.insert(0,achievement_message) | |
| else: | |
| self.recent_wins.pop() | |
| self.recent_wins.insert(0,achievement_message) | |
| def add_life_score_point(self, variable, points_added, notes): | |
| if variable == 'Personal Growth': | |
| self.personal_growth_score += points_added | |
| logger.info(f"Added {points_added} points to Personal Growth for {notes}", extra={"user_id": self.user_id, "endpoint": "add_life_score_point"}) | |
| elif variable == 'Career Growth': | |
| self.career_growth_score += points_added | |
| logger.info(f"Added {points_added} points to Career Growth for {notes}", extra={"user_id": self.user_id, "endpoint": "add_life_score_point"}) | |
| elif variable == 'Health and Wellness': | |
| self.health_and_wellness_score += points_added | |
| logger.info(f"Added {points_added} points to Health and Wellness for {notes}", extra={"user_id": self.user_id, "endpoint": "add_life_score_point"}) | |
| elif variable == 'Mental Well-Being': | |
| self.mental_well_being_score += points_added | |
| logger.info(f"Added {points_added} points to Mental Well Being for {notes}", extra={"user_id": self.user_id, "endpoint": "add_life_score_point"}) | |
| elif variable == 'Relationship': | |
| self.relationship_score += points_added | |
| logger.info(f"Added {points_added} points to Relationship for {notes}", extra={"user_id": self.user_id, "endpoint": "add_life_score_point"}) | |
| def get_current_goal(self, full=False): | |
| # look for most recent goal with status = ONGOING | |
| for goal in self.goal[::-1]: | |
| if goal.status == "ONGOING": | |
| if full: | |
| return goal | |
| return goal.content | |
| else: | |
| if self.goal: | |
| if full: | |
| return self.goal[-1] | |
| return self.goal[-1].content | |
| return None | |
| def update_goal(self, goal, status, content=None): | |
| if goal is None: | |
| # complete the current goal | |
| current_goal = self.get_current_goal(full=True) | |
| if current_goal: | |
| current_goal.status = "COMPLETED" | |
| current_goal.updated_at = pd.Timestamp.now(tz='UTC').strftime("%d-%m-%Y %a %H:%M:%S") | |
| return current_goal.content | |
| for g in self.goal: | |
| if g.content == goal: | |
| g.status = status | |
| g.updated_at = pd.Timestamp.now(tz='UTC').strftime("%d-%m-%Y %a %H:%M:%S") | |
| if content: | |
| g.content = content | |
| return True | |
| return False | |
| def set_goal(self, goal, goal_area, add=True, completed=False): | |
| current_goal = self.get_current_goal() | |
| if completed: | |
| self.update_goal(current_goal, "COMPLETED") | |
| self.add_life_score_point(variable = goal_area, points_added = 30, notes = "Completing a Goal") | |
| self.add_recent_wins(wins = "You have completed your goal!", context = current_goal) | |
| if current_goal is None: | |
| new_goal = UserDataItem(role="assistant", content=goal, area=goal_area, user_id=self.user_id, status="ONGOING", created_at=pd.Timestamp.now(tz='UTC').strftime("%d-%m-%Y %a %H:%M:%S"), updated_at=pd.Timestamp.now(tz='UTC').strftime("%d-%m-%Y %a %H:%M:%S")) | |
| self.goal.append(new_goal) | |
| self.add_life_score_point(variable = goal_area, points_added = 10, notes = "Setting a Goal") | |
| self.add_recent_wins(wins = "You have set your first goal!", context = new_goal.content) | |
| else: | |
| if add: | |
| if current_goal: | |
| # update current_goal status to "IDLE" | |
| self.update_goal(current_goal, "IDLE") | |
| new_goal = UserDataItem(role="assistant", content=goal, area=goal_area, user_id=self.user_id, status="ONGOING", created_at=pd.Timestamp.now(tz='UTC').strftime("%d-%m-%Y %a %H:%M:%S"), updated_at=pd.Timestamp.now(tz='UTC').strftime("%d-%m-%Y %a %H:%M:%S")) | |
| self.goal.append(new_goal) | |
| self.add_life_score_point(variable = goal_area, points_added = 10, notes = "Setting a Goal") | |
| self.add_recent_wins(wins = "You have set a new goal!", context = new_goal.content) | |
| else: | |
| self.update_goal(current_goal, "ONGOING", content=goal) | |
| def update_recommended_micro_action_status(self, micro_action, status): | |
| for ma in self.recommended_micro_actions: | |
| if ma.content == micro_action: | |
| ma.status = status | |
| ma.updated_at = pd.Timestamp.now(tz='UTC').strftime("%d-%m-%Y %a %H:%M:%S") | |
| return True | |
| return False | |
| def add_ai_message(self, text): | |
| self.conversations._add_ai_message(text) | |
| return text | |
| def reset_conversations(self): | |
| self.conversations = ConversationManager(self.client, self, self.asst_id) | |
| self.growth_plan.reset() | |
| self.done_first_reflection = None | |
| self.goal = [] | |
| self.micro_actions = [] | |
| self.recommended_micro_actions = [] | |
| self.challenges = [] | |
| self.other_focusses = [] | |
| self.personal_growth_score = 0 | |
| self.career_growth_score = 0 | |
| self.relationship_score = 0 | |
| self.mental_well_being_score = 0 | |
| self.health_and_wellness_score = 0 | |
| self.reminders = None | |
| self.recent_wins = [] | |
| def get_last_user_message(self): | |
| # find the last message from 'role': 'user' in the conversation history | |
| messages = self.conversations._get_current_thread_history(remove_system_message=False) | |
| for msg in messages[::-1]: | |
| if msg['role'] == 'user': | |
| return msg['content'] | |
| def generate_user_interaction_guidelines(self, user_info, client): | |
| logger.info(f"Generating user interaction guidelines for user: {self.user_id}", extra={"user_id": self.user_id, "endpoint": "generate_user_interaction_guidelines"}) | |
| # prompt = f"A 'profile' is a document containing rich insights on users for the purpose of \ | |
| # providing contexts to LLMs. Based on the user's information, generate a \ | |
| # user summary that describes how best to interact with this user to create a personalized \ | |
| # and targeted chat experience. The user summary MUST strictly contain these parts:\ | |
| # What kind of coaching style & tone that the user possibly prefers based on their...\ | |
| # 1. Based on the user's chosen 'Legend Persona'\ | |
| # 2. Based on the user's age\ | |
| # 3. Based on the user's MBTI\ | |
| # 4. Based on the user's Love Language\ | |
| # 5. Based on the user's experience of trying coaching previously\ | |
| # 6. Based on the user's belief in Astrology\ | |
| # Generate a 6-point user summary based on the following \ | |
| # user information:\n\n{user_info}" | |
| # response = client.chat.completions.create( | |
| # model="gpt-4o-mini", | |
| # messages=[ | |
| # {"role": "system", "content": "You are an expert at building profile documents containing rich user insights."}, | |
| # {"role": "user", "content": prompt} | |
| # ], | |
| # temperature=0.2 | |
| # ) | |
| # user_guideline = f""" | |
| # {user_info}\n\n | |
| # ### INTERACTION GUIDELINE ### \n | |
| # {response.choices[0].message.content} | |
| # """ | |
| user_guideline = user_info | |
| return user_guideline | |
| def get_recent_run(self): | |
| return self.conversations.assistants['general'].recent_run | |
| def cancel_run(self, run, thread=None): | |
| logger.info(f"(user) Cancelling run: {run}", extra={"user_id": self.user_id, "endpoint": "cancel_run"}) | |
| self.conversations.cancel_run(run, thread) | |
| def update_conversation_state(self, stage, last_interaction): | |
| self.conversation_state['stage'] = stage | |
| self.conversation_state['last_interaction'] = last_interaction | |
| def _get_current_thread(self): | |
| return self.conversations.current_thread | |
| def send_message(self, text): | |
| response, run = self.conversations._run_current_thread(text) | |
| message = run.metadata.get("message", "No message") | |
| logger.info(f"Message: {message}", extra={"user_id": self.user_id, "endpoint": "user_send_message"}) | |
| if message == "start_now": | |
| # must do current plan now | |
| action = self.growth_plan.current() | |
| logger.info(f"Current Action: {action}", extra={"user_id": self.user_id, "endpoint": "user_send_message"}) | |
| response, prompt = self.do_theme(action['coachingTheme'], self.conversations.state['date'], action['day']) | |
| # add response to ai message | |
| self.add_ai_message("[hidden]" + prompt) | |
| self.add_ai_message(response['content']) | |
| # Move to the next action | |
| self.growth_plan.next() | |
| elif message == "change_goal": | |
| # send the change goal prompt | |
| logger.info("Sending change goal message...", extra={"user_id": self.user_id, "endpoint": "user_send_message"}) | |
| prompt = f""" | |
| I want to change my goal! Based on my information below, suggest me a new goal, and **ONLY** if i approve, call the create_smart_goal() function | |
| Previous Goal: | |
| {self.get_current_goal()} | |
| Profile: | |
| {self.user_info} | |
| """ | |
| response = self.conversations._send_morning_message(prompt) | |
| logger.info(f"Change Goal Response: {response}", extra={"user_id": self.user_id, "endpoint": "user_send_message"}) | |
| # add response to ai message | |
| self.add_ai_message(response['content']) | |
| # reset the growth_plan | |
| self.growth_plan.reset() | |
| logger.info(f"Current reminders: {self.reminders}", extra={"user_id": self.user_id, "endpoint": "user_send_message"}) | |
| if self.reminders is not None and len(self.reminders): | |
| # response['reminders'] = all reminders which date is today (so all the reminders that BE has to queue today) | |
| date = pd.to_datetime(self.conversations.state['date']).date() | |
| response['reminders'] = self.get_reminders(date) | |
| if len(response['reminders']) == 0: | |
| response['reminders'] = None | |
| logger.info(f"No reminders for today {date}", extra={"user_id": self.user_id, "endpoint": "user_send_message"}) | |
| logger.info(f"Returning reminders: {response['reminders']}", extra={"user_id": self.user_id, "endpoint": "user_send_message"}) | |
| else: | |
| response['reminders'] = None | |
| logger.info(f"Response: {response}", extra={"user_id": self.user_id, "endpoint": "user_send_message"}) | |
| return response | |
| def get_reminders(self, date=None): | |
| if self.reminders is None: | |
| return [] | |
| if date: | |
| if isinstance(date, str): | |
| # date might be in the format "%d-%m-%Y %a %H:%M:%S" or "%Y-%m-%d" | |
| try: | |
| datetime.strptime(date, "%d-%m-%Y %a %H:%M:%S") | |
| except ValueError: | |
| date = datetime.strptime(date, "%Y-%m-%d").date() | |
| elif isinstance(date, datetime): | |
| date = date.date() | |
| return [reminder for reminder in self.reminders if reminder['timestamp'].date() == date] | |
| return self.reminders | |
| def find_same_reminder(self, reminder_text): | |
| logger.info(f"Finding similar reminders: {self.reminders} to: {reminder_text}", extra={"user_id": self.user_id, "endpoint": "find_same_reminder"}) | |
| response = self.client.beta.chat.completions.parse( | |
| model="gpt-4o", | |
| messages=[ | |
| {"role": "system", "content": "You are an expert at understanding the context of texts"}, | |
| {"role": "user", "content": f"Identify the reminder in {self.reminders if self.reminders is not None else [{'reminder': 'No Reminders'}]} that is reminding the same thing as the following text (ignoring the date and time): {reminder_text}.\n\nIf none, return -1 otherwise return the index of the matching reminder."} | |
| ], | |
| response_format=Index, | |
| temperature=0.2 | |
| ) | |
| logger.info(f"Similar reminder response: {response.choices[0].message.parsed}", extra={"user_id": self.user_id, "endpoint": "find_same_reminder"}) | |
| index = getattr(response.choices[0].message.parsed, 'value', -1) | |
| logger.info(f"Similar reminder idx: reminders[{index}]", extra={"user_id": self.user_id, "endpoint": "find_same_reminder"}) | |
| return index | |
| def set_reminder(self, reminder): | |
| db_params = { | |
| 'dbname': 'ourcoach', | |
| 'user': 'ourcoach', | |
| 'password': 'hvcTL3kN3pOG5KteT17T', | |
| 'host': 'staging-ourcoach.cx8se8o0iaiy.ap-southeast-1.rds.amazonaws.com', | |
| 'port': '5432' | |
| } | |
| with psycopg2.connect(**db_params) as conn: | |
| with conn.cursor() as cursor: | |
| query = sql.SQL("SELECT * FROM {table} WHERE id = %s").format(table=sql.Identifier('public', 'users')) | |
| cursor.execute(query, (self.user_id,)) | |
| row = cursor.fetchone() | |
| if (row): | |
| colnames = [desc[0] for desc in cursor.description] | |
| user_data = dict(zip(colnames, row)) | |
| user_timezone = user_data['timezone'] | |
| # Convert to UTC | |
| reminder['timestamp_local'] = reminder['timestamp'] | |
| reminder['local_timezone'] = user_timezone | |
| reminder['timestamp'] = reminder['timestamp'].tz_localize(user_timezone).tz_convert("UTC") | |
| # generate uuid for this reminder | |
| reminder['id'] = generate_uuid() | |
| action = reminder['action'] | |
| if self.reminders is None: | |
| self.reminders = [] | |
| if action == 'update': | |
| index = self.find_same_reminder(reminder['reminder']) | |
| if index == -1: | |
| self.reminders.append(reminder) | |
| logger.info(f"Reminder added: {reminder}", extra={"user_id": self.user_id, "endpoint": "set_reminder"}) | |
| else: | |
| old_reminder = self.reminders[index] | |
| reminder['id'] = old_reminder['id'] | |
| self.reminders[index] = reminder | |
| logger.info(f"Reminder {old_reminder} -- updated to --> {reminder}", extra={"user_id": self.user_id, "endpoint": "set_reminder"}) | |
| elif action == 'delete': | |
| logger.info('Deleting reminder', extra={"user_id": self.user_id, "endpoint": "set_reminder"}) | |
| index = self.find_same_reminder(reminder['reminder']) | |
| if index == -1: | |
| logger.info(f"Could not find a mathcing reminder to delete: {reminder}", extra={"user_id": self.user_id, "endpoint": "set_reminder"}) | |
| else: | |
| old_reminder = self.reminders[index] | |
| self.reminders[index]['action'] = 'delete' | |
| logger.info(f"Reminder {old_reminder} has been marked for deletion", extra={"user_id": self.user_id, "endpoint": "set_reminder"}) | |
| else: | |
| # action is 'set' | |
| self.reminders.append(reminder) | |
| logger.info(f"Reminder added: {reminder}", extra={"user_id": self.user_id, "endpoint": "set_reminder"}) | |
| logger.info(f"Reminders: {self.reminders}", extra={"user_id": self.user_id, "endpoint": "set_reminder"}) | |
| def get_messages(self, exclude_system_msg=True, show_hidden=False): | |
| if not exclude_system_msg: | |
| return self.conversations._get_current_thread_history(False) | |
| else: | |
| if show_hidden: | |
| return list(filter(lambda x: not (x['content'].startswith("** It is a new day:") or x['content'].startswith("Pay attention to the current state you are in") or x['content'].startswith("Date changed to")), self.conversations._get_current_thread_history(exclude_system_msg))) | |
| return list(filter(lambda x: not (x['content'].startswith("** It is a new day:") or x['content'].startswith("Pay attention to the current state you are in") or x['content'].startswith("Date changed to") or x['content'].startswith("[hidden]")), self.conversations._get_current_thread_history(exclude_system_msg))) | |
| def set_intro_done(self): | |
| self.conversations.intro_done = True | |
| def do_theme(self, theme, date, day): | |
| logger.info(f"Doing theme: {theme}", extra={"user_id": self.user_id, "endpoint": "do_theme"}) | |
| if self.reminders is not None and len(self.reminders): | |
| logger.info(f"ALL Upcoming Reminders: {self.reminders}", extra={"user_id": self.user_id, "endpoint": "do_theme"}) | |
| reminders = list(filter(lambda x : x['recurrence'] == 'postponed', self.reminders)) | |
| logger.info(f"ALL Postponed Reminders: {reminders}", extra={"user_id": self.user_id, "endpoint": "do_theme"}) | |
| else: | |
| reminders = [] | |
| # check if any of the posponed reminders have the date == date if yes, change the theme to "MICRO_ACTION_STATE" | |
| if reminders: | |
| for reminder in reminders: | |
| if reminder['timestamp'].date() == pd.to_datetime(date).date(): | |
| logger.info(f"Reminder found for today ({pd.to_datetime(date).date()}): {reminder}", extra={"user_id": self.user_id, "endpoint": "do_theme"}) | |
| if theme != "FINAL_SUMMARY_STATE": | |
| theme = "MICRO_ACTION_STATE" | |
| break | |
| else: | |
| logger.info(f"No reminders found for today ({pd.to_datetime(date).date()})", extra={"user_id": self.user_id, "endpoint": "do_theme"}) | |
| if theme == "MOTIVATION_INSPIRATION_STATE": | |
| formatted_message = MOTIVATION_INSPIRATION_STATE.format(self.get_current_goal(), day, len(self.growth_plan.array)) | |
| elif theme == "PROGRESS_REFLECTION_STATE": | |
| formatted_message = PROGRESS_REFLECTION_STATE.format(self.get_current_goal(), day, len(self.growth_plan.array)) | |
| if len(self.challenges): | |
| challenge = self.challenges.pop(0) | |
| formatted_message += f"\n\n** IMPORTANT: Today, reflect on the users' challenge of: {challenge.content}, which they brought up during their growth guide session (let the user know we are bringing it up because of this) **" | |
| elif theme == "MICRO_ACTION_STATE": | |
| reminder_message = "\n".join([f"{i+1}. {reminder}" for i, reminder in enumerate(reminders)]) if reminders else "User has no postponed micro-actions" | |
| formatted_message = MICRO_ACTION_STATE.format(self.get_current_goal(), day, len(self.growth_plan.array), reminder_message) | |
| if len(self.recommended_micro_actions): | |
| todays_micro_action = self.recommended_micro_actions.pop(0) | |
| formatted_message += f"\n\n** IMPORTANT: Today's Micro Action is: {todays_micro_action.content}, which was recommended during their growth guide session (let the user know we are bringing it up because of this) **" | |
| elif theme == "OPEN_DISCUSSION_STATE": | |
| formatted_message = OPEN_DISCUSSION_STATE.format(self.get_current_goal(), day, len(self.growth_plan.array)) | |
| if len(self.other_focusses): | |
| focus = self.other_focusses.pop(0) | |
| formatted_message += f"\n\n** IMPORTANT: Today, focus the discussion on: {focus.content}, which they discussed during their growth guide session (let the user know we are bringing it up because of this) **" | |
| elif theme == "PROGRESS_SUMMARY_STATE": | |
| formatted_message = PROGRESS_SUMMARY_STATE.format(self.get_current_goal(), day, len(self.growth_plan.array)) | |
| elif theme == "FINAL_SUMMARY_STATE": | |
| formatted_message = FINAL_SUMMARY_STATE.format(self.get_current_goal(), day, len(self.growth_plan.array)) | |
| elif theme == "EDUCATION_STATE": | |
| formatted_message = EDUCATION_STATE.format(self.get_current_goal(), day, len(self.growth_plan.array)) | |
| elif theme == "FOLLUP_ACTION_STATE": | |
| reminder_message = "\n".join([f"{i+1}. {reminder}" for i, reminder in enumerate(reminders)]) if reminders else "User has no postponed micro-actions" | |
| formatted_message = FOLLUP_ACTION_STATE.format(self.get_current_goal(), day, len(self.growth_plan.array), reminder_message) | |
| elif theme == "FUNFACT_STATE": | |
| topics = ["Fun Fact about the User's Goal", "How Personality Type is affecting/shaping their behaviour towards the goal", "How Love Language may impact and be relevant toward the goal"] | |
| randomized_topic = random.choice(topics) | |
| formatted_message = FUNFACT_STATE.format(self.get_current_goal(), day, len(self.growth_plan.array), randomized_topic) | |
| # prompt = f"""** It is a new day: {date} ** | |
| # Additional System Instruction: | |
| # - Remember all of the user's personal information. Use this information to be as personalised as possible when conversing by including it in your responses where relevant. | |
| # - You are a good life coach because you don't overwhelm the user by sending too many questions or lengthy messages. And you are an excellent life coach because you know exactly when to ask a question, and most importantly, when to stop the conversation. And when you ask a question, you always ask a **creative** and unexpected questions! | |
| # - Keep all of your response short, only 40 tokens or words maximum! If your response contains a question, use a single break line before the question and encapsulate the question with one asterisk like this: *question* | |
| # - Be **creative** ! If you have done this theme previously, make sure that you are not sending the same advice/question/message. Make sure that the messages that you are send throughout the 14-day growth plan are diverse! | |
| # - When you give your wisdom and enlightment, **YOU** as a coach, must channel the energy, wisdom, and mindset of {user_legendary_persona} | |
| # Today's Theme: | |
| # {formatted_message} | |
| # """ | |
| prompt = f"""** It is a new day: {date} ({day}) 10:00:00 ** | |
| (If the day is a public holiday (e.g., Christmas, New Year, or other significant occasions), customize your message to reflect the context appropriately, acknowledging the holiday or its significance.) | |
| Note: If the user has not answered your question yesterday, you must say something like (but warmer) "Hey, you didn't answer my question. Do you still want to continue?" | |
| If the user says "no", then ask if they want to set a new goal (therefore later, call the change_goal() function) | |
| But if the user says "yes", then proceed to the theme below. | |
| Otherwise, you may directly proceed to today's theme below without asking the user. | |
| Additional System Instruction: | |
| ** Always remember to incorporate your personality (based on your persona) into all of your responses. ** | |
| 1. Focus on giving more wisdom than questions: | |
| - Provide assertive guidance, encouragement, validation, and different viewpoints. | |
| - Be critical and challenge the user's arguments as needed. | |
| - Avoid asking too many questions. | |
| - Offer strategies instead of asking for the user's strategies. | |
| 2. Keep messages concise & use Whatsapp texting length: | |
| - Use natural, casual language that's easy to understand. | |
| 3. Limit interaction to three messages per day (until the user says "it is a new day"): | |
| - Send each message one at a time. | |
| - Warmly end the daily interaction in the third message. | |
| 4. Include zero or one question per message (optional): | |
| - If including a question, place it after a line break and encapsulate with one asterisk: *question* | |
| 5. Ensure messages are personalized and creative: | |
| - Enrich content with user information. | |
| - Avoid being a "boring" coach with generic questions or advice. | |
| 6. Channel the energy, wisdom, and mindset of your persona: | |
| - Reiterate your famous sayings. | |
| - Explain your thinking philosophy in a natural, casual conversational tone (not like a lecture). | |
| 7. You may greet the user by name on a new day, but no need to use their name in every message. | |
| 8. You **MUST** call the process_reminder() function if the user wants to postpone their micro-action | |
| Today's Theme: | |
| {formatted_message} | |
| """ | |
| response = self.conversations._send_morning_message(prompt, add_to_main=True) | |
| if theme == "MICRO_ACTION_STATE": | |
| # check for any recommended microactions | |
| logger.info(f"Checking for recommended micro actions", extra={"user_id": self.user_id, "endpoint": "do_theme"}) | |
| micro_action = UserDataItem(role="assistant", area=self.get_current_goal(full=True).area, content=response['content'], user_id=self.user_id, status="PENDING", created_at=pd.Timestamp.now(tz='UTC').strftime("%d-%m-%Y %a %H:%M:%S"), updated_at=pd.Timestamp.now(tz='UTC').strftime("%d-%m-%Y %a %H:%M:%S")) | |
| logger.info(f"Recommended Micro Action: {micro_action}", extra={"user_id": self.user_id, "endpoint": "do_theme"}) | |
| self.micro_actions.append(micro_action) | |
| return response, prompt | |
| def change_date(self, date): | |
| logger.info(f"Changing date from {self.conversations.state['date']} to {date}", | |
| extra={"user_id": self.user_id, "endpoint": "user_change_date"}) | |
| # delete all hidden messages prom previous day | |
| self.conversations.delete_hidden_messages() | |
| # update the date in the state | |
| self.conversations.state['date'] = date | |
| action = self.growth_plan.current() | |
| # remove stale reminders | |
| if self.reminders is not None and len(self.reminders): | |
| # remove all reminders which 'recurrence' is 'once' or 'action' is 'delete' or the date is < today | |
| # this is to ensure that only reminders with recurrence and future reminder are kept | |
| now_utc = pd.Timestamp.now(tz='UTC') | |
| self.reminders = [reminder for reminder in self.reminders if not (reminder['action'] == 'delete' or (reminder['recurrence'] in ['once', 'postponed'] and reminder['timestamp'].tz_convert('UTC') < now_utc))] | |
| logger.info(f"Active Reminders: {self.reminders}", extra={"user_id": self.user_id, "endpoint": "user_change_date"}) | |
| ## ADD POINT FOR CHANGE DATE | |
| if self.growth_plan.current()['day'] == 7: | |
| self.add_life_score_point(variable = self.get_current_goal(full=True).area, points_added = 5, notes = "Reaching Day 7") | |
| self.add_recent_wins(wins = "You have reached Day 7 of your growth journey!", context = 'Growth journey is a 14-day coaching plan') | |
| elif self.growth_plan.current()['day'] == 14: | |
| self.add_life_score_point(variable = self.get_current_goal(full=True).area, points_added = 10, notes = "Reaching Day 14") | |
| self.add_recent_wins(wins = "You have finished your growth journey!", context = 'Growth journey is a 14-day coaching plan') | |
| logger.info(f"Today's action is {action}", extra={"user_id": self.user_id, "endpoint": "user_change_date"}) | |
| if action['day'] == 1: | |
| # mark intro as done | |
| self.set_intro_done() | |
| logger.info(f"Marked Intro as done: {self}", extra={"user_id": self.user_id, "endpoint": "user_change_date"}) | |
| # The coaching theme conditions are hardcoded for now | |
| theme = action['coachingTheme'] | |
| response, prompt = self.do_theme(theme, date, action['day']) | |
| # add today's reminders to response to schedule | |
| # response['reminders'] = all reminders which date is today (so all the reminders that BE has to queue today) | |
| # convert date to YYYY-MM-DD format | |
| date = pd.to_datetime(date).date() | |
| response['reminders'] = self.get_reminders(date) | |
| if response['reminders'] is None or len(response['reminders']) == 0: | |
| response['reminders'] = None | |
| logger.info(f"No reminders for today {date}", extra={"user_id": self.user_id, "endpoint": "user_change_date"}) | |
| logger.info(f"Reminders on {date}: {response['reminders']}", extra={"user_id": self.user_id, "endpoint": "user_change_date"}) | |
| # Move to the next action | |
| self.growth_plan.next() | |
| logger.info(f"Date Updated: {self.conversations.state['date']}", extra={"user_id": self.user_id, "endpoint": "user_change_date"}) | |
| return {'response': response, 'theme_prompt': '[hidden]'+prompt} | |
| def update_user_info(self, new_info): | |
| logger.info(f"Updating user info: [{self.user_info}] with: [{new_info}]", extra={"user_id": self.user_id, "endpoint": "update_user_info"}) | |
| # make an api call to gpt4o to compare the current user_info and the new info and create a new consolidated user_info | |
| comparison_prompt = f""" | |
| Current user information: | |
| {self.user_info} | |
| New/Updated user information: | |
| {new_info} | |
| Please carefully analyze both sets of information and merge them into a single, comprehensive user profile. The consolidated profile should be super context-rich and personalized to the user. Include all relevant details, resolve any conflicting information thoughtfully, and present the information in a clear, coherent, and well-organized manner. Highlight key aspects that can enhance personalization. | |
| """ | |
| response = self.client.chat.completions.create( | |
| model="gpt-4o", | |
| messages=[ | |
| {"role": "system", "content": "You are an expert at creating detailed, context-rich, and personalized user profiles by consolidating multiple sources of user information. Your task is to merge the current and new user information into a single, comprehensive profile that is super context-rich and personalized to the user."}, | |
| {"role": "user", "content": comparison_prompt} | |
| ], | |
| temperature=0.2 | |
| ) | |
| self.user_info = response.choices[0].message.content | |
| logger.info(f"Updated user info: {self.user_info}", extra={"user_id": self.user_id, "endpoint": "update_user_info"}) | |
| return True | |
| def _summarize_zoom(self, zoom_ai_summary): | |
| logger.info(f"Summarizing zoom ai summary", extra={"user_id": self.user_id, "endpoint": "summarize_zoom"}) | |
| # make an api call to gpt4o to summarize the zoom_ai_summary and produce a text with a focus on the most amount of user insight and info extracted | |
| system_prompt = f"""You are an expert at summarizing AI-generated Zoom transcripts concisely, focusing on extracting key user insights to enhance personalization in future interactions. Note that the zoom ai transcript may get the user's name wrong. Replace it with the actual user's name: {self.user_info}. Refer to the coach/guide as 'the Growth Guide'.""" | |
| prompt = f"Please summarize the following AI-generated Zoom transcript **in one short paragraph only** !!, emphasizing the most significant user insights and information:\n\n{zoom_ai_summary}" | |
| response = self.client.chat.completions.create( | |
| model="gpt-4o", | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| temperature=0.2 | |
| ) | |
| logger.info(f"Summary: {response.choices[0].message.content}", extra={"user_id": self.user_id, "endpoint": "summarize_zoom"}) | |
| return {'overview': response.choices[0].message.content} | |
| def _update_user_data(self, data_type, text_input, extra_text=""): | |
| data_mapping = { | |
| 'micro_actions': { | |
| 'prompt_description': 'micro actions', | |
| 'status': 'RECOMMENDED', | |
| 'attribute': 'recommended_micro_actions', | |
| 'endpoint': f'update_{data_type}', | |
| }, | |
| 'challenges': { | |
| 'prompt_description': 'challenges', | |
| 'status': 'ONGOING', | |
| 'attribute': 'challenges', | |
| 'endpoint': f'update_{data_type}', | |
| }, | |
| 'other_focusses': { | |
| 'prompt_description': 'other focuses', | |
| 'status': 'ONGOING', | |
| 'attribute': 'other_focusses', | |
| 'endpoint': f'update_{data_type}', | |
| }, | |
| } | |
| if data_type not in data_mapping: | |
| logger.error(f"Invalid data type: {data_type}", extra={"user_id": self.user_id}) | |
| return | |
| mapping = data_mapping[data_type] | |
| prompt = ( | |
| f"Extract {mapping['prompt_description']} from the following text and return them in a structured format. " | |
| f"If there are no {mapping['prompt_description']}, return an empty array for that field. " | |
| f"Each item should include the extracted {mapping['prompt_description'][:-1]} as content.\n\n" | |
| f"{extra_text}" | |
| f"Text:\n{text_input}" | |
| ) | |
| current_time = datetime.now(timezone.utc).strftime("%d-%m-%Y %a %H:%M:%S") | |
| response = self.client.beta.chat.completions.parse( | |
| model="gpt-4o", | |
| messages=[{"role": "user", "content": prompt}], | |
| response_format=UserDataResponse, | |
| temperature=0.2 | |
| ) | |
| data = getattr(response.choices[0].message.parsed, 'data') | |
| # Update the common fields for each item | |
| for item in data: | |
| item.role = "assistant" | |
| item.user_id = self.user_id | |
| item.status = mapping['status'] | |
| item.created_at = current_time | |
| item.updated_at = current_time | |
| logger.info(f"Updated {data_type}: {data}", extra={"user_id": self.user_id, "endpoint": mapping['endpoint']}) | |
| getattr(self, mapping['attribute']).extend(data) | |
| def update_user_data(self, gg_report): | |
| self._update_user_data('micro_actions', gg_report[0]['answer']) | |
| extra_text = f"User has new challenge:\n{gg_report[1]['answer']}\n\n" | |
| self._update_user_data('challenges', gg_report[2]['answer'], extra_text=extra_text) | |
| self._update_user_data('other_focusses', gg_report[3]['answer']) | |
| self._update_goal(gg_report[4]['answer']) | |
| def _update_goal(self, goal_text): | |
| prompt = f""" | |
| The user has a current goal: {self.get_current_goal()} | |
| The user provided a new goal: {goal_text} | |
| Determine if the new goal is inside the scope of the current goal or if it's outside the scope. | |
| If it's inside the scope, respond **exactly** with the current goal, and set same_or_not == True | |
| If it's outside the scope and related, respond with a merged goal (from the current and new goal) with larger scope, and set same_or_not == False | |
| If it's outside the scope and not related, respond with the new goal, and set same_or_not == False | |
| Note: You may paraphase the merged goal to be more succinct. | |
| Your response will be the final goal. You will also need to determine the area of this | |
| final goal by choosing one of these areas that suits the final goal: | |
| "Personal Growth", "Career Growth", "Relationship", "Mental Well-Being", "Health and Wellness" | |
| Only output a JSON with this schema below: | |
| {{ | |
| same_or_not: bool (whether the new goal is the same or not with the current goal), | |
| goal: str (the final goal), | |
| area: str (the area of the goal) | |
| }} | |
| ## Example 1 (Inside the scope): | |
| The user has a current goal: to spend at least 30 minutes exercising every day | |
| The user provided a new goal: to do short exercise every day | |
| Your verdict: inside the scope | |
| Your output: | |
| {{ | |
| same_or_not: True, | |
| goal: "to spend at least 30 minutes exercising every day" | |
| area: "Health and Wellness" | |
| }} | |
| ## Example 2 (Outside the scope, still related): | |
| The user has a current goal: to spend at least 30 minutes exercising every day | |
| The user provided a new goal: to exercise and have a balanced meal plan | |
| Your verdict: outside the scope, still related | |
| Your output: | |
| {{ | |
| same_or_not: False, | |
| goal: "to exercise at least 30 minutes every day, together with a balanced meal plan" | |
| area: "Health and Wellness" | |
| }} | |
| ## Example 3 (Outside the scope, not related): | |
| The user has a current goal: to spend at least 30 minutes exercising every day | |
| The user provided a new goal: to have a better relationship with friends | |
| Your verdict: outside the scope, not related | |
| Your output: | |
| {{ | |
| same_or_not: False, | |
| goal: "to have a better relationship with friends" | |
| area: "Relationship" | |
| }} | |
| """ | |
| response = self.client.chat.completions.create( | |
| model="gpt-4o", | |
| messages=[{"role": "user", "content": prompt}], | |
| response_format = { | |
| "type": "json_schema", | |
| "json_schema": { | |
| "name": "goal_determination", | |
| "strict": True, | |
| "schema": { | |
| "type": "object", | |
| "properties": { | |
| "same_or_not": { | |
| "type": "boolean", | |
| "description": "Indicates whether the new goal is the same as the current goal." | |
| }, | |
| "goal": { | |
| "type": "string", | |
| "description": "The final goal determined based on input." | |
| }, | |
| "area": { | |
| "type": "string", | |
| "description": "The area of the goal.", | |
| "enum": [ | |
| "Personal Growth", | |
| "Career Growth", | |
| "Relationship", | |
| "Mental Well-Being", | |
| "Health and Wellness" | |
| ] | |
| } | |
| }, | |
| "required": [ | |
| "same_or_not", | |
| "goal", | |
| "area" | |
| ], | |
| "additionalProperties": False | |
| } | |
| } | |
| }, | |
| temperature=0.2 | |
| ) | |
| final_goal = json.loads(response.choices[0].message.content)['goal'] | |
| final_goal_area = json.loads(response.choices[0].message.content)['area'] | |
| # if json.loads(response.choices[0].message.content)['same_or_not']: | |
| # final_goal_status = self.get_current_goal()['status'] | |
| # else: | |
| # final_goal_status = 'PENDING' | |
| if json.loads(response.choices[0].message.content)['same_or_not'] == False: | |
| self.set_goal(final_goal, final_goal_area) | |
| logger.info(f"User goal updated to: {final_goal}", extra={"user_id": self.user_id, "endpoint": "_update_goal"}) | |
| else: | |
| logger.info(f"User goal remains unchanged.", extra={"user_id": self.user_id, "endpoint": "_update_goal"}) | |
| def update_micro_action_status(self, completed_micro_action): | |
| if completed_micro_action: | |
| self.micro_actions[-1].status = "COMPLETE" | |
| self.micro_actions[-1].updated_at = pd.Timestamp.now(tz='UTC').strftime("%d-%m-%Y %a %H:%M:%S") | |
| num_of_micro_actions_completed = sum(1 for item in self.micro_actions if item.status == 'COMPLETE') | |
| if (num_of_micro_actions_completed in (1,3,5)) or (num_of_micro_actions_completed % 10 == 0 and num_of_micro_actions_completed != 0): | |
| self.add_life_score_point(variable = self.get_current_goal(full=True).area, points_added = 10, notes = f"Completing the {num_of_micro_actions_completed}-th micro-action") | |
| self.add_recent_wins(wins = "You have completed a micro action!", context= self.micro_actions[-1]['content']) | |
| def trigger_deep_reflection_point(self, area_of_deep_reflection): | |
| if len(area_of_deep_reflection)>0: | |
| for area in area_of_deep_reflection: | |
| self.add_life_score_point(variable = area, points_added = 5, notes = f"Doing a deep reflection about {area}") | |
| self.add_recent_wins(wins = f"You have done a deep reflection about your {area}!", context = 'Deep reflection') | |
| def add_point_for_booking(self): | |
| self.add_life_score_point(variable = self.get_current_goal(full=True).area, points_added = 5, notes = "Booking a GG session") | |
| self.add_recent_wins(wins = "You have booked a Growth Guide session!", context = "Growth Guide is a life coach") | |
| def add_point_for_completing_session(self): | |
| self.add_life_score_point(variable = self.get_current_goal(full=True).area, points_added = 20, notes = "Completing a GG session") | |
| self.add_recent_wins(wins = "You have completed a Growth Guide session!", context = "Growth Guide is a life coach") | |
| def build_ourcoach_report(self, overview, action_plan, gg_session_notes): | |
| logger.info(f"Building ourcoach report", extra={"user_id": self.user_id, "endpoint": "build_ourcoach_report"}) | |
| ourcoach_report = {'overview': overview['overview'], 'action_plan': action_plan, 'others': gg_session_notes} | |
| return ourcoach_report | |
| def process_growth_guide_session(self, session_data, booking_id): | |
| logger.info(f"Processing growth guide session data: {session_data}", extra={"user_id": self.user_id, "endpoint": "process_growth_guide_session"}) | |
| self.last_gg_session = booking_id | |
| # Generate the ourcoach_report (summary) | |
| zoom_ai_summary = session_data["zoom_ai_summary"] | |
| gg_report = session_data["gg_report"] | |
| overview = self._summarize_zoom(zoom_ai_summary) | |
| # Update user data based on growth guide answers | |
| self.update_user_data(gg_report) | |
| self.update_user_info(gg_report[5]['answer'] + "\n\n" + overview['overview']) | |
| # build ourcoach_report | |
| ourcoach_report = self.build_ourcoach_report(overview, list(map(lambda x : x.content, self.recommended_micro_actions)), gg_report[5]['answer']) | |
| # add this report to the db | |
| update_growth_guide_summary(self.user_id, booking_id, ourcoach_report) | |
| # Send hidden message to AI to generate the response | |
| logger.info(f"Sending hidden message to AI to generate response", extra={"user_id": self.user_id, "endpoint": "process_growth_guide_session"}) | |
| # post_gg_prompt = POST_GG_STATE.format(self.user_info, ourcoach_report, gg_report) | |
| post_gg_promt = f"""The user: {self.user_info} has completed their growth guide session. | |
| Send them a warm and congratulatory message acknowledging this and a link to their web dasboard. Only include the link once in the message. | |
| Capitalise the first letter in Revelation Dashboard and Growth Guide. Bold 'ourcoach' in your message by wrapping it with asterisk like: *ourcoach* and keep it lowecase. | |
| Example: | |
| Hey <user>! ..., you can find a details of your session on your Revelation Dashboard: {OURCOACH_DASHBOARD_URL}""" | |
| response = self.conversations._send_morning_message(post_gg_promt) | |
| logger.info(f"Response: {response}", extra={"user_id": self.user_id, "endpoint": "process_growth_guide_session"}) | |
| return response | |
| def ask_to_schedule_growth_guide_reminder(self, date): | |
| prompt = f""" ** The user has scheduled a Growth Guide session for {date} (current date: {self.conversations.state['date']}) **\n\nFirstly, greet the user warmly and excitedly and let them know that they have succesfully booked their Growth Guide session. | |
| Then, ask the user if they would like a reminder for the Growth Guide session. If they would like a reminder, create a new reminder 1 hour before their scheduled session.""" | |
| response = self.conversations._send_morning_message(prompt) | |
| self.add_ai_message("[hidden]" + prompt) | |
| self.add_ai_message(response['content']) | |
| logger.info(f"Response: {response}", extra={"user_id": self.user_id, "endpoint": "process_growth_guide_session"}) | |
| return response | |
| def infer_memento_follow_ups(self): | |
| mementos_path = os.path.join("mementos", "to_upload", f"{self.user_id}", "*.json") | |
| # mementos_path = f"mementos/to_upload/{self.user_id}/*.json" | |
| for file_path in glob.glob(mementos_path): | |
| with open(file_path, 'r+') as file: | |
| data = json.load(file) | |
| infered_follow_up = self._infer_follow_ups(data['created'], data['context']) | |
| logger.info(f"[Infered Follow Up]: {infered_follow_up}", extra={"user_id": self.user_id, "endpoint": "infer_memento_follow_ups"}) | |
| data['follow_up_on'] = infered_follow_up | |
| file.seek(0) | |
| json.dump(data, file, indent=4) | |
| file.truncate() | |
| return True | |
| def get_daily_messages(self): | |
| return self.conversations.get_daily_thread() | |
| def change_assistant(self, asst_id): | |
| self.asst_id = asst_id | |
| self.conversations.assistants['general'] = Assistant(self.asst_id, self.conversations) | |
| return self | |
| def __getstate__(self): | |
| state = self.__dict__.copy() | |
| if 'client' in state: | |
| del state['client'] | |
| return state | |
| def __setstate__(self, state): | |
| self.__dict__.update(state) | |
| self.client = None | |
| def __str__(self): | |
| return f"""User(user_id={self.user_id} | |
| micro_actions={self.micro_actions} | |
| recommended_actions={self.recommended_micro_actions} | |
| challenge={self.challenges} | |
| other_focusses={self.other_focusses} | |
| goals={self.goal} | |
| done_first_reflection={self.done_first_reflection} | |
| conversations={self.conversations}) | |
| user_info={self.user_info}""" | |
| def __repr__(self): | |
| return f"""User(user_id={self.user_id} | |
| micro_actions={self.micro_actions} | |
| recommended_actions={self.recommended_micro_actions} | |
| challenge={self.challenges} | |
| other_focusses={self.other_focusses} | |
| goals={self.goal} | |
| done_first_reflection={self.done_first_reflection} | |
| conversations={self.conversations}) | |
| user_info={self.user_info}""" | |
| def refresh(self, client): | |
| # copy user by creating new user object | |
| user = User(self.user_id, self.user_info, client, "asst_07u7sucvSXGJOjcjXr5EL6nD", self.user_interaction_guidelines) | |
| user.conversations = self.conversations.clone(client) | |
| if len(user.get_messages()) >= 1: | |
| user.done_first_reflection = True | |
| else: | |
| user.done_first_reflection = None | |
| # user.add_ai_message("** You are now in the Idle State **") | |
| return user | |
| def save_user(self): | |
| # Construct the file path dynamically for cross-platform compatibility | |
| file_path = os.path.join("users", "to_upload", f"{self.user_id}.pkl") | |
| # Ensure the directory exists | |
| os.makedirs(os.path.dirname(file_path), exist_ok=True) | |
| # Save the user object as a pickle file | |
| with open(file_path, 'wb') as file: | |
| pickle.dump(self, file) | |
| return file_path | |
| def load_user(user_id, client): | |
| # Construct the file path dynamically for cross-platform compatibility | |
| file_path = os.path.join("users", "data", f"{user_id}.pkl") | |
| # Load the user object from the pickle file | |
| with open(file_path, 'rb') as file: | |
| user = pickle.load(file) | |
| user.client = client # Re-attach the client object | |
| user.conversations.client = client # Re-attach the client to conversations | |
| return user | |
| class CircularQueue: | |
| def __init__(self, array, user_id): | |
| if not array: | |
| raise ValueError("Array cannot be empty") | |
| self.user_id = user_id | |
| self.array = array | |
| self.size = len(array) | |
| self.index = 0 # Tracks the current position in the array | |
| def __getstate__(self): | |
| state = self.__dict__.copy() | |
| return state | |
| def __setstate__(self, state): | |
| self.__dict__.update(state) | |
| def next(self): | |
| # Get the next element and advance the index | |
| element = self.array[self.index] | |
| logger.info(f"[Reflection Topic] [Previous]: {self.array[self.index-1] if self.index else 'None'} -> [Current]: {element}", extra={"user_id": self.user_id, "endpoint": "circular_queue_next"}) | |
| self.index = (self.index + 1) % self.size # Wrap around to 0 when the end is reached | |
| return element | |
| def current(self): | |
| return self.array[self.index] | |
| def reset(self): | |
| self.index = 0 | |
| def __str__(self): | |
| elements = [ | |
| f"[{item}]" if i == self.index else str(item) | |
| for i, item in enumerate(self.array) | |
| ] | |
| return f"CircularQueue: {' -> '.join(elements)}" | |
| def __repr__(self): | |
| elements = [ | |
| f"[{item}]" if i == self.index else str(item) | |
| for i, item in enumerate(self.array) | |
| ] | |
| return f"CircularQueue: {' -> '.join(elements)}" |