vlbandara's picture
Upload folder using huggingface_hub
eb27803 verified
import os
from typing import Dict, Any, List, Optional
from datetime import datetime, timedelta
import pandas as pd
import yfinance as yf
from crewai.tools import BaseTool
import time
import logging
import requests
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("yahoo_tools")
class YahooBitcoinDataTool(BaseTool):
name: str = "Yahoo Finance Bitcoin Data Tool"
description: str = "Fetches Bitcoin price data from Yahoo Finance as an alternative data source"
max_retries: int = 3
backoff_factor: float = 2.0
timeout: int = 30
cache_duration_minutes: int = 15
cached_data: Dict[str, Dict[str, Any]] = {}
last_cache_time: Dict[str, datetime] = {}
def __init__(self, max_retries: int = 3, backoff_factor: float = 2.0, timeout: int = 30, cache_duration_minutes: int = 15):
"""
Initialize the tool with retry parameters
Args:
max_retries: Maximum number of retry attempts (default: 3)
backoff_factor: Exponential backoff factor between retries (default: 2.0)
timeout: Request timeout in seconds (default: 30)
cache_duration_minutes: How long to cache results (default: 15 minutes)
"""
super().__init__()
self.max_retries = max_retries
self.backoff_factor = backoff_factor
self.timeout = timeout
self.cache_duration_minutes = cache_duration_minutes
self.cached_data = {} # Cache by period and interval
self.last_cache_time = {} # Cache timestamps by period and interval
def _check_cache_valid(self, period: str, interval: str) -> bool:
"""
Check if the cached data is still valid
Args:
period: The time period key
interval: The interval key
Returns:
True if cache is valid, False otherwise
"""
cache_key = f"{period}_{interval}"
if cache_key not in self.cached_data or cache_key not in self.last_cache_time:
return False
cache_age = datetime.now() - self.last_cache_time[cache_key]
return cache_age.total_seconds() < (self.cache_duration_minutes * 60)
def _run(self, period: str = "1mo", interval: str = "1d") -> Dict[str, Any]:
"""
Fetch Bitcoin price data from Yahoo Finance
Args:
period: Time period to fetch data for (1d, 5d, 1mo, 3mo, 6mo, 1y, 2y, 5y, 10y, ytd, max)
interval: Time interval between data points (1m, 2m, 5m, 15m, 30m, 60m, 90m, 1h, 1d, 5d, 1wk, 1mo, 3mo)
Returns:
Dictionary with OHLCV data and metadata
"""
# Use cached data if available and valid
cache_key = f"{period}_{interval}"
if self._check_cache_valid(period, interval):
logger.info(f"Using cached Bitcoin data for period={period}, interval={interval}")
return self.cached_data[cache_key]
logger.info(f"Fetching Bitcoin data for period={period}, interval={interval}")
# BTC-USD ticker from Yahoo Finance
ticker = "BTC-USD"
# Alternative tickers to try if primary fails
alternative_tickers = ["BTC-USD", "BTCUSD=X", "BTC=F"]
data_dict = None
# Try the primary ticker with retry logic
for attempt in range(self.max_retries):
try:
logger.info(f"Fetching {ticker} data (attempt {attempt + 1}/{self.max_retries})")
# Get the data
data = yf.Ticker(ticker)
df = data.history(period=period, interval=interval, timeout=self.timeout)
# Check if we have valid data
if df.empty:
error_msg = f"{ticker}: possibly delisted; no price data found (period={period})"
logger.warning(error_msg)
# Don't retry with the same ticker, break to try alternatives
break
# Process the data
df = df.reset_index()
# Ensure proper column names
if 'Date' in df.columns:
df = df.rename(columns={'Date': 'time'})
elif 'Datetime' in df.columns:
df = df.rename(columns={'Datetime': 'time'})
# Standardize column names to lowercase
df.columns = [col.lower() for col in df.columns]
# Convert numpy types to Python native types for JSON serialization
for col in df.columns:
if col != 'time':
df[col] = df[col].apply(lambda x: x.item() if hasattr(x, 'item') else x)
# Metadata about Bitcoin from Yahoo Finance
info = data.info
# Extract key information
market_cap = info.get('marketCap', None)
volume_24h = info.get('volume24Hr', None)
circulating_supply = info.get('circulatingSupply', None)
# Convert to dictionary for JSON serialization
data_dict = {
"dataframe": df.to_dict(orient='records'),
"last_price": float(df['close'].iloc[-1]) if not df.empty else None,
"time_period": period,
"interval": interval,
"ticker": ticker,
"metadata": {
"market_cap": market_cap,
"volume_24h": volume_24h,
"circulating_supply": circulating_supply,
"last_updated": datetime.now().isoformat()
}
}
# Cache the data
self.cached_data[cache_key] = data_dict
self.last_cache_time[cache_key] = datetime.now()
logger.info(f"Cached Bitcoin data for period={period}, interval={interval}")
return data_dict
except requests.exceptions.Timeout as e:
wait_time = self.backoff_factor ** attempt
logger.warning(f"Timeout error for {ticker}: {str(e)}. Retrying in {wait_time:.1f} seconds...")
if attempt < self.max_retries - 1:
time.sleep(wait_time)
else:
logger.error(f"Max retries reached for {ticker}")
# Don't return error yet, try alternative tickers
break
except Exception as e:
logger.error(f"Error fetching data for {ticker}: {str(e)}")
# Don't return error yet, try alternative tickers
break
# If we get here, the primary ticker failed - try alternatives
if data_dict is None:
for alt_ticker in alternative_tickers:
# Skip the one we already tried
if alt_ticker == ticker:
continue
logger.info(f"Trying alternative ticker: {alt_ticker}")
try:
# Get the data with the alternative ticker
data = yf.Ticker(alt_ticker)
df = data.history(period=period, interval=interval, timeout=self.timeout)
# Check if we have valid data
if df.empty:
logger.warning(f"{alt_ticker}: possibly delisted; no price data found (period={period})")
continue
# Process the data
df = df.reset_index()
# Ensure proper column names
if 'Date' in df.columns:
df = df.rename(columns={'Date': 'time'})
elif 'Datetime' in df.columns:
df = df.rename(columns={'Datetime': 'time'})
# Standardize column names to lowercase
df.columns = [col.lower() for col in df.columns]
# Convert numpy types to Python native types for JSON serialization
for col in df.columns:
if col != 'time':
df[col] = df[col].apply(lambda x: x.item() if hasattr(x, 'item') else x)
# Metadata about Bitcoin from Yahoo Finance
info = data.info
# Extract key information
market_cap = info.get('marketCap', None)
volume_24h = info.get('volume24Hr', None)
circulating_supply = info.get('circulatingSupply', None)
# Convert to dictionary for JSON serialization
data_dict = {
"dataframe": df.to_dict(orient='records'),
"last_price": float(df['close'].iloc[-1]) if not df.empty else None,
"time_period": period,
"interval": interval,
"ticker": alt_ticker,
"metadata": {
"market_cap": market_cap,
"volume_24h": volume_24h,
"circulating_supply": circulating_supply,
"last_updated": datetime.now().isoformat()
},
"note": f"Used alternative ticker {alt_ticker} because primary ticker failed"
}
# Cache the data
self.cached_data[cache_key] = data_dict
self.last_cache_time[cache_key] = datetime.now()
logger.info(f"Cached Bitcoin data for period={period}, interval={interval} using alternate ticker {alt_ticker}")
return data_dict
except Exception as e:
logger.error(f"Error fetching data for alternative ticker {alt_ticker}: {str(e)}")
continue
# If we get here, all tickers failed
error_msg = f"Failed to fetch Bitcoin data for period={period}, interval={interval} with all available tickers"
logger.error(error_msg)
return {
"error": error_msg,
"time_period": period,
"interval": interval,
"tickers_tried": alternative_tickers
}
class YahooCryptoMarketTool(BaseTool):
name: str = "Yahoo Finance Crypto Market Tool"
description: str = "Fetches data about the broader cryptocurrency market for contextual analysis"
max_retries: int = 3
backoff_factor: float = 2.0
timeout: int = 30
cache_duration_minutes: int = 30
cached_data: Dict[str, Any] = None
last_cache_time: datetime = None
def __init__(self, max_retries: int = 3, backoff_factor: float = 2.0, timeout: int = 30, cache_duration_minutes: int = 30):
"""
Initialize the tool with retry parameters
Args:
max_retries: Maximum number of retry attempts (default: 3)
backoff_factor: Exponential backoff factor between retries (default: 2.0)
timeout: Request timeout in seconds (default: 30)
cache_duration_minutes: How long to cache results (default: 30 minutes)
"""
super().__init__()
self.max_retries = max_retries
self.backoff_factor = backoff_factor
self.timeout = timeout
self.cache_duration_minutes = cache_duration_minutes
self.cached_data = None
self.last_cache_time = None
def _check_cache_valid(self) -> bool:
"""
Check if the cached data is still valid
Returns:
True if cache is valid, False otherwise
"""
if self.cached_data is None or self.last_cache_time is None:
return False
cache_age = datetime.now() - self.last_cache_time
return cache_age.total_seconds() < (self.cache_duration_minutes * 60)
def _get_ticker_data_with_retries(self, ticker: str) -> Dict[str, Any]:
"""
Get ticker data with retry logic
Args:
ticker: The ticker symbol to fetch
Returns:
Dictionary with ticker data or error
"""
for attempt in range(self.max_retries):
try:
logger.info(f"Fetching data for {ticker} (attempt {attempt + 1}/{self.max_retries})")
data = yf.Ticker(ticker)
# Set a timeout for the history call
hist = data.history(period="5d", timeout=self.timeout)
info = data.info
# Verify we have data
if hist.empty:
logger.warning(f"{ticker}: possibly delisted; no price data found (period=5d)")
return {
"ticker": ticker,
"error": f"{ticker}: possibly delisted; no price data found (period=5d)"
}
# Successfully got data, return it
return {
"data": data,
"hist": hist,
"info": info
}
except requests.exceptions.Timeout as e:
wait_time = self.backoff_factor ** attempt
logger.warning(f"Timeout error for {ticker}: {str(e)}. Retrying in {wait_time:.1f} seconds...")
if attempt < self.max_retries - 1:
time.sleep(wait_time)
else:
logger.error(f"Max retries reached for {ticker}")
return {
"ticker": ticker,
"error": f"Timeout error after {self.max_retries} attempts: {str(e)}"
}
except Exception as e:
logger.error(f"Error fetching data for {ticker}: {str(e)}")
return {
"ticker": ticker,
"error": str(e)
}
return {
"ticker": ticker,
"error": "Unknown error during retry attempts"
}
def _run(self, top_n: int = 10) -> Dict[str, Any]:
"""
Fetch data about the top cryptocurrencies in the market
Args:
top_n: Number of top cryptocurrencies to fetch data for
Returns:
Dictionary with market data and trends
"""
# Check if we have valid cached data
if self._check_cache_valid():
logger.info("Using cached cryptocurrency market data")
return self.cached_data
logger.info(f"Fetching data for top {top_n} cryptocurrencies")
try:
# Common crypto tickers to check
tickers = [
"BTC-USD", # Bitcoin
"ETH-USD", # Ethereum
"XRP-USD", # Ripple
"SOL-USD", # Solana
"ADA-USD", # Cardano
"AVAX-USD", # Avalanche
"DOT-USD", # Polkadot
"DOGE-USD", # Dogecoin
"LINK-USD", # Chainlink
"MATIC-USD" # Polygon
]
# Limit to requested number
tickers = tickers[:min(top_n, len(tickers))]
results = []
market_cap_sum = 0
btc_dominance = 0
btc_market_cap = 0
success_count = 0
error_count = 0
# Fetch data for each ticker
for ticker in tickers:
ticker_data = self._get_ticker_data_with_retries(ticker)
if "error" in ticker_data:
# Handle error case - add basic info with error message
error_count += 1
results.append({
"ticker": ticker,
"name": ticker.split('-')[0],
"error": ticker_data["error"],
"data_available": False
})
continue
# Extract data
data = ticker_data["data"]
hist = ticker_data["hist"]
info = ticker_data["info"]
success_count += 1
if not hist.empty:
current_price = hist['Close'].iloc[-1]
day_change = ((current_price / hist['Close'].iloc[-2]) - 1) * 100 if len(hist) > 1 else 0
week_change = ((current_price / hist['Close'].iloc[0]) - 1) * 100 if len(hist) > 4 else day_change
market_cap = info.get('marketCap', 0)
market_cap_sum += market_cap if market_cap else 0
# Store BTC market cap for dominance calculation
if ticker == "BTC-USD":
btc_market_cap = market_cap if market_cap else 0
# Convert numpy types to Python native types for JSON serialization
if hasattr(current_price, 'item'):
current_price = current_price.item()
if hasattr(day_change, 'item'):
day_change = day_change.item()
if hasattr(week_change, 'item'):
week_change = week_change.item()
results.append({
"ticker": ticker,
"name": info.get('shortName', ticker.split('-')[0]),
"current_price": current_price,
"market_cap": market_cap,
"volume_24h": info.get('volume24Hr', None),
"day_change_percent": day_change,
"week_change_percent": week_change,
"data_available": True
})
# Calculate BTC dominance
if market_cap_sum > 0 and btc_market_cap > 0:
btc_dominance = (btc_market_cap / market_cap_sum) * 100
# Overall market trends - only count assets with data
valid_results = [r for r in results if r.get("data_available", False)]
market_trend = "bullish" if sum(r.get('day_change_percent', 0) for r in valid_results) > 0 else "bearish"
# Create response
response = {
"cryptocurrencies": results,
"market_summary": {
"total_market_cap": market_cap_sum,
"btc_dominance": btc_dominance,
"market_trend": market_trend,
"timestamp": datetime.now().isoformat(),
"success_count": success_count,
"error_count": error_count,
"total_count": len(tickers)
}
}
# Cache the result if we got at least some data
if success_count > 0:
self.cached_data = response
self.last_cache_time = datetime.now()
logger.info(f"Cached cryptocurrency market data (success: {success_count}, errors: {error_count})")
return response
except Exception as e:
logger.error(f"Error in YahooCryptoMarketTool: {str(e)}")
return {
"error": str(e),
"market_summary": {
"market_trend": "unknown",
"timestamp": datetime.now().isoformat(),
"error": str(e)
},
"cryptocurrencies": []
}