fastapi-v2 / app /conversation_manager.py
Shageenderan Sapai
Added Alerts and other HF feedback
24c93b6
raw
history blame
15.1 kB
import os
import openai
import pandas as pd
from datetime import datetime, timezone
from app.assistants import Assistant
import random
import logging
from app.exceptions import BaseOurcoachException, ConversationManagerError, OpenAIRequestError
from datetime import datetime
import dotenv
from app.utils import id_to_persona
dotenv.load_dotenv()
OURCOACH_DASHBOARD_URL = os.getenv("OURCOACH_DASHBOARD_URL")
logger = logging.getLogger(__name__)
def get_current_datetime():
return datetime.now(timezone.utc)
class ConversationManager:
def __init__(self, client, user, asst_id, intro_done=False):
self.user = user
self.intro_done = intro_done
self.assistants = {'general': Assistant(asst_id, self), 'intro': Assistant('asst_baczEK65KKvPWIUONSzdYH8j', self)}
self.client = client
self.state = {'date': pd.Timestamp.now(tz='UTC').strftime("%Y-%m-%d %a %H:%M:%S")}
self.current_thread = self.create_thread()
self.daily_thread = None
logger.info("Initializing conversation state", extra={"user_id": self.user.user_id, "endpoint": "conversation_init"})
def __getstate__(self):
state = self.__dict__.copy()
# Remove unpicklable or unnecessary attributes
if 'client' in state:
del state['client']
return state
def __setstate__(self, state):
self.__dict__.update(state)
# Re-initialize attributes that were not pickled
self.client = None
def catch_error(func):
def wrapper(self, *args, **kwargs):
try:
return func(self, *args, **kwargs)
except BaseOurcoachException as e:
raise e
except openai.BadRequestError as e:
raise OpenAIRequestError(user_id=self.user.user_id, message="OpenAI Request Error", e=str(e))
except Exception as e:
# Handle other exceptions
logger.error(f"An unexpected error occurred: {e}")
raise ConversationManagerError(user_id=self.user.user_id, message="Unexpected error in ConversationManager", e=str(e))
return wrapper
@catch_error
def create_thread(self):
user_interaction_guidelines =self.user.user_interaction_guidelines
thread = self.client.beta.threads.create()
self.system_message = self.add_message_to_thread(thread.id, "assistant",
f"""
You are:
{id_to_persona(self.user.asst_id)}, always adhere to your choosen persona by incorporating it conversationally.
You represent a coach at ourcoach. You may refer to you Knowledgebase (ourcoach FAQ) for all information related to ourcoach.
** Branding ** Always stylize ourcoach as 'ourcoach' instead of 'OurCoach' or 'Ourcoach', regardless of any grammatical errors.
-------------------------------------------
You are coaching:
\n\n{user_interaction_guidelines}\n\n\
Be mindful of this information at all times in order to
be as personalised as possible when conversing. Ensure to
follow the conversation guidelines and flow templates. Use the
current state of the conversation to adhere to the flow. Do not let the user know about any transitions.\n\n
Whenever you reference the ourcoach dashboard, refer to it as 'Revelation Dashboard' and include the link: {OURCOACH_DASHBOARD_URL} .
** Today is {self.state['date']}.\n\n **
** You are now in the INTRODUCTION STATE. **
""")
return thread
@catch_error
def _get_current_thread_history(self, remove_system_message=True, _msg=None, thread=None):
if thread is None:
thread = self.current_thread
if not remove_system_message:
return [{"role": msg.role, "content": msg.content[0].text.value} for msg in self.client.beta.threads.messages.list(thread.id, order="asc")]
if _msg:
return [{"role": msg.role, "content": msg.content[0].text.value} for msg in self.client.beta.threads.messages.list(thread.id, order="asc", after=_msg.id)][1:]
return [{"role": msg.role, "content": msg.content[0].text.value} for msg in self.client.beta.threads.messages.list(thread.id, order="asc")][1:] # remove the system message
@catch_error
def add_message_to_thread(self, thread_id, role, content):
message = self.client.beta.threads.messages.create(
thread_id=thread_id,
role=role,
content=content
)
return message
@catch_error
def _run_current_thread(self, text, thread=None, hidden=False):
if thread is None:
thread = self.current_thread
logger.warning(f"{self}", extra={"user_id": self.user.user_id, "endpoint": "run_current_thread"})
logger.info(f"User Message: {text}", extra={"user_id": self.user.user_id, "endpoint": "run_current_thread"})
# need to select assistant
if self.intro_done:
logger.info(f"Running general assistant", extra={"user_id": self.user.user_id, "endpoint": "run_current_thread"})
run, just_finished_intro, message = self.assistants['general'].process(thread, text)
else:
logger.info(f"Running intro assistant", extra={"user_id": self.user.user_id, "endpoint": "run_current_thread"})
run, just_finished_intro, message = self.assistants['intro'].process(thread, text)
logger.info(f"Run {run.id} {run.status} just finished intro: {just_finished_intro}", extra={"user_id": self.user.user_id, "endpoint": "run_current_thread"})
if 'message' in run.metadata:
message = run.metadata['message']
if message == 'start_now':
self.intro_done = True
logger.info(f"Start now", extra={"user_id": self.user.user_id, "endpoint": "run_current_thread"})
elif message == 'change_goal':
self.intro_done = False
logger.info(f"Changing goal, reset to intro assistant", extra={"user_id": self.user.user_id, "endpoint": "run_current_thread"})
# Actually dont need this
elif message == 'error':
logger.error(f"Run was cancelled due to error", extra={"user_id": self.user.user_id, "endpoint": "run_current_thread"})
# self.add_message_to_thread(thread.id, "assistant", run.metadata['content'])
# return self._get_current_thread_history(remove_system_message=False)[-1], run
if hidden:
self.client.beta.threads.messages.delete(message_id=message.id, thread_id=thread.id)
logger.info(f"Deleted hidden message: {message}", extra={"user_id": self.user.user_id, "endpoint": "run_current_thread"})
return self._get_current_thread_history(remove_system_message=False)[-1], run
@catch_error
def _send_and_replace_message(self, text, replacement_msg=None):
logger.info(f"Sending hidden message: {text}", extra={"user_id": self.user.user_id, "endpoint": "send_hidden_message"})
response, _ = self._run_current_thread(text, hidden=True)
# check if there is a replacement message
if replacement_msg:
logger.info(f"Adding replacement message: {replacement_msg}", extra={"user_id": self.user.user_id, "endpoint": "send_hidden_message"})
# get the last message
last_msg = list(self.client.beta.threads.messages.list(self.current_thread.id, order="asc"))[-1]
logger.info(f"Last message: {last_msg}", extra={"user_id": self.user.user_id, "endpoint": "send_hidden_message"})
response = last_msg.content[0].text.value
# delete the last message
self.client.beta.threads.messages.delete(message_id=last_msg.id, thread_id=self.current_thread.id)
self.add_message_to_thread(self.current_thread.id, "user", replacement_msg)
self.add_message_to_thread(self.current_thread.id, "assistant", response)
logger.info(f"Hidden message response: {response}", extra={"user_id": self.user.user_id, "endpoint": "send_hidden_message"})
# NOTE: this is a hack, should get the response straight from the run
return {'content': response, 'role': 'assistant'}
@catch_error
def _add_ai_message(self, text):
return self.add_message_to_thread(self.current_thread.id, "assistant", text)
@catch_error
def get_daily_thread(self):
if self.daily_thread is None:
messages = self._get_current_thread_history(remove_system_message=False)
self.daily_thread = self.client.beta.threads.create(
messages=messages[:30]
)
# Add remaining messages one by one if there are more than 30
for msg in messages[30:]:
self.add_message_to_thread(
self.daily_thread.id,
msg['role'],
msg['content']
)
self.last_daily_message = list(self.client.beta.threads.messages.list(self.daily_thread.id, order="asc"))[-1]
else:
messages = self._get_current_thread_history(remove_system_message=False, _msg=self.last_daily_message)
self.client.beta.threads.delete(self.daily_thread.id)
self.daily_thread = self.client.beta.threads.create(messages=messages)
self.last_daily_message = list(self.client.beta.threads.messages.list(self.daily_thread.id, order="asc"))[-1]
logger.info(f"Daily Thread: {self._get_current_thread_history(thread=self.daily_thread)}", extra={"user_id": self.user.user_id, "endpoint": "send_morning_message"})
logger.info(f"Last Daily Message: {self.last_daily_message}", extra={"user_id": self.user.user_id, "endpoint": "send_morning_message"})
return self._get_current_thread_history(thread=self.daily_thread)
# [{"role":, "content":}, ....]
@catch_error
def _send_morning_message(self, text, add_to_main=False):
# create a new thread
# OPENAI LIMITATION: Can only attach a maximum of 32 messages when creating a new thread
messages = self._get_current_thread_history(remove_system_message=False)
if len(messages) >= 29:
messages = [{"content": """ Remember who you are and who you are coaching.
Be mindful of this information at all times in order to
be as personalised as possible when conversing. Ensure to
follow the conversation guidelines and flow provided.""", "role":"assistant"}] + messages[-29:]
logger.info(f"Current Thread Messages: {messages}", extra={"user_id": self.user.user_id, "endpoint": "send_hidden_message"})
temp_thread = self.client.beta.threads.create(messages=messages)
logger.info(f"Created Temp Thread: {temp_thread}", extra={"user_id": self.user.user_id, "endpoint": "send_hidden_message"})
if add_to_main:
logger.info(f"Adding message to main thread: {text}", extra={"user_id": self.user.user_id, "endpoint": "send_hidden_message"})
self.add_message_to_thread(self.current_thread.id, "assistant", text)
self.add_message_to_thread(temp_thread.id, "user", text)
self._run_current_thread(text, thread=temp_thread)
response = self._get_current_thread_history(thread=temp_thread)[-1]
logger.info(f"Hidden Response: {response}", extra={"user_id": self.user.user_id, "endpoint": "send_hidden_message"})
# delete temp thread
self.client.beta.threads.delete(temp_thread.id)
logger.info(f"Deleted Temp Thread: {temp_thread}", extra={"user_id": self.user.user_id, "endpoint": "send_hidden_message"})
return response
@catch_error
def delete_hidden_messages(self, old_thread=None):
if old_thread is None:
old_thread = self.current_thread
# create a new thread
messages = [msg for msg in self._get_current_thread_history(remove_system_message=False) if not msg['content'].startswith("[hidden]")]
if len(messages) >= 29:
messages = messages[-29:]
logger.info(f"Current Thread Messages: {messages}", extra={"user_id": self.user.user_id, "endpoint": "delete_hidden_messages"})
new_thread = self.client.beta.threads.create(messages=messages)
# delete old thread
self.client.beta.threads.delete(old_thread.id)
# set current thread
self.current_thread = new_thread
@catch_error
def cancel_run(self, run, thread = None):
# Cancels and recreates a thread
logger.info(f"(CM) Cancelling run {run} for thread {thread}", extra={"user_id": self.user.user_id, "endpoint": "cancel_run"})
if thread is None:
thread = self.current_thread.id
if self.intro_done:
self.assistants['general'].cancel_run(run, thread)
else:
self.assistants['intro'].cancel_run(run, thread)
logger.info(f"Run cancelled", extra={"user_id": self.user.user_id, "endpoint": "cancel_run"})
return True
@catch_error
def clone(self, client):
"""Creates a new ConversationManager with copied thread messages."""
# Create new instance with same init parameters
new_cm = ConversationManager(
client,
self.user,
self.assistants['general'].id,
intro_done=True
)
# Get all messages from current thread
messages = self._get_current_thread_history(remove_system_message=False)
# Delete the automatically created thread from constructor
new_cm.client.beta.threads.delete(new_cm.current_thread.id)
# Create new thread with first 30 messages
new_cm.current_thread = new_cm.client.beta.threads.create(
messages=messages[:30]
)
# Add remaining messages one by one if there are more than 30
for msg in messages[30:]:
new_cm.add_message_to_thread(
new_cm.current_thread.id,
msg['role'],
msg['content']
)
# Copy other relevant state
new_cm.state = self.state
return new_cm
def __str__(self):
return f"ConversationManager(intro_done={self.intro_done}, assistants={self.assistants}, current_thread={self.current_thread})"
def __repr__(self):
return (f"ConversationManager("
f"intro_done={self.intro_done}, current_thread={self.current_thread})")