Spaces:
Sleeping
Sleeping
| import ccxt | |
| import pandas as pd | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| import matplotlib.dates as mdates | |
| from datetime import datetime, timedelta | |
| import ta | |
| from typing import List, Dict, Any, Optional, Tuple, Union, Callable | |
| import os | |
| import logging | |
| import json | |
| import time | |
| from dataclasses import dataclass, asdict | |
| import traceback | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| class AnalysisResult: | |
| """Data class for structured analysis results""" | |
| ticker: str | |
| timeframe: str | |
| summary: Dict[str, Any] | |
| chart_path: str | |
| indicators_used: List[str] | |
| data_points: int | |
| period: str | |
| timestamp: str | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Convert to dictionary""" | |
| return asdict(self) | |
| def to_json(self) -> str: | |
| """Convert to JSON string""" | |
| return json.dumps(self.to_dict(), indent=2) | |
| def get_trading_signal(self) -> Tuple[str, float]: | |
| """Extract trading signal from analysis""" | |
| signal = "NEUTRAL" | |
| confidence = 0.5 | |
| # Extract trend info | |
| if 'trend' in self.summary: | |
| if self.summary['trend'] == "Bullish": | |
| signal = "BUY" | |
| confidence = 0.7 | |
| elif self.summary['trend'] == "Bearish": | |
| signal = "SELL" | |
| confidence = 0.7 | |
| # Factor in RSI | |
| if 'RSI' in self.summary: | |
| rsi_value = self.summary['RSI']['value'] | |
| if rsi_value < 30 and signal != "BUY": | |
| signal = "BUY" | |
| confidence = max(confidence, 0.8) | |
| elif rsi_value > 70 and signal != "SELL": | |
| signal = "SELL" | |
| confidence = max(confidence, 0.8) | |
| # Consider MACD | |
| if 'MACD' in self.summary: | |
| if self.summary['MACD']['interpretation'] == "Bullish crossover" and signal != "SELL": | |
| signal = "BUY" | |
| confidence = max(confidence, 0.75) | |
| elif self.summary['MACD']['interpretation'] == "Bearish crossover" and signal != "BUY": | |
| signal = "SELL" | |
| confidence = max(confidence, 0.75) | |
| return signal, confidence | |
| def get_summary_text(self) -> str: | |
| """Get summary as formatted text""" | |
| summary_lines = [f"**Analysis for {self.ticker} ({self.timeframe}):**\n"] | |
| # Add trend information | |
| if 'trend' in self.summary: | |
| summary_lines.append(f"Overall Trend: {self.summary['trend']}") | |
| # Add price information | |
| if 'price' in self.summary: | |
| price_info = self.summary['price'] | |
| change_text = f" (24h change: {price_info['change_24h']}%)" if price_info['change_24h'] is not None else "" | |
| summary_lines.append(f"Current Price: {price_info['current']}{change_text}") | |
| # Add RSI information | |
| if 'RSI' in self.summary: | |
| rsi_info = self.summary['RSI'] | |
| summary_lines.append(f"RSI: {rsi_info['value']} - {rsi_info['interpretation']}") | |
| # Add MACD information | |
| if 'MACD' in self.summary: | |
| macd_info = self.summary['MACD'] | |
| summary_lines.append(f"MACD: {macd_info['value']}, Signal: {macd_info['signal']}") | |
| summary_lines.append(f"MACD Interpretation: {macd_info['interpretation']}") | |
| # Add Bollinger Bands information | |
| if 'Bollinger_Bands' in self.summary: | |
| bb_info = self.summary['Bollinger_Bands'] | |
| summary_lines.append(f"Bollinger Bands: Upper: {bb_info['upper']}, Middle: {bb_info['middle']}, Lower: {bb_info['lower']}") | |
| summary_lines.append(f"Bandwidth: {bb_info['bandwidth']}%, Position: {bb_info['position']}, Squeeze: {bb_info['squeeze']}") | |
| # Add support and resistance levels if available | |
| if 'Support' in self.summary and 'Resistance' in self.summary: | |
| summary_lines.append(f"Support Levels: {self.summary['Support']}") | |
| summary_lines.append(f"Resistance Levels: {self.summary['Resistance']}") | |
| # Add trading signal | |
| signal, confidence = self.get_trading_signal() | |
| summary_lines.append(f"\nTrading Signal: {signal} (Confidence: {confidence:.2f})") | |
| # Add chart path | |
| summary_lines.append(f"\nChart saved to: {self.chart_path}") | |
| # Add analysis period | |
| summary_lines.append(f"Analysis period: {self.period}") | |
| return "\n".join(summary_lines) | |
| class CryptoAnalyzer: | |
| """A class to analyze cryptocurrency charts with technical indicators.""" | |
| def __init__(self, | |
| exchange_name: str = "binance", | |
| output_dir: str = "./charts", | |
| rate_limit_pause: float = 1.0, | |
| max_retries: int = 3): | |
| """ | |
| Initialize the crypto analyzer with an exchange. | |
| Args: | |
| exchange_name (str): Name of CCXT-supported exchange (default: "binance") | |
| output_dir (str): Directory to save chart images | |
| rate_limit_pause (float): Pause between API calls to avoid rate limits | |
| max_retries (int): Maximum number of API call retries | |
| """ | |
| try: | |
| self.exchange = getattr(ccxt, exchange_name)() | |
| self.output_dir = output_dir | |
| self.rate_limit_pause = rate_limit_pause | |
| self.max_retries = max_retries | |
| self.supports_advanced_patterns = True # Flag for pattern recognition capabilities | |
| # Create output directory if it doesn't exist | |
| if not os.path.exists(output_dir): | |
| os.makedirs(output_dir) | |
| # Cache for market data to reduce API calls | |
| self._market_cache = {} | |
| self._last_api_call = 0 | |
| except AttributeError: | |
| supported = ", ".join(ccxt.exchanges) | |
| raise ValueError(f"Exchange '{exchange_name}' not supported. Choose from: {supported}") | |
| except Exception as e: | |
| raise Exception(f"Failed to initialize analyzer: {str(e)}") | |
| def get_supported_exchanges(self) -> List[str]: | |
| """Return list of supported exchanges""" | |
| return ccxt.exchanges | |
| def get_supported_timeframes(self) -> List[str]: | |
| """Return list of supported timeframes for current exchange""" | |
| return list(self.exchange.timeframes.keys()) if hasattr(self.exchange, 'timeframes') else ["1m", "5m", "15m", "30m", "1h", "4h", "1d", "1w"] | |
| def get_supported_indicators(self) -> Dict[str, str]: | |
| """Return dictionary of supported indicators with descriptions""" | |
| return { | |
| "RSI": "Relative Strength Index - Momentum oscillator measuring speed and change of price movements", | |
| "MACD": "Moving Average Convergence Divergence - Trend-following momentum indicator", | |
| "SMA": "Simple Moving Average - Average price over specified period", | |
| "EMA": "Exponential Moving Average - Weighted moving average giving more importance to recent prices", | |
| "BB": "Bollinger Bands - Volatility indicator showing price channels around moving average", | |
| "ATR": "Average True Range - Volatility indicator measuring market volatility", | |
| "OBV": "On-Balance Volume - Volume indicator using volume flow to predict changes in price", | |
| "VWAP": "Volume Weighted Average Price - Average price weighted by volume", | |
| "Ichimoku": "Ichimoku Cloud - Trend indicator showing support/resistance levels and momentum", | |
| "Stochastic": "Stochastic Oscillator - Momentum indicator comparing close price to price range", | |
| "Patterns": "Candlestick pattern recognition for common bullish and bearish patterns" | |
| } | |
| def _respect_rate_limit(self): | |
| """Implement rate limiting to avoid API restrictions""" | |
| elapsed = time.time() - self._last_api_call | |
| if elapsed < self.rate_limit_pause: | |
| time.sleep(self.rate_limit_pause - elapsed) | |
| self._last_api_call = time.time() | |
| def get_available_pairs(self, quote_currency: Optional[str] = None) -> List[str]: | |
| """ | |
| Get available trading pairs from exchange | |
| Args: | |
| quote_currency (Optional[str]): Filter by quote currency (e.g., "USD", "BTC") | |
| Returns: | |
| List[str]: List of available trading pairs | |
| """ | |
| try: | |
| if 'markets' not in self._market_cache: | |
| self._respect_rate_limit() | |
| self._market_cache['markets'] = self.exchange.load_markets() | |
| pairs = list(self._market_cache['markets'].keys()) | |
| if quote_currency: | |
| pairs = [p for p in pairs if p.endswith(f"/{quote_currency}")] | |
| return pairs | |
| except Exception as e: | |
| logger.error(f"Error fetching available pairs: {str(e)}") | |
| return [] | |
| def fetch_data(self, | |
| ticker: str, | |
| timeframe: str, | |
| days: int = 30, | |
| retry_on_error: bool = True) -> pd.DataFrame: | |
| """ | |
| Fetch OHLCV data for a specified ticker and timeframe. | |
| Args: | |
| ticker (str): Trading pair (e.g., "BTC/USD") | |
| timeframe (str): Timeframe (e.g., "1h", "4h", "1d") | |
| days (int): Number of days of historical data to fetch | |
| retry_on_error (bool): Whether to retry on network errors | |
| Returns: | |
| pd.DataFrame: DataFrame with OHLCV data | |
| """ | |
| retries = 0 | |
| last_error = None | |
| while retries <= self.max_retries: | |
| try: | |
| # Format symbol according to exchange requirements | |
| symbol = ticker | |
| # Calculate timestamp for the specified number of days ago | |
| since = int((datetime.now() - timedelta(days=days)).timestamp() * 1000) | |
| # Fetch OHLCV data with rate limiting | |
| self._respect_rate_limit() | |
| logger.info(f"Fetching {days} days of {timeframe} data for {ticker}") | |
| ohlcv = self.exchange.fetch_ohlcv(symbol, timeframe=timeframe, since=since, limit=1000) | |
| # Check if we got data | |
| if not ohlcv or len(ohlcv) < 2: | |
| raise ValueError(f"Insufficient data returned for {ticker} ({timeframe})") | |
| # Convert to DataFrame | |
| df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) | |
| df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') | |
| df.set_index('timestamp', inplace=True) | |
| return df | |
| except (ccxt.NetworkError, ccxt.ExchangeNotAvailable) as e: | |
| retries += 1 | |
| last_error = e | |
| wait_time = retries * 2 # Exponential backoff | |
| if retry_on_error and retries <= self.max_retries: | |
| logger.warning(f"Network error: {str(e)}. Retrying in {wait_time}s... (Attempt {retries}/{self.max_retries})") | |
| time.sleep(wait_time) | |
| else: | |
| raise ConnectionError(f"Failed to fetch data after {retries} attempts: {str(e)}") | |
| except ccxt.ExchangeError as e: | |
| logger.error(f"Exchange error: {str(e)}") | |
| raise ValueError(f"Failed to fetch data: Exchange error - {str(e)}") | |
| except Exception as e: | |
| logger.error(f"Unexpected error: {str(e)}") | |
| raise Exception(f"Failed to fetch data: {str(e)}") | |
| # If we got here, we've exhausted retries | |
| raise ConnectionError(f"Failed to fetch data after {self.max_retries} attempts: {str(last_error)}") | |
| def calculate_indicators(self, df: pd.DataFrame, indicators: List[str]) -> pd.DataFrame: | |
| """ | |
| Calculate technical indicators based on price data. | |
| Args: | |
| df (pd.DataFrame): OHLCV DataFrame | |
| indicators (List[str]): List of indicators to calculate | |
| Returns: | |
| pd.DataFrame: DataFrame with added indicator columns | |
| """ | |
| analysis = pd.DataFrame() | |
| analysis['close'] = df['close'] | |
| analysis['open'] = df['open'] | |
| analysis['high'] = df['high'] | |
| analysis['low'] = df['low'] | |
| analysis['volume'] = df['volume'] | |
| indicator_map = { | |
| "RSI": lambda: self._add_rsi(analysis, window=14), | |
| "MACD": lambda: self._add_macd(analysis), | |
| "SMA": lambda: self._add_sma(analysis, window=20), | |
| "EMA": lambda: self._add_ema(analysis, window=20), | |
| "BB": lambda: self._add_bollinger_bands(analysis, window=20, std=2), | |
| "ATR": lambda: self._add_atr(analysis, df, window=14), | |
| "OBV": lambda: self._add_obv(analysis, df), | |
| "VWAP": lambda: self._add_vwap(analysis, df), | |
| "Ichimoku": lambda: self._add_ichimoku(analysis), | |
| "Stochastic": lambda: self._add_stochastic(analysis), | |
| "Patterns": lambda: self._add_candlestick_patterns(analysis) | |
| } | |
| # Calculate requested indicators | |
| for indicator in indicators: | |
| if indicator in indicator_map: | |
| try: | |
| indicator_map[indicator]() | |
| except Exception as e: | |
| logger.warning(f"Failed to calculate {indicator}: {str(e)}") | |
| else: | |
| logger.warning(f"Indicator '{indicator}' not supported.") | |
| return analysis | |
| def _add_rsi(self, df: pd.DataFrame, window: int = 14) -> None: | |
| """Add Relative Strength Index to DataFrame.""" | |
| df['RSI'] = ta.momentum.RSIIndicator(df['close'], window=window).rsi() | |
| def _add_macd(self, df: pd.DataFrame, fast: int = 12, slow: int = 26, signal: int = 9) -> None: | |
| """Add MACD indicator to DataFrame.""" | |
| macd_indicator = ta.trend.MACD( | |
| df['close'], | |
| window_fast=fast, | |
| window_slow=slow, | |
| window_sign=signal | |
| ) | |
| df['MACD'] = macd_indicator.macd() | |
| df['MACD_signal'] = macd_indicator.macd_signal() | |
| df['MACD_histogram'] = macd_indicator.macd_diff() | |
| def _add_sma(self, df: pd.DataFrame, window: int = 20) -> None: | |
| """Add Simple Moving Average to DataFrame.""" | |
| df[f'SMA_{window}'] = ta.trend.SMAIndicator(df['close'], window=window).sma_indicator() | |
| def _add_ema(self, df: pd.DataFrame, window: int = 20) -> None: | |
| """Add Exponential Moving Average to DataFrame.""" | |
| df[f'EMA_{window}'] = ta.trend.EMAIndicator(df['close'], window=window).ema_indicator() | |
| def _add_bollinger_bands(self, df: pd.DataFrame, window: int = 20, std: float = 2) -> None: | |
| """Add Bollinger Bands to DataFrame.""" | |
| bollinger = ta.volatility.BollingerBands(df['close'], window=window, window_dev=std) | |
| df['BB_upper'] = bollinger.bollinger_hband() | |
| df['BB_middle'] = bollinger.bollinger_mavg() | |
| df['BB_lower'] = bollinger.bollinger_lband() | |
| def _add_atr(self, df: pd.DataFrame, ohlc: pd.DataFrame, window: int = 14) -> None: | |
| """Add Average True Range to DataFrame.""" | |
| df['ATR'] = ta.volatility.AverageTrueRange( | |
| high=ohlc['high'], | |
| low=ohlc['low'], | |
| close=ohlc['close'], | |
| window=window | |
| ).average_true_range() | |
| def _add_obv(self, df: pd.DataFrame, ohlc: pd.DataFrame) -> None: | |
| """Add On-Balance Volume to DataFrame.""" | |
| df['OBV'] = ta.volume.OnBalanceVolumeIndicator( | |
| close=ohlc['close'], | |
| volume=ohlc['volume'] | |
| ).on_balance_volume() | |
| def _add_vwap(self, df: pd.DataFrame, ohlc: pd.DataFrame) -> None: | |
| """Add Volume Weighted Average Price to DataFrame.""" | |
| try: | |
| # Reset index to access timestamp for VWAP calculation | |
| temp_df = ohlc.reset_index() | |
| # Group by date for daily VWAP | |
| temp_df['date'] = temp_df['timestamp'].dt.date | |
| typical_price = (temp_df['high'] + temp_df['low'] + temp_df['close']) / 3 | |
| temp_df['VWAP'] = (typical_price * temp_df['volume']).cumsum() / temp_df['volume'].cumsum() | |
| # Set back to original index | |
| df['VWAP'] = temp_df.set_index('timestamp')['VWAP'] | |
| except Exception as e: | |
| logger.error(f"VWAP calculation error: {str(e)}") | |
| def _add_ichimoku(self, df: pd.DataFrame) -> None: | |
| """Add Ichimoku Cloud indicator to DataFrame.""" | |
| try: | |
| # Tenkan-sen (Conversion Line): (9-period high + 9-period low)/2 | |
| period9_high = df['high'].rolling(window=9).max() | |
| period9_low = df['low'].rolling(window=9).min() | |
| df['tenkan_sen'] = (period9_high + period9_low) / 2 | |
| # Kijun-sen (Base Line): (26-period high + 26-period low)/2 | |
| period26_high = df['high'].rolling(window=26).max() | |
| period26_low = df['low'].rolling(window=26).min() | |
| df['kijun_sen'] = (period26_high + period26_low) / 2 | |
| # Senkou Span A (Leading Span A): (Conversion Line + Base Line)/2 | |
| df['senkou_span_a'] = ((df['tenkan_sen'] + df['kijun_sen']) / 2).shift(26) | |
| # Senkou Span B (Leading Span B): (52-period high + 52-period low)/2 | |
| period52_high = df['high'].rolling(window=52).max() | |
| period52_low = df['low'].rolling(window=52).min() | |
| df['senkou_span_b'] = ((period52_high + period52_low) / 2).shift(26) | |
| # Chikou Span (Lagging Span): Close price shifted back 26 periods | |
| df['chikou_span'] = df['close'].shift(-26) | |
| except Exception as e: | |
| logger.error(f"Ichimoku calculation error: {str(e)}") | |
| def _add_stochastic(self, df: pd.DataFrame, k_window: int = 14, d_window: int = 3) -> None: | |
| """Add Stochastic Oscillator to DataFrame.""" | |
| try: | |
| stoch = ta.momentum.StochasticOscillator( | |
| high=df['high'], | |
| low=df['low'], | |
| close=df['close'], | |
| window=k_window, | |
| smooth_window=d_window | |
| ) | |
| df['stoch_k'] = stoch.stoch() | |
| df['stoch_d'] = stoch.stoch_signal() | |
| except Exception as e: | |
| logger.error(f"Stochastic calculation error: {str(e)}") | |
| def _add_candlestick_patterns(self, df: pd.DataFrame) -> None: | |
| """Add candlestick pattern recognition to DataFrame.""" | |
| try: | |
| # Detect common candlestick patterns | |
| # Bullish patterns | |
| df['doji'] = ta.candlestick.doji(df['open'], df['high'], df['low'], df['close']) | |
| df['hammer'] = ta.candlestick.hammer(df['open'], df['high'], df['low'], df['close']) | |
| df['morning_star'] = ta.candlestick.morning_star(df['open'], df['high'], df['low'], df['close']) | |
| # Bearish patterns | |
| df['shooting_star'] = ta.candlestick.shooting_star(df['open'], df['high'], df['low'], df['close']) | |
| df['evening_star'] = ta.candlestick.evening_star(df['open'], df['high'], df['low'], df['close']) | |
| df['bearish_harami'] = ta.candlestick.bearish_harami(df['open'], df['high'], df['low'], df['close']) | |
| # Consolidate patterns into single column for easy identification | |
| df['bullish_pattern'] = (df['doji'] | df['hammer'] | df['morning_star']) | |
| df['bearish_pattern'] = (df['shooting_star'] | df['evening_star'] | df['bearish_harami']) | |
| except Exception as e: | |
| logger.error(f"Pattern recognition error: {str(e)}") | |
| def identify_support_resistance(self, df: pd.DataFrame, window: int = 20, threshold: float = 0.03) -> Tuple[List[float], List[float]]: | |
| """ | |
| Identify support and resistance levels using pivot points | |
| Args: | |
| df (pd.DataFrame): OHLCV data | |
| window (int): Lookback window for pivot identification | |
| threshold (float): Minimum price change to consider a pivot | |
| Returns: | |
| Tuple[List[float], List[float]]: Support and resistance levels | |
| """ | |
| try: | |
| # Identify pivot highs (resistance) | |
| pivot_highs = [] | |
| for i in range(window, len(df) - window): | |
| if all(df['high'].iloc[i] > df['high'].iloc[i-j] for j in range(1, window+1)) and \ | |
| all(df['high'].iloc[i] > df['high'].iloc[i+j] for j in range(1, window+1)): | |
| pivot_highs.append(df['high'].iloc[i]) | |
| # Identify pivot lows (support) | |
| pivot_lows = [] | |
| for i in range(window, len(df) - window): | |
| if all(df['low'].iloc[i] < df['low'].iloc[i-j] for j in range(1, window+1)) and \ | |
| all(df['low'].iloc[i] < df['low'].iloc[i+j] for j in range(1, window+1)): | |
| pivot_lows.append(df['low'].iloc[i]) | |
| # Group close levels | |
| def group_levels(levels, threshold): | |
| if not levels: | |
| return [] | |
| levels = sorted(levels) | |
| grouped = [] | |
| current_group = [levels[0]] | |
| for level in levels[1:]: | |
| if (level - current_group[0]) / current_group[0] <= threshold: | |
| current_group.append(level) | |
| else: | |
| grouped.append(sum(current_group) / len(current_group)) | |
| current_group = [level] | |
| if current_group: | |
| grouped.append(sum(current_group) / len(current_group)) | |
| return grouped | |
| return group_levels(pivot_lows, threshold), group_levels(pivot_highs, threshold) | |
| except Exception as e: | |
| logger.error(f"Support/resistance calculation error: {str(e)}") | |
| return [], [] | |
| def generate_analysis_summary(self, analysis: pd.DataFrame, indicators: List[str]) -> Dict[str, Any]: | |
| """ | |
| Generate a summary of the technical analysis. | |
| Args: | |
| analysis (pd.DataFrame): DataFrame with indicator data | |
| indicators (List[str]): List of indicators used | |
| Returns: | |
| Dict[str, Any]: Dictionary with analysis results | |
| """ | |
| summary = {} | |
| try: | |
| latest = analysis.iloc[-1] | |
| # Overall trend determination | |
| trend = "Neutral" | |
| trend_strength = 0 | |
| # Using moving averages for trend | |
| if "SMA" in indicators and "EMA" in indicators: | |
| sma_cols = [col for col in analysis.columns if 'SMA' in col] | |
| ema_cols = [col for col in analysis.columns if 'EMA' in col] | |
| if sma_cols and ema_cols: | |
| sma_col = sma_cols[0] | |
| ema_col = ema_cols[0] | |
| if latest['close'] > latest[sma_col] and latest['close'] > latest[ema_col]: | |
| trend = "Bullish" | |
| trend_strength += 1 | |
| elif latest['close'] < latest[sma_col] and latest['close'] < latest[ema_col]: | |
| trend = "Bearish" | |
| trend_strength += 1 | |
| # Using RSI for trend confirmation | |
| if "RSI" in indicators and not pd.isna(latest.get('RSI', np.nan)): | |
| rsi_value = latest['RSI'] | |
| if rsi_value > 60: | |
| if trend == "Bullish": | |
| trend_strength += 1 | |
| else: | |
| trend = "Bullish" | |
| elif rsi_value < 40: | |
| if trend == "Bearish": | |
| trend_strength += 1 | |
| else: | |
| trend = "Bearish" | |
| # Using MACD for trend confirmation | |
| if "MACD" in indicators and not pd.isna(latest.get('MACD', np.nan)): | |
| if latest['MACD'] > latest['MACD_signal']: | |
| if trend == "Bullish": | |
| trend_strength += 1 | |
| else: | |
| trend = "Bullish" | |
| elif latest['MACD'] < latest['MACD_signal']: | |
| if trend == "Bearish": | |
| trend_strength += 1 | |
| else: | |
| trend = "Bearish" | |
| # Store trend information | |
| summary['trend'] = trend | |
| summary['trend_strength'] = f"{trend_strength}/3" if trend_strength > 0 else "Weak" | |
| # RSI analysis | |
| if "RSI" in indicators and not pd.isna(latest.get('RSI', np.nan)): | |
| rsi_value = latest['RSI'] | |
| # Check RSI divergence | |
| has_divergence = False | |
| divergence_type = None | |
| if len(analysis) > 20: # Need enough data for divergence | |
| # Find recent price high/low | |
| price_section = analysis['close'].iloc[-20:] | |
| rsi_section = analysis['RSI'].iloc[-20:] | |
| price_high_idx = price_section.idxmax() | |
| price_low_idx = price_section.idxmin() | |
| rsi_high_idx = rsi_section.idxmax() | |
| rsi_low_idx = rsi_section.idxmin() | |
| # Bearish divergence: price makes higher high, RSI makes lower high | |
| if price_high_idx != rsi_high_idx and price_section.max() > price_section.iloc[0]: | |
| has_divergence = True | |
| divergence_type = "Bearish" | |
| # Bullish divergence: price makes lower low, RSI makes higher low | |
| if price_low_idx != rsi_low_idx and price_section.min() < price_section.iloc[0]: | |
| has_divergence = True | |
| divergence_type = "Bullish" | |
| summary['RSI'] = { | |
| 'value': round(rsi_value, 2), | |
| 'interpretation': "Overbought" if rsi_value > 70 else "Oversold" if rsi_value < 30 else "Neutral", | |
| 'has_divergence': has_divergence, | |
| 'divergence_type': divergence_type | |
| } | |
| # MACD analysis | |
| if "MACD" in indicators and not pd.isna(latest.get('MACD', np.nan)): | |
| macd_value = latest['MACD'] | |
| signal_value = latest['MACD_signal'] | |
| cross_direction = None | |
| # Check for recent crossover (past 5 periods) | |
| for i in range(min(5, len(analysis) - 1)): | |
| prev_idx = -2 - i | |
| if (analysis['MACD'].iloc[prev_idx] <= analysis['MACD_signal'].iloc[prev_idx] and | |
| macd_value > signal_value): | |
| cross_direction = "Bullish crossover" | |
| break | |
| elif (analysis['MACD'].iloc[prev_idx] >= analysis['MACD_signal'].iloc[prev_idx] and | |
| macd_value < signal_value): | |
| cross_direction = "Bearish crossover" | |
| break | |
| # Check for histogram momentum | |
| histogram_momentum = "Increasing" if latest['MACD_histogram'] > analysis['MACD_histogram'].iloc[-2] else "Decreasing" | |
| summary['MACD'] = { | |
| 'value': round(macd_value, 2), | |
| 'signal': round(signal_value, 2), | |
| 'histogram': round(latest['MACD_histogram'], 2), | |
| 'histogram_momentum': histogram_momentum, | |
| 'interpretation': cross_direction if cross_direction else "Neutral" | |
| } | |
| # Bollinger Bands analysis | |
| if "BB" in indicators and "BB_upper" in analysis.columns: | |
| # Calculate bandwidth | |
| bandwidth = (latest['BB_upper'] - latest['BB_lower']) / latest['BB_middle'] * 100 | |
| if latest['close'] > latest['BB_upper']: | |
| bb_position = "Above upper band (potentially overbought)" | |
| elif latest['close'] < latest['BB_lower']: | |
| bb_position = "Below lower band (potentially oversold)" | |
| else: | |
| # Calculate position within bands as percentage | |
| band_width = latest['BB_upper'] - latest['BB_lower'] | |
| if band_width > 0: | |
| position = (latest['close'] - latest['BB_lower']) / band_width * 100 | |
| bb_position = f"Within bands ({round(position, 1)}% from lower band)" | |
| else: | |
| bb_position = "Within bands" | |
| # Check for BB squeeze (narrowing bands) | |
| is_squeeze = False | |
| if len(analysis) > 20: | |
| prev_bandwidth = (analysis['BB_upper'].iloc[-20] - analysis['BB_lower'].iloc[-20]) / analysis['BB_middle'].iloc[-20] * 100 | |
| is_squeeze = bandwidth < prev_bandwidth * 0.8 # 20% narrower bands | |
| summary['Bollinger_Bands'] = { | |
| 'upper': round(latest['BB_upper'], 2), | |
| 'middle': round(latest['BB_middle'], 2), | |
| 'lower': round(latest['BB_lower'], 2), | |
| 'bandwidth': round(bandwidth, 2), | |
| 'position': bb_position, | |
| 'squeeze': is_squeeze | |
| } | |
| # Support and Resistance levels | |
| support, resistance = self.identify_support_resistance(analysis) | |
| summary['Support'] = support | |
| summary['Resistance'] = resistance | |
| return summary | |
| except Exception as e: | |
| logger.error(f"Error generating summary: {str(e)}") | |
| return summary | |
| def plot_chart(self, df: pd.DataFrame, ticker: str, timeframe: str, analysis: pd.DataFrame, indicators: List[str]) -> str: | |
| """ | |
| Plot candlestick chart with indicators and save chart image. | |
| Returns: | |
| str: File path of saved chart image. | |
| """ | |
| try: | |
| plt.figure(figsize=(12,8)) | |
| # Plot close price | |
| plt.plot(df.index, df['close'], label='Close Price', color='blue') | |
| # Plot SMA and EMA if available | |
| for col in df.columns: | |
| if 'SMA' in col or 'EMA' in col: | |
| plt.plot(df.index, df[col], label=col) | |
| # Format x-axis with date labels | |
| plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d')) | |
| plt.gca().xaxis.set_major_locator(mdates.AutoDateLocator()) | |
| plt.gcf().autofmt_xdate() | |
| plt.title(f"{ticker} Price Chart ({timeframe})") | |
| plt.xlabel("Date") | |
| plt.ylabel("Price") | |
| plt.legend() | |
| plt.grid(True) | |
| # Save chart | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| chart_filename = f"{ticker.replace('/', '_')}_{timeframe}_{timestamp}.png" | |
| chart_path = os.path.join(self.output_dir, chart_filename) | |
| plt.savefig(chart_path) | |
| plt.close() | |
| return chart_path | |
| except Exception as e: | |
| logger.error(f"Chart plotting error: {str(e)}") | |
| return "" | |
| def analyze(self, ticker: str, timeframe: str, indicators: List[str], days: int = 30) -> AnalysisResult: | |
| """ | |
| Perform full analysis: fetch data, calculate indicators, generate summary and chart. | |
| Returns: | |
| AnalysisResult: Structured analysis result. | |
| """ | |
| try: | |
| df = self.fetch_data(ticker, timeframe, days=days) | |
| analysis_df = self.calculate_indicators(df, indicators) | |
| summary = self.generate_analysis_summary(analysis_df, indicators) | |
| chart_path = self.plot_chart(df, ticker, timeframe, analysis_df, indicators) | |
| period = f"Last {days} days" | |
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| result = AnalysisResult( | |
| ticker=ticker, | |
| timeframe=timeframe, | |
| summary=summary, | |
| chart_path=chart_path, | |
| indicators_used=indicators, | |
| data_points=len(df), | |
| period=period, | |
| timestamp=timestamp | |
| ) | |
| return result | |
| except Exception as e: | |
| logger.error(f"Analysis failed: {traceback.format_exc()}") | |
| raise e | |
| if __name__ == "__main__": | |
| analyzer = CryptoAnalyzer(exchange_name="binance", output_dir="./charts", rate_limit_pause=1.0, max_retries=3) | |
| ticker = "BTC/USDT" | |
| timeframe = "1d" | |
| indicators = ["RSI", "MACD", "SMA", "EMA", "BB", "ATR", "OBV", "VWAP", "Ichimoku", "Stochastic", "Patterns"] | |
| try: | |
| result = analyzer.analyze(ticker, timeframe, indicators, days=90) | |
| print(result.get_summary_text()) | |
| print(result.to_json()) | |
| except Exception as e: | |
| print(f"Error during analysis: {e}") | |