import json import io import os import pandas as pd from datetime import datetime import json from assistants import Assistant import glob def get_current_datetime(): return datetime.now() class ConversationManager: def __init__(self, client, user, asst_id): self.user = user self.assistants = {'general': Assistant(asst_id, self)} self.client = client self.user_personal_memory = None self.current_thread = self.create_thread() self.state = {'date': pd.Timestamp.now().date()} print("[Init State]:", self.state) def create_thread(self): # Check if a thread already exists for the current month, if yes, return it user_interaction_guidelines =self.user.user_interaction_guidelines thread = self.client.beta.threads.create() # Create an (empty, for now) 'UserPersonalMemory' vector store to store information on users events/state/etc. self.user_personal_memory = self.client.beta.vector_stores.create(name="UserPersonalMemory", \ metadata={ "description": "Personal events and emotional states of the user for personalized assistance and reminders of upcoming events.", "user_id": self.user.user_id, "categories": "events,tasks,emotions,goals", "created_at": str(datetime.now()), "updated_at": str(datetime.now()), "tags": "personal,emotional_state,upcoming_events,goals", "source": "Mementos shared by user", }) self.system_message = self.add_message_to_thread(thread.id, "assistant", f"\ You are coaching:\n\n{user_interaction_guidelines}\n\n\ Be mindful of this information at all times in order to \ be as personalised as possible when conversing. Ensure to \ follow the conversation guidelines and flow provided. Use the \ current state of the conversation to adhere to the flow.\n\n \ ** You are now in the Introduction state. **") return thread def _get_current_thread_history(self, remove_system_message=True, msg=None): if not remove_system_message: return [{"role": msg.role, "content": msg.content[0].text.value} for msg in self.client.beta.threads.messages.list(self.current_thread.id, order="asc")] if msg: return [{"role": msg.role, "content": msg.content[0].text.value} for msg in self.client.beta.threads.messages.list(self.current_thread.id, order="asc", after=msg.id)][1:] return [{"role": msg.role, "content": msg.content[0].text.value} for msg in self.client.beta.threads.messages.list(self.current_thread.id, order="asc")][1:] # remove the system message def add_message_to_thread(self, thread_id, role, content): message = self.client.beta.threads.messages.create( thread_id=thread_id, role=role, content=content ) return message # else: # raise ValueError("Thread ID not found.") def _get_thread_messages(self, thread): return self.client.beta.threads.messages.list(thread.id, order="asc") def _run_current_thread(self, text): thread = self.current_thread # if you have more than 1 assistant # need to select assistant response = self.assistants['general'].process(thread, text) return response def _send_hidden_message(self, text): msg = self.add_message_to_thread(self.current_thread.id, "user", text) self._run_current_thread(text) # delete message self.client.beta.threads.messages.delete(message_id=msg.id, thread_id=self.current_thread.id) class User: def __init__(self, user_id, user_info, client, asst_id): self.user_id = user_id self.client = client self.asst_id = asst_id # self.user_info = user_info # self.events = None self.user_interaction_guidelines = self.generate_user_interaction_guidelines(user_info, client) # self.coaching_plans = {} self.conversations = ConversationManager(client, self, asst_id) def reset(self): self.conversations = ConversationManager(self.client, self, self.asst_id) def generate_user_interaction_guidelines(self, user_info, client): prompt = f"Using the users information:\n\n{user_info}\n\n generate a \ user summary that describes how best to interact with the user to create a personalized \ and targeted chat experience. Include all the information above and based on that information \ list several sample conversation topics and questions that will engage the user." response = client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": "You are an expert at building profile documents containing rich user insights."}, {"role": "user", "content": prompt} ], temperature=0.2 ) return response.choices[0].message.content def update_conversation_state(self, stage, last_interaction): self.conversation_state['stage'] = stage self.conversation_state['last_interaction'] = last_interaction def _get_current_thread(self): return self.conversations.current_thread def send_message(self, text): run = self.conversations._run_current_thread(text) return run # print("[Response]:", response) def get_messages(self, exclude_system_msg=True): if not exclude_system_msg: return self.conversations._get_current_thread_history(False) else: return list(filter(lambda x: not x['content'].startswith("** It is a new day:") or not x['content'].startswith("Pay attention to the current state you are in"), self.conversations._get_current_thread_history(exclude_system_msg))) def change_date(self, date): print(f"[Changing Date]: {self.conversations.state['date']} -> {date}") self.conversations.state['date'] = date self.conversations._send_hidden_message(f"** It is a new day: {date} **") print("[Date Updated]:", self.conversations.state['date']) return self.get_messages() def __hash__(self) -> int: return hash(self.user_id) def _infer_follow_ups(self, created, context): prompt = f"Infer the date of the next follow-up for the user based on the created date:{created} and the context:{context}" response = self.client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": "You are an expert at estimating when to follow up events with users. Only output a single string representing the follow up date i.e. 'YYYY-MM-dd'"}, {"role": "user", "content": prompt} ], temperature=0.2 ) return response.choices[0].message.content def infer_memento_follow_ups(self): mementos_path = f"mementos/to_upload/{self.user_id}/*.json" for file_path in glob.glob(mementos_path): with open(file_path, 'r+') as file: data = json.load(file) infered_follow_up = self._infer_follow_ups(data['created'], data['context']) print(f"[Infered Follow Up]: {infered_follow_up}") data['follow_up_on'] = infered_follow_up file.seek(0) json.dump(data, file, indent=4) file.truncate() def change_assistant(self, asst_id): self.asst_id = asst_id self.conversations.assistants['general'] = Assistant(self.asst_id, self.conversations) return self