import os from typing import Dict, Any, List, Optional from datetime import datetime, timedelta import pandas as pd import yfinance as yf from crewai.tools import BaseTool import time import logging import requests # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger("yahoo_tools") class YahooBitcoinDataTool(BaseTool): name: str = "Yahoo Finance Bitcoin Data Tool" description: str = "Fetches Bitcoin price data from Yahoo Finance as an alternative data source" max_retries: int = 3 backoff_factor: float = 2.0 timeout: int = 30 cache_duration_minutes: int = 15 cached_data: Dict[str, Dict[str, Any]] = {} last_cache_time: Dict[str, datetime] = {} def __init__(self, max_retries: int = 3, backoff_factor: float = 2.0, timeout: int = 30, cache_duration_minutes: int = 15): """ Initialize the tool with retry parameters Args: max_retries: Maximum number of retry attempts (default: 3) backoff_factor: Exponential backoff factor between retries (default: 2.0) timeout: Request timeout in seconds (default: 30) cache_duration_minutes: How long to cache results (default: 15 minutes) """ super().__init__() self.max_retries = max_retries self.backoff_factor = backoff_factor self.timeout = timeout self.cache_duration_minutes = cache_duration_minutes self.cached_data = {} # Cache by period and interval self.last_cache_time = {} # Cache timestamps by period and interval def _check_cache_valid(self, period: str, interval: str) -> bool: """ Check if the cached data is still valid Args: period: The time period key interval: The interval key Returns: True if cache is valid, False otherwise """ cache_key = f"{period}_{interval}" if cache_key not in self.cached_data or cache_key not in self.last_cache_time: return False cache_age = datetime.now() - self.last_cache_time[cache_key] return cache_age.total_seconds() < (self.cache_duration_minutes * 60) def _run(self, period: str = "1mo", interval: str = "1d") -> Dict[str, Any]: """ Fetch Bitcoin price data from Yahoo Finance Args: period: Time period to fetch data for (1d, 5d, 1mo, 3mo, 6mo, 1y, 2y, 5y, 10y, ytd, max) interval: Time interval between data points (1m, 2m, 5m, 15m, 30m, 60m, 90m, 1h, 1d, 5d, 1wk, 1mo, 3mo) Returns: Dictionary with OHLCV data and metadata """ # Use cached data if available and valid cache_key = f"{period}_{interval}" if self._check_cache_valid(period, interval): logger.info(f"Using cached Bitcoin data for period={period}, interval={interval}") return self.cached_data[cache_key] logger.info(f"Fetching Bitcoin data for period={period}, interval={interval}") # BTC-USD ticker from Yahoo Finance ticker = "BTC-USD" # Alternative tickers to try if primary fails alternative_tickers = ["BTC-USD", "BTCUSD=X", "BTC=F"] data_dict = None # Try the primary ticker with retry logic for attempt in range(self.max_retries): try: logger.info(f"Fetching {ticker} data (attempt {attempt + 1}/{self.max_retries})") # Get the data data = yf.Ticker(ticker) df = data.history(period=period, interval=interval, timeout=self.timeout) # Check if we have valid data if df.empty: error_msg = f"{ticker}: possibly delisted; no price data found (period={period})" logger.warning(error_msg) # Don't retry with the same ticker, break to try alternatives break # Process the data df = df.reset_index() # Ensure proper column names if 'Date' in df.columns: df = df.rename(columns={'Date': 'time'}) elif 'Datetime' in df.columns: df = df.rename(columns={'Datetime': 'time'}) # Standardize column names to lowercase df.columns = [col.lower() for col in df.columns] # Convert numpy types to Python native types for JSON serialization for col in df.columns: if col != 'time': df[col] = df[col].apply(lambda x: x.item() if hasattr(x, 'item') else x) # Metadata about Bitcoin from Yahoo Finance info = data.info # Extract key information market_cap = info.get('marketCap', None) volume_24h = info.get('volume24Hr', None) circulating_supply = info.get('circulatingSupply', None) # Convert to dictionary for JSON serialization data_dict = { "dataframe": df.to_dict(orient='records'), "last_price": float(df['close'].iloc[-1]) if not df.empty else None, "time_period": period, "interval": interval, "ticker": ticker, "metadata": { "market_cap": market_cap, "volume_24h": volume_24h, "circulating_supply": circulating_supply, "last_updated": datetime.now().isoformat() } } # Cache the data self.cached_data[cache_key] = data_dict self.last_cache_time[cache_key] = datetime.now() logger.info(f"Cached Bitcoin data for period={period}, interval={interval}") return data_dict except requests.exceptions.Timeout as e: wait_time = self.backoff_factor ** attempt logger.warning(f"Timeout error for {ticker}: {str(e)}. Retrying in {wait_time:.1f} seconds...") if attempt < self.max_retries - 1: time.sleep(wait_time) else: logger.error(f"Max retries reached for {ticker}") # Don't return error yet, try alternative tickers break except Exception as e: logger.error(f"Error fetching data for {ticker}: {str(e)}") # Don't return error yet, try alternative tickers break # If we get here, the primary ticker failed - try alternatives if data_dict is None: for alt_ticker in alternative_tickers: # Skip the one we already tried if alt_ticker == ticker: continue logger.info(f"Trying alternative ticker: {alt_ticker}") try: # Get the data with the alternative ticker data = yf.Ticker(alt_ticker) df = data.history(period=period, interval=interval, timeout=self.timeout) # Check if we have valid data if df.empty: logger.warning(f"{alt_ticker}: possibly delisted; no price data found (period={period})") continue # Process the data df = df.reset_index() # Ensure proper column names if 'Date' in df.columns: df = df.rename(columns={'Date': 'time'}) elif 'Datetime' in df.columns: df = df.rename(columns={'Datetime': 'time'}) # Standardize column names to lowercase df.columns = [col.lower() for col in df.columns] # Convert numpy types to Python native types for JSON serialization for col in df.columns: if col != 'time': df[col] = df[col].apply(lambda x: x.item() if hasattr(x, 'item') else x) # Metadata about Bitcoin from Yahoo Finance info = data.info # Extract key information market_cap = info.get('marketCap', None) volume_24h = info.get('volume24Hr', None) circulating_supply = info.get('circulatingSupply', None) # Convert to dictionary for JSON serialization data_dict = { "dataframe": df.to_dict(orient='records'), "last_price": float(df['close'].iloc[-1]) if not df.empty else None, "time_period": period, "interval": interval, "ticker": alt_ticker, "metadata": { "market_cap": market_cap, "volume_24h": volume_24h, "circulating_supply": circulating_supply, "last_updated": datetime.now().isoformat() }, "note": f"Used alternative ticker {alt_ticker} because primary ticker failed" } # Cache the data self.cached_data[cache_key] = data_dict self.last_cache_time[cache_key] = datetime.now() logger.info(f"Cached Bitcoin data for period={period}, interval={interval} using alternate ticker {alt_ticker}") return data_dict except Exception as e: logger.error(f"Error fetching data for alternative ticker {alt_ticker}: {str(e)}") continue # If we get here, all tickers failed error_msg = f"Failed to fetch Bitcoin data for period={period}, interval={interval} with all available tickers" logger.error(error_msg) return { "error": error_msg, "time_period": period, "interval": interval, "tickers_tried": alternative_tickers } class YahooCryptoMarketTool(BaseTool): name: str = "Yahoo Finance Crypto Market Tool" description: str = "Fetches data about the broader cryptocurrency market for contextual analysis" max_retries: int = 3 backoff_factor: float = 2.0 timeout: int = 30 cache_duration_minutes: int = 30 cached_data: Dict[str, Any] = None last_cache_time: datetime = None def __init__(self, max_retries: int = 3, backoff_factor: float = 2.0, timeout: int = 30, cache_duration_minutes: int = 30): """ Initialize the tool with retry parameters Args: max_retries: Maximum number of retry attempts (default: 3) backoff_factor: Exponential backoff factor between retries (default: 2.0) timeout: Request timeout in seconds (default: 30) cache_duration_minutes: How long to cache results (default: 30 minutes) """ super().__init__() self.max_retries = max_retries self.backoff_factor = backoff_factor self.timeout = timeout self.cache_duration_minutes = cache_duration_minutes self.cached_data = None self.last_cache_time = None def _check_cache_valid(self) -> bool: """ Check if the cached data is still valid Returns: True if cache is valid, False otherwise """ if self.cached_data is None or self.last_cache_time is None: return False cache_age = datetime.now() - self.last_cache_time return cache_age.total_seconds() < (self.cache_duration_minutes * 60) def _get_ticker_data_with_retries(self, ticker: str) -> Dict[str, Any]: """ Get ticker data with retry logic Args: ticker: The ticker symbol to fetch Returns: Dictionary with ticker data or error """ for attempt in range(self.max_retries): try: logger.info(f"Fetching data for {ticker} (attempt {attempt + 1}/{self.max_retries})") data = yf.Ticker(ticker) # Set a timeout for the history call hist = data.history(period="5d", timeout=self.timeout) info = data.info # Verify we have data if hist.empty: logger.warning(f"{ticker}: possibly delisted; no price data found (period=5d)") return { "ticker": ticker, "error": f"{ticker}: possibly delisted; no price data found (period=5d)" } # Successfully got data, return it return { "data": data, "hist": hist, "info": info } except requests.exceptions.Timeout as e: wait_time = self.backoff_factor ** attempt logger.warning(f"Timeout error for {ticker}: {str(e)}. Retrying in {wait_time:.1f} seconds...") if attempt < self.max_retries - 1: time.sleep(wait_time) else: logger.error(f"Max retries reached for {ticker}") return { "ticker": ticker, "error": f"Timeout error after {self.max_retries} attempts: {str(e)}" } except Exception as e: logger.error(f"Error fetching data for {ticker}: {str(e)}") return { "ticker": ticker, "error": str(e) } return { "ticker": ticker, "error": "Unknown error during retry attempts" } def _run(self, top_n: int = 10) -> Dict[str, Any]: """ Fetch data about the top cryptocurrencies in the market Args: top_n: Number of top cryptocurrencies to fetch data for Returns: Dictionary with market data and trends """ # Check if we have valid cached data if self._check_cache_valid(): logger.info("Using cached cryptocurrency market data") return self.cached_data logger.info(f"Fetching data for top {top_n} cryptocurrencies") try: # Common crypto tickers to check tickers = [ "BTC-USD", # Bitcoin "ETH-USD", # Ethereum "XRP-USD", # Ripple "SOL-USD", # Solana "ADA-USD", # Cardano "AVAX-USD", # Avalanche "DOT-USD", # Polkadot "DOGE-USD", # Dogecoin "LINK-USD", # Chainlink "MATIC-USD" # Polygon ] # Limit to requested number tickers = tickers[:min(top_n, len(tickers))] results = [] market_cap_sum = 0 btc_dominance = 0 btc_market_cap = 0 success_count = 0 error_count = 0 # Fetch data for each ticker for ticker in tickers: ticker_data = self._get_ticker_data_with_retries(ticker) if "error" in ticker_data: # Handle error case - add basic info with error message error_count += 1 results.append({ "ticker": ticker, "name": ticker.split('-')[0], "error": ticker_data["error"], "data_available": False }) continue # Extract data data = ticker_data["data"] hist = ticker_data["hist"] info = ticker_data["info"] success_count += 1 if not hist.empty: current_price = hist['Close'].iloc[-1] day_change = ((current_price / hist['Close'].iloc[-2]) - 1) * 100 if len(hist) > 1 else 0 week_change = ((current_price / hist['Close'].iloc[0]) - 1) * 100 if len(hist) > 4 else day_change market_cap = info.get('marketCap', 0) market_cap_sum += market_cap if market_cap else 0 # Store BTC market cap for dominance calculation if ticker == "BTC-USD": btc_market_cap = market_cap if market_cap else 0 # Convert numpy types to Python native types for JSON serialization if hasattr(current_price, 'item'): current_price = current_price.item() if hasattr(day_change, 'item'): day_change = day_change.item() if hasattr(week_change, 'item'): week_change = week_change.item() results.append({ "ticker": ticker, "name": info.get('shortName', ticker.split('-')[0]), "current_price": current_price, "market_cap": market_cap, "volume_24h": info.get('volume24Hr', None), "day_change_percent": day_change, "week_change_percent": week_change, "data_available": True }) # Calculate BTC dominance if market_cap_sum > 0 and btc_market_cap > 0: btc_dominance = (btc_market_cap / market_cap_sum) * 100 # Overall market trends - only count assets with data valid_results = [r for r in results if r.get("data_available", False)] market_trend = "bullish" if sum(r.get('day_change_percent', 0) for r in valid_results) > 0 else "bearish" # Create response response = { "cryptocurrencies": results, "market_summary": { "total_market_cap": market_cap_sum, "btc_dominance": btc_dominance, "market_trend": market_trend, "timestamp": datetime.now().isoformat(), "success_count": success_count, "error_count": error_count, "total_count": len(tickers) } } # Cache the result if we got at least some data if success_count > 0: self.cached_data = response self.last_cache_time = datetime.now() logger.info(f"Cached cryptocurrency market data (success: {success_count}, errors: {error_count})") return response except Exception as e: logger.error(f"Error in YahooCryptoMarketTool: {str(e)}") return { "error": str(e), "market_summary": { "market_trend": "unknown", "timestamp": datetime.now().isoformat(), "error": str(e) }, "cryptocurrencies": [] }