Spaces:
Sleeping
Sleeping
| import os | |
| import openai | |
| import pandas as pd | |
| from datetime import datetime, timezone | |
| from app.assistants import Assistant | |
| import random | |
| import logging | |
| from app.exceptions import BaseOurcoachException, ConversationManagerError, OpenAIRequestError | |
| from datetime import datetime | |
| import dotenv | |
| from app.utils import id_to_persona | |
| dotenv.load_dotenv() | |
| OURCOACH_DASHBOARD_URL = os.getenv("OURCOACH_DASHBOARD_URL") | |
| logger = logging.getLogger(__name__) | |
| def get_current_datetime(): | |
| return datetime.now(timezone.utc) | |
| class ConversationManager: | |
| def __init__(self, client, user, asst_id, intro_done=False): | |
| self.user = user | |
| self.intro_done = intro_done | |
| self.assistants = {'general': Assistant(asst_id, self), 'intro': Assistant('asst_baczEK65KKvPWIUONSzdYH8j', self)} | |
| self.client = client | |
| self.state = {'date': pd.Timestamp.now(tz='UTC').strftime("%Y-%m-%d %a %H:%M:%S")} | |
| self.current_thread = self.create_thread() | |
| self.daily_thread = None | |
| logger.info("Initializing conversation state", extra={"user_id": self.user.user_id, "endpoint": "conversation_init"}) | |
| def __getstate__(self): | |
| state = self.__dict__.copy() | |
| # Remove unpicklable or unnecessary attributes | |
| if 'client' in state: | |
| del state['client'] | |
| return state | |
| def __setstate__(self, state): | |
| self.__dict__.update(state) | |
| # Re-initialize attributes that were not pickled | |
| self.client = None | |
| def catch_error(func): | |
| def wrapper(self, *args, **kwargs): | |
| try: | |
| return func(self, *args, **kwargs) | |
| except BaseOurcoachException as e: | |
| raise e | |
| except openai.BadRequestError as e: | |
| raise OpenAIRequestError(user_id=self.user.user_id, message="OpenAI Request Error", e=str(e)) | |
| except Exception as e: | |
| # Handle other exceptions | |
| logger.error(f"An unexpected error occurred: {e}") | |
| raise ConversationManagerError(user_id=self.user.user_id, message="Unexpected error in ConversationManager", e=str(e)) | |
| return wrapper | |
| def create_thread(self): | |
| user_interaction_guidelines =self.user.user_interaction_guidelines | |
| thread = self.client.beta.threads.create() | |
| self.system_message = self.add_message_to_thread(thread.id, "assistant", | |
| f""" | |
| You are: | |
| {id_to_persona(self.user.asst_id)}, always adhere to your choosen persona by incorporating it conversationally. | |
| You represent a coach at ourcoach. You may refer to you Knowledgebase (ourcoach FAQ) for all information related to ourcoach. | |
| ** Branding ** Always stylize ourcoach as 'ourcoach' instead of 'OurCoach' or 'Ourcoach', regardless of any grammatical errors. | |
| ------------------------------------------- | |
| You are coaching: | |
| \n\n{user_interaction_guidelines}\n\n\ | |
| Be mindful of this information at all times in order to | |
| be as personalised as possible when conversing. Ensure to | |
| follow the conversation guidelines and flow templates. Use the | |
| current state of the conversation to adhere to the flow. Do not let the user know about any transitions.\n\n | |
| Whenever you reference the ourcoach dashboard, refer to it as 'Revelation Dashboard' and include the link: {OURCOACH_DASHBOARD_URL} . | |
| ** Today is {self.state['date']}.\n\n ** | |
| ** You are now in the INTRODUCTION STATE. ** | |
| """) | |
| return thread | |
| def _get_current_thread_history(self, remove_system_message=True, _msg=None, thread=None): | |
| if thread is None: | |
| thread = self.current_thread | |
| if not remove_system_message: | |
| return [{"role": msg.role, "content": msg.content[0].text.value} for msg in self.client.beta.threads.messages.list(thread.id, order="asc")] | |
| if _msg: | |
| return [{"role": msg.role, "content": msg.content[0].text.value} for msg in self.client.beta.threads.messages.list(thread.id, order="asc", after=_msg.id)][1:] | |
| return [{"role": msg.role, "content": msg.content[0].text.value} for msg in self.client.beta.threads.messages.list(thread.id, order="asc")][1:] # remove the system message | |
| def add_message_to_thread(self, thread_id, role, content): | |
| message = self.client.beta.threads.messages.create( | |
| thread_id=thread_id, | |
| role=role, | |
| content=content | |
| ) | |
| return message | |
| def _run_current_thread(self, text, thread=None, hidden=False): | |
| if thread is None: | |
| thread = self.current_thread | |
| logger.warning(f"{self}", extra={"user_id": self.user.user_id, "endpoint": "run_current_thread"}) | |
| logger.info(f"User Message: {text}", extra={"user_id": self.user.user_id, "endpoint": "run_current_thread"}) | |
| # need to select assistant | |
| if self.intro_done: | |
| logger.info(f"Running general assistant", extra={"user_id": self.user.user_id, "endpoint": "run_current_thread"}) | |
| run, just_finished_intro, message = self.assistants['general'].process(thread, text) | |
| else: | |
| logger.info(f"Running intro assistant", extra={"user_id": self.user.user_id, "endpoint": "run_current_thread"}) | |
| run, just_finished_intro, message = self.assistants['intro'].process(thread, text) | |
| logger.info(f"Run {run.id} {run.status} just finished intro: {just_finished_intro}", extra={"user_id": self.user.user_id, "endpoint": "run_current_thread"}) | |
| if 'message' in run.metadata: | |
| message = run.metadata['message'] | |
| if message == 'start_now': | |
| self.intro_done = True | |
| logger.info(f"Start now", extra={"user_id": self.user.user_id, "endpoint": "run_current_thread"}) | |
| elif message == 'change_goal': | |
| self.intro_done = False | |
| logger.info(f"Changing goal, reset to intro assistant", extra={"user_id": self.user.user_id, "endpoint": "run_current_thread"}) | |
| # Actually dont need this | |
| elif message == 'error': | |
| logger.error(f"Run was cancelled due to error", extra={"user_id": self.user.user_id, "endpoint": "run_current_thread"}) | |
| # self.add_message_to_thread(thread.id, "assistant", run.metadata['content']) | |
| # return self._get_current_thread_history(remove_system_message=False)[-1], run | |
| if hidden: | |
| self.client.beta.threads.messages.delete(message_id=message.id, thread_id=thread.id) | |
| logger.info(f"Deleted hidden message: {message}", extra={"user_id": self.user.user_id, "endpoint": "run_current_thread"}) | |
| return self._get_current_thread_history(remove_system_message=False)[-1], run | |
| def _send_and_replace_message(self, text, replacement_msg=None): | |
| logger.info(f"Sending hidden message: {text}", extra={"user_id": self.user.user_id, "endpoint": "send_hidden_message"}) | |
| response, _ = self._run_current_thread(text, hidden=True) | |
| # check if there is a replacement message | |
| if replacement_msg: | |
| logger.info(f"Adding replacement message: {replacement_msg}", extra={"user_id": self.user.user_id, "endpoint": "send_hidden_message"}) | |
| # get the last message | |
| last_msg = list(self.client.beta.threads.messages.list(self.current_thread.id, order="asc"))[-1] | |
| logger.info(f"Last message: {last_msg}", extra={"user_id": self.user.user_id, "endpoint": "send_hidden_message"}) | |
| response = last_msg.content[0].text.value | |
| # delete the last message | |
| self.client.beta.threads.messages.delete(message_id=last_msg.id, thread_id=self.current_thread.id) | |
| self.add_message_to_thread(self.current_thread.id, "user", replacement_msg) | |
| self.add_message_to_thread(self.current_thread.id, "assistant", response) | |
| logger.info(f"Hidden message response: {response}", extra={"user_id": self.user.user_id, "endpoint": "send_hidden_message"}) | |
| # NOTE: this is a hack, should get the response straight from the run | |
| return {'content': response, 'role': 'assistant'} | |
| def _add_ai_message(self, text): | |
| return self.add_message_to_thread(self.current_thread.id, "assistant", text) | |
| def get_daily_thread(self): | |
| if self.daily_thread is None: | |
| messages = self._get_current_thread_history(remove_system_message=False) | |
| self.daily_thread = self.client.beta.threads.create( | |
| messages=messages[:30] | |
| ) | |
| # Add remaining messages one by one if there are more than 30 | |
| for msg in messages[30:]: | |
| self.add_message_to_thread( | |
| self.daily_thread.id, | |
| msg['role'], | |
| msg['content'] | |
| ) | |
| self.last_daily_message = list(self.client.beta.threads.messages.list(self.daily_thread.id, order="asc"))[-1] | |
| else: | |
| messages = self._get_current_thread_history(remove_system_message=False, _msg=self.last_daily_message) | |
| self.client.beta.threads.delete(self.daily_thread.id) | |
| self.daily_thread = self.client.beta.threads.create(messages=messages) | |
| self.last_daily_message = list(self.client.beta.threads.messages.list(self.daily_thread.id, order="asc"))[-1] | |
| logger.info(f"Daily Thread: {self._get_current_thread_history(thread=self.daily_thread)}", extra={"user_id": self.user.user_id, "endpoint": "send_morning_message"}) | |
| logger.info(f"Last Daily Message: {self.last_daily_message}", extra={"user_id": self.user.user_id, "endpoint": "send_morning_message"}) | |
| return self._get_current_thread_history(thread=self.daily_thread) | |
| # [{"role":, "content":}, ....] | |
| def _send_morning_message(self, text, add_to_main=False): | |
| # create a new thread | |
| # OPENAI LIMITATION: Can only attach a maximum of 32 messages when creating a new thread | |
| messages = self._get_current_thread_history(remove_system_message=False) | |
| if len(messages) >= 29: | |
| messages = [{"content": """ Remember who you are and who you are coaching. | |
| Be mindful of this information at all times in order to | |
| be as personalised as possible when conversing. Ensure to | |
| follow the conversation guidelines and flow provided.""", "role":"assistant"}] + messages[-29:] | |
| logger.info(f"Current Thread Messages: {messages}", extra={"user_id": self.user.user_id, "endpoint": "send_hidden_message"}) | |
| temp_thread = self.client.beta.threads.create(messages=messages) | |
| logger.info(f"Created Temp Thread: {temp_thread}", extra={"user_id": self.user.user_id, "endpoint": "send_hidden_message"}) | |
| if add_to_main: | |
| logger.info(f"Adding message to main thread: {text}", extra={"user_id": self.user.user_id, "endpoint": "send_hidden_message"}) | |
| self.add_message_to_thread(self.current_thread.id, "assistant", text) | |
| self.add_message_to_thread(temp_thread.id, "user", text) | |
| self._run_current_thread(text, thread=temp_thread) | |
| response = self._get_current_thread_history(thread=temp_thread)[-1] | |
| logger.info(f"Hidden Response: {response}", extra={"user_id": self.user.user_id, "endpoint": "send_hidden_message"}) | |
| # delete temp thread | |
| self.client.beta.threads.delete(temp_thread.id) | |
| logger.info(f"Deleted Temp Thread: {temp_thread}", extra={"user_id": self.user.user_id, "endpoint": "send_hidden_message"}) | |
| return response | |
| def delete_hidden_messages(self, old_thread=None): | |
| if old_thread is None: | |
| old_thread = self.current_thread | |
| # create a new thread | |
| messages = [msg for msg in self._get_current_thread_history(remove_system_message=False) if not msg['content'].startswith("[hidden]")] | |
| if len(messages) >= 29: | |
| messages = messages[-29:] | |
| logger.info(f"Current Thread Messages: {messages}", extra={"user_id": self.user.user_id, "endpoint": "delete_hidden_messages"}) | |
| new_thread = self.client.beta.threads.create(messages=messages) | |
| # delete old thread | |
| self.client.beta.threads.delete(old_thread.id) | |
| # set current thread | |
| self.current_thread = new_thread | |
| def cancel_run(self, run, thread = None): | |
| # Cancels and recreates a thread | |
| logger.info(f"(CM) Cancelling run {run} for thread {thread}", extra={"user_id": self.user.user_id, "endpoint": "cancel_run"}) | |
| if thread is None: | |
| thread = self.current_thread.id | |
| if self.intro_done: | |
| self.assistants['general'].cancel_run(run, thread) | |
| else: | |
| self.assistants['intro'].cancel_run(run, thread) | |
| logger.info(f"Run cancelled", extra={"user_id": self.user.user_id, "endpoint": "cancel_run"}) | |
| return True | |
| def clone(self, client): | |
| """Creates a new ConversationManager with copied thread messages.""" | |
| # Create new instance with same init parameters | |
| new_cm = ConversationManager( | |
| client, | |
| self.user, | |
| self.assistants['general'].id, | |
| intro_done=True | |
| ) | |
| # Get all messages from current thread | |
| messages = self._get_current_thread_history(remove_system_message=False) | |
| # Delete the automatically created thread from constructor | |
| new_cm.client.beta.threads.delete(new_cm.current_thread.id) | |
| # Create new thread with first 30 messages | |
| new_cm.current_thread = new_cm.client.beta.threads.create( | |
| messages=messages[:30] | |
| ) | |
| # Add remaining messages one by one if there are more than 30 | |
| for msg in messages[30:]: | |
| new_cm.add_message_to_thread( | |
| new_cm.current_thread.id, | |
| msg['role'], | |
| msg['content'] | |
| ) | |
| # Copy other relevant state | |
| new_cm.state = self.state | |
| return new_cm | |
| def __str__(self): | |
| return f"ConversationManager(intro_done={self.intro_done}, assistants={self.assistants}, current_thread={self.current_thread})" | |
| def __repr__(self): | |
| return (f"ConversationManager(" | |
| f"intro_done={self.intro_done}, current_thread={self.current_thread})") | |