| | import os |
| | from typing import Dict, Any, List, Optional |
| | from datetime import datetime, timedelta |
| | import pandas as pd |
| | import yfinance as yf |
| | from crewai.tools import BaseTool |
| | import time |
| | import logging |
| | import requests |
| |
|
| | |
| | logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
| | logger = logging.getLogger("yahoo_tools") |
| |
|
| | class YahooBitcoinDataTool(BaseTool): |
| | name: str = "Yahoo Finance Bitcoin Data Tool" |
| | description: str = "Fetches Bitcoin price data from Yahoo Finance as an alternative data source" |
| | max_retries: int = 3 |
| | backoff_factor: float = 2.0 |
| | timeout: int = 30 |
| | cache_duration_minutes: int = 15 |
| | cached_data: Dict[str, Dict[str, Any]] = {} |
| | last_cache_time: Dict[str, datetime] = {} |
| | |
| | def __init__(self, max_retries: int = 3, backoff_factor: float = 2.0, timeout: int = 30, cache_duration_minutes: int = 15): |
| | """ |
| | Initialize the tool with retry parameters |
| | |
| | Args: |
| | max_retries: Maximum number of retry attempts (default: 3) |
| | backoff_factor: Exponential backoff factor between retries (default: 2.0) |
| | timeout: Request timeout in seconds (default: 30) |
| | cache_duration_minutes: How long to cache results (default: 15 minutes) |
| | """ |
| | super().__init__() |
| | self.max_retries = max_retries |
| | self.backoff_factor = backoff_factor |
| | self.timeout = timeout |
| | self.cache_duration_minutes = cache_duration_minutes |
| | self.cached_data = {} |
| | self.last_cache_time = {} |
| | |
| | def _check_cache_valid(self, period: str, interval: str) -> bool: |
| | """ |
| | Check if the cached data is still valid |
| | |
| | Args: |
| | period: The time period key |
| | interval: The interval key |
| | |
| | Returns: |
| | True if cache is valid, False otherwise |
| | """ |
| | cache_key = f"{period}_{interval}" |
| | |
| | if cache_key not in self.cached_data or cache_key not in self.last_cache_time: |
| | return False |
| | |
| | cache_age = datetime.now() - self.last_cache_time[cache_key] |
| | return cache_age.total_seconds() < (self.cache_duration_minutes * 60) |
| | |
| | def _run(self, period: str = "1mo", interval: str = "1d") -> Dict[str, Any]: |
| | """ |
| | Fetch Bitcoin price data from Yahoo Finance |
| | |
| | Args: |
| | period: Time period to fetch data for (1d, 5d, 1mo, 3mo, 6mo, 1y, 2y, 5y, 10y, ytd, max) |
| | interval: Time interval between data points (1m, 2m, 5m, 15m, 30m, 60m, 90m, 1h, 1d, 5d, 1wk, 1mo, 3mo) |
| | |
| | Returns: |
| | Dictionary with OHLCV data and metadata |
| | """ |
| | |
| | cache_key = f"{period}_{interval}" |
| | if self._check_cache_valid(period, interval): |
| | logger.info(f"Using cached Bitcoin data for period={period}, interval={interval}") |
| | return self.cached_data[cache_key] |
| | |
| | logger.info(f"Fetching Bitcoin data for period={period}, interval={interval}") |
| | |
| | |
| | ticker = "BTC-USD" |
| | |
| | |
| | alternative_tickers = ["BTC-USD", "BTCUSD=X", "BTC=F"] |
| | |
| | data_dict = None |
| | |
| | |
| | for attempt in range(self.max_retries): |
| | try: |
| | logger.info(f"Fetching {ticker} data (attempt {attempt + 1}/{self.max_retries})") |
| | |
| | |
| | data = yf.Ticker(ticker) |
| | df = data.history(period=period, interval=interval, timeout=self.timeout) |
| | |
| | |
| | if df.empty: |
| | error_msg = f"{ticker}: possibly delisted; no price data found (period={period})" |
| | logger.warning(error_msg) |
| | |
| | break |
| | |
| | |
| | df = df.reset_index() |
| | |
| | |
| | if 'Date' in df.columns: |
| | df = df.rename(columns={'Date': 'time'}) |
| | elif 'Datetime' in df.columns: |
| | df = df.rename(columns={'Datetime': 'time'}) |
| | |
| | |
| | df.columns = [col.lower() for col in df.columns] |
| | |
| | |
| | for col in df.columns: |
| | if col != 'time': |
| | df[col] = df[col].apply(lambda x: x.item() if hasattr(x, 'item') else x) |
| | |
| | |
| | info = data.info |
| | |
| | |
| | market_cap = info.get('marketCap', None) |
| | volume_24h = info.get('volume24Hr', None) |
| | circulating_supply = info.get('circulatingSupply', None) |
| | |
| | |
| | data_dict = { |
| | "dataframe": df.to_dict(orient='records'), |
| | "last_price": float(df['close'].iloc[-1]) if not df.empty else None, |
| | "time_period": period, |
| | "interval": interval, |
| | "ticker": ticker, |
| | "metadata": { |
| | "market_cap": market_cap, |
| | "volume_24h": volume_24h, |
| | "circulating_supply": circulating_supply, |
| | "last_updated": datetime.now().isoformat() |
| | } |
| | } |
| | |
| | |
| | self.cached_data[cache_key] = data_dict |
| | self.last_cache_time[cache_key] = datetime.now() |
| | logger.info(f"Cached Bitcoin data for period={period}, interval={interval}") |
| | |
| | return data_dict |
| | |
| | except requests.exceptions.Timeout as e: |
| | wait_time = self.backoff_factor ** attempt |
| | logger.warning(f"Timeout error for {ticker}: {str(e)}. Retrying in {wait_time:.1f} seconds...") |
| | if attempt < self.max_retries - 1: |
| | time.sleep(wait_time) |
| | else: |
| | logger.error(f"Max retries reached for {ticker}") |
| | |
| | break |
| | |
| | except Exception as e: |
| | logger.error(f"Error fetching data for {ticker}: {str(e)}") |
| | |
| | break |
| | |
| | |
| | if data_dict is None: |
| | for alt_ticker in alternative_tickers: |
| | |
| | if alt_ticker == ticker: |
| | continue |
| | |
| | logger.info(f"Trying alternative ticker: {alt_ticker}") |
| | |
| | try: |
| | |
| | data = yf.Ticker(alt_ticker) |
| | df = data.history(period=period, interval=interval, timeout=self.timeout) |
| | |
| | |
| | if df.empty: |
| | logger.warning(f"{alt_ticker}: possibly delisted; no price data found (period={period})") |
| | continue |
| | |
| | |
| | df = df.reset_index() |
| | |
| | |
| | if 'Date' in df.columns: |
| | df = df.rename(columns={'Date': 'time'}) |
| | elif 'Datetime' in df.columns: |
| | df = df.rename(columns={'Datetime': 'time'}) |
| | |
| | |
| | df.columns = [col.lower() for col in df.columns] |
| | |
| | |
| | for col in df.columns: |
| | if col != 'time': |
| | df[col] = df[col].apply(lambda x: x.item() if hasattr(x, 'item') else x) |
| | |
| | |
| | info = data.info |
| | |
| | |
| | market_cap = info.get('marketCap', None) |
| | volume_24h = info.get('volume24Hr', None) |
| | circulating_supply = info.get('circulatingSupply', None) |
| | |
| | |
| | data_dict = { |
| | "dataframe": df.to_dict(orient='records'), |
| | "last_price": float(df['close'].iloc[-1]) if not df.empty else None, |
| | "time_period": period, |
| | "interval": interval, |
| | "ticker": alt_ticker, |
| | "metadata": { |
| | "market_cap": market_cap, |
| | "volume_24h": volume_24h, |
| | "circulating_supply": circulating_supply, |
| | "last_updated": datetime.now().isoformat() |
| | }, |
| | "note": f"Used alternative ticker {alt_ticker} because primary ticker failed" |
| | } |
| | |
| | |
| | self.cached_data[cache_key] = data_dict |
| | self.last_cache_time[cache_key] = datetime.now() |
| | logger.info(f"Cached Bitcoin data for period={period}, interval={interval} using alternate ticker {alt_ticker}") |
| | |
| | return data_dict |
| | |
| | except Exception as e: |
| | logger.error(f"Error fetching data for alternative ticker {alt_ticker}: {str(e)}") |
| | continue |
| | |
| | |
| | error_msg = f"Failed to fetch Bitcoin data for period={period}, interval={interval} with all available tickers" |
| | logger.error(error_msg) |
| | return { |
| | "error": error_msg, |
| | "time_period": period, |
| | "interval": interval, |
| | "tickers_tried": alternative_tickers |
| | } |
| |
|
| |
|
| | class YahooCryptoMarketTool(BaseTool): |
| | name: str = "Yahoo Finance Crypto Market Tool" |
| | description: str = "Fetches data about the broader cryptocurrency market for contextual analysis" |
| | max_retries: int = 3 |
| | backoff_factor: float = 2.0 |
| | timeout: int = 30 |
| | cache_duration_minutes: int = 30 |
| | cached_data: Dict[str, Any] = None |
| | last_cache_time: datetime = None |
| | |
| | def __init__(self, max_retries: int = 3, backoff_factor: float = 2.0, timeout: int = 30, cache_duration_minutes: int = 30): |
| | """ |
| | Initialize the tool with retry parameters |
| | |
| | Args: |
| | max_retries: Maximum number of retry attempts (default: 3) |
| | backoff_factor: Exponential backoff factor between retries (default: 2.0) |
| | timeout: Request timeout in seconds (default: 30) |
| | cache_duration_minutes: How long to cache results (default: 30 minutes) |
| | """ |
| | super().__init__() |
| | self.max_retries = max_retries |
| | self.backoff_factor = backoff_factor |
| | self.timeout = timeout |
| | self.cache_duration_minutes = cache_duration_minutes |
| | self.cached_data = None |
| | self.last_cache_time = None |
| |
|
| | def _check_cache_valid(self) -> bool: |
| | """ |
| | Check if the cached data is still valid |
| | |
| | Returns: |
| | True if cache is valid, False otherwise |
| | """ |
| | if self.cached_data is None or self.last_cache_time is None: |
| | return False |
| | |
| | cache_age = datetime.now() - self.last_cache_time |
| | return cache_age.total_seconds() < (self.cache_duration_minutes * 60) |
| |
|
| | def _get_ticker_data_with_retries(self, ticker: str) -> Dict[str, Any]: |
| | """ |
| | Get ticker data with retry logic |
| | |
| | Args: |
| | ticker: The ticker symbol to fetch |
| | |
| | Returns: |
| | Dictionary with ticker data or error |
| | """ |
| | for attempt in range(self.max_retries): |
| | try: |
| | logger.info(f"Fetching data for {ticker} (attempt {attempt + 1}/{self.max_retries})") |
| | data = yf.Ticker(ticker) |
| | |
| | |
| | hist = data.history(period="5d", timeout=self.timeout) |
| | info = data.info |
| | |
| | |
| | if hist.empty: |
| | logger.warning(f"{ticker}: possibly delisted; no price data found (period=5d)") |
| | return { |
| | "ticker": ticker, |
| | "error": f"{ticker}: possibly delisted; no price data found (period=5d)" |
| | } |
| | |
| | |
| | return { |
| | "data": data, |
| | "hist": hist, |
| | "info": info |
| | } |
| | |
| | except requests.exceptions.Timeout as e: |
| | wait_time = self.backoff_factor ** attempt |
| | logger.warning(f"Timeout error for {ticker}: {str(e)}. Retrying in {wait_time:.1f} seconds...") |
| | if attempt < self.max_retries - 1: |
| | time.sleep(wait_time) |
| | else: |
| | logger.error(f"Max retries reached for {ticker}") |
| | return { |
| | "ticker": ticker, |
| | "error": f"Timeout error after {self.max_retries} attempts: {str(e)}" |
| | } |
| | |
| | except Exception as e: |
| | logger.error(f"Error fetching data for {ticker}: {str(e)}") |
| | return { |
| | "ticker": ticker, |
| | "error": str(e) |
| | } |
| | |
| | return { |
| | "ticker": ticker, |
| | "error": "Unknown error during retry attempts" |
| | } |
| | |
| | def _run(self, top_n: int = 10) -> Dict[str, Any]: |
| | """ |
| | Fetch data about the top cryptocurrencies in the market |
| | |
| | Args: |
| | top_n: Number of top cryptocurrencies to fetch data for |
| | |
| | Returns: |
| | Dictionary with market data and trends |
| | """ |
| | |
| | if self._check_cache_valid(): |
| | logger.info("Using cached cryptocurrency market data") |
| | return self.cached_data |
| | |
| | logger.info(f"Fetching data for top {top_n} cryptocurrencies") |
| | |
| | try: |
| | |
| | tickers = [ |
| | "BTC-USD", |
| | "ETH-USD", |
| | "XRP-USD", |
| | "SOL-USD", |
| | "ADA-USD", |
| | "AVAX-USD", |
| | "DOT-USD", |
| | "DOGE-USD", |
| | "LINK-USD", |
| | "MATIC-USD" |
| | ] |
| | |
| | |
| | tickers = tickers[:min(top_n, len(tickers))] |
| | |
| | results = [] |
| | market_cap_sum = 0 |
| | btc_dominance = 0 |
| | btc_market_cap = 0 |
| | success_count = 0 |
| | error_count = 0 |
| | |
| | |
| | for ticker in tickers: |
| | ticker_data = self._get_ticker_data_with_retries(ticker) |
| | |
| | if "error" in ticker_data: |
| | |
| | error_count += 1 |
| | results.append({ |
| | "ticker": ticker, |
| | "name": ticker.split('-')[0], |
| | "error": ticker_data["error"], |
| | "data_available": False |
| | }) |
| | continue |
| | |
| | |
| | data = ticker_data["data"] |
| | hist = ticker_data["hist"] |
| | info = ticker_data["info"] |
| | |
| | success_count += 1 |
| | |
| | if not hist.empty: |
| | current_price = hist['Close'].iloc[-1] |
| | day_change = ((current_price / hist['Close'].iloc[-2]) - 1) * 100 if len(hist) > 1 else 0 |
| | week_change = ((current_price / hist['Close'].iloc[0]) - 1) * 100 if len(hist) > 4 else day_change |
| | |
| | market_cap = info.get('marketCap', 0) |
| | market_cap_sum += market_cap if market_cap else 0 |
| | |
| | |
| | if ticker == "BTC-USD": |
| | btc_market_cap = market_cap if market_cap else 0 |
| | |
| | |
| | if hasattr(current_price, 'item'): |
| | current_price = current_price.item() |
| | if hasattr(day_change, 'item'): |
| | day_change = day_change.item() |
| | if hasattr(week_change, 'item'): |
| | week_change = week_change.item() |
| | |
| | results.append({ |
| | "ticker": ticker, |
| | "name": info.get('shortName', ticker.split('-')[0]), |
| | "current_price": current_price, |
| | "market_cap": market_cap, |
| | "volume_24h": info.get('volume24Hr', None), |
| | "day_change_percent": day_change, |
| | "week_change_percent": week_change, |
| | "data_available": True |
| | }) |
| | |
| | |
| | if market_cap_sum > 0 and btc_market_cap > 0: |
| | btc_dominance = (btc_market_cap / market_cap_sum) * 100 |
| | |
| | |
| | valid_results = [r for r in results if r.get("data_available", False)] |
| | market_trend = "bullish" if sum(r.get('day_change_percent', 0) for r in valid_results) > 0 else "bearish" |
| | |
| | |
| | response = { |
| | "cryptocurrencies": results, |
| | "market_summary": { |
| | "total_market_cap": market_cap_sum, |
| | "btc_dominance": btc_dominance, |
| | "market_trend": market_trend, |
| | "timestamp": datetime.now().isoformat(), |
| | "success_count": success_count, |
| | "error_count": error_count, |
| | "total_count": len(tickers) |
| | } |
| | } |
| | |
| | |
| | if success_count > 0: |
| | self.cached_data = response |
| | self.last_cache_time = datetime.now() |
| | logger.info(f"Cached cryptocurrency market data (success: {success_count}, errors: {error_count})") |
| | |
| | return response |
| | |
| | except Exception as e: |
| | logger.error(f"Error in YahooCryptoMarketTool: {str(e)}") |
| | return { |
| | "error": str(e), |
| | "market_summary": { |
| | "market_trend": "unknown", |
| | "timestamp": datetime.now().isoformat(), |
| | "error": str(e) |
| | }, |
| | "cryptocurrencies": [] |
| | } |