| import json
|
| import io
|
| import os
|
| from datetime import datetime
|
| import json
|
|
|
| def get_current_datetime():
|
| return datetime.now()
|
|
|
| class Assistant:
|
| def __init__(self, id, cm):
|
| self.id = id
|
| self.cm = cm
|
|
|
| def process(self, thread, text):
|
| template_search = self.cm.add_message_to_thread(thread.id, "assistant", f"Pay attention to the current state you are in and the conversation guidelines to respond to the users query:")
|
| message = self.cm.add_message_to_thread(thread.id, "user", text)
|
|
|
| run = self.cm.client.beta.threads.runs.create_and_poll(
|
| thread_id=thread.id,
|
| assistant_id=self.id,
|
| model="gpt-4o-mini",
|
| )
|
|
|
| if run.status == 'requires_action':
|
|
|
| run = self.call_tool(run, thread)
|
|
|
| if run.status == 'completed':
|
|
|
|
|
|
|
|
|
| self.cm.client.beta.threads.messages.delete(
|
| message_id=template_search.id,
|
| thread_id=thread.id,
|
| )
|
| else:
|
| print(run.status)
|
| return message
|
|
|
| def call_tool(self, run, thread):
|
| tool_outputs = []
|
| print(f"[INFO]: Required actions: {list(map(lambda x: f'{x.function.name}({x.function.arguments})', run.required_action.submit_tool_outputs.tool_calls))}")
|
|
|
| for tool in run.required_action.submit_tool_outputs.tool_calls:
|
| if tool.function.name == "generate_coaching_plan":
|
| user_challenge = json.loads(tool.function.arguments)
|
| tool_outputs.append({
|
| "tool_call_id": tool.id,
|
| "output": str(self.generate_coaching_plan(user_challenge))
|
| })
|
| elif tool.function.name == "transition":
|
| transitions = json.loads(tool.function.arguments)
|
| print(f"[TRANSITION]: {transitions['from']} -> {transitions['to']}")
|
| tool_outputs.append({
|
| "tool_call_id": tool.id,
|
| "output": f"** [TRANSITION]: {transitions['from']} -> {transitions['to']} **"
|
| })
|
| elif tool.function.name == "get_date":
|
|
|
| print(f"[DATETIME]: {self.cm.state['date']}")
|
|
|
| tool_outputs.append({
|
| "tool_call_id": tool.id,
|
| "output": f"{self.cm.state['date']}"
|
| })
|
| elif tool.function.name == "create_goals" or tool.function.name == "create_memento":
|
| json_string = json.loads(tool.function.arguments)
|
| json_string['created'] = str(self.cm.state['date'])
|
| json_string['updated'] = None
|
| print(f"[NEW EVENT]: {json_string}")
|
|
|
|
|
| user_mementos_folder = f"mementos/to_upload/{self.cm.user.user_id}"
|
| if not os.path.exists(user_mementos_folder):
|
| os.makedirs(user_mementos_folder)
|
|
|
|
|
| json.dump(json_string, open(f"mementos/to_upload/{self.cm.user.user_id}/{json_string['title']}.json", "w"))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| print(f"[INFO]: Added event to the user's vector store")
|
|
|
| tool_outputs.append({
|
| "tool_call_id": tool.id,
|
| "output": f"** [Success]: Added event to the user's vector store**"
|
| })
|
| elif tool.function.name == "msearch":
|
| context = json.loads(tool.function.arguments)['queries']
|
|
|
| print(f"[MSEARCH]: Searching for {context}")
|
| tool_outputs.append({
|
| "tool_call_id": tool.id,
|
| "output": f"** retrieve any files related to: {context} **"
|
| })
|
| elif tool.function.name == "get_goals":
|
| print("FETCH GOAL")
|
| context = json.loads(tool.function.arguments)['context']
|
|
|
| print(f"[GET GOALS]: {context}")
|
| tool_outputs.append({
|
| "tool_call_id": tool.id,
|
| "output": f"** File search the vector_store: {self.cm.user_personal_memory.id} for the most relevant goal related to: {context} **"
|
| })
|
| elif tool.function.name == "update_goal":
|
| goal = json.loads(tool.function.arguments)['goal']
|
|
|
| print(f"[UPDATE GOAL]: {goal}")
|
| tool_outputs.append({
|
| "tool_call_id": tool.id,
|
| "output": f"** Updated Goal: {context} **"
|
| })
|
| elif tool.function.name == "get_mementos":
|
| context = json.loads(tool.function.arguments)['context']
|
|
|
| print(f"[GET MEMENTOS]: {context}")
|
| tool_outputs.append({
|
| "tool_call_id": tool.id,
|
| "output": f"** File search the vector_store: {self.cm.user_personal_memory.id} for the most relevant mementos based on the recent conversation history and context:{context} **"
|
| })
|
|
|
|
|
| if tool_outputs:
|
| try:
|
| run = self.cm.client.beta.threads.runs.submit_tool_outputs_and_poll(
|
| thread_id=thread.id,
|
| run_id=run.id,
|
| tool_outputs=tool_outputs
|
| )
|
| print("Tool outputs submitted successfully.")
|
| except Exception as e:
|
| print("Failed to submit tool outputs:", e)
|
| else:
|
| print("No tool outputs to submit.")
|
|
|
| if run.status == 'completed':
|
| messages = self.cm.client.beta.threads.messages.list(
|
| thread_id=thread.id
|
| )
|
|
|
| elif run.status == 'requires_action':
|
| print(f'[TOOL]: Calling tool for action')
|
| run = self.call_tool(run, thread)
|
| else:
|
| print("Something bad happened", run.status)
|
| return run
|
|
|
| class GeneralAssistant(Assistant):
|
| def __init__(self, id, cm):
|
| super().__init__(id, cm)
|
|
|
|
|
| class PFAssistant(Assistant):
|
| def __init__(self, id, cm):
|
| super().__init__(id, cm) |