File size: 2,494 Bytes
d068566 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
import json
from phi.tools import Toolkit
from phi.utils.log import logger
from src.data_sources.coin_gecko import CoinGecko
from src.data_sources.cryptocompare import CryptoCompare
class CryptoDataTools(Toolkit):
def __init__(self):
super().__init__(name="crypto_data_tools")
self.register(self.get_coin_price)
self.coingecko = CoinGecko()
self.crypto_compare = CryptoCompare()
def get_coin_price(self, coin_id: str, vs_currency: str = "usd") -> str:
"""
Fetches the price data for a given cryptocurrency coin from CoinGecko and CryptoCompare.
Args:
coin_id (str): The unique identifier for the cryptocurrency coin.
vs_currency (str, optional): The currency to which the coin price will be compared. Defaults to "usd".
Returns:
str: A JSON string containing the price data from CoinGecko and CryptoCompare for the specified coin.
Raises:
Exception: If an error occurs while fetching the price data.
Example:
>>> get_coin_price("bitcoin", "eur")
"""
logger.info(f"Fetching price data for {coin_id} cryptocurrency coin from CoinGecko and CryptoCompare")
try:
coingecko_price = {}
crypto_compare_price = {}
coingecko_price_data = self.coingecko.get_coin_price(ids=[coin_id], vs_currencies=[vs_currency])
crypto_compare_price_data = self.crypto_compare.get_coin_price(ids=[coin_id], vs_currencies=[vs_currency])
logger.debug(f"coingecko_price_data: {coingecko_price_data}")
logger.debug(f"crypto_compare_price_data: {crypto_compare_price_data}")
if coin_id in coingecko_price_data:
coingecko_price[coin_id] = coingecko_price_data[coin_id]
else:
logger.warning(f"Warning: CoinGecko data for {coin_id} not found.")
if coin_id.upper() in crypto_compare_price_data:
crypto_compare_price[coin_id] = crypto_compare_price_data[coin_id.upper()]
else:
logger.warning(f"Warning: CryptoCompare data for {coin_id} not found.")
logger.debug(f"respons {coingecko_price}")
logger.debug(f"respons {crypto_compare_price}")
return f"{(coingecko_price, crypto_compare_price)}"
except Exception as e:
logger.warning(f"Failed to fetch price data for {coin_id}: {e}")
return f"Error: {e}"
|