app / src /tools /crypto_swap_toolkit.py
LONGYKING
fixed swap and bridging
a36d1ff
import asyncio
import json
import pprint
import requests
from web3 import Web3
from phi.tools import Toolkit
from phi.utils.log import logger
from src.libs.web3 import get_web3_instance
from src.libs.helper_functions import get_headers, get_private_key
from src.libs.token_approval_helper import TokenApprovalHelper
from src.libs.rpc_client import rpc_call
class CryptoSwapTools(Toolkit):
def __init__(self, web3: Web3 = get_web3_instance()):
super().__init__(name="swap_tools")
# Store Web3 instance
self.web3 = web3
# Helper for token approval
self.token_approval_helper = TokenApprovalHelper(web3, get_private_key())
# Registering methods to make them accessible via the toolkit
self.register(self.get_swap_quote)
self.register(self.get_swap_price)
self.register(self.get_swap_sources)
self.register(self.execute_swap)
def get_swap_quote(self, buy_token: str, sell_token: str, sell_amount: str, chain: str = "base") -> str:
"""
Fetches a swap quote from the 0x Swap API.
Args:
buy_token (str): The token to buy (e.g., 'DAI').
sell_token (str): The token to sell (e.g., 'ETH').
sell_amount (str): The amount of the sell token to swap, in the smallest unit (e.g., wei for ETH).
Returns:
dict: A dictionary containing the swap quote details.
Example:
>>> get_swap_quote('DAI', 'ETH', '1000000000000000000')
"""
logger.info(f"Fetching swap quote: buying {buy_token} with {sell_token} amount {sell_amount}")
try:
params = {
'buyToken': buy_token,
'sellToken': sell_token,
'sellAmount': sell_amount,
'wallet': {
"userEmail": "none",
"chain": chain,
"testnet": True,
"gasless": True,
"connected": True
}
}
response = asyncio.run(rpc_call(method_name="getSwapQuote", params=params))
return f"{response}"
except requests.exceptions.RequestException as e:
logger.warning(f"Failed to get swap quote: {e}")
# return {"error": str(e)}
return f"Error: {e}"
def get_swap_price(self, buy_token: str, sell_token: str, buy_amount: str, chain: str = "base") -> str:
"""
Fetches the price for a swap from the 0x Swap API.
Args:
buy_token (str): The token to buy.
sell_token (str): The token to sell.
buy_amount (str): The amount of the buy token, in the smallest unit.
Returns:
dict: A dictionary containing the swap price details.
Example:
>>> get_swap_price('DAI', 'ETH', '1000000000000000000')
"""
logger.info(f"Fetching swap price: buying {buy_token} with {sell_token} amount {buy_amount}")
try:
params = {
'buyToken': buy_token,
'sellToken': sell_token,
'buyAmount': buy_amount,
'wallet': {
"userEmail": "none",
"chain": chain,
"testnet": True,
"gasless": True,
"connected": True
}
}
response = asyncio.run(rpc_call(method_name="getSwapPrice", params=params))
return f"{response}"
except requests.exceptions.RequestException as e:
logger.warning(f"Failed to get swap price: {e}")
# return {"error": str(e)}
return f"Error: {e}"
def get_swap_sources(self) -> str:
"""
Fetches the list of liquidity sources from the 0x Swap API.
Returns:
dict: A dictionary containing the list of liquidity sources.
Example:
>>> get_swap_sources()
"""
logger.info("Fetching swap sources")
response = asyncio.run(rpc_call(method_name="getSwapSources"))
return f"{response}"
def execute_swap(
self,
buy_token: str,
sell_token: str,
sell_amount: str,
user_email: str,
chain: str = "base",
) -> str:
"""
Executes a swap using the 0x Swap API. this is a sensitive function, show a preview and allow user to give final permission to execute function
Args:
-buy_token (str): The token to buy (e.g., 'DAI').
-sell_token (str): The token to sell (e.g., 'ETH').
-sell_amount (str): The amount of the sell token to swap, in the smallest unit (e.g., wei for ETH).
- user_email (str): The email of the user for whom the wallet is being fetched.
- chain (str): The EVM chain for which the wallet is being fetched and used to execute this transaction. use chain base as default
Returns:
dict: The transaction receipt of the swap transaction.
Example:
>>> execute_swap('DAI', 'ETH', '1000000000000000000', 'userEmail', 'base')
"""
logger.info("executing swap")
try:
params = {
'buyToken': buy_token,
'sellToken': sell_token,
'sellAmount': sell_amount,
'wallet': {
"userEmail": user_email,
"chain": chain,
"testnet": True,
"gasless": True,
"connected": True
}
}
response = asyncio.run(rpc_call(method_name="swapTokens", params=params))
logger.info(f"Swap transaction receipt: {response}")
return f"{response}"
except requests.exceptions.RequestException as e:
logger.warning(f"Failed to execute swap: {e}")
# return {"error": str(e)}
return f"Error: {e}"