Spaces:
Sleeping
Sleeping
| from langchain.tools import BaseTool | |
| from pydantic import BaseModel, Field | |
| from typing import Dict, Any, List, Optional | |
| import json | |
| import asyncio | |
| from src.utils.logger import get_logger | |
| logger = get_logger(__name__) | |
| class ChartDataInput(BaseModel): | |
| """Input schema for chart data requests""" | |
| chart_type: str = Field(description="Chart type: price_chart, market_overview, defi_tvl, portfolio_pie, gas_tracker") | |
| symbol: Optional[str] = Field(default=None, description="Asset symbol (e.g., bitcoin, ethereum)") | |
| timeframe: Optional[str] = Field(default="30d", description="Time range: 1d, 7d, 30d, 90d, 365d") | |
| protocols: Optional[List[str]] = Field(default=None, description="DeFi protocol names") | |
| network: Optional[str] = Field(default="ethereum", description="Blockchain network") | |
| class ChartDataTool(BaseTool): | |
| """ | |
| Chart Data Provider Tool | |
| This tool provides structured data that can be used to create charts. | |
| Instead of returning HTML, it returns clean JSON data for visualization. | |
| """ | |
| name: str = "chart_data_provider" | |
| description: str = """Provides structured data for creating cryptocurrency charts. | |
| Returns JSON data in this format: | |
| {{ | |
| "chart_type": "price_chart|market_overview|defi_tvl|portfolio_pie|gas_tracker", | |
| "data": {{...}}, | |
| "config": {{...}} | |
| }} | |
| Chart types: | |
| - price_chart: Bitcoin/crypto price and volume data | |
| - market_overview: Top cryptocurrencies market data | |
| - defi_tvl: DeFi protocol TVL comparison | |
| - portfolio_pie: Portfolio allocation breakdown | |
| - gas_tracker: Gas fees across networks | |
| """ | |
| args_schema: type[ChartDataInput] = ChartDataInput | |
| def _run(self, chart_type: str, symbol: str = None, timeframe: str = "30d", | |
| protocols: List[str] = None, network: str = "ethereum") -> str: | |
| """Synchronous execution""" | |
| return asyncio.run(self._arun(chart_type, symbol, timeframe, protocols, network)) | |
| async def _arun(self, chart_type: str, symbol: str = None, timeframe: str = "30d", | |
| protocols: List[str] = None, network: str = "ethereum") -> str: | |
| """Provide chart data based on request""" | |
| try: | |
| logger.info(f"Providing {chart_type} data for {symbol or 'general'}") | |
| # Convert timeframe to days | |
| days = self._parse_timeframe(timeframe) | |
| if chart_type == "price_chart": | |
| return await self._get_price_chart_data(symbol or "bitcoin", days) | |
| elif chart_type == "market_overview": | |
| return await self._get_market_overview_data() | |
| elif chart_type == "defi_tvl": | |
| return await self._get_defi_tvl_data(protocols or ["uniswap", "aave", "compound"]) | |
| elif chart_type == "portfolio_pie": | |
| return await self._get_portfolio_data() | |
| elif chart_type == "gas_tracker": | |
| return await self._get_gas_data(network) | |
| else: | |
| return json.dumps({ | |
| "chart_type": "error", | |
| "error": f"Unknown chart type: {chart_type}", | |
| "available_types": ["price_chart", "market_overview", "defi_tvl", "portfolio_pie", "gas_tracker"] | |
| }) | |
| except Exception as e: | |
| logger.error(f"Chart data error: {e}") | |
| return json.dumps({ | |
| "chart_type": "error", | |
| "error": str(e), | |
| "message": "Failed to generate chart data" | |
| }) | |
| async def _get_price_chart_data(self, symbol: str, days: int) -> str: | |
| """Get real price chart data from CoinGecko API""" | |
| try: | |
| # Import the CoinGecko tool to get real data | |
| from src.tools.coingecko_tool import CoinGeckoTool | |
| coingecko = CoinGeckoTool() | |
| # Map common symbols to CoinGecko IDs | |
| symbol_map = { | |
| "btc": "bitcoin", "bitcoin": "bitcoin", | |
| "eth": "ethereum", "ethereum": "ethereum", | |
| "sol": "solana", "solana": "solana", | |
| "ada": "cardano", "cardano": "cardano", | |
| "bnb": "binancecoin", "binance": "binancecoin", | |
| "matic": "matic-network", "polygon": "matic-network", | |
| "avax": "avalanche-2", "avalanche": "avalanche-2", | |
| "dot": "polkadot", "polkadot": "polkadot", | |
| "link": "chainlink", "chainlink": "chainlink", | |
| "uni": "uniswap", "uniswap": "uniswap" | |
| } | |
| coin_id = symbol_map.get(symbol.lower(), symbol.lower()) | |
| # Get price history from CoinGecko | |
| url = f"https://api.coingecko.com/api/v3/coins/{coin_id}/market_chart" | |
| params = {"vs_currency": "usd", "days": days, "interval": "daily" if days > 90 else "hourly"} | |
| data = await coingecko.make_request(url, params=params) | |
| if not data or "prices" not in data: | |
| # Fallback to mock data if API fails | |
| logger.warning(f"CoinGecko API failed for {symbol}, using fallback data") | |
| return await self._get_mock_price_data(symbol, days) | |
| # Format the real data | |
| price_data = data.get("prices", []) | |
| volume_data = data.get("total_volumes", []) | |
| # Get current coin info | |
| coin_info = await coingecko.make_request(f"https://api.coingecko.com/api/v3/coins/{coin_id}") | |
| coin_name = coin_info.get("name", symbol.title()) if coin_info else symbol.title() | |
| return json.dumps({ | |
| "chart_type": "price_chart", | |
| "data": { | |
| "prices": price_data, | |
| "total_volumes": volume_data, | |
| "symbol": symbol.upper(), | |
| "name": coin_name | |
| }, | |
| "config": { | |
| "title": f"{coin_name} Price Analysis ({days} days)", | |
| "timeframe": f"{days}d", | |
| "currency": "USD" | |
| } | |
| }) | |
| except Exception as e: | |
| logger.error(f"Real price data failed: {e}") | |
| return await self._get_mock_price_data(symbol, days) | |
| async def _get_mock_price_data(self, symbol: str, days: int) -> str: | |
| """Fallback mock price data""" | |
| import time | |
| import random | |
| base_price = 35000 if symbol.lower() == "bitcoin" else 1800 if symbol.lower() == "ethereum" else 100 | |
| base_timestamp = int(time.time() * 1000) - (days * 24 * 60 * 60 * 1000) | |
| price_data = [] | |
| volume_data = [] | |
| for i in range(days): | |
| timestamp = base_timestamp + (i * 24 * 60 * 60 * 1000) | |
| price_change = random.uniform(-0.05, 0.05) | |
| price = base_price * (1 + price_change * i / days) | |
| price += random.uniform(-price*0.02, price*0.02) | |
| volume = random.uniform(1000000000, 5000000000) | |
| price_data.append([timestamp, round(price, 2)]) | |
| volume_data.append([timestamp, int(volume)]) | |
| return json.dumps({ | |
| "chart_type": "price_chart", | |
| "data": { | |
| "prices": price_data, | |
| "total_volumes": volume_data, | |
| "symbol": symbol.upper(), | |
| "name": symbol.title() | |
| }, | |
| "config": { | |
| "title": f"{symbol.title()} Price Analysis ({days} days)", | |
| "timeframe": f"{days}d", | |
| "currency": "USD" | |
| } | |
| }) | |
| async def _get_market_overview_data(self) -> str: | |
| """Get real market overview data from CoinGecko API""" | |
| try: | |
| from src.tools.coingecko_tool import CoinGeckoTool | |
| coingecko = CoinGeckoTool() | |
| # Get top market cap coins | |
| url = "https://api.coingecko.com/api/v3/coins/markets" | |
| params = { | |
| "vs_currency": "usd", | |
| "order": "market_cap_desc", | |
| "per_page": 10, | |
| "page": 1, | |
| "sparkline": False | |
| } | |
| data = await coingecko.make_request(url, params=params) | |
| if not data: | |
| logger.warning("CoinGecko market data failed, using fallback") | |
| return await self._get_mock_market_data() | |
| # Format real market data | |
| coins = [] | |
| for coin in data[:10]: | |
| coins.append({ | |
| "name": coin.get("name", "Unknown"), | |
| "symbol": coin.get("symbol", "").upper(), | |
| "current_price": coin.get("current_price", 0), | |
| "market_cap_rank": coin.get("market_cap_rank", 0), | |
| "price_change_percentage_24h": coin.get("price_change_percentage_24h", 0), | |
| "market_cap": coin.get("market_cap", 0), | |
| "total_volume": coin.get("total_volume", 0) | |
| }) | |
| return json.dumps({ | |
| "chart_type": "market_overview", | |
| "data": {"coins": coins}, | |
| "config": { | |
| "title": "Top Cryptocurrencies Market Overview", | |
| "currency": "USD" | |
| } | |
| }) | |
| except Exception as e: | |
| logger.error(f"Market overview API failed: {e}") | |
| return await self._get_mock_market_data() | |
| async def _get_mock_market_data(self) -> str: | |
| """Fallback mock market data""" | |
| return json.dumps({ | |
| "chart_type": "market_overview", | |
| "data": { | |
| "coins": [ | |
| {"name": "Bitcoin", "symbol": "BTC", "current_price": 35000, "market_cap_rank": 1, "price_change_percentage_24h": 2.5}, | |
| {"name": "Ethereum", "symbol": "ETH", "current_price": 1800, "market_cap_rank": 2, "price_change_percentage_24h": -1.2}, | |
| {"name": "Cardano", "symbol": "ADA", "current_price": 0.25, "market_cap_rank": 3, "price_change_percentage_24h": 3.1}, | |
| {"name": "Solana", "symbol": "SOL", "current_price": 22.5, "market_cap_rank": 4, "price_change_percentage_24h": -2.8}, | |
| {"name": "Polygon", "symbol": "MATIC", "current_price": 0.52, "market_cap_rank": 5, "price_change_percentage_24h": 1.9} | |
| ] | |
| }, | |
| "config": { | |
| "title": "Top Cryptocurrencies Market Overview", | |
| "currency": "USD" | |
| } | |
| }) | |
| async def _get_defi_tvl_data(self, protocols: List[str]) -> str: | |
| """Get real DeFi TVL data from DeFiLlama API""" | |
| try: | |
| from src.tools.defillama_tool import DeFiLlamaTool | |
| defillama = DeFiLlamaTool() | |
| # Get protocols data | |
| data = await defillama.make_request(f"{defillama._base_url}/protocols") | |
| if not data: | |
| logger.warning("DeFiLlama API failed, using fallback") | |
| return await self._get_mock_defi_data(protocols) | |
| # Filter for requested protocols or top protocols | |
| if protocols: | |
| filtered_protocols = [] | |
| for protocol_name in protocols: | |
| for protocol in data: | |
| if protocol_name.lower() in protocol.get("name", "").lower(): | |
| filtered_protocols.append(protocol) | |
| break | |
| protocols_data = filtered_protocols[:8] # Limit to 8 | |
| else: | |
| # Get top protocols by TVL | |
| protocols_data = sorted([p for p in data if p.get("tvl", 0) > 0], | |
| key=lambda x: x.get("tvl", 0), reverse=True)[:8] | |
| if not protocols_data: | |
| return await self._get_mock_defi_data(protocols) | |
| # Format TVL data | |
| tvl_data = [] | |
| for protocol in protocols_data: | |
| tvl_data.append({ | |
| "name": protocol.get("name", "Unknown"), | |
| "tvl": protocol.get("tvl", 0), | |
| "change_1d": protocol.get("change_1d", 0), | |
| "chain": protocol.get("chain", "Multi-chain"), | |
| "category": protocol.get("category", "DeFi") | |
| }) | |
| return json.dumps({ | |
| "chart_type": "defi_tvl", | |
| "data": {"protocols": tvl_data}, | |
| "config": { | |
| "title": "DeFi Protocols by Total Value Locked", | |
| "currency": "USD" | |
| } | |
| }) | |
| except Exception as e: | |
| logger.error(f"DeFi TVL API failed: {e}") | |
| return await self._get_mock_defi_data(protocols) | |
| async def _get_mock_defi_data(self, protocols: List[str]) -> str: | |
| """Fallback mock DeFi data""" | |
| import random | |
| protocol_names = protocols or ["Uniswap", "Aave", "Compound", "Curve", "MakerDAO"] | |
| tvl_data = [] | |
| for protocol in protocol_names[:5]: | |
| tvl = random.uniform(500000000, 5000000000) | |
| change = random.uniform(-10, 15) | |
| tvl_data.append({ | |
| "name": protocol, | |
| "tvl": tvl, | |
| "change_1d": change, | |
| "chain": "Ethereum", | |
| "category": "DeFi" | |
| }) | |
| return json.dumps({ | |
| "chart_type": "defi_tvl", | |
| "data": {"protocols": tvl_data}, | |
| "config": { | |
| "title": "DeFi Protocols by Total Value Locked", | |
| "currency": "USD" | |
| } | |
| }) | |
| async def _get_portfolio_data(self) -> str: | |
| """Get portfolio allocation data""" | |
| return json.dumps({ | |
| "chart_type": "portfolio_pie", | |
| "data": { | |
| "allocations": [ | |
| {"name": "Bitcoin", "symbol": "BTC", "value": 40, "color": "#f7931a"}, | |
| {"name": "Ethereum", "symbol": "ETH", "value": 30, "color": "#627eea"}, | |
| {"name": "Cardano", "symbol": "ADA", "value": 15, "color": "#0033ad"}, | |
| {"name": "Solana", "symbol": "SOL", "value": 10, "color": "#9945ff"}, | |
| {"name": "Other", "symbol": "OTHER", "value": 5, "color": "#666666"} | |
| ] | |
| }, | |
| "config": { | |
| "title": "Sample Portfolio Allocation", | |
| "currency": "Percentage" | |
| } | |
| }) | |
| async def _get_gas_data(self, network: str) -> str: | |
| """Get gas fee data""" | |
| import random | |
| import time | |
| # Generate 24 hours of gas data | |
| gas_data = [] | |
| base_timestamp = int(time.time() * 1000) - (24 * 60 * 60 * 1000) | |
| for i in range(24): | |
| timestamp = base_timestamp + (i * 60 * 60 * 1000) | |
| gas_price = random.uniform(20, 100) if network == "ethereum" else random.uniform(1, 10) | |
| gas_data.append([timestamp, round(gas_price, 2)]) | |
| return json.dumps({ | |
| "chart_type": "gas_tracker", | |
| "data": { | |
| "gas_prices": gas_data, | |
| "network": network.title() | |
| }, | |
| "config": { | |
| "title": f"{network.title()} Gas Fee Tracker (24h)", | |
| "unit": "Gwei" | |
| } | |
| }) | |
| def _parse_timeframe(self, timeframe: str) -> int: | |
| """Convert timeframe string to days""" | |
| timeframe_map = { | |
| "1d": 1, "7d": 7, "30d": 30, "90d": 90, "365d": 365, "1y": 365 | |
| } | |
| return timeframe_map.get(timeframe, 30) | |