| from phi.tools import Toolkit |
| from phi.utils.log import logger |
|
|
| from src.data_sources.coin_gecko import CoinGecko |
| from src.data_sources.cryptocompare import CryptoCompare |
| from src.data_sources.dexscreener import DexScreener |
|
|
| class CryptoDataTools(Toolkit): |
| def __init__(self): |
| super().__init__(name="crypto_data_tools") |
| |
| self.register(self.get_coin_price) |
| self.register(self.get_coin_data) |
| self.register(self.get_exchange_data) |
| self.register(self.get_latest_crypto_news_data) |
|
|
| self.coingecko = CoinGecko() |
| self.crypto_compare = CryptoCompare() |
| self.dex_screener = DexScreener() |
|
|
| def get_coin_price(self, coin_id: str, vs_currency: str = "usd") -> str: |
| """ |
| Fetches the price data for a given cryptocurrency coin from CoinGecko and CryptoCompare. |
| |
| Args: |
| coin_id (str): The unique identifier for the cryptocurrency coin. |
| vs_currency (str, optional): The currency to which the coin price will be compared. Defaults to "usd". |
| |
| Returns: |
| str: A JSON string containing the price data from CoinGecko and CryptoCompare for the specified coin. |
| |
| Raises: |
| Exception: If an error occurs while fetching the price data. |
| |
| Example: |
| >>> get_coin_price("bitcoin", "eur") |
| """ |
| logger.info(f"Fetching price data for {coin_id} cryptocurrency coin from CoinGecko and CryptoCompare") |
| try: |
| coingecko_price = {} |
| crypto_compare_price = {} |
|
|
| coingecko_price_data = self.coingecko.get_coin_price(ids=[coin_id], vs_currencies=[vs_currency]) |
| crypto_compare_price_data = self.crypto_compare.get_coin_price(ids=[coin_id], vs_currencies=[vs_currency]) |
|
|
| logger.debug(f"coingecko_price_data: {coingecko_price_data}") |
| logger.debug(f"crypto_compare_price_data: {crypto_compare_price_data}") |
|
|
| if coin_id in coingecko_price_data: |
| coingecko_price[coin_id] = coingecko_price_data[coin_id] |
| else: |
| logger.warning(f"Warning: CoinGecko data for {coin_id} not found.") |
|
|
| if coin_id.upper() in crypto_compare_price_data: |
| crypto_compare_price[coin_id] = crypto_compare_price_data[coin_id.upper()] |
| else: |
| logger.warning(f"Warning: CryptoCompare data for {coin_id} not found.") |
|
|
| logger.debug(f"response {coingecko_price}") |
| logger.debug(f"response {crypto_compare_price}") |
| |
| return f"{(coingecko_price, crypto_compare_price)}" |
| except Exception as e: |
| logger.warning(f"Failed to fetch price data for {coin_id}: {e}") |
| return f"Error: {e}" |
| |
| def get_coin_data(self, coin_symbol: str) -> str: |
| """ |
| Fetches coin data for a given cryptocurrency coin from DexScreener and CryptoCompare. |
| |
| Args: |
| coin_symbol (str): The symbol for the cryptocurrency coin. |
| |
| Returns: |
| str: A JSON string containing the coin data from DexScreener and CryptoCompare for the specified coin. |
| |
| Raises: |
| Exception: If an error occurs while fetching the coin data. |
| |
| Example: |
| >>> get_coin_data("BTC") |
| """ |
| logger.info(f"Fetching coin data for {coin_symbol} cryptocurrency coin") |
| try: |
| dexscreener_coin_data = self.dex_screener.search(query=coin_symbol) |
| cryptocompare_coin_data = self.crypto_compare.get_overall_coin_data(symbol=coin_symbol) |
|
|
| logger.debug(f"dexscreener_coin_data: {dexscreener_coin_data}") |
| logger.debug(f"cryptocompare_coin_data: {cryptocompare_coin_data}") |
|
|
| return f"{(dexscreener_coin_data, cryptocompare_coin_data)}" |
| except Exception as e: |
| logger.warning(f"Failed to fetch coin data for {coin_symbol}: {e}") |
| return f"Error: {e}" |
| |
| def get_exchange_data(self, exchange: str) -> str: |
| """ |
| Fetches exchange data for a given cryptocurrency exchange from CoinGecko and DexScreener. |
| |
| Args: |
| exchange (str): The unique identifier for the cryptocurrency exchange. |
| |
| Returns: |
| str: A JSON string containing the exchange data from CoinGecko and DexScreener for the specified exchange. |
| |
| Raises: |
| Exception: If an error occurs while fetching the exchange data. |
| |
| Example: |
| >>> get_exchange_data("binance") |
| """ |
| logger.info(f"Fetching exchange data for {exchange} cryptocurrency exchange") |
| try: |
| coin_gecko_exchange_data = self.coingecko.get_exchange_data(id=exchange) |
| dexscreener_exchange_data = self.dex_screener.search(query=exchange) |
|
|
| logger.debug(f"coingecko exchange data: {coin_gecko_exchange_data}") |
| logger.debug(f"dexscreener exchange data: {dexscreener_exchange_data}") |
|
|
| return f"{(coin_gecko_exchange_data, dexscreener_exchange_data)}" |
| except Exception as e: |
| logger.warning(f"Failed to fetch exchange data for {exchange}: {e}") |
| return f"Error: {e}" |
| |
| def get_latest_crypto_news_data(self) -> str: |
| """ |
| Fetches the latest cryptocurrency news data from CoinGecko and CryptoCompare. |
| |
| Returns: |
| str: A JSON string containing the latest cryptocurrency news data from CoinGecko and CryptoCompare. |
| |
| Raises: |
| Exception: If an error occurs while fetching the latest cryptocurrency news data. |
| |
| Example: |
| >>> get_latest_crypto_news_data() |
| """ |
| logger.info("Fetching latest cryptocurrency news data") |
| try: |
| coin_gecko_trending_coins = self.coingecko.get_trending_coin_list() |
| cryptocompare_news_categories = self.crypto_compare.get_news_categories() |
| cryptocompare_lates_news_articles = self.crypto_compare.get_latest_news_articles() |
|
|
| logger.debug(f"coingecko trending coins: {coin_gecko_trending_coins}") |
| logger.debug(f"cryptocompare news categories: {cryptocompare_news_categories}") |
| logger.debug(f"cryptocompare latest news articles: {cryptocompare_lates_news_articles}") |
|
|
| return f"{(coin_gecko_trending_coins, cryptocompare_news_categories, cryptocompare_lates_news_articles)}" |
| except Exception as e: |
| logger.warning(f"Failed to fetch latest cryptocurrency news data: {e}") |
| return f"Error: {e}" |
|
|