Spaces:
Sleeping
Sleeping
File size: 1,973 Bytes
1dece25 a0c219b 7b0b449 1dece25 a0c219b 7b0b449 a0c219b a0a8d76 b9478b5 7b0b449 a0c219b 1dece25 a0c219b 1dece25 a0c219b 1dece25 a0c219b 1dece25 a0c219b 1dece25 a0c219b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
import requests
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
from typing import Type
# ---------- Input Schema ----------
class MarketDataInput(BaseModel):
symbol: str = Field(default="bitcoin", description="Crypto asset ID on CoinGecko")
currency: str = Field(default="usd", description="Fiat currency (usd, eur, etc.)")
# ---------- Tool ----------
class MarketDataTool(BaseTool):
name: str = "get_market_data"
description: str = (
"Fetches the current market price of a cryptocurrency "
"in a chosen fiat currency using the CoinGecko API."
)
args_schema: Type[BaseModel] = MarketDataInput
def _run(self, symbol: str = "bitcoin", currency: str = "usd") -> dict:
"""
Returns structured JSON:
{
"symbol": "bitcoin",
"currency": "usd",
"latest_price": 90482,
"volume_24h": 38500000000
}
"""
url = (
f"https://api.coingecko.com/api/v3/simple/price"
f"?ids={symbol}"
f"&vs_currencies={currency}"
f"&include_24hr_vol=true"
)
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
data = response.json()
asset = data.get(symbol, {})
price = asset.get(currency)
volume = asset.get(f"{currency}_24h_vol")
if price is None:
return {
"symbol": symbol,
"currency": currency,
"error": f"MarketDataTool: No price found for {symbol.upper()} in {currency.upper()}."
}
return {
"symbol": symbol,
"currency": currency,
"latest_price": price,
"volume_24h": volume,
}
except Exception as e:
return {"error": f"MarketDataTool failed: {str(e)}"} |