app / src /tools /crypto_swap_toolkit.py
LONGYKING
fixed swap and bridging
a36d1ff
raw
history blame
5.93 kB
import asyncio
import json
import pprint
import requests
from web3 import Web3
from phi.tools import Toolkit
from phi.utils.log import logger
from src.libs.web3 import get_web3_instance
from src.libs.helper_functions import get_headers, get_private_key
from src.libs.token_approval_helper import TokenApprovalHelper
from src.libs.rpc_client import rpc_call
class CryptoSwapTools(Toolkit):
def __init__(self, web3: Web3 = get_web3_instance()):
super().__init__(name="swap_tools")
# Store Web3 instance
self.web3 = web3
# Helper for token approval
self.token_approval_helper = TokenApprovalHelper(web3, get_private_key())
# Registering methods to make them accessible via the toolkit
self.register(self.get_swap_quote)
self.register(self.get_swap_price)
self.register(self.get_swap_sources)
self.register(self.execute_swap)
def get_swap_quote(self, buy_token: str, sell_token: str, sell_amount: str, chain: str = "base") -> str:
"""
Fetches a swap quote from the 0x Swap API.
Args:
buy_token (str): The token to buy (e.g., 'DAI').
sell_token (str): The token to sell (e.g., 'ETH').
sell_amount (str): The amount of the sell token to swap, in the smallest unit (e.g., wei for ETH).
Returns:
dict: A dictionary containing the swap quote details.
Example:
>>> get_swap_quote('DAI', 'ETH', '1000000000000000000')
"""
logger.info(f"Fetching swap quote: buying {buy_token} with {sell_token} amount {sell_amount}")
try:
params = {
'buyToken': buy_token,
'sellToken': sell_token,
'sellAmount': sell_amount,
'wallet': {
"userEmail": "none",
"chain": chain,
"testnet": True,
"gasless": True,
"connected": True
}
}
response = asyncio.run(rpc_call(method_name="getSwapQuote", params=params))
return f"{response}"
except requests.exceptions.RequestException as e:
logger.warning(f"Failed to get swap quote: {e}")
# return {"error": str(e)}
return f"Error: {e}"
def get_swap_price(self, buy_token: str, sell_token: str, buy_amount: str, chain: str = "base") -> str:
"""
Fetches the price for a swap from the 0x Swap API.
Args:
buy_token (str): The token to buy.
sell_token (str): The token to sell.
buy_amount (str): The amount of the buy token, in the smallest unit.
Returns:
dict: A dictionary containing the swap price details.
Example:
>>> get_swap_price('DAI', 'ETH', '1000000000000000000')
"""
logger.info(f"Fetching swap price: buying {buy_token} with {sell_token} amount {buy_amount}")
try:
params = {
'buyToken': buy_token,
'sellToken': sell_token,
'buyAmount': buy_amount,
'wallet': {
"userEmail": "none",
"chain": chain,
"testnet": True,
"gasless": True,
"connected": True
}
}
response = asyncio.run(rpc_call(method_name="getSwapPrice", params=params))
return f"{response}"
except requests.exceptions.RequestException as e:
logger.warning(f"Failed to get swap price: {e}")
# return {"error": str(e)}
return f"Error: {e}"
def get_swap_sources(self) -> str:
"""
Fetches the list of liquidity sources from the 0x Swap API.
Returns:
dict: A dictionary containing the list of liquidity sources.
Example:
>>> get_swap_sources()
"""
logger.info("Fetching swap sources")
response = asyncio.run(rpc_call(method_name="getSwapSources"))
return f"{response}"
def execute_swap(
self,
buy_token: str,
sell_token: str,
sell_amount: str,
user_email: str,
chain: str = "base",
) -> str:
"""
Executes a swap using the 0x Swap API. this is a sensitive function, show a preview and allow user to give final permission to execute function
Args:
-buy_token (str): The token to buy (e.g., 'DAI').
-sell_token (str): The token to sell (e.g., 'ETH').
-sell_amount (str): The amount of the sell token to swap, in the smallest unit (e.g., wei for ETH).
- user_email (str): The email of the user for whom the wallet is being fetched.
- chain (str): The EVM chain for which the wallet is being fetched and used to execute this transaction. use chain base as default
Returns:
dict: The transaction receipt of the swap transaction.
Example:
>>> execute_swap('DAI', 'ETH', '1000000000000000000', 'userEmail', 'base')
"""
logger.info("executing swap")
try:
params = {
'buyToken': buy_token,
'sellToken': sell_token,
'sellAmount': sell_amount,
'wallet': {
"userEmail": user_email,
"chain": chain,
"testnet": True,
"gasless": True,
"connected": True
}
}
response = asyncio.run(rpc_call(method_name="swapTokens", params=params))
logger.info(f"Swap transaction receipt: {response}")
return f"{response}"
except requests.exceptions.RequestException as e:
logger.warning(f"Failed to execute swap: {e}")
# return {"error": str(e)}
return f"Error: {e}"