|
|
import os |
|
|
from dotenv import load_dotenv |
|
|
from src.databases.redis import REDIS_CACHED |
|
|
from src.libs.constants import ONE_HOUR_IN_SECONDS, ONE_MINUTE_IN_SECONDS, ONE_MONTH_IN_SECONDS |
|
|
from src.libs.logger import logger |
|
|
from coingecko import CoinGeckoProClient, CoinGeckoDemoClient |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
redis_cache = REDIS_CACHED |
|
|
|
|
|
class CoinGecko: |
|
|
def __init__(self, pro_api: bool = False) -> None: |
|
|
if pro_api: |
|
|
self.cgc = CoinGeckoProClient(api_key=os.getenv('COINGECKO_PRO_API_KEY')) |
|
|
else: |
|
|
self.cgc = CoinGeckoDemoClient(api_key=os.getenv('COINGECKO_DEMO_API_KEY')) |
|
|
|
|
|
@redis_cache(ttl=ONE_MINUTE_IN_SECONDS) |
|
|
@logger.instrument() |
|
|
def get_coin_price(self, ids: list[str], vs_currencies: list[str], cache_ttl: int = None) -> dict: |
|
|
|
|
|
|
|
|
|
|
|
result = self.cgc.simple.get_price(ids=ids, vs_currencies=vs_currencies) |
|
|
|
|
|
|
|
|
return result |
|
|
|
|
|
@redis_cache(ttl=ONE_HOUR_IN_SECONDS) |
|
|
@logger.instrument() |
|
|
def get_coin_data(self, id: str, cache_ttl: int = None) -> dict: |
|
|
|
|
|
result = self.cgc.coins.get_id(id=id, localization=False, market_data=False, tickers=False, sparkline=False) |
|
|
logger.debug(f"Result: {result}") |
|
|
|
|
|
return result |
|
|
|
|
|
@redis_cache(ttl=ONE_MONTH_IN_SECONDS) |
|
|
@logger.instrument() |
|
|
def get_coin_category_data(self, params: dict = None, cache_ttl: int = None) -> dict: |
|
|
|
|
|
params = params or { |
|
|
"order": "market_cap_desc", |
|
|
} |
|
|
|
|
|
result = self.cgc.categories.get_data(endpoint="coins/categories", params=params) |
|
|
logger.debug(f"Result: {result}") |
|
|
|
|
|
return { "categories": result} |
|
|
|
|
|
@redis_cache(ttl=ONE_MONTH_IN_SECONDS) |
|
|
@logger.instrument() |
|
|
def get_exchanges_list(self, cache_ttl: int = None) -> dict: |
|
|
|
|
|
|
|
|
|
|
|
result = self.cgc.exchanges.get_list() |
|
|
logger.debug(f"Result: {result}") |
|
|
|
|
|
return { 'exchanges': result } |
|
|
|
|
|
@redis_cache(ttl=ONE_HOUR_IN_SECONDS) |
|
|
@logger.instrument() |
|
|
def get_exchange_data(self, id: str = None, cache_ttl: int = None) -> dict: |
|
|
|
|
|
result = self.cgc.exchanges.get_id(id=id) |
|
|
logger.debug(f"Result: {result}") |
|
|
|
|
|
return result |
|
|
|
|
|
|