Spaces:
Sleeping
Sleeping
| import requests | |
| from crewai.tools import BaseTool | |
| from pydantic import BaseModel, Field | |
| from typing import Type | |
| # ---------- Input Schema ---------- | |
| class MarketDataInput(BaseModel): | |
| symbol: str = Field(default="bitcoin", description="Crypto asset ID on CoinGecko") | |
| currency: str = Field(default="usd", description="Fiat currency (usd, eur, etc.)") | |
| # ---------- Tool ---------- | |
| class MarketDataTool(BaseTool): | |
| name: str = "get_market_data" | |
| description: str = ( | |
| "Fetches the current market price of a cryptocurrency " | |
| "in a chosen fiat currency using the CoinGecko API." | |
| ) | |
| args_schema: Type[BaseModel] = MarketDataInput | |
| def _run(self, symbol: str = "bitcoin", currency: str = "usd") -> dict: | |
| """ | |
| Returns structured JSON: | |
| { | |
| "symbol": "bitcoin", | |
| "currency": "usd", | |
| "latest_price": 90482, | |
| "volume_24h": 38500000000 | |
| } | |
| """ | |
| url = ( | |
| f"https://api.coingecko.com/api/v3/simple/price" | |
| f"?ids={symbol}" | |
| f"&vs_currencies={currency}" | |
| f"&include_24hr_vol=true" | |
| ) | |
| try: | |
| response = requests.get(url, timeout=10) | |
| response.raise_for_status() | |
| data = response.json() | |
| asset = data.get(symbol, {}) | |
| price = asset.get(currency) | |
| volume = asset.get(f"{currency}_24h_vol") | |
| if price is None: | |
| return { | |
| "symbol": symbol, | |
| "currency": currency, | |
| "error": f"MarketDataTool: No price found for {symbol.upper()} in {currency.upper()}." | |
| } | |
| return { | |
| "symbol": symbol, | |
| "currency": currency, | |
| "latest_price": price, | |
| "volume_24h": volume, | |
| } | |
| except Exception as e: | |
| return {"error": f"MarketDataTool failed: {str(e)}"} |