| | import json
|
| | import io
|
| | import os
|
| | import pandas as pd
|
| | from datetime import datetime
|
| | import json
|
| | from assistants import Assistant
|
| | import glob
|
| |
|
| | def get_current_datetime():
|
| | return datetime.now()
|
| |
|
| | class ConversationManager:
|
| | def __init__(self, client, user, asst_id):
|
| | self.user = user
|
| | self.assistants = {'general': Assistant(asst_id, self)}
|
| | self.client = client
|
| | self.user_personal_memory = None
|
| | self.current_thread = self.create_thread()
|
| |
|
| | self.state = {'date': pd.Timestamp.now().date()}
|
| |
|
| | print("[Init State]:", self.state)
|
| |
|
| | def create_thread(self):
|
| |
|
| | user_interaction_guidelines =self.user.user_interaction_guidelines
|
| | thread = self.client.beta.threads.create()
|
| |
|
| |
|
| | self.user_personal_memory = self.client.beta.vector_stores.create(name="UserPersonalMemory", \
|
| | metadata={
|
| | "description": "Personal events and emotional states of the user for personalized assistance and reminders of upcoming events.",
|
| | "user_id": self.user.user_id,
|
| | "categories": "events,tasks,emotions,goals",
|
| | "created_at": str(datetime.now()),
|
| | "updated_at": str(datetime.now()),
|
| | "tags": "personal,emotional_state,upcoming_events,goals",
|
| | "source": "Mementos shared by user",
|
| | })
|
| |
|
| |
|
| | self.system_message = self.add_message_to_thread(thread.id, "assistant", f"\
|
| | You are coaching:\n\n{user_interaction_guidelines}\n\n\
|
| | Be mindful of this information at all times in order to \
|
| | be as personalised as possible when conversing. Ensure to \
|
| | follow the conversation guidelines and flow provided. Use the \
|
| | current state of the conversation to adhere to the flow.\n\n \
|
| | ** You are now in the Introduction state. **")
|
| | return thread
|
| |
|
| | def _get_current_thread_history(self, remove_system_message=True, msg=None):
|
| | if not remove_system_message:
|
| | return [{"role": msg.role, "content": msg.content[0].text.value} for msg in self.client.beta.threads.messages.list(self.current_thread.id, order="asc")]
|
| | if msg:
|
| | return [{"role": msg.role, "content": msg.content[0].text.value} for msg in self.client.beta.threads.messages.list(self.current_thread.id, order="asc", after=msg.id)][1:]
|
| | return [{"role": msg.role, "content": msg.content[0].text.value} for msg in self.client.beta.threads.messages.list(self.current_thread.id, order="asc")][1:]
|
| |
|
| | def add_message_to_thread(self, thread_id, role, content):
|
| | message = self.client.beta.threads.messages.create(
|
| | thread_id=thread_id,
|
| | role=role,
|
| | content=content
|
| | )
|
| | return message
|
| |
|
| |
|
| |
|
| | def _get_thread_messages(self, thread):
|
| | return self.client.beta.threads.messages.list(thread.id, order="asc")
|
| |
|
| | def _run_current_thread(self, text):
|
| | thread = self.current_thread
|
| |
|
| |
|
| |
|
| | response = self.assistants['general'].process(thread, text)
|
| | return response
|
| |
|
| | def _send_hidden_message(self, text):
|
| | msg = self.add_message_to_thread(self.current_thread.id, "user", text)
|
| | self._run_current_thread(text)
|
| |
|
| | self.client.beta.threads.messages.delete(message_id=msg.id, thread_id=self.current_thread.id)
|
| |
|
| |
|
| | class User:
|
| | def __init__(self, user_id, user_info, client, asst_id):
|
| | self.user_id = user_id
|
| | self.client = client
|
| | self.asst_id = asst_id
|
| |
|
| |
|
| | self.user_interaction_guidelines = self.generate_user_interaction_guidelines(user_info, client)
|
| |
|
| |
|
| | self.conversations = ConversationManager(client, self, asst_id)
|
| |
|
| | def reset(self):
|
| | self.conversations = ConversationManager(self.client, self, self.asst_id)
|
| |
|
| | def generate_user_interaction_guidelines(self, user_info, client):
|
| | prompt = f"Using the users information:\n\n{user_info}\n\n generate a \
|
| | user summary that describes how best to interact with the user to create a personalized \
|
| | and targeted chat experience. Include all the information above and based on that information \
|
| | list several sample conversation topics and questions that will engage the user."
|
| |
|
| | response = client.chat.completions.create(
|
| | model="gpt-4o-mini",
|
| | messages=[
|
| | {"role": "system", "content": "You are an expert at building profile documents containing rich user insights."},
|
| | {"role": "user", "content": prompt}
|
| | ],
|
| | temperature=0.2
|
| | )
|
| | return response.choices[0].message.content
|
| |
|
| | def update_conversation_state(self, stage, last_interaction):
|
| | self.conversation_state['stage'] = stage
|
| | self.conversation_state['last_interaction'] = last_interaction
|
| |
|
| | def _get_current_thread(self):
|
| | return self.conversations.current_thread
|
| |
|
| | def send_message(self, text):
|
| | run = self.conversations._run_current_thread(text)
|
| | return run
|
| |
|
| |
|
| | def get_messages(self, exclude_system_msg=True):
|
| | if not exclude_system_msg:
|
| | return self.conversations._get_current_thread_history(False)
|
| | else:
|
| | return list(filter(lambda x: not x['content'].startswith("** It is a new day:") or not x['content'].startswith("Pay attention to the current state you are in"), self.conversations._get_current_thread_history(exclude_system_msg)))
|
| |
|
| | def change_date(self, date):
|
| | print(f"[Changing Date]: {self.conversations.state['date']} -> {date}")
|
| | self.conversations.state['date'] = date
|
| | self.conversations._send_hidden_message(f"** It is a new day: {date} **")
|
| | print("[Date Updated]:", self.conversations.state['date'])
|
| | return self.get_messages()
|
| |
|
| | def __hash__(self) -> int:
|
| | return hash(self.user_id)
|
| |
|
| | def _infer_follow_ups(self, created, context):
|
| | prompt = f"Infer the date of the next follow-up for the user based on the created date:{created} and the context:{context}"
|
| |
|
| | response = self.client.chat.completions.create(
|
| | model="gpt-4o-mini",
|
| | messages=[
|
| | {"role": "system", "content": "You are an expert at estimating when to follow up events with users. Only output a single string representing the follow up date i.e. 'YYYY-MM-dd'"},
|
| | {"role": "user", "content": prompt}
|
| | ],
|
| | temperature=0.2
|
| | )
|
| | return response.choices[0].message.content
|
| |
|
| | def infer_memento_follow_ups(self):
|
| | mementos_path = f"mementos/to_upload/{self.user_id}/*.json"
|
| |
|
| | for file_path in glob.glob(mementos_path):
|
| | with open(file_path, 'r+') as file:
|
| | data = json.load(file)
|
| | infered_follow_up = self._infer_follow_ups(data['created'], data['context'])
|
| | print(f"[Infered Follow Up]: {infered_follow_up}")
|
| | data['follow_up_on'] = infered_follow_up
|
| | file.seek(0)
|
| | json.dump(data, file, indent=4)
|
| | file.truncate()
|
| |
|
| | def change_assistant(self, asst_id):
|
| | self.asst_id = asst_id
|
| | self.conversations.assistants['general'] = Assistant(self.asst_id, self.conversations)
|
| | return self |