Crypto_Analyst_Agent / tools /market_data.py
cicboy's picture
update tools
a0a8d76
import requests
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
from typing import Type
# ---------- Input Schema ----------
class MarketDataInput(BaseModel):
symbol: str = Field(default="bitcoin", description="Crypto asset ID on CoinGecko")
currency: str = Field(default="usd", description="Fiat currency (usd, eur, etc.)")
# ---------- Tool ----------
class MarketDataTool(BaseTool):
name: str = "get_market_data"
description: str = (
"Fetches the current market price of a cryptocurrency "
"in a chosen fiat currency using the CoinGecko API."
)
args_schema: Type[BaseModel] = MarketDataInput
def _run(self, symbol: str = "bitcoin", currency: str = "usd") -> dict:
"""
Returns structured JSON:
{
"symbol": "bitcoin",
"currency": "usd",
"latest_price": 90482,
"volume_24h": 38500000000
}
"""
url = (
f"https://api.coingecko.com/api/v3/simple/price"
f"?ids={symbol}"
f"&vs_currencies={currency}"
f"&include_24hr_vol=true"
)
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
data = response.json()
asset = data.get(symbol, {})
price = asset.get(currency)
volume = asset.get(f"{currency}_24h_vol")
if price is None:
return {
"symbol": symbol,
"currency": currency,
"error": f"MarketDataTool: No price found for {symbol.upper()} in {currency.upper()}."
}
return {
"symbol": symbol,
"currency": currency,
"latest_price": price,
"volume_24h": volume,
}
except Exception as e:
return {"error": f"MarketDataTool failed: {str(e)}"}