File size: 6,445 Bytes
ed3fa91 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
import asyncio
import chainlit as cl
from phi.tools import Toolkit
from phi.utils.log import logger
from src.libs.rpc_client import rpc_call # Replace with the actual path to your RPC client library
class NotedPadToolkit(Toolkit):
def __init__(self):
super().__init__(name="noted_pad_toolkit")
# Registering methods to make them accessible via the toolkit
self.register(self.get_note)
self.register(self.get_all_notes)
self.register(self.add_note)
self.register(self.update_note)
self.register(self.delete_note)
self.register(self.query_notes)
def get_user_email(self) -> str:
"""
Fetches the user's email from the Chainlit user session.
Returns:
str: The user's email.
Example:
>>> get_user_email()
"""
logger.info("Fetching user email")
try:
user_session = cl.user_session.get("user").metadata
if not user_session:
raise ValueError("User session not found")
email = user_session['email']
return email
except Exception as e:
logger.warning(f"Failed to get user email: {e}")
return f"Error: {e}"
@cl.on_chat_start
def get_note(self, note_id: str) -> str:
"""
Fetches a specific note from the RPC server.
Args:
note_id (str): The ID of the note to fetch.
Returns:
str: The requested note.
Example:
>>> get_note('note_1')
"""
logger.info(f"Fetching note: {note_id}")
try:
email = self.get_user_email()
if "Error" in email:
raise ValueError(email)
params = {
'email': email,
'note_id': note_id
}
response = asyncio.run(rpc_call(method_name="getNote", params=params))
return f"{response}"
except Exception as e:
logger.warning(f"Failed to get note: {e}")
return f"Error: {e}"
def get_all_notes(self) -> str:
"""
Fetches all notes from the RPC server.
Returns:
dict: A dictionary containing all notes.
Example:
>>> get_all_notes()
"""
logger.info("Fetching all notes")
try:
email = self.get_user_email()
if "Error" in email:
raise ValueError(email)
params = {
'email': email
}
print(params)
response = asyncio.run(rpc_call(method_name="getAllNotes", params=params))
return f"{response}"
except Exception as e:
logger.warning(f"Failed to get all notes: {e}")
return f"Error: {e}"
def add_note(self, note_id: str, note_content: str) -> str:
"""
Adds a new note to the RPC server. add important notes during conversationß
Args:
note_id (str): The ID of the note to add.
note_content (str): The content of the note.
Returns:
str: Confirmation message.
Example:
>>> add_note('note_1', 'This is a new note')
"""
logger.info(f"Adding note: {note_id}")
try:
email = self.get_user_email()
if "Error" in email:
raise ValueError(email)
params = {
'email': email,
'note_id': note_id,
'note_content': note_content
}
response = asyncio.run(rpc_call(method_name="addNote", params=params))
return f"{response}"
except Exception as e:
logger.warning(f"Failed to add note: {e}")
return f"Error: {e}"
def update_note(self, note_id: str, note_content: str) -> str:
"""
Updates an existing note on the RPC server.
Args:
note_id (str): The ID of the note to update.
note_content (str): The new content of the note.
Returns:
str: Confirmation message.
Example:
>>> update_note('note_1', 'This is an updated note')
"""
logger.info(f"Updating note: {note_id}")
try:
email = self.get_user_email()
if "Error" in email:
raise ValueError(email)
params = {
'email': email,
'note_id': note_id,
'note_content': note_content
}
response = asyncio.run(rpc_call(method_name="updateNote", params=params))
return f"{response}"
except Exception as e:
logger.warning(f"Failed to update note: {e}")
return f"Error: {e}"
def delete_note(self, note_id: str) -> str:
"""
Deletes a note from the RPC server.
Args:
note_id (str): The ID of the note to delete.
Returns:
str: Confirmation message.
Example:
>>> delete_note('note_1')
"""
logger.info(f"Deleting note: {note_id}")
try:
email = self.get_user_email()
if "Error" in email:
raise ValueError(email)
params = {
'email': email,
'note_id': note_id
}
response = asyncio.run(rpc_call(method_name="deleteNote", params=params))
return f"{response}"
except Exception as e:
logger.warning(f"Failed to delete note: {e}")
return f"Error: {e}"
def query_notes(self, query: str) -> str:
"""
Queries notes from the RPC server based on the provided context.
Args:
query (str): The context or keywords to search for in notes.
Returns:
str: The notes that match the query.
Example:
>>> query_notes('spiritual growth')
"""
logger.info(f"Querying notes with context: {query}")
try:
email = self.get_user_email()
if "Error" in email:
raise ValueError(email)
params = {
'email': email,
'query': query
}
response = asyncio.run(rpc_call(method_name="queryNotes", params=params))
return f"{response}"
except Exception as e:
logger.warning(f"Failed to query notes: {e}")
return f"Error: {e}"
|