| import json | |
| from phi.tools import Toolkit | |
| from phi.utils.log import logger | |
| from src.data_sources.coin_gecko import CoinGecko | |
| from src.data_sources.cryptocompare import CryptoCompare | |
| from src.data_sources.dexscreener import DexScreener | |
| class CryptoDataTools(Toolkit): | |
| def __init__(self): | |
| super().__init__(name="crypto_data_tools") | |
| self.register(self.get_coin_price) | |
| self.register(self.get_coin_data) | |
| self.coingecko = CoinGecko() | |
| self.crypto_compare = CryptoCompare() | |
| self.dex_screener = DexScreener() | |
| def get_coin_price(self, coin_id: str, vs_currency: str = "usd") -> str: | |
| """ | |
| Fetches the price data for a given cryptocurrency coin from CoinGecko and CryptoCompare. | |
| Args: | |
| coin_id (str): The unique identifier for the cryptocurrency coin. | |
| vs_currency (str, optional): The currency to which the coin price will be compared. Defaults to "usd". | |
| Returns: | |
| str: A JSON string containing the price data from CoinGecko and CryptoCompare for the specified coin. | |
| Raises: | |
| Exception: If an error occurs while fetching the price data. | |
| Example: | |
| >>> get_coin_price("bitcoin", "eur") | |
| """ | |
| logger.info(f"Fetching price data for {coin_id} cryptocurrency coin from CoinGecko and CryptoCompare") | |
| try: | |
| coingecko_price = {} | |
| crypto_compare_price = {} | |
| coingecko_price_data = self.coingecko.get_coin_price(ids=[coin_id], vs_currencies=[vs_currency]) | |
| crypto_compare_price_data = self.crypto_compare.get_coin_price(ids=[coin_id], vs_currencies=[vs_currency]) | |
| logger.debug(f"coingecko_price_data: {coingecko_price_data}") | |
| logger.debug(f"crypto_compare_price_data: {crypto_compare_price_data}") | |
| if coin_id in coingecko_price_data: | |
| coingecko_price[coin_id] = coingecko_price_data[coin_id] | |
| else: | |
| logger.warning(f"Warning: CoinGecko data for {coin_id} not found.") | |
| if coin_id.upper() in crypto_compare_price_data: | |
| crypto_compare_price[coin_id] = crypto_compare_price_data[coin_id.upper()] | |
| else: | |
| logger.warning(f"Warning: CryptoCompare data for {coin_id} not found.") | |
| logger.debug(f"response {coingecko_price}") | |
| logger.debug(f"response {crypto_compare_price}") | |
| return f"{(coingecko_price, crypto_compare_price)}" | |
| except Exception as e: | |
| logger.warning(f"Failed to fetch price data for {coin_id}: {e}") | |
| return f"Error: {e}" | |
| def get_coin_data(self, coin_symbol: str) -> str: | |
| """ | |
| Fetches coin data for a given cryptocurrency coin from DexScreener and CryptoCompare. | |
| Args: | |
| coin_symbol (str): The symbol for the cryptocurrency coin. | |
| Returns: | |
| str: A JSON string containing the coin data from DexScreener and CryptoCompare for the specified coin. | |
| Raises: | |
| Exception: If an error occurs while fetching the coin data. | |
| Example: | |
| >>> get_coin_data("BTC") | |
| """ | |
| logger.info(f"Fetching coin data for {coin_symbol} cryptocurrency coin") | |
| try: | |
| dexscreener_coin_data = self.dex_screener.search(query=coin_symbol) | |
| cryptocompare_coin_data = self.crypto_compare.get_overall_coin_data(symbol=coin_symbol) | |
| logger.debug(f"dexscreener_coin_data: {dexscreener_coin_data}") | |
| logger.debug(f"cryptocompare_coin_data: {cryptocompare_coin_data}") | |
| return f"{(dexscreener_coin_data, cryptocompare_coin_data)}" | |
| except Exception as e: | |
| logger.warning(f"Failed to fetch coin data for {coin_symbol}: {e}") | |
| return f"Error: {e}" | |