Spaces:
Sleeping
Sleeping
| from typing import Dict, Any, Optional | |
| from pydantic import BaseModel, PrivateAttr | |
| from src.tools.base_tool import BaseWeb3Tool, Web3ToolInput | |
| from src.utils.config import config | |
| from src.utils.logger import get_logger | |
| from src.utils.cache_manager import cache_manager | |
| logger = get_logger(__name__) | |
| class CoinGeckoTool(BaseWeb3Tool): | |
| name: str = "coingecko_data" | |
| description: str = """Get cryptocurrency price, volume, market cap and trend data from CoinGecko.""" | |
| args_schema: type[BaseModel] = Web3ToolInput | |
| _base_url: str = PrivateAttr(default="https://api.coingecko.com/api/v3") | |
| _symbol_map: Dict[str, str] = PrivateAttr(default_factory=lambda: { | |
| "btc": "bitcoin", "eth": "ethereum", "sol": "solana", "ada": "cardano", | |
| "dot": "polkadot", "bnb": "binancecoin", "usdc": "usd-coin", | |
| "usdt": "tether", "xrp": "ripple", "avax": "avalanche-2", | |
| "link": "chainlink", "matic": "matic-network", "uni": "uniswap" | |
| }) | |
| def __init__(self): | |
| super().__init__() | |
| async def _arun(self, query: str, filters: Optional[Dict[str, Any]] = None, **kwargs) -> str: | |
| filters = filters or {} | |
| try: | |
| # Check cache first | |
| cache_key = f"coingecko_{filters.get('type', 'coin')}_{query}_{str(filters)}" | |
| cached_result = cache_manager.get(cache_key) | |
| if cached_result: | |
| logger.info(f"Cache hit for {cache_key}") | |
| return cached_result | |
| result = None | |
| t = filters.get("type") | |
| if t == "trending": | |
| result = await self._get_trending() | |
| elif t == "market_overview": | |
| result = await self._get_market_overview() | |
| elif t == "price_history": | |
| days = int(filters.get("days", 30)) | |
| result = await self._get_price_history(query, days) | |
| else: | |
| result = await self._get_coin_data(query) | |
| # Cache successful results | |
| if result and not result.startswith("β οΈ"): | |
| cache_manager.set(cache_key, result, ttl=300) | |
| return result | |
| except Exception as e: | |
| logger.error(f"CoinGecko error: {e}") | |
| return f"β οΈ CoinGecko service temporarily unavailable: {str(e)}" | |
| async def _get_trending(self) -> str: | |
| data = await self.make_request(f"{self._base_url}/search/trending") | |
| coins = data.get("coins", [])[:5] | |
| out = "π₯ **Trending Cryptocurrencies:**\n\n" | |
| for i, c in enumerate(coins, 1): | |
| item = c.get("item", {}) | |
| out += f"{i}. **{item.get('name','?')} ({item.get('symbol','?').upper()})** β Rank #{item.get('market_cap_rank','?')}\n" | |
| return out | |
| async def _get_market_overview(self) -> str: | |
| try: | |
| params = { | |
| "vs_currency": "usd", | |
| "order": "market_cap_desc", | |
| "per_page": 10, | |
| "page": 1 | |
| } | |
| data = await self.make_request(f"{self._base_url}/coins/markets", params=params) | |
| if not data or not isinstance(data, list): | |
| return "β οΈ Market overview data temporarily unavailable" | |
| if len(data) == 0: | |
| return "β No market data available" | |
| result = "π **Top Cryptocurrencies by Market Cap:**\n\n" | |
| for coin in data[:10]: # Ensure max 10 | |
| try: | |
| name = coin.get("name", "Unknown") | |
| symbol = coin.get("symbol", "?").upper() | |
| price = coin.get("current_price", 0) | |
| change_24h = coin.get("price_change_percentage_24h", 0) | |
| market_cap = coin.get("market_cap", 0) | |
| # Handle missing or invalid data | |
| if price is None or price <= 0: | |
| continue | |
| emoji = "π" if change_24h >= 0 else "π" | |
| mcap_formatted = f"${market_cap/1e9:.2f}B" if market_cap > 0 else "N/A" | |
| result += f"{emoji} **{name} ({symbol})**: ${price:,.4f} ({change_24h:+.2f}%) | MCap: {mcap_formatted}\n" | |
| except (TypeError, KeyError, ValueError) as e: | |
| logger.warning(f"Skipping invalid coin data: {e}") | |
| continue | |
| return result | |
| except Exception as e: | |
| logger.error(f"Market overview error: {e}") | |
| return "β οΈ Market overview temporarily unavailable" | |
| async def _get_coin_data(self, query: str) -> str: | |
| if not query or not query.strip(): | |
| return "β Please provide a cryptocurrency symbol or name" | |
| coin_id = self._symbol_map.get(query.lower(), query.lower()) | |
| params = { | |
| "ids": coin_id, | |
| "vs_currencies": "usd", | |
| "include_24hr_change": "true", | |
| "include_24hr_vol": "true", | |
| "include_market_cap": "true" | |
| } | |
| try: | |
| data = await self.make_request(f"{self._base_url}/simple/price", params=params) | |
| if not data or coin_id not in data: | |
| # Try alternative search if direct lookup fails | |
| search_data = await self._search_coin(query) | |
| if search_data: | |
| return search_data | |
| return f"β No data found for '{query}'. Try using full name or common symbols like BTC, ETH, SOL" | |
| coin_data = data[coin_id] | |
| # Validate required fields | |
| if "usd" not in coin_data: | |
| return f"β Price data unavailable for {query.upper()}" | |
| price = coin_data.get("usd", 0) | |
| change_24h = coin_data.get("usd_24h_change", 0) | |
| volume_24h = coin_data.get("usd_24h_vol", 0) | |
| market_cap = coin_data.get("usd_market_cap", 0) | |
| # Handle edge cases | |
| if price <= 0: | |
| return f"β οΈ {query.upper()} price data appears invalid" | |
| emoji = "π" if change_24h >= 0 else "π" | |
| result = f"π° **{query.upper()} Market Data:**\n\n" | |
| result += f"{emoji} **Price**: ${price:,.4f}\n" | |
| result += f"π **24h Change**: {change_24h:+.2f}%\n" | |
| if volume_24h > 0: | |
| result += f"π **24h Volume**: ${volume_24h:,.0f}\n" | |
| else: | |
| result += f"π **24h Volume**: Data unavailable\n" | |
| if market_cap > 0: | |
| result += f"π¦ **Market Cap**: ${market_cap:,.0f}\n" | |
| else: | |
| result += f"π¦ **Market Cap**: Data unavailable\n" | |
| return result | |
| except Exception as e: | |
| logger.error(f"Error fetching coin data for {query}: {e}") | |
| return f"β οΈ Unable to fetch data for {query.upper()}. Please try again later." | |
| async def _search_coin(self, query: str) -> Optional[str]: | |
| """Fallback search when direct ID lookup fails""" | |
| try: | |
| search_params = {"query": query} | |
| search_data = await self.make_request(f"{self._base_url}/search", params=search_params) | |
| coins = search_data.get("coins", []) | |
| if coins: | |
| coin = coins[0] # Take first match | |
| coin_id = coin.get("id") | |
| if coin_id: | |
| return await self._get_coin_data(coin_id) | |
| return None | |
| except Exception: | |
| return None | |
| async def _get_price_history(self, symbol: str, days: int) -> str: | |
| coin_id = self._symbol_map.get(symbol.lower(), symbol.lower()) | |
| params = {"vs_currency": "usd", "days": days} | |
| data = await self.make_request(f"{self._base_url}/coins/{coin_id}/market_chart", params=params) | |
| # you can format this as you like; hereβs a simple JSON dump | |
| return { | |
| "symbol": symbol.upper(), | |
| "prices": data.get("prices", []), | |
| "volumes": data.get("total_volumes", []), | |
| "market_caps": data.get("market_caps", []) | |
| } | |