|
|
import os |
|
|
import requests |
|
|
import json |
|
|
from typing import Dict, Any, List, Optional |
|
|
from datetime import datetime, timedelta |
|
|
import pandas as pd |
|
|
import numpy as np |
|
|
import pytz |
|
|
|
|
|
|
|
|
import sys |
|
|
import importlib.util |
|
|
import inspect |
|
|
|
|
|
|
|
|
try: |
|
|
import pandas_ta as ta |
|
|
except ImportError as e: |
|
|
if "cannot import name 'NaN' from 'numpy'" in str(e): |
|
|
|
|
|
np.NaN = np.nan |
|
|
|
|
|
import pandas_ta as ta |
|
|
else: |
|
|
raise |
|
|
|
|
|
|
|
|
import openai |
|
|
from dotenv import load_dotenv |
|
|
load_dotenv() |
|
|
|
|
|
from crewai.tools import BaseTool |
|
|
from pydantic import Field |
|
|
from alpaca.data.historical import CryptoHistoricalDataClient |
|
|
from alpaca.data.requests import CryptoBarsRequest |
|
|
from alpaca.data.timeframe import TimeFrame, TimeFrameUnit |
|
|
|
|
|
|
|
|
openai.api_key = os.getenv("OPENAI_API_KEY") |
|
|
|
|
|
class IndicatorCalculator: |
|
|
"""A modular class to calculate various technical indicators on price data.""" |
|
|
|
|
|
|
|
|
AVAILABLE_INDICATORS = { |
|
|
"rsi": "Relative Strength Index - measures momentum by comparing recent gains to recent losses", |
|
|
"bollinger_bands": "Bollinger Bands - measure volatility with upper and lower bands", |
|
|
"macd": "Moving Average Convergence Divergence - momentum indicator showing relationship between two moving averages", |
|
|
"adx": "Average Directional Index - measures trend strength", |
|
|
"ema": "Exponential Moving Average - gives more weight to recent prices", |
|
|
"sma": "Simple Moving Average - arithmetic mean of prices over a period", |
|
|
"atr": "Average True Range - measures volatility", |
|
|
"stochastic": "Stochastic Oscillator - momentum indicator comparing close price to price range", |
|
|
"obv": "On-Balance Volume - relates volume to price change" |
|
|
} |
|
|
|
|
|
def __init__(self): |
|
|
"""Initialize the indicator calculator.""" |
|
|
pass |
|
|
|
|
|
def calculate_all_indicators(self, df: pd.DataFrame, params: Dict[str, Any] = None) -> pd.DataFrame: |
|
|
""" |
|
|
Calculate all available indicators on the price data |
|
|
|
|
|
Args: |
|
|
df: DataFrame with OHLCV data |
|
|
params: Dictionary of parameters for indicators |
|
|
|
|
|
Returns: |
|
|
DataFrame with added technical indicators |
|
|
""" |
|
|
if params is None: |
|
|
params = {} |
|
|
|
|
|
try: |
|
|
|
|
|
required_columns = ['open', 'high', 'low', 'close', 'volume'] |
|
|
missing_columns = [col for col in required_columns if col not in df.columns] |
|
|
|
|
|
if missing_columns: |
|
|
print(f"Missing columns in dataframe: {missing_columns}") |
|
|
print(f"Available columns: {df.columns}") |
|
|
return df |
|
|
|
|
|
|
|
|
rsi_length = params.get('rsi_length', 14) |
|
|
df = self.calculate_rsi(df, length=rsi_length) |
|
|
|
|
|
|
|
|
bb_length = params.get('bb_length', 20) |
|
|
bb_std = params.get('bb_std', 2.0) |
|
|
df = self.calculate_bollinger_bands(df, length=bb_length, std=bb_std) |
|
|
|
|
|
|
|
|
adx_length = params.get('adx_length', 14) |
|
|
df = self.calculate_adx(df, length=adx_length) |
|
|
|
|
|
|
|
|
ema_lengths = params.get('ema_lengths', [8, 21, 50, 200]) |
|
|
for length in ema_lengths: |
|
|
df = self.calculate_ema(df, length=length) |
|
|
|
|
|
|
|
|
macd_fast = params.get('macd_fast', 12) |
|
|
macd_slow = params.get('macd_slow', 26) |
|
|
macd_signal = params.get('macd_signal', 9) |
|
|
df = self.calculate_macd(df, fast=macd_fast, slow=macd_slow, signal=macd_signal) |
|
|
|
|
|
|
|
|
stoch_k = params.get('stoch_k', 14) |
|
|
stoch_d = params.get('stoch_d', 3) |
|
|
df = self.calculate_stochastic(df, k=stoch_k, d=stoch_d) |
|
|
|
|
|
|
|
|
atr_length = params.get('atr_length', 14) |
|
|
df = self.calculate_atr(df, length=atr_length) |
|
|
|
|
|
|
|
|
df = self.calculate_obv(df) |
|
|
|
|
|
print(f"Calculated all technical indicators. Final data shape: {df.shape}") |
|
|
|
|
|
return df |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error calculating indicators: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
return df |
|
|
|
|
|
def calculate_rsi(self, df: pd.DataFrame, length: int = 14) -> pd.DataFrame: |
|
|
"""Calculate RSI indicator.""" |
|
|
df[f'rsi_{length}'] = ta.rsi(df['close'], length=length) |
|
|
return df |
|
|
|
|
|
def calculate_bollinger_bands(self, df: pd.DataFrame, length: int = 20, std: float = 2.0) -> pd.DataFrame: |
|
|
"""Calculate Bollinger Bands indicator.""" |
|
|
bbands = ta.bbands(df['close'], length=length, std=std) |
|
|
df[f'bb_upper_{length}_{std}'] = bbands[f'BBU_{length}_{std}'] |
|
|
df[f'bb_middle_{length}_{std}'] = bbands[f'BBM_{length}_{std}'] |
|
|
df[f'bb_lower_{length}_{std}'] = bbands[f'BBL_{length}_{std}'] |
|
|
|
|
|
|
|
|
df[f'bb_position_{length}_{std}'] = (df['close'] - df[f'bb_lower_{length}_{std}']) / (df[f'bb_upper_{length}_{std}'] - df[f'bb_lower_{length}_{std}']) |
|
|
|
|
|
return df |
|
|
|
|
|
def calculate_adx(self, df: pd.DataFrame, length: int = 14) -> pd.DataFrame: |
|
|
"""Calculate ADX indicator.""" |
|
|
adx = ta.adx(df['high'], df['low'], df['close'], length=length) |
|
|
df[f'adx_{length}'] = adx[f'ADX_{length}'] |
|
|
df[f'di_plus_{length}'] = adx[f'DMP_{length}'] |
|
|
df[f'di_minus_{length}'] = adx[f'DMN_{length}'] |
|
|
return df |
|
|
|
|
|
def calculate_ema(self, df: pd.DataFrame, length: int = 21) -> pd.DataFrame: |
|
|
"""Calculate EMA indicator.""" |
|
|
df[f'ema_{length}'] = ta.ema(df['close'], length=length) |
|
|
return df |
|
|
|
|
|
def calculate_sma(self, df: pd.DataFrame, length: int = 21) -> pd.DataFrame: |
|
|
"""Calculate SMA indicator.""" |
|
|
df[f'sma_{length}'] = ta.sma(df['close'], length=length) |
|
|
return df |
|
|
|
|
|
def calculate_macd(self, df: pd.DataFrame, fast: int = 12, slow: int = 26, signal: int = 9) -> pd.DataFrame: |
|
|
"""Calculate MACD indicator.""" |
|
|
macd = ta.macd(df['close'], fast=fast, slow=slow, signal=signal) |
|
|
df[f'macd_{fast}_{slow}_{signal}'] = macd[f'MACD_{fast}_{slow}_{signal}'] |
|
|
df[f'macd_signal_{fast}_{slow}_{signal}'] = macd[f'MACDs_{fast}_{slow}_{signal}'] |
|
|
df[f'macd_histogram_{fast}_{slow}_{signal}'] = macd[f'MACDh_{fast}_{slow}_{signal}'] |
|
|
return df |
|
|
|
|
|
def calculate_stochastic(self, df: pd.DataFrame, k: int = 14, d: int = 3) -> pd.DataFrame: |
|
|
"""Calculate Stochastic Oscillator.""" |
|
|
stoch = ta.stoch(df['high'], df['low'], df['close'], k=k, d=d) |
|
|
df[f'stoch_k_{k}'] = stoch[f'STOCHk_{k}_{d}_3'] |
|
|
df[f'stoch_d_{k}_{d}'] = stoch[f'STOCHd_{k}_{d}_3'] |
|
|
return df |
|
|
|
|
|
def calculate_atr(self, df: pd.DataFrame, length: int = 14) -> pd.DataFrame: |
|
|
"""Calculate Average True Range.""" |
|
|
df[f'atr_{length}'] = ta.atr(df['high'], df['low'], df['close'], length=length) |
|
|
return df |
|
|
|
|
|
def calculate_obv(self, df: pd.DataFrame) -> pd.DataFrame: |
|
|
"""Calculate On-Balance Volume.""" |
|
|
df['obv'] = ta.obv(df['close'], df['volume']) |
|
|
return df |
|
|
|
|
|
@classmethod |
|
|
def get_available_indicators(cls) -> Dict[str, str]: |
|
|
"""Return the dictionary of available indicators with descriptions.""" |
|
|
return cls.AVAILABLE_INDICATORS |
|
|
|
|
|
class TechnicalAnalysisStrategy(BaseTool): |
|
|
name: str = "Bitcoin Technical Analysis Strategy Tool" |
|
|
description: str = "Fetches current Bitcoin technical indicators data from the market" |
|
|
|
|
|
|
|
|
api_key: Optional[str] = Field(default=None, description="Alpaca API key") |
|
|
api_secret: Optional[str] = Field(default=None, description="Alpaca API secret") |
|
|
client: Optional[Any] = Field(default=None, description="Alpaca client instance") |
|
|
indicator_calculator: IndicatorCalculator = Field(default_factory=IndicatorCalculator, description="Indicator calculator instance") |
|
|
|
|
|
|
|
|
model_config = {"arbitrary_types_allowed": True} |
|
|
|
|
|
def __init__(self, **kwargs): |
|
|
|
|
|
super().__init__(**kwargs) |
|
|
|
|
|
|
|
|
if not self.api_key: |
|
|
self.api_key = os.getenv("ALPACA_API_KEY") |
|
|
if not self.api_secret: |
|
|
self.api_secret = os.getenv("ALPACA_API_SECRET") |
|
|
|
|
|
print(f"Initializing Technical Analysis Strategy with API key: {'Present' if self.api_key else 'Missing'}") |
|
|
|
|
|
|
|
|
if not self.client and self.api_key and self.api_secret: |
|
|
try: |
|
|
self.client = CryptoHistoricalDataClient(api_key=self.api_key, secret_key=self.api_secret) |
|
|
print("Successfully initialized Alpaca client") |
|
|
except Exception as e: |
|
|
print(f"Error initializing Alpaca client: {e}") |
|
|
|
|
|
def _run(self) -> Dict[str, Any]: |
|
|
""" |
|
|
Fetch Bitcoin technical indicator data |
|
|
|
|
|
Returns: |
|
|
Dictionary with all technical indicators data |
|
|
""" |
|
|
try: |
|
|
print("Fetching Bitcoin technical indicator data") |
|
|
|
|
|
|
|
|
if not self.client: |
|
|
print("Client not initialized, attempting to create one now") |
|
|
self.client = CryptoHistoricalDataClient(api_key=self.api_key, secret_key=self.api_secret) |
|
|
|
|
|
|
|
|
|
|
|
min_required = 20 |
|
|
lookback_periods = 100 |
|
|
actual_lookback = max(min_required, lookback_periods) * 3 |
|
|
timeframe_minutes = 5 |
|
|
|
|
|
|
|
|
df = self._fetch_price_data(actual_lookback, timeframe_minutes) |
|
|
|
|
|
if df is None: |
|
|
print("No price data returned from fetch_price_data") |
|
|
return {"error": "No price data available"} |
|
|
|
|
|
print(f"Retrieved {len(df)} data points") |
|
|
|
|
|
if len(df) < min_required: |
|
|
print(f"Insufficient data points: {len(df)}, need at least {min_required}") |
|
|
return {"error": f"Insufficient price data: only received {len(df)} data points"} |
|
|
|
|
|
|
|
|
indicator_params = { |
|
|
'bb_std': 2.0, |
|
|
'rsi_length': 14, |
|
|
'bb_length': 20 |
|
|
} |
|
|
df = self.indicator_calculator.calculate_all_indicators(df, indicator_params) |
|
|
|
|
|
|
|
|
latest_data = df.iloc[-1].to_dict() |
|
|
|
|
|
|
|
|
result = {} |
|
|
|
|
|
|
|
|
result["price"] = float(latest_data.get('close', 0)) |
|
|
|
|
|
|
|
|
for key, value in latest_data.items(): |
|
|
if key not in ['open', 'high', 'low', 'close', 'volume', 'time']: |
|
|
|
|
|
if pd.isna(value): |
|
|
result[key] = None |
|
|
else: |
|
|
result[key] = float(value) |
|
|
|
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error fetching indicator data: {str(e)}") |
|
|
return {"error": str(e)} |
|
|
|
|
|
def _fetch_price_data(self, lookback_periods: int, timeframe_minutes: int = 5) -> pd.DataFrame: |
|
|
""" |
|
|
Fetch Bitcoin price data from Alpaca API |
|
|
|
|
|
Args: |
|
|
lookback_periods: Number of periods to fetch |
|
|
timeframe_minutes: Timeframe in minutes |
|
|
|
|
|
Returns: |
|
|
DataFrame with OHLCV data |
|
|
""" |
|
|
try: |
|
|
|
|
|
end = datetime.now(pytz.UTC) |
|
|
start = end - timedelta(minutes=timeframe_minutes * lookback_periods) |
|
|
|
|
|
print(f"Fetching price data from {start} to {end}") |
|
|
|
|
|
|
|
|
request_params = CryptoBarsRequest( |
|
|
symbol_or_symbols=["BTC/USD"], |
|
|
timeframe=TimeFrame(timeframe_minutes, TimeFrameUnit.Minute), |
|
|
start=start, |
|
|
end=end |
|
|
) |
|
|
|
|
|
print(f"Request parameters: {request_params}") |
|
|
|
|
|
|
|
|
bars = self.client.get_crypto_bars(request_params) |
|
|
|
|
|
if bars is None: |
|
|
print("No bars data returned from API") |
|
|
return None |
|
|
|
|
|
|
|
|
df = bars.df.reset_index() |
|
|
|
|
|
print(f"Raw data columns: {df.columns}") |
|
|
print(f"Raw data shape: {df.shape}") |
|
|
|
|
|
|
|
|
print(f"First few rows: {df.head(2)}") |
|
|
|
|
|
|
|
|
if 'timestamp' in df.columns: |
|
|
df = df.rename(columns={'timestamp': 'time'}) |
|
|
|
|
|
|
|
|
if 'symbol' in df.columns: |
|
|
df = df[df['symbol'] == 'BTC/USD'].reset_index(drop=True) |
|
|
df = df.drop(columns=['symbol']) |
|
|
|
|
|
print(f"Processed data shape: {df.shape}") |
|
|
return df |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error fetching price data with SDK: {e}") |
|
|
print(f"API Key present: {'Yes' if self.api_key else 'No'}") |
|
|
print(f"API Secret present: {'Yes' if self.api_secret else 'No'}") |
|
|
|
|
|
|
|
|
try: |
|
|
print("Attempting fallback to direct REST API call...") |
|
|
return self._fetch_price_data_direct_api(lookback_periods, timeframe_minutes) |
|
|
except Exception as e2: |
|
|
print(f"Error in fallback API call: {e2}") |
|
|
return None |
|
|
|
|
|
def _fetch_price_data_direct_api(self, lookback_periods: int, timeframe_minutes: int = 5) -> pd.DataFrame: |
|
|
""" |
|
|
Fallback method to fetch Bitcoin price data using direct REST API calls |
|
|
following the curl example format. |
|
|
""" |
|
|
|
|
|
end = datetime.now(pytz.UTC) |
|
|
start = end - timedelta(minutes=timeframe_minutes * lookback_periods) |
|
|
|
|
|
|
|
|
start_str = start.strftime('%Y-%m-%dT%H:%M:%SZ') |
|
|
end_str = end.strftime('%Y-%m-%dT%H:%M:%SZ') |
|
|
|
|
|
|
|
|
url = f"https://data.alpaca.markets/v1beta3/crypto/us/bars" |
|
|
|
|
|
|
|
|
params = { |
|
|
"symbols": "BTC/USD", |
|
|
"timeframe": f"{timeframe_minutes}Min", |
|
|
"start": start_str, |
|
|
"end": end_str, |
|
|
"limit": lookback_periods |
|
|
} |
|
|
|
|
|
|
|
|
headers = { |
|
|
"Apca-Api-Key-Id": self.api_key, |
|
|
"Apca-Api-Secret-Key": self.api_secret |
|
|
} |
|
|
|
|
|
print(f"Making direct API call to: {url}") |
|
|
print(f"With params: {params}") |
|
|
|
|
|
|
|
|
response = requests.get(url, params=params, headers=headers) |
|
|
|
|
|
if response.status_code != 200: |
|
|
print(f"API Error: {response.status_code} - {response.text}") |
|
|
return None |
|
|
|
|
|
|
|
|
data = response.json() |
|
|
print(f"API Response: {json.dumps(data)[:300]}...") |
|
|
|
|
|
|
|
|
if 'bars' not in data or 'BTC/USD' not in data['bars']: |
|
|
print("No bar data found in response") |
|
|
return None |
|
|
|
|
|
bars_data = data['bars']['BTC/USD'] |
|
|
|
|
|
|
|
|
df = pd.DataFrame(bars_data) |
|
|
|
|
|
|
|
|
if 't' in df.columns: |
|
|
df = df.rename(columns={ |
|
|
't': 'time', |
|
|
'o': 'open', |
|
|
'h': 'high', |
|
|
'l': 'low', |
|
|
'c': 'close', |
|
|
'v': 'volume' |
|
|
}) |
|
|
|
|
|
|
|
|
if 'time' in df.columns: |
|
|
df['time'] = pd.to_datetime(df['time']) |
|
|
|
|
|
print(f"Processed API data shape: {df.shape}") |
|
|
return df |