import asyncio import json import pprint import requests from web3 import Web3 from phi.tools import Toolkit from phi.utils.log import logger from src.libs.web3 import get_web3_instance from src.libs.helper_functions import get_headers, get_private_key from src.libs.token_approval_helper import TokenApprovalHelper from src.libs.rpc_client import rpc_call class CryptoSwapTools(Toolkit): def __init__(self, web3: Web3 = get_web3_instance()): super().__init__(name="swap_tools") # Store Web3 instance self.web3 = web3 # Helper for token approval self.token_approval_helper = TokenApprovalHelper(web3, get_private_key()) # Registering methods to make them accessible via the toolkit self.register(self.get_swap_quote) self.register(self.get_swap_price) self.register(self.get_swap_sources) self.register(self.execute_swap) def get_swap_quote(self, buy_token: str, sell_token: str, sell_amount: str) -> str: """ Fetches a swap quote from the 0x Swap API. Args: buy_token (str): The token to buy (e.g., 'DAI'). sell_token (str): The token to sell (e.g., 'ETH'). sell_amount (str): The amount of the sell token to swap, in the smallest unit (e.g., wei for ETH). Returns: dict: A dictionary containing the swap quote details. Example: >>> get_swap_quote('DAI', 'ETH', '1000000000000000000') """ logger.info(f"Fetching swap quote: buying {buy_token} with {sell_token} amount {sell_amount}") try: params = { 'buyToken': buy_token, 'sellToken': sell_token, 'sellAmount': sell_amount } response = asyncio.run(rpc_call(method_name="getSwapQuote", params=params)) return f"{response}" except requests.exceptions.RequestException as e: logger.warning(f"Failed to get swap quote: {e}") # return {"error": str(e)} return f"Error: {e}" def get_swap_price(self, buy_token: str, sell_token: str, buy_amount: str) -> str: """ Fetches the price for a swap from the 0x Swap API. Args: buy_token (str): The token to buy. sell_token (str): The token to sell. buy_amount (str): The amount of the buy token, in the smallest unit. Returns: dict: A dictionary containing the swap price details. Example: >>> get_swap_price('DAI', 'ETH', '1000000000000000000') """ logger.info(f"Fetching swap price: buying {buy_token} with {sell_token} amount {buy_amount}") try: params = { 'buyToken': buy_token, 'sellToken': sell_token, 'buyAmount': buy_amount } response = asyncio.run(rpc_call(method_name="getSwapPrice", params=params)) return f"{response}" except requests.exceptions.RequestException as e: logger.warning(f"Failed to get swap price: {e}") # return {"error": str(e)} return f"Error: {e}" def get_swap_sources(self) -> str: """ Fetches the list of liquidity sources from the 0x Swap API. Returns: dict: A dictionary containing the list of liquidity sources. Example: >>> get_swap_sources() """ logger.info("Fetching swap sources") response = asyncio.run(rpc_call(method_name="getSwapSources")) return f"{response}" def execute_swap(self, buy_token: str, sell_token: str, sell_amount: str, eth_address: str) -> str: """ Executes a swap using the 0x Swap API. Args: buy_token (str): The token to buy (e.g., 'DAI'). sell_token (str): The token to sell (e.g., 'ETH'). sell_amount (str): The amount of the sell token to swap, in the smallest unit (e.g., wei for ETH). eth_address (str): The Ethereum address of the user executing the swap. Returns: dict: The transaction receipt of the swap transaction. Example: >>> execute_swap('DAI', 'ETH', '1000000000000000000', '0xYourEthereumAddress') """ # Get the swap quote quote = json.loads(self.get_swap_quote(buy_token, sell_token, sell_amount)) logger.info(f"Swap quote: {quote}") if 'error' in quote: # return {"error": "Failed to get swap quote"} return f"Error: Failed to get swap quote" # Approve the token if needed (skip if selling ETH) if sell_token != 'ETH': approval_receipt = self.token_approval_helper.approve_token(sell_token, quote['allowanceTarget'], sell_amount, eth_address) logger.info(f"Approval receipt: {approval_receipt}") if 'status' not in approval_receipt or approval_receipt['status'] != 1: # return {"error": "Token approval failed"} return f"Error: Token approval failed" # Execute the swap try: swap_tx = { 'from': eth_address, 'to': quote['to'], 'data': quote['data'], 'value': int(quote['value']), 'gas': 200000, 'gasPrice': self.web3.to_wei('20', 'gwei'), 'nonce': self.web3.eth.get_transaction_count(eth_address) } signed_swap_tx = self.web3.eth.account.signTransaction(swap_tx, private_key=get_private_key()) tx_hash = self.web3.eth.send_raw_transaction(signed_swap_tx.rawTransaction) receipt = self.web3.eth.wait_for_transaction_receipt(tx_hash) logger.info(f"Swap transaction receipt: {receipt}") # return receipt return f"{(receipt)}" except Exception as e: logger.warning(f"Failed to execute swap: {e}") # return {"error": str(e)} return f"Error: {e}"