import os import requests import json from typing import Dict, Any, List, Optional from datetime import datetime, timedelta import pandas as pd import numpy as np import pytz # Import pytz for timezone handling # Patch for pandas_ta NaN import issue import sys import importlib.util import inspect # Create a patched version of pandas_ta that won't try to import NaN from numpy try: import pandas_ta as ta except ImportError as e: if "cannot import name 'NaN' from 'numpy'" in str(e): # Fix by monkeypatching numpy to provide NaN in expected location np.NaN = np.nan # Now import pandas_ta import pandas_ta as ta else: raise # Import OpenAI for LLM strategy interpretation import openai from dotenv import load_dotenv load_dotenv() from crewai.tools import BaseTool from pydantic import Field from alpaca.data.historical import CryptoHistoricalDataClient from alpaca.data.requests import CryptoBarsRequest from alpaca.data.timeframe import TimeFrame, TimeFrameUnit # Set up OpenAI API key openai.api_key = os.getenv("OPENAI_API_KEY") class IndicatorCalculator: """A modular class to calculate various technical indicators on price data.""" # Dictionary of available indicators with their descriptions AVAILABLE_INDICATORS = { "rsi": "Relative Strength Index - measures momentum by comparing recent gains to recent losses", "bollinger_bands": "Bollinger Bands - measure volatility with upper and lower bands", "macd": "Moving Average Convergence Divergence - momentum indicator showing relationship between two moving averages", "adx": "Average Directional Index - measures trend strength", "ema": "Exponential Moving Average - gives more weight to recent prices", "sma": "Simple Moving Average - arithmetic mean of prices over a period", "atr": "Average True Range - measures volatility", "stochastic": "Stochastic Oscillator - momentum indicator comparing close price to price range", "obv": "On-Balance Volume - relates volume to price change" } def __init__(self): """Initialize the indicator calculator.""" pass def calculate_all_indicators(self, df: pd.DataFrame, params: Dict[str, Any] = None) -> pd.DataFrame: """ Calculate all available indicators on the price data Args: df: DataFrame with OHLCV data params: Dictionary of parameters for indicators Returns: DataFrame with added technical indicators """ if params is None: params = {} try: # Ensure we're working with a DataFrame with expected columns required_columns = ['open', 'high', 'low', 'close', 'volume'] missing_columns = [col for col in required_columns if col not in df.columns] if missing_columns: print(f"Missing columns in dataframe: {missing_columns}") print(f"Available columns: {df.columns}") return df # Calculate RSI rsi_length = params.get('rsi_length', 14) df = self.calculate_rsi(df, length=rsi_length) # Calculate Bollinger Bands bb_length = params.get('bb_length', 20) bb_std = params.get('bb_std', 2.0) df = self.calculate_bollinger_bands(df, length=bb_length, std=bb_std) # Calculate ADX adx_length = params.get('adx_length', 14) df = self.calculate_adx(df, length=adx_length) # Calculate EMAs ema_lengths = params.get('ema_lengths', [8, 21, 50, 200]) for length in ema_lengths: df = self.calculate_ema(df, length=length) # Calculate MACD macd_fast = params.get('macd_fast', 12) macd_slow = params.get('macd_slow', 26) macd_signal = params.get('macd_signal', 9) df = self.calculate_macd(df, fast=macd_fast, slow=macd_slow, signal=macd_signal) # Calculate Stochastic Oscillator stoch_k = params.get('stoch_k', 14) stoch_d = params.get('stoch_d', 3) df = self.calculate_stochastic(df, k=stoch_k, d=stoch_d) # Calculate ATR atr_length = params.get('atr_length', 14) df = self.calculate_atr(df, length=atr_length) # Calculate OBV df = self.calculate_obv(df) print(f"Calculated all technical indicators. Final data shape: {df.shape}") return df except Exception as e: print(f"Error calculating indicators: {e}") import traceback traceback.print_exc() return df def calculate_rsi(self, df: pd.DataFrame, length: int = 14) -> pd.DataFrame: """Calculate RSI indicator.""" df[f'rsi_{length}'] = ta.rsi(df['close'], length=length) return df def calculate_bollinger_bands(self, df: pd.DataFrame, length: int = 20, std: float = 2.0) -> pd.DataFrame: """Calculate Bollinger Bands indicator.""" bbands = ta.bbands(df['close'], length=length, std=std) df[f'bb_upper_{length}_{std}'] = bbands[f'BBU_{length}_{std}'] df[f'bb_middle_{length}_{std}'] = bbands[f'BBM_{length}_{std}'] df[f'bb_lower_{length}_{std}'] = bbands[f'BBL_{length}_{std}'] # Normalize Bollinger Bands position (0 = lower band, 1 = upper band) df[f'bb_position_{length}_{std}'] = (df['close'] - df[f'bb_lower_{length}_{std}']) / (df[f'bb_upper_{length}_{std}'] - df[f'bb_lower_{length}_{std}']) return df def calculate_adx(self, df: pd.DataFrame, length: int = 14) -> pd.DataFrame: """Calculate ADX indicator.""" adx = ta.adx(df['high'], df['low'], df['close'], length=length) df[f'adx_{length}'] = adx[f'ADX_{length}'] df[f'di_plus_{length}'] = adx[f'DMP_{length}'] df[f'di_minus_{length}'] = adx[f'DMN_{length}'] return df def calculate_ema(self, df: pd.DataFrame, length: int = 21) -> pd.DataFrame: """Calculate EMA indicator.""" df[f'ema_{length}'] = ta.ema(df['close'], length=length) return df def calculate_sma(self, df: pd.DataFrame, length: int = 21) -> pd.DataFrame: """Calculate SMA indicator.""" df[f'sma_{length}'] = ta.sma(df['close'], length=length) return df def calculate_macd(self, df: pd.DataFrame, fast: int = 12, slow: int = 26, signal: int = 9) -> pd.DataFrame: """Calculate MACD indicator.""" macd = ta.macd(df['close'], fast=fast, slow=slow, signal=signal) df[f'macd_{fast}_{slow}_{signal}'] = macd[f'MACD_{fast}_{slow}_{signal}'] df[f'macd_signal_{fast}_{slow}_{signal}'] = macd[f'MACDs_{fast}_{slow}_{signal}'] df[f'macd_histogram_{fast}_{slow}_{signal}'] = macd[f'MACDh_{fast}_{slow}_{signal}'] return df def calculate_stochastic(self, df: pd.DataFrame, k: int = 14, d: int = 3) -> pd.DataFrame: """Calculate Stochastic Oscillator.""" stoch = ta.stoch(df['high'], df['low'], df['close'], k=k, d=d) df[f'stoch_k_{k}'] = stoch[f'STOCHk_{k}_{d}_3'] df[f'stoch_d_{k}_{d}'] = stoch[f'STOCHd_{k}_{d}_3'] return df def calculate_atr(self, df: pd.DataFrame, length: int = 14) -> pd.DataFrame: """Calculate Average True Range.""" df[f'atr_{length}'] = ta.atr(df['high'], df['low'], df['close'], length=length) return df def calculate_obv(self, df: pd.DataFrame) -> pd.DataFrame: """Calculate On-Balance Volume.""" df['obv'] = ta.obv(df['close'], df['volume']) return df @classmethod def get_available_indicators(cls) -> Dict[str, str]: """Return the dictionary of available indicators with descriptions.""" return cls.AVAILABLE_INDICATORS class TechnicalAnalysisStrategy(BaseTool): name: str = "Bitcoin Technical Analysis Strategy Tool" description: str = "Fetches current Bitcoin technical indicators data from the market" # Define all fields as proper Pydantic fields api_key: Optional[str] = Field(default=None, description="Alpaca API key") api_secret: Optional[str] = Field(default=None, description="Alpaca API secret") client: Optional[Any] = Field(default=None, description="Alpaca client instance") indicator_calculator: IndicatorCalculator = Field(default_factory=IndicatorCalculator, description="Indicator calculator instance") # Add model_config to allow arbitrary types model_config = {"arbitrary_types_allowed": True} def __init__(self, **kwargs): # Initialize the base class first super().__init__(**kwargs) # Set the API keys from environment variables if not provided directly if not self.api_key: self.api_key = os.getenv("ALPACA_API_KEY") if not self.api_secret: self.api_secret = os.getenv("ALPACA_API_SECRET") print(f"Initializing Technical Analysis Strategy with API key: {'Present' if self.api_key else 'Missing'}") # Initialize the Alpaca client if not already set if not self.client and self.api_key and self.api_secret: try: self.client = CryptoHistoricalDataClient(api_key=self.api_key, secret_key=self.api_secret) print("Successfully initialized Alpaca client") except Exception as e: print(f"Error initializing Alpaca client: {e}") def _run(self) -> Dict[str, Any]: """ Fetch Bitcoin technical indicator data Returns: Dictionary with all technical indicators data """ try: print("Fetching Bitcoin technical indicator data") # Make sure client is initialized if not self.client: print("Client not initialized, attempting to create one now") self.client = CryptoHistoricalDataClient(api_key=self.api_key, secret_key=self.api_secret) # Request extra periods to account for gaps in historical data # Request 2x the minimum to ensure we get enough data points min_required = 20 # Require fewer data points for basic analysis lookback_periods = 100 actual_lookback = max(min_required, lookback_periods) * 3 timeframe_minutes = 5 # Default timeframe # Fetch the Bitcoin price data for the specified timeframe df = self._fetch_price_data(actual_lookback, timeframe_minutes) if df is None: print("No price data returned from fetch_price_data") return {"error": "No price data available"} print(f"Retrieved {len(df)} data points") if len(df) < min_required: print(f"Insufficient data points: {len(df)}, need at least {min_required}") return {"error": f"Insufficient price data: only received {len(df)} data points"} # Calculate all indicators indicator_params = { 'bb_std': 2.0, 'rsi_length': 14, 'bb_length': 20 } df = self.indicator_calculator.calculate_all_indicators(df, indicator_params) # Get the latest data point with all indicators latest_data = df.iloc[-1].to_dict() # Prepare result dictionary result = {} # Add price data result["price"] = float(latest_data.get('close', 0)) # Add all indicators for key, value in latest_data.items(): if key not in ['open', 'high', 'low', 'close', 'volume', 'time']: # Convert numpy values to Python native types if pd.isna(value): result[key] = None else: result[key] = float(value) return result except Exception as e: print(f"Error fetching indicator data: {str(e)}") return {"error": str(e)} def _fetch_price_data(self, lookback_periods: int, timeframe_minutes: int = 5) -> pd.DataFrame: """ Fetch Bitcoin price data from Alpaca API Args: lookback_periods: Number of periods to fetch timeframe_minutes: Timeframe in minutes Returns: DataFrame with OHLCV data """ try: # Calculate the start and end dates with timezone information end = datetime.now(pytz.UTC) # Make end timezone-aware with UTC start = end - timedelta(minutes=timeframe_minutes * lookback_periods) print(f"Fetching price data from {start} to {end}") # Create the request parameters request_params = CryptoBarsRequest( symbol_or_symbols=["BTC/USD"], # Use correct format with slash timeframe=TimeFrame(timeframe_minutes, TimeFrameUnit.Minute), # Use specified timeframe start=start, end=end ) print(f"Request parameters: {request_params}") # Get the bars data bars = self.client.get_crypto_bars(request_params) if bars is None: print("No bars data returned from API") return None # Convert to dataframe df = bars.df.reset_index() print(f"Raw data columns: {df.columns}") print(f"Raw data shape: {df.shape}") # Print first few rows for debugging print(f"First few rows: {df.head(2)}") # Ensure proper column names if 'timestamp' in df.columns: df = df.rename(columns={'timestamp': 'time'}) # Filter to only BTC/USD data if 'symbol' in df.columns: df = df[df['symbol'] == 'BTC/USD'].reset_index(drop=True) df = df.drop(columns=['symbol']) print(f"Processed data shape: {df.shape}") return df except Exception as e: print(f"Error fetching price data with SDK: {e}") print(f"API Key present: {'Yes' if self.api_key else 'No'}") print(f"API Secret present: {'Yes' if self.api_secret else 'No'}") # Try the direct REST API approach as fallback try: print("Attempting fallback to direct REST API call...") return self._fetch_price_data_direct_api(lookback_periods, timeframe_minutes) except Exception as e2: print(f"Error in fallback API call: {e2}") return None def _fetch_price_data_direct_api(self, lookback_periods: int, timeframe_minutes: int = 5) -> pd.DataFrame: """ Fallback method to fetch Bitcoin price data using direct REST API calls following the curl example format. """ # Calculate the start and end dates end = datetime.now(pytz.UTC) start = end - timedelta(minutes=timeframe_minutes * lookback_periods) # Format dates for the API start_str = start.strftime('%Y-%m-%dT%H:%M:%SZ') end_str = end.strftime('%Y-%m-%dT%H:%M:%SZ') # Endpoint for historical bars url = f"https://data.alpaca.markets/v1beta3/crypto/us/bars" # Query parameters params = { "symbols": "BTC/USD", "timeframe": f"{timeframe_minutes}Min", "start": start_str, "end": end_str, "limit": lookback_periods } # Headers with authentication headers = { "Apca-Api-Key-Id": self.api_key, "Apca-Api-Secret-Key": self.api_secret } print(f"Making direct API call to: {url}") print(f"With params: {params}") # Make the request response = requests.get(url, params=params, headers=headers) if response.status_code != 200: print(f"API Error: {response.status_code} - {response.text}") return None # Parse response data = response.json() print(f"API Response: {json.dumps(data)[:300]}...") # Extract the bars if 'bars' not in data or 'BTC/USD' not in data['bars']: print("No bar data found in response") return None bars_data = data['bars']['BTC/USD'] # Create DataFrame df = pd.DataFrame(bars_data) # Rename columns to match expected format if 't' in df.columns: df = df.rename(columns={ 't': 'time', 'o': 'open', 'h': 'high', 'l': 'low', 'c': 'close', 'v': 'volume' }) # Convert timestamp to datetime if 'time' in df.columns: df['time'] = pd.to_datetime(df['time']) print(f"Processed API data shape: {df.shape}") return df