|
|
import asyncio |
|
|
import chainlit as cl |
|
|
from phi.tools import Toolkit |
|
|
from phi.utils.log import logger |
|
|
from src.libs.rpc_client import rpc_call |
|
|
|
|
|
class NotedPadToolkit(Toolkit): |
|
|
def __init__(self): |
|
|
super().__init__(name="noted_pad_toolkit") |
|
|
|
|
|
|
|
|
self.register(self.get_note) |
|
|
self.register(self.get_all_notes) |
|
|
self.register(self.add_note) |
|
|
self.register(self.update_note) |
|
|
self.register(self.delete_note) |
|
|
self.register(self.query_notes) |
|
|
|
|
|
def get_user_email(self) -> str: |
|
|
""" |
|
|
Fetches the user's email from the Chainlit user session. |
|
|
|
|
|
Returns: |
|
|
str: The user's email. |
|
|
|
|
|
Example: |
|
|
>>> get_user_email() |
|
|
""" |
|
|
logger.info("Fetching user email") |
|
|
try: |
|
|
user_session = cl.user_session.get("user").metadata |
|
|
if not user_session: |
|
|
raise ValueError("User session not found") |
|
|
email = user_session['email'] |
|
|
return email |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to get user email: {e}") |
|
|
return f"Error: {e}" |
|
|
|
|
|
@cl.on_chat_start |
|
|
def get_note(self, note_id: str) -> str: |
|
|
""" |
|
|
Fetches a specific note from the RPC server. |
|
|
|
|
|
Args: |
|
|
note_id (str): The ID of the note to fetch. |
|
|
|
|
|
Returns: |
|
|
str: The requested note. |
|
|
|
|
|
Example: |
|
|
>>> get_note('note_1') |
|
|
""" |
|
|
logger.info(f"Fetching note: {note_id}") |
|
|
try: |
|
|
email = self.get_user_email() |
|
|
if "Error" in email: |
|
|
raise ValueError(email) |
|
|
|
|
|
params = { |
|
|
'email': email, |
|
|
'note_id': note_id |
|
|
} |
|
|
response = asyncio.run(rpc_call(method_name="getNote", params=params)) |
|
|
return f"{response}" |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to get note: {e}") |
|
|
return f"Error: {e}" |
|
|
|
|
|
def get_all_notes(self) -> str: |
|
|
""" |
|
|
Fetches all notes from the RPC server. |
|
|
|
|
|
Returns: |
|
|
dict: A dictionary containing all notes. |
|
|
|
|
|
Example: |
|
|
>>> get_all_notes() |
|
|
""" |
|
|
logger.info("Fetching all notes") |
|
|
try: |
|
|
email = self.get_user_email() |
|
|
if "Error" in email: |
|
|
raise ValueError(email) |
|
|
|
|
|
params = { |
|
|
'email': email |
|
|
} |
|
|
print(params) |
|
|
response = asyncio.run(rpc_call(method_name="getAllNotes", params=params)) |
|
|
return f"{response}" |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to get all notes: {e}") |
|
|
return f"Error: {e}" |
|
|
|
|
|
def add_note(self, note_id: str, note_content: str) -> str: |
|
|
""" |
|
|
Adds a new note to the RPC server. add important notes during conversationß |
|
|
|
|
|
Args: |
|
|
note_id (str): The ID of the note to add. |
|
|
note_content (str): The content of the note. |
|
|
|
|
|
Returns: |
|
|
str: Confirmation message. |
|
|
|
|
|
Example: |
|
|
>>> add_note('note_1', 'This is a new note') |
|
|
""" |
|
|
logger.info(f"Adding note: {note_id}") |
|
|
try: |
|
|
email = self.get_user_email() |
|
|
if "Error" in email: |
|
|
raise ValueError(email) |
|
|
|
|
|
params = { |
|
|
'email': email, |
|
|
'note_id': note_id, |
|
|
'note_content': note_content |
|
|
} |
|
|
response = asyncio.run(rpc_call(method_name="addNote", params=params)) |
|
|
return f"{response}" |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to add note: {e}") |
|
|
return f"Error: {e}" |
|
|
|
|
|
def update_note(self, note_id: str, note_content: str) -> str: |
|
|
""" |
|
|
Updates an existing note on the RPC server. |
|
|
|
|
|
Args: |
|
|
note_id (str): The ID of the note to update. |
|
|
note_content (str): The new content of the note. |
|
|
|
|
|
Returns: |
|
|
str: Confirmation message. |
|
|
|
|
|
Example: |
|
|
>>> update_note('note_1', 'This is an updated note') |
|
|
""" |
|
|
logger.info(f"Updating note: {note_id}") |
|
|
try: |
|
|
email = self.get_user_email() |
|
|
if "Error" in email: |
|
|
raise ValueError(email) |
|
|
|
|
|
params = { |
|
|
'email': email, |
|
|
'note_id': note_id, |
|
|
'note_content': note_content |
|
|
} |
|
|
response = asyncio.run(rpc_call(method_name="updateNote", params=params)) |
|
|
return f"{response}" |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to update note: {e}") |
|
|
return f"Error: {e}" |
|
|
|
|
|
def delete_note(self, note_id: str) -> str: |
|
|
""" |
|
|
Deletes a note from the RPC server. |
|
|
|
|
|
Args: |
|
|
note_id (str): The ID of the note to delete. |
|
|
|
|
|
Returns: |
|
|
str: Confirmation message. |
|
|
|
|
|
Example: |
|
|
>>> delete_note('note_1') |
|
|
""" |
|
|
logger.info(f"Deleting note: {note_id}") |
|
|
try: |
|
|
email = self.get_user_email() |
|
|
if "Error" in email: |
|
|
raise ValueError(email) |
|
|
|
|
|
params = { |
|
|
'email': email, |
|
|
'note_id': note_id |
|
|
} |
|
|
response = asyncio.run(rpc_call(method_name="deleteNote", params=params)) |
|
|
return f"{response}" |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to delete note: {e}") |
|
|
return f"Error: {e}" |
|
|
|
|
|
def query_notes(self, query: str) -> str: |
|
|
""" |
|
|
Queries notes from the RPC server based on the provided context. |
|
|
|
|
|
Args: |
|
|
query (str): The context or keywords to search for in notes. |
|
|
|
|
|
Returns: |
|
|
str: The notes that match the query. |
|
|
|
|
|
Example: |
|
|
>>> query_notes('spiritual growth') |
|
|
""" |
|
|
logger.info(f"Querying notes with context: {query}") |
|
|
try: |
|
|
email = self.get_user_email() |
|
|
if "Error" in email: |
|
|
raise ValueError(email) |
|
|
|
|
|
params = { |
|
|
'email': email, |
|
|
'query': query |
|
|
} |
|
|
response = asyncio.run(rpc_call(method_name="queryNotes", params=params)) |
|
|
return f"{response}" |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to query notes: {e}") |
|
|
return f"Error: {e}" |
|
|
|