import requests from crewai.tools import BaseTool from pydantic import BaseModel, Field from typing import Type # ---------- Input Schema ---------- class MarketDataInput(BaseModel): symbol: str = Field(default="bitcoin", description="Crypto asset ID on CoinGecko") currency: str = Field(default="usd", description="Fiat currency (usd, eur, etc.)") # ---------- Tool ---------- class MarketDataTool(BaseTool): name: str = "get_market_data" description: str = ( "Fetches the current market price of a cryptocurrency " "in a chosen fiat currency using the CoinGecko API." ) args_schema: Type[BaseModel] = MarketDataInput def _run(self, symbol: str = "bitcoin", currency: str = "usd") -> dict: """ Returns structured JSON: { "symbol": "bitcoin", "currency": "usd", "latest_price": 90482, "volume_24h": 38500000000 } """ url = ( f"https://api.coingecko.com/api/v3/simple/price" f"?ids={symbol}" f"&vs_currencies={currency}" f"&include_24hr_vol=true" ) try: response = requests.get(url, timeout=10) response.raise_for_status() data = response.json() asset = data.get(symbol, {}) price = asset.get(currency) volume = asset.get(f"{currency}_24h_vol") if price is None: return { "symbol": symbol, "currency": currency, "error": f"MarketDataTool: No price found for {symbol.upper()} in {currency.upper()}." } return { "symbol": symbol, "currency": currency, "latest_price": price, "volume_24h": volume, } except Exception as e: return {"error": f"MarketDataTool failed: {str(e)}"}