cryptic / src /crypto_analysis /tools /technical_tools.py
vlbandara's picture
Upload folder using huggingface_hub
eb27803 verified
import os
import requests
import json
from typing import Dict, Any, List, Optional
from datetime import datetime, timedelta
import pandas as pd
import numpy as np
import pytz # Import pytz for timezone handling
# Patch for pandas_ta NaN import issue
import sys
import importlib.util
import inspect
# Create a patched version of pandas_ta that won't try to import NaN from numpy
try:
import pandas_ta as ta
except ImportError as e:
if "cannot import name 'NaN' from 'numpy'" in str(e):
# Fix by monkeypatching numpy to provide NaN in expected location
np.NaN = np.nan
# Now import pandas_ta
import pandas_ta as ta
else:
raise
# Import OpenAI for LLM strategy interpretation
import openai
from dotenv import load_dotenv
load_dotenv()
from crewai.tools import BaseTool
from pydantic import Field
from alpaca.data.historical import CryptoHistoricalDataClient
from alpaca.data.requests import CryptoBarsRequest
from alpaca.data.timeframe import TimeFrame, TimeFrameUnit
# Set up OpenAI API key
openai.api_key = os.getenv("OPENAI_API_KEY")
class IndicatorCalculator:
"""A modular class to calculate various technical indicators on price data."""
# Dictionary of available indicators with their descriptions
AVAILABLE_INDICATORS = {
"rsi": "Relative Strength Index - measures momentum by comparing recent gains to recent losses",
"bollinger_bands": "Bollinger Bands - measure volatility with upper and lower bands",
"macd": "Moving Average Convergence Divergence - momentum indicator showing relationship between two moving averages",
"adx": "Average Directional Index - measures trend strength",
"ema": "Exponential Moving Average - gives more weight to recent prices",
"sma": "Simple Moving Average - arithmetic mean of prices over a period",
"atr": "Average True Range - measures volatility",
"stochastic": "Stochastic Oscillator - momentum indicator comparing close price to price range",
"obv": "On-Balance Volume - relates volume to price change"
}
def __init__(self):
"""Initialize the indicator calculator."""
pass
def calculate_all_indicators(self, df: pd.DataFrame, params: Dict[str, Any] = None) -> pd.DataFrame:
"""
Calculate all available indicators on the price data
Args:
df: DataFrame with OHLCV data
params: Dictionary of parameters for indicators
Returns:
DataFrame with added technical indicators
"""
if params is None:
params = {}
try:
# Ensure we're working with a DataFrame with expected columns
required_columns = ['open', 'high', 'low', 'close', 'volume']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
print(f"Missing columns in dataframe: {missing_columns}")
print(f"Available columns: {df.columns}")
return df
# Calculate RSI
rsi_length = params.get('rsi_length', 14)
df = self.calculate_rsi(df, length=rsi_length)
# Calculate Bollinger Bands
bb_length = params.get('bb_length', 20)
bb_std = params.get('bb_std', 2.0)
df = self.calculate_bollinger_bands(df, length=bb_length, std=bb_std)
# Calculate ADX
adx_length = params.get('adx_length', 14)
df = self.calculate_adx(df, length=adx_length)
# Calculate EMAs
ema_lengths = params.get('ema_lengths', [8, 21, 50, 200])
for length in ema_lengths:
df = self.calculate_ema(df, length=length)
# Calculate MACD
macd_fast = params.get('macd_fast', 12)
macd_slow = params.get('macd_slow', 26)
macd_signal = params.get('macd_signal', 9)
df = self.calculate_macd(df, fast=macd_fast, slow=macd_slow, signal=macd_signal)
# Calculate Stochastic Oscillator
stoch_k = params.get('stoch_k', 14)
stoch_d = params.get('stoch_d', 3)
df = self.calculate_stochastic(df, k=stoch_k, d=stoch_d)
# Calculate ATR
atr_length = params.get('atr_length', 14)
df = self.calculate_atr(df, length=atr_length)
# Calculate OBV
df = self.calculate_obv(df)
print(f"Calculated all technical indicators. Final data shape: {df.shape}")
return df
except Exception as e:
print(f"Error calculating indicators: {e}")
import traceback
traceback.print_exc()
return df
def calculate_rsi(self, df: pd.DataFrame, length: int = 14) -> pd.DataFrame:
"""Calculate RSI indicator."""
df[f'rsi_{length}'] = ta.rsi(df['close'], length=length)
return df
def calculate_bollinger_bands(self, df: pd.DataFrame, length: int = 20, std: float = 2.0) -> pd.DataFrame:
"""Calculate Bollinger Bands indicator."""
bbands = ta.bbands(df['close'], length=length, std=std)
df[f'bb_upper_{length}_{std}'] = bbands[f'BBU_{length}_{std}']
df[f'bb_middle_{length}_{std}'] = bbands[f'BBM_{length}_{std}']
df[f'bb_lower_{length}_{std}'] = bbands[f'BBL_{length}_{std}']
# Normalize Bollinger Bands position (0 = lower band, 1 = upper band)
df[f'bb_position_{length}_{std}'] = (df['close'] - df[f'bb_lower_{length}_{std}']) / (df[f'bb_upper_{length}_{std}'] - df[f'bb_lower_{length}_{std}'])
return df
def calculate_adx(self, df: pd.DataFrame, length: int = 14) -> pd.DataFrame:
"""Calculate ADX indicator."""
adx = ta.adx(df['high'], df['low'], df['close'], length=length)
df[f'adx_{length}'] = adx[f'ADX_{length}']
df[f'di_plus_{length}'] = adx[f'DMP_{length}']
df[f'di_minus_{length}'] = adx[f'DMN_{length}']
return df
def calculate_ema(self, df: pd.DataFrame, length: int = 21) -> pd.DataFrame:
"""Calculate EMA indicator."""
df[f'ema_{length}'] = ta.ema(df['close'], length=length)
return df
def calculate_sma(self, df: pd.DataFrame, length: int = 21) -> pd.DataFrame:
"""Calculate SMA indicator."""
df[f'sma_{length}'] = ta.sma(df['close'], length=length)
return df
def calculate_macd(self, df: pd.DataFrame, fast: int = 12, slow: int = 26, signal: int = 9) -> pd.DataFrame:
"""Calculate MACD indicator."""
macd = ta.macd(df['close'], fast=fast, slow=slow, signal=signal)
df[f'macd_{fast}_{slow}_{signal}'] = macd[f'MACD_{fast}_{slow}_{signal}']
df[f'macd_signal_{fast}_{slow}_{signal}'] = macd[f'MACDs_{fast}_{slow}_{signal}']
df[f'macd_histogram_{fast}_{slow}_{signal}'] = macd[f'MACDh_{fast}_{slow}_{signal}']
return df
def calculate_stochastic(self, df: pd.DataFrame, k: int = 14, d: int = 3) -> pd.DataFrame:
"""Calculate Stochastic Oscillator."""
stoch = ta.stoch(df['high'], df['low'], df['close'], k=k, d=d)
df[f'stoch_k_{k}'] = stoch[f'STOCHk_{k}_{d}_3']
df[f'stoch_d_{k}_{d}'] = stoch[f'STOCHd_{k}_{d}_3']
return df
def calculate_atr(self, df: pd.DataFrame, length: int = 14) -> pd.DataFrame:
"""Calculate Average True Range."""
df[f'atr_{length}'] = ta.atr(df['high'], df['low'], df['close'], length=length)
return df
def calculate_obv(self, df: pd.DataFrame) -> pd.DataFrame:
"""Calculate On-Balance Volume."""
df['obv'] = ta.obv(df['close'], df['volume'])
return df
@classmethod
def get_available_indicators(cls) -> Dict[str, str]:
"""Return the dictionary of available indicators with descriptions."""
return cls.AVAILABLE_INDICATORS
class TechnicalAnalysisStrategy(BaseTool):
name: str = "Bitcoin Technical Analysis Strategy Tool"
description: str = "Fetches current Bitcoin technical indicators data from the market"
# Define all fields as proper Pydantic fields
api_key: Optional[str] = Field(default=None, description="Alpaca API key")
api_secret: Optional[str] = Field(default=None, description="Alpaca API secret")
client: Optional[Any] = Field(default=None, description="Alpaca client instance")
indicator_calculator: IndicatorCalculator = Field(default_factory=IndicatorCalculator, description="Indicator calculator instance")
# Add model_config to allow arbitrary types
model_config = {"arbitrary_types_allowed": True}
def __init__(self, **kwargs):
# Initialize the base class first
super().__init__(**kwargs)
# Set the API keys from environment variables if not provided directly
if not self.api_key:
self.api_key = os.getenv("ALPACA_API_KEY")
if not self.api_secret:
self.api_secret = os.getenv("ALPACA_API_SECRET")
print(f"Initializing Technical Analysis Strategy with API key: {'Present' if self.api_key else 'Missing'}")
# Initialize the Alpaca client if not already set
if not self.client and self.api_key and self.api_secret:
try:
self.client = CryptoHistoricalDataClient(api_key=self.api_key, secret_key=self.api_secret)
print("Successfully initialized Alpaca client")
except Exception as e:
print(f"Error initializing Alpaca client: {e}")
def _run(self) -> Dict[str, Any]:
"""
Fetch Bitcoin technical indicator data
Returns:
Dictionary with all technical indicators data
"""
try:
print("Fetching Bitcoin technical indicator data")
# Make sure client is initialized
if not self.client:
print("Client not initialized, attempting to create one now")
self.client = CryptoHistoricalDataClient(api_key=self.api_key, secret_key=self.api_secret)
# Request extra periods to account for gaps in historical data
# Request 2x the minimum to ensure we get enough data points
min_required = 20 # Require fewer data points for basic analysis
lookback_periods = 100
actual_lookback = max(min_required, lookback_periods) * 3
timeframe_minutes = 5 # Default timeframe
# Fetch the Bitcoin price data for the specified timeframe
df = self._fetch_price_data(actual_lookback, timeframe_minutes)
if df is None:
print("No price data returned from fetch_price_data")
return {"error": "No price data available"}
print(f"Retrieved {len(df)} data points")
if len(df) < min_required:
print(f"Insufficient data points: {len(df)}, need at least {min_required}")
return {"error": f"Insufficient price data: only received {len(df)} data points"}
# Calculate all indicators
indicator_params = {
'bb_std': 2.0,
'rsi_length': 14,
'bb_length': 20
}
df = self.indicator_calculator.calculate_all_indicators(df, indicator_params)
# Get the latest data point with all indicators
latest_data = df.iloc[-1].to_dict()
# Prepare result dictionary
result = {}
# Add price data
result["price"] = float(latest_data.get('close', 0))
# Add all indicators
for key, value in latest_data.items():
if key not in ['open', 'high', 'low', 'close', 'volume', 'time']:
# Convert numpy values to Python native types
if pd.isna(value):
result[key] = None
else:
result[key] = float(value)
return result
except Exception as e:
print(f"Error fetching indicator data: {str(e)}")
return {"error": str(e)}
def _fetch_price_data(self, lookback_periods: int, timeframe_minutes: int = 5) -> pd.DataFrame:
"""
Fetch Bitcoin price data from Alpaca API
Args:
lookback_periods: Number of periods to fetch
timeframe_minutes: Timeframe in minutes
Returns:
DataFrame with OHLCV data
"""
try:
# Calculate the start and end dates with timezone information
end = datetime.now(pytz.UTC) # Make end timezone-aware with UTC
start = end - timedelta(minutes=timeframe_minutes * lookback_periods)
print(f"Fetching price data from {start} to {end}")
# Create the request parameters
request_params = CryptoBarsRequest(
symbol_or_symbols=["BTC/USD"], # Use correct format with slash
timeframe=TimeFrame(timeframe_minutes, TimeFrameUnit.Minute), # Use specified timeframe
start=start,
end=end
)
print(f"Request parameters: {request_params}")
# Get the bars data
bars = self.client.get_crypto_bars(request_params)
if bars is None:
print("No bars data returned from API")
return None
# Convert to dataframe
df = bars.df.reset_index()
print(f"Raw data columns: {df.columns}")
print(f"Raw data shape: {df.shape}")
# Print first few rows for debugging
print(f"First few rows: {df.head(2)}")
# Ensure proper column names
if 'timestamp' in df.columns:
df = df.rename(columns={'timestamp': 'time'})
# Filter to only BTC/USD data
if 'symbol' in df.columns:
df = df[df['symbol'] == 'BTC/USD'].reset_index(drop=True)
df = df.drop(columns=['symbol'])
print(f"Processed data shape: {df.shape}")
return df
except Exception as e:
print(f"Error fetching price data with SDK: {e}")
print(f"API Key present: {'Yes' if self.api_key else 'No'}")
print(f"API Secret present: {'Yes' if self.api_secret else 'No'}")
# Try the direct REST API approach as fallback
try:
print("Attempting fallback to direct REST API call...")
return self._fetch_price_data_direct_api(lookback_periods, timeframe_minutes)
except Exception as e2:
print(f"Error in fallback API call: {e2}")
return None
def _fetch_price_data_direct_api(self, lookback_periods: int, timeframe_minutes: int = 5) -> pd.DataFrame:
"""
Fallback method to fetch Bitcoin price data using direct REST API calls
following the curl example format.
"""
# Calculate the start and end dates
end = datetime.now(pytz.UTC)
start = end - timedelta(minutes=timeframe_minutes * lookback_periods)
# Format dates for the API
start_str = start.strftime('%Y-%m-%dT%H:%M:%SZ')
end_str = end.strftime('%Y-%m-%dT%H:%M:%SZ')
# Endpoint for historical bars
url = f"https://data.alpaca.markets/v1beta3/crypto/us/bars"
# Query parameters
params = {
"symbols": "BTC/USD",
"timeframe": f"{timeframe_minutes}Min",
"start": start_str,
"end": end_str,
"limit": lookback_periods
}
# Headers with authentication
headers = {
"Apca-Api-Key-Id": self.api_key,
"Apca-Api-Secret-Key": self.api_secret
}
print(f"Making direct API call to: {url}")
print(f"With params: {params}")
# Make the request
response = requests.get(url, params=params, headers=headers)
if response.status_code != 200:
print(f"API Error: {response.status_code} - {response.text}")
return None
# Parse response
data = response.json()
print(f"API Response: {json.dumps(data)[:300]}...")
# Extract the bars
if 'bars' not in data or 'BTC/USD' not in data['bars']:
print("No bar data found in response")
return None
bars_data = data['bars']['BTC/USD']
# Create DataFrame
df = pd.DataFrame(bars_data)
# Rename columns to match expected format
if 't' in df.columns:
df = df.rename(columns={
't': 'time',
'o': 'open',
'h': 'high',
'l': 'low',
'c': 'close',
'v': 'volume'
})
# Convert timestamp to datetime
if 'time' in df.columns:
df['time'] = pd.to_datetime(df['time'])
print(f"Processed API data shape: {df.shape}")
return df