import asyncio import chainlit as cl from phi.tools import Toolkit from phi.utils.log import logger from src.libs.rpc_client import rpc_call # Replace with the actual path to your RPC client library class NotedPadToolkit(Toolkit): def __init__(self): super().__init__(name="noted_pad_toolkit") # Registering methods to make them accessible via the toolkit self.register(self.get_note) self.register(self.get_all_notes) self.register(self.add_note) self.register(self.update_note) self.register(self.delete_note) self.register(self.query_notes) def get_user_email(self) -> str: """ Fetches the user's email from the Chainlit user session. Returns: str: The user's email. Example: >>> get_user_email() """ logger.info("Fetching user email") try: user_session = cl.user_session.get("user").metadata if not user_session: raise ValueError("User session not found") email = user_session['email'] return email except Exception as e: logger.warning(f"Failed to get user email: {e}") return f"Error: {e}" @cl.on_chat_start def get_note(self, note_id: str) -> str: """ Fetches a specific note from the RPC server. Args: note_id (str): The ID of the note to fetch. Returns: str: The requested note. Example: >>> get_note('note_1') """ logger.info(f"Fetching note: {note_id}") try: email = self.get_user_email() if "Error" in email: raise ValueError(email) params = { 'email': email, 'note_id': note_id } response = asyncio.run(rpc_call(method_name="getNote", params=params)) return f"{response}" except Exception as e: logger.warning(f"Failed to get note: {e}") return f"Error: {e}" def get_all_notes(self) -> str: """ Fetches all notes from the RPC server. Returns: dict: A dictionary containing all notes. Example: >>> get_all_notes() """ logger.info("Fetching all notes") try: email = self.get_user_email() if "Error" in email: raise ValueError(email) params = { 'email': email } print(params) response = asyncio.run(rpc_call(method_name="getAllNotes", params=params)) return f"{response}" except Exception as e: logger.warning(f"Failed to get all notes: {e}") return f"Error: {e}" def add_note(self, note_id: str, note_content: str) -> str: """ Adds a new note to the RPC server. add important notes during conversationß Args: note_id (str): The ID of the note to add. note_content (str): The content of the note. Returns: str: Confirmation message. Example: >>> add_note('note_1', 'This is a new note') """ logger.info(f"Adding note: {note_id}") try: email = self.get_user_email() if "Error" in email: raise ValueError(email) params = { 'email': email, 'note_id': note_id, 'note_content': note_content } response = asyncio.run(rpc_call(method_name="addNote", params=params)) return f"{response}" except Exception as e: logger.warning(f"Failed to add note: {e}") return f"Error: {e}" def update_note(self, note_id: str, note_content: str) -> str: """ Updates an existing note on the RPC server. Args: note_id (str): The ID of the note to update. note_content (str): The new content of the note. Returns: str: Confirmation message. Example: >>> update_note('note_1', 'This is an updated note') """ logger.info(f"Updating note: {note_id}") try: email = self.get_user_email() if "Error" in email: raise ValueError(email) params = { 'email': email, 'note_id': note_id, 'note_content': note_content } response = asyncio.run(rpc_call(method_name="updateNote", params=params)) return f"{response}" except Exception as e: logger.warning(f"Failed to update note: {e}") return f"Error: {e}" def delete_note(self, note_id: str) -> str: """ Deletes a note from the RPC server. Args: note_id (str): The ID of the note to delete. Returns: str: Confirmation message. Example: >>> delete_note('note_1') """ logger.info(f"Deleting note: {note_id}") try: email = self.get_user_email() if "Error" in email: raise ValueError(email) params = { 'email': email, 'note_id': note_id } response = asyncio.run(rpc_call(method_name="deleteNote", params=params)) return f"{response}" except Exception as e: logger.warning(f"Failed to delete note: {e}") return f"Error: {e}" def query_notes(self, query: str) -> str: """ Queries notes from the RPC server based on the provided context. Args: query (str): The context or keywords to search for in notes. Returns: str: The notes that match the query. Example: >>> query_notes('spiritual growth') """ logger.info(f"Querying notes with context: {query}") try: email = self.get_user_email() if "Error" in email: raise ValueError(email) params = { 'email': email, 'query': query } response = asyncio.run(rpc_call(method_name="queryNotes", params=params)) return f"{response}" except Exception as e: logger.warning(f"Failed to query notes: {e}") return f"Error: {e}"