app / src /tools /user_confirmation_pin_toolkit.py
lemdaddy's picture
Add pin rpc methods
3cdfd33
raw
history blame
3.54 kB
import asyncio
import chainlit as cl
from phi.tools import Toolkit
from phi.utils.log import logger
from src.libs.rpc_client import rpc_call
class UserConfirmationPinToolkit(Toolkit):
def __init__(self):
super().__init__(name="user_confirmation_pin_toolkit")
# Registering methods to make them accessible via the toolkit
self.register(self.has_user_confirmation_pin)
self.register(self.set_user_confirmation_pin)
self.register(self.update_user_confirmation_pin)
self.register(self.verify_user_confirmation_pin)
def has_user_confirmation_pin(self, user_id: str) -> str:
"""
Checks if a user has a confirmation PIN.
This method takes a user's ID and checks if the user has a confirmation
PIN via a remote procedure call (RPC).
Args:
user_id (str): The ID of the user to check for a confirmation PIN.
Returns:
str: The response from the RPC call indicating if the user has a PIN.
"""
logger.info("Checking if user has confirmation pin")
params = {
"userId": user_id,
}
response = asyncio.run(rpc_call(method_name="hasUserPin", params=params))
return f"{response}"
def set_user_confirmation_pin(self, user_id: str, pin: str) -> str:
"""
Sets a user's confirmation PIN.
This method takes a user's ID and a new PIN, and sets the user's PIN
via a remote procedure call (RPC).
Args:
user_id (str): The ID of the user whose PIN is to be set.
pin (str): The new confirmation PIN to be set.
Returns:
str: The response from the RPC call setting the PIN.
"""
logger.info("Setting user confirmation pin")
params = {
"userId": user_id,
"pin": pin
}
response = asyncio.run(rpc_call(method_name="setUserPin", params=params))
return f"{response}"
def update_user_confirmation_pin(self, user_id: str, pin: str) -> str:
"""
Updates a user's confirmation PIN.
This method takes a user's ID and a new PIN, and updates the user's PIN
via a remote procedure call (RPC).
Args:
user_id (str): The ID of the user whose PIN is to be updated.
pin (str): The new confirmation PIN to be set.
Returns:
str: The response from the RPC call updating the PIN.
"""
logger.info("Updating user confirmation pin")
params = {
"userId": user_id,
"pin": pin
}
response = asyncio.run(rpc_call(method_name="updateUserPin", params=params))
return f"{response}"
def verify_user_confirmation_pin(self, user_id: str, pin: str) -> str:
"""
Verifies a user's confirmation PIN.
This method takes a user's ID and a PIN, converts the email to lowercase,
and verifies the PIN via a remote procedure call (RPC).
Args:
user_id (str): The ID of the user whose PIN is to be verified.
pin (str): The confirmation PIN to be verified.
Returns:
str: The response from the RPC call verifying the PIN.
"""
logger.info("Verifying user confirmation pin")
params = {
"userId": user_id,
"pin": pin
}
response = asyncio.run(rpc_call(method_name="verifyUserPin", params=params))
return f"{response}"