|
|
import asyncio |
|
|
import json |
|
|
import pprint |
|
|
import requests |
|
|
from web3 import Web3 |
|
|
from phi.tools import Toolkit |
|
|
from phi.utils.log import logger |
|
|
from src.libs.web3 import get_web3_instance |
|
|
from src.libs.helper_functions import get_headers, get_private_key |
|
|
from src.libs.token_approval_helper import TokenApprovalHelper |
|
|
from src.libs.rpc_client import rpc_call |
|
|
|
|
|
class CryptoSwapTools(Toolkit): |
|
|
def __init__(self, web3: Web3 = get_web3_instance()): |
|
|
super().__init__(name="swap_tools") |
|
|
|
|
|
|
|
|
self.web3 = web3 |
|
|
|
|
|
|
|
|
self.token_approval_helper = TokenApprovalHelper(web3, get_private_key()) |
|
|
|
|
|
|
|
|
self.register(self.get_swap_quote) |
|
|
self.register(self.get_swap_price) |
|
|
self.register(self.get_swap_sources) |
|
|
self.register(self.execute_swap) |
|
|
|
|
|
def get_swap_quote(self, buy_token: str, sell_token: str, sell_amount: str) -> str: |
|
|
""" |
|
|
Fetches a swap quote from the 0x Swap API. |
|
|
|
|
|
Args: |
|
|
buy_token (str): The token to buy (e.g., 'DAI'). |
|
|
sell_token (str): The token to sell (e.g., 'ETH'). |
|
|
sell_amount (str): The amount of the sell token to swap, in the smallest unit (e.g., wei for ETH). |
|
|
|
|
|
Returns: |
|
|
dict: A dictionary containing the swap quote details. |
|
|
|
|
|
Example: |
|
|
>>> get_swap_quote('DAI', 'ETH', '1000000000000000000') |
|
|
""" |
|
|
logger.info(f"Fetching swap quote: buying {buy_token} with {sell_token} amount {sell_amount}") |
|
|
try: |
|
|
params = { |
|
|
'buyToken': buy_token, |
|
|
'sellToken': sell_token, |
|
|
'sellAmount': sell_amount |
|
|
} |
|
|
response = asyncio.run(rpc_call(method_name="getSwapQuote", params=params)) |
|
|
return f"{response}" |
|
|
except requests.exceptions.RequestException as e: |
|
|
logger.warning(f"Failed to get swap quote: {e}") |
|
|
|
|
|
return f"Error: {e}" |
|
|
|
|
|
def get_swap_price(self, buy_token: str, sell_token: str, buy_amount: str) -> str: |
|
|
""" |
|
|
Fetches the price for a swap from the 0x Swap API. |
|
|
|
|
|
Args: |
|
|
buy_token (str): The token to buy. |
|
|
sell_token (str): The token to sell. |
|
|
buy_amount (str): The amount of the buy token, in the smallest unit. |
|
|
|
|
|
Returns: |
|
|
dict: A dictionary containing the swap price details. |
|
|
|
|
|
Example: |
|
|
>>> get_swap_price('DAI', 'ETH', '1000000000000000000') |
|
|
""" |
|
|
logger.info(f"Fetching swap price: buying {buy_token} with {sell_token} amount {buy_amount}") |
|
|
try: |
|
|
params = { |
|
|
'buyToken': buy_token, |
|
|
'sellToken': sell_token, |
|
|
'buyAmount': buy_amount |
|
|
} |
|
|
response = asyncio.run(rpc_call(method_name="getSwapPrice", params=params)) |
|
|
return f"{response}" |
|
|
except requests.exceptions.RequestException as e: |
|
|
logger.warning(f"Failed to get swap price: {e}") |
|
|
|
|
|
return f"Error: {e}" |
|
|
|
|
|
def get_swap_sources(self) -> str: |
|
|
""" |
|
|
Fetches the list of liquidity sources from the 0x Swap API. |
|
|
|
|
|
Returns: |
|
|
dict: A dictionary containing the list of liquidity sources. |
|
|
|
|
|
Example: |
|
|
>>> get_swap_sources() |
|
|
""" |
|
|
logger.info("Fetching swap sources") |
|
|
|
|
|
response = asyncio.run(rpc_call(method_name="getSwapSources")) |
|
|
return f"{response}" |
|
|
|
|
|
def execute_swap(self, buy_token: str, sell_token: str, sell_amount: str, eth_address: str) -> str: |
|
|
""" |
|
|
Executes a swap using the 0x Swap API. |
|
|
|
|
|
Args: |
|
|
buy_token (str): The token to buy (e.g., 'DAI'). |
|
|
sell_token (str): The token to sell (e.g., 'ETH'). |
|
|
sell_amount (str): The amount of the sell token to swap, in the smallest unit (e.g., wei for ETH). |
|
|
eth_address (str): The Ethereum address of the user executing the swap. |
|
|
|
|
|
Returns: |
|
|
dict: The transaction receipt of the swap transaction. |
|
|
|
|
|
Example: |
|
|
>>> execute_swap('DAI', 'ETH', '1000000000000000000', '0xYourEthereumAddress') |
|
|
""" |
|
|
|
|
|
quote = json.loads(self.get_swap_quote(buy_token, sell_token, sell_amount)) |
|
|
logger.info(f"Swap quote: {quote}") |
|
|
|
|
|
if 'error' in quote: |
|
|
|
|
|
return f"Error: Failed to get swap quote" |
|
|
|
|
|
|
|
|
if sell_token != 'ETH': |
|
|
approval_receipt = self.token_approval_helper.approve_token(sell_token, quote['allowanceTarget'], |
|
|
sell_amount, eth_address) |
|
|
logger.info(f"Approval receipt: {approval_receipt}") |
|
|
|
|
|
if 'status' not in approval_receipt or approval_receipt['status'] != 1: |
|
|
|
|
|
return f"Error: Token approval failed" |
|
|
|
|
|
|
|
|
try: |
|
|
swap_tx = { |
|
|
'from': eth_address, |
|
|
'to': quote['to'], |
|
|
'data': quote['data'], |
|
|
'value': int(quote['value']), |
|
|
'gas': 200000, |
|
|
'gasPrice': self.web3.to_wei('20', 'gwei'), |
|
|
'nonce': self.web3.eth.get_transaction_count(eth_address) |
|
|
} |
|
|
signed_swap_tx = self.web3.eth.account.signTransaction(swap_tx, private_key=get_private_key()) |
|
|
tx_hash = self.web3.eth.send_raw_transaction(signed_swap_tx.rawTransaction) |
|
|
receipt = self.web3.eth.wait_for_transaction_receipt(tx_hash) |
|
|
logger.info(f"Swap transaction receipt: {receipt}") |
|
|
|
|
|
return f"{(receipt)}" |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to execute swap: {e}") |
|
|
|
|
|
return f"Error: {e}" |
|
|
|