File size: 17,796 Bytes
eb27803 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 |
import os
import requests
import json
from typing import Dict, Any, List, Optional
from datetime import datetime, timedelta
import pandas as pd
import numpy as np
import pytz # Import pytz for timezone handling
# Patch for pandas_ta NaN import issue
import sys
import importlib.util
import inspect
# Create a patched version of pandas_ta that won't try to import NaN from numpy
try:
import pandas_ta as ta
except ImportError as e:
if "cannot import name 'NaN' from 'numpy'" in str(e):
# Fix by monkeypatching numpy to provide NaN in expected location
np.NaN = np.nan
# Now import pandas_ta
import pandas_ta as ta
else:
raise
# Import OpenAI for LLM strategy interpretation
import openai
from dotenv import load_dotenv
load_dotenv()
from crewai.tools import BaseTool
from pydantic import Field
from alpaca.data.historical import CryptoHistoricalDataClient
from alpaca.data.requests import CryptoBarsRequest
from alpaca.data.timeframe import TimeFrame, TimeFrameUnit
# Set up OpenAI API key
openai.api_key = os.getenv("OPENAI_API_KEY")
class IndicatorCalculator:
"""A modular class to calculate various technical indicators on price data."""
# Dictionary of available indicators with their descriptions
AVAILABLE_INDICATORS = {
"rsi": "Relative Strength Index - measures momentum by comparing recent gains to recent losses",
"bollinger_bands": "Bollinger Bands - measure volatility with upper and lower bands",
"macd": "Moving Average Convergence Divergence - momentum indicator showing relationship between two moving averages",
"adx": "Average Directional Index - measures trend strength",
"ema": "Exponential Moving Average - gives more weight to recent prices",
"sma": "Simple Moving Average - arithmetic mean of prices over a period",
"atr": "Average True Range - measures volatility",
"stochastic": "Stochastic Oscillator - momentum indicator comparing close price to price range",
"obv": "On-Balance Volume - relates volume to price change"
}
def __init__(self):
"""Initialize the indicator calculator."""
pass
def calculate_all_indicators(self, df: pd.DataFrame, params: Dict[str, Any] = None) -> pd.DataFrame:
"""
Calculate all available indicators on the price data
Args:
df: DataFrame with OHLCV data
params: Dictionary of parameters for indicators
Returns:
DataFrame with added technical indicators
"""
if params is None:
params = {}
try:
# Ensure we're working with a DataFrame with expected columns
required_columns = ['open', 'high', 'low', 'close', 'volume']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
print(f"Missing columns in dataframe: {missing_columns}")
print(f"Available columns: {df.columns}")
return df
# Calculate RSI
rsi_length = params.get('rsi_length', 14)
df = self.calculate_rsi(df, length=rsi_length)
# Calculate Bollinger Bands
bb_length = params.get('bb_length', 20)
bb_std = params.get('bb_std', 2.0)
df = self.calculate_bollinger_bands(df, length=bb_length, std=bb_std)
# Calculate ADX
adx_length = params.get('adx_length', 14)
df = self.calculate_adx(df, length=adx_length)
# Calculate EMAs
ema_lengths = params.get('ema_lengths', [8, 21, 50, 200])
for length in ema_lengths:
df = self.calculate_ema(df, length=length)
# Calculate MACD
macd_fast = params.get('macd_fast', 12)
macd_slow = params.get('macd_slow', 26)
macd_signal = params.get('macd_signal', 9)
df = self.calculate_macd(df, fast=macd_fast, slow=macd_slow, signal=macd_signal)
# Calculate Stochastic Oscillator
stoch_k = params.get('stoch_k', 14)
stoch_d = params.get('stoch_d', 3)
df = self.calculate_stochastic(df, k=stoch_k, d=stoch_d)
# Calculate ATR
atr_length = params.get('atr_length', 14)
df = self.calculate_atr(df, length=atr_length)
# Calculate OBV
df = self.calculate_obv(df)
print(f"Calculated all technical indicators. Final data shape: {df.shape}")
return df
except Exception as e:
print(f"Error calculating indicators: {e}")
import traceback
traceback.print_exc()
return df
def calculate_rsi(self, df: pd.DataFrame, length: int = 14) -> pd.DataFrame:
"""Calculate RSI indicator."""
df[f'rsi_{length}'] = ta.rsi(df['close'], length=length)
return df
def calculate_bollinger_bands(self, df: pd.DataFrame, length: int = 20, std: float = 2.0) -> pd.DataFrame:
"""Calculate Bollinger Bands indicator."""
bbands = ta.bbands(df['close'], length=length, std=std)
df[f'bb_upper_{length}_{std}'] = bbands[f'BBU_{length}_{std}']
df[f'bb_middle_{length}_{std}'] = bbands[f'BBM_{length}_{std}']
df[f'bb_lower_{length}_{std}'] = bbands[f'BBL_{length}_{std}']
# Normalize Bollinger Bands position (0 = lower band, 1 = upper band)
df[f'bb_position_{length}_{std}'] = (df['close'] - df[f'bb_lower_{length}_{std}']) / (df[f'bb_upper_{length}_{std}'] - df[f'bb_lower_{length}_{std}'])
return df
def calculate_adx(self, df: pd.DataFrame, length: int = 14) -> pd.DataFrame:
"""Calculate ADX indicator."""
adx = ta.adx(df['high'], df['low'], df['close'], length=length)
df[f'adx_{length}'] = adx[f'ADX_{length}']
df[f'di_plus_{length}'] = adx[f'DMP_{length}']
df[f'di_minus_{length}'] = adx[f'DMN_{length}']
return df
def calculate_ema(self, df: pd.DataFrame, length: int = 21) -> pd.DataFrame:
"""Calculate EMA indicator."""
df[f'ema_{length}'] = ta.ema(df['close'], length=length)
return df
def calculate_sma(self, df: pd.DataFrame, length: int = 21) -> pd.DataFrame:
"""Calculate SMA indicator."""
df[f'sma_{length}'] = ta.sma(df['close'], length=length)
return df
def calculate_macd(self, df: pd.DataFrame, fast: int = 12, slow: int = 26, signal: int = 9) -> pd.DataFrame:
"""Calculate MACD indicator."""
macd = ta.macd(df['close'], fast=fast, slow=slow, signal=signal)
df[f'macd_{fast}_{slow}_{signal}'] = macd[f'MACD_{fast}_{slow}_{signal}']
df[f'macd_signal_{fast}_{slow}_{signal}'] = macd[f'MACDs_{fast}_{slow}_{signal}']
df[f'macd_histogram_{fast}_{slow}_{signal}'] = macd[f'MACDh_{fast}_{slow}_{signal}']
return df
def calculate_stochastic(self, df: pd.DataFrame, k: int = 14, d: int = 3) -> pd.DataFrame:
"""Calculate Stochastic Oscillator."""
stoch = ta.stoch(df['high'], df['low'], df['close'], k=k, d=d)
df[f'stoch_k_{k}'] = stoch[f'STOCHk_{k}_{d}_3']
df[f'stoch_d_{k}_{d}'] = stoch[f'STOCHd_{k}_{d}_3']
return df
def calculate_atr(self, df: pd.DataFrame, length: int = 14) -> pd.DataFrame:
"""Calculate Average True Range."""
df[f'atr_{length}'] = ta.atr(df['high'], df['low'], df['close'], length=length)
return df
def calculate_obv(self, df: pd.DataFrame) -> pd.DataFrame:
"""Calculate On-Balance Volume."""
df['obv'] = ta.obv(df['close'], df['volume'])
return df
@classmethod
def get_available_indicators(cls) -> Dict[str, str]:
"""Return the dictionary of available indicators with descriptions."""
return cls.AVAILABLE_INDICATORS
class TechnicalAnalysisStrategy(BaseTool):
name: str = "Bitcoin Technical Analysis Strategy Tool"
description: str = "Fetches current Bitcoin technical indicators data from the market"
# Define all fields as proper Pydantic fields
api_key: Optional[str] = Field(default=None, description="Alpaca API key")
api_secret: Optional[str] = Field(default=None, description="Alpaca API secret")
client: Optional[Any] = Field(default=None, description="Alpaca client instance")
indicator_calculator: IndicatorCalculator = Field(default_factory=IndicatorCalculator, description="Indicator calculator instance")
# Add model_config to allow arbitrary types
model_config = {"arbitrary_types_allowed": True}
def __init__(self, **kwargs):
# Initialize the base class first
super().__init__(**kwargs)
# Set the API keys from environment variables if not provided directly
if not self.api_key:
self.api_key = os.getenv("ALPACA_API_KEY")
if not self.api_secret:
self.api_secret = os.getenv("ALPACA_API_SECRET")
print(f"Initializing Technical Analysis Strategy with API key: {'Present' if self.api_key else 'Missing'}")
# Initialize the Alpaca client if not already set
if not self.client and self.api_key and self.api_secret:
try:
self.client = CryptoHistoricalDataClient(api_key=self.api_key, secret_key=self.api_secret)
print("Successfully initialized Alpaca client")
except Exception as e:
print(f"Error initializing Alpaca client: {e}")
def _run(self) -> Dict[str, Any]:
"""
Fetch Bitcoin technical indicator data
Returns:
Dictionary with all technical indicators data
"""
try:
print("Fetching Bitcoin technical indicator data")
# Make sure client is initialized
if not self.client:
print("Client not initialized, attempting to create one now")
self.client = CryptoHistoricalDataClient(api_key=self.api_key, secret_key=self.api_secret)
# Request extra periods to account for gaps in historical data
# Request 2x the minimum to ensure we get enough data points
min_required = 20 # Require fewer data points for basic analysis
lookback_periods = 100
actual_lookback = max(min_required, lookback_periods) * 3
timeframe_minutes = 5 # Default timeframe
# Fetch the Bitcoin price data for the specified timeframe
df = self._fetch_price_data(actual_lookback, timeframe_minutes)
if df is None:
print("No price data returned from fetch_price_data")
return {"error": "No price data available"}
print(f"Retrieved {len(df)} data points")
if len(df) < min_required:
print(f"Insufficient data points: {len(df)}, need at least {min_required}")
return {"error": f"Insufficient price data: only received {len(df)} data points"}
# Calculate all indicators
indicator_params = {
'bb_std': 2.0,
'rsi_length': 14,
'bb_length': 20
}
df = self.indicator_calculator.calculate_all_indicators(df, indicator_params)
# Get the latest data point with all indicators
latest_data = df.iloc[-1].to_dict()
# Prepare result dictionary
result = {}
# Add price data
result["price"] = float(latest_data.get('close', 0))
# Add all indicators
for key, value in latest_data.items():
if key not in ['open', 'high', 'low', 'close', 'volume', 'time']:
# Convert numpy values to Python native types
if pd.isna(value):
result[key] = None
else:
result[key] = float(value)
return result
except Exception as e:
print(f"Error fetching indicator data: {str(e)}")
return {"error": str(e)}
def _fetch_price_data(self, lookback_periods: int, timeframe_minutes: int = 5) -> pd.DataFrame:
"""
Fetch Bitcoin price data from Alpaca API
Args:
lookback_periods: Number of periods to fetch
timeframe_minutes: Timeframe in minutes
Returns:
DataFrame with OHLCV data
"""
try:
# Calculate the start and end dates with timezone information
end = datetime.now(pytz.UTC) # Make end timezone-aware with UTC
start = end - timedelta(minutes=timeframe_minutes * lookback_periods)
print(f"Fetching price data from {start} to {end}")
# Create the request parameters
request_params = CryptoBarsRequest(
symbol_or_symbols=["BTC/USD"], # Use correct format with slash
timeframe=TimeFrame(timeframe_minutes, TimeFrameUnit.Minute), # Use specified timeframe
start=start,
end=end
)
print(f"Request parameters: {request_params}")
# Get the bars data
bars = self.client.get_crypto_bars(request_params)
if bars is None:
print("No bars data returned from API")
return None
# Convert to dataframe
df = bars.df.reset_index()
print(f"Raw data columns: {df.columns}")
print(f"Raw data shape: {df.shape}")
# Print first few rows for debugging
print(f"First few rows: {df.head(2)}")
# Ensure proper column names
if 'timestamp' in df.columns:
df = df.rename(columns={'timestamp': 'time'})
# Filter to only BTC/USD data
if 'symbol' in df.columns:
df = df[df['symbol'] == 'BTC/USD'].reset_index(drop=True)
df = df.drop(columns=['symbol'])
print(f"Processed data shape: {df.shape}")
return df
except Exception as e:
print(f"Error fetching price data with SDK: {e}")
print(f"API Key present: {'Yes' if self.api_key else 'No'}")
print(f"API Secret present: {'Yes' if self.api_secret else 'No'}")
# Try the direct REST API approach as fallback
try:
print("Attempting fallback to direct REST API call...")
return self._fetch_price_data_direct_api(lookback_periods, timeframe_minutes)
except Exception as e2:
print(f"Error in fallback API call: {e2}")
return None
def _fetch_price_data_direct_api(self, lookback_periods: int, timeframe_minutes: int = 5) -> pd.DataFrame:
"""
Fallback method to fetch Bitcoin price data using direct REST API calls
following the curl example format.
"""
# Calculate the start and end dates
end = datetime.now(pytz.UTC)
start = end - timedelta(minutes=timeframe_minutes * lookback_periods)
# Format dates for the API
start_str = start.strftime('%Y-%m-%dT%H:%M:%SZ')
end_str = end.strftime('%Y-%m-%dT%H:%M:%SZ')
# Endpoint for historical bars
url = f"https://data.alpaca.markets/v1beta3/crypto/us/bars"
# Query parameters
params = {
"symbols": "BTC/USD",
"timeframe": f"{timeframe_minutes}Min",
"start": start_str,
"end": end_str,
"limit": lookback_periods
}
# Headers with authentication
headers = {
"Apca-Api-Key-Id": self.api_key,
"Apca-Api-Secret-Key": self.api_secret
}
print(f"Making direct API call to: {url}")
print(f"With params: {params}")
# Make the request
response = requests.get(url, params=params, headers=headers)
if response.status_code != 200:
print(f"API Error: {response.status_code} - {response.text}")
return None
# Parse response
data = response.json()
print(f"API Response: {json.dumps(data)[:300]}...")
# Extract the bars
if 'bars' not in data or 'BTC/USD' not in data['bars']:
print("No bar data found in response")
return None
bars_data = data['bars']['BTC/USD']
# Create DataFrame
df = pd.DataFrame(bars_data)
# Rename columns to match expected format
if 't' in df.columns:
df = df.rename(columns={
't': 'time',
'o': 'open',
'h': 'high',
'l': 'low',
'c': 'close',
'v': 'volume'
})
# Convert timestamp to datetime
if 'time' in df.columns:
df['time'] = pd.to_datetime(df['time'])
print(f"Processed API data shape: {df.shape}")
return df |