import asyncio import json import pprint import requests from web3 import Web3 from phi.tools import Toolkit from phi.utils.log import logger from src.libs.web3 import get_web3_instance from src.libs.helper_functions import get_headers, get_private_key from src.libs.token_approval_helper import TokenApprovalHelper from src.libs.rpc_client import rpc_call class CryptoSwapTools(Toolkit): def __init__(self, web3: Web3 = get_web3_instance()): super().__init__(name="swap_tools") # Store Web3 instance self.web3 = web3 # Helper for token approval self.token_approval_helper = TokenApprovalHelper(web3, get_private_key()) # Registering methods to make them accessible via the toolkit self.register(self.get_swap_quote) self.register(self.get_swap_price) self.register(self.get_swap_sources) self.register(self.execute_swap) def get_swap_quote(self, buy_token: str, sell_token: str, sell_amount: str, chain: str = "base") -> str: """ Fetches a swap quote from the 0x Swap API. Args: buy_token (str): The token to buy (e.g., 'DAI'). sell_token (str): The token to sell (e.g., 'ETH'). sell_amount (str): The amount of the sell token to swap, in the smallest unit (e.g., wei for ETH). Returns: dict: A dictionary containing the swap quote details. Example: >>> get_swap_quote('DAI', 'ETH', '1000000000000000000') """ logger.info(f"Fetching swap quote: buying {buy_token} with {sell_token} amount {sell_amount}") try: params = { 'buyToken': buy_token, 'sellToken': sell_token, 'sellAmount': sell_amount, 'wallet': { "userEmail": "none", "chain": chain, "testnet": True, "gasless": True, "connected": True } } response = asyncio.run(rpc_call(method_name="getSwapQuote", params=params)) return f"{response}" except requests.exceptions.RequestException as e: logger.warning(f"Failed to get swap quote: {e}") # return {"error": str(e)} return f"Error: {e}" def get_swap_price(self, buy_token: str, sell_token: str, buy_amount: str, chain: str = "base") -> str: """ Fetches the price for a swap from the 0x Swap API. Args: buy_token (str): The token to buy. sell_token (str): The token to sell. buy_amount (str): The amount of the buy token, in the smallest unit. Returns: dict: A dictionary containing the swap price details. Example: >>> get_swap_price('DAI', 'ETH', '1000000000000000000') """ logger.info(f"Fetching swap price: buying {buy_token} with {sell_token} amount {buy_amount}") try: params = { 'buyToken': buy_token, 'sellToken': sell_token, 'buyAmount': buy_amount, 'wallet': { "userEmail": "none", "chain": chain, "testnet": True, "gasless": True, "connected": True } } response = asyncio.run(rpc_call(method_name="getSwapPrice", params=params)) return f"{response}" except requests.exceptions.RequestException as e: logger.warning(f"Failed to get swap price: {e}") # return {"error": str(e)} return f"Error: {e}" def get_swap_sources(self) -> str: """ Fetches the list of liquidity sources from the 0x Swap API. Returns: dict: A dictionary containing the list of liquidity sources. Example: >>> get_swap_sources() """ logger.info("Fetching swap sources") response = asyncio.run(rpc_call(method_name="getSwapSources")) return f"{response}" def execute_swap( self, buy_token: str, sell_token: str, sell_amount: str, user_email: str, chain: str = "base", ) -> str: """ Executes a swap using the 0x Swap API. this is a sensitive function, show a preview and allow user to give final permission to execute function Args: -buy_token (str): The token to buy (e.g., 'DAI'). -sell_token (str): The token to sell (e.g., 'ETH'). -sell_amount (str): The amount of the sell token to swap, in the smallest unit (e.g., wei for ETH). - user_email (str): The email of the user for whom the wallet is being fetched. - chain (str): The EVM chain for which the wallet is being fetched and used to execute this transaction. use chain base as default Returns: dict: The transaction receipt of the swap transaction. Example: >>> execute_swap('DAI', 'ETH', '1000000000000000000', 'userEmail', 'base') """ logger.info("executing swap") try: params = { 'buyToken': buy_token, 'sellToken': sell_token, 'sellAmount': sell_amount, 'wallet': { "userEmail": user_email, "chain": chain, "testnet": True, "gasless": True, "connected": True } } response = asyncio.run(rpc_call(method_name="swapTokens", params=params)) logger.info(f"Swap transaction receipt: {response}") return f"{response}" except requests.exceptions.RequestException as e: logger.warning(f"Failed to execute swap: {e}") # return {"error": str(e)} return f"Error: {e}"