Spaces:
Build error
Build error
| """ | |
| Technical indicators module | |
| Uses pandas_ta for efficient calculation | |
| """ | |
| import pandas as pd | |
| import numpy as np | |
| import pandas_ta as ta | |
| from typing import Dict, List | |
| class TechnicalIndicators: | |
| """Calculate various technical indicators""" | |
| def add_all_indicators(df: pd.DataFrame, include_advanced: bool = False) -> pd.DataFrame: | |
| """ | |
| Add all common technical indicators to dataframe | |
| Args: | |
| df: OHLCV dataframe | |
| include_advanced: Include advanced indicators | |
| Returns: | |
| DataFrame with indicators added | |
| """ | |
| df = df.copy() | |
| # Moving Averages | |
| df = TechnicalIndicators.add_moving_averages(df) | |
| # Momentum Indicators | |
| df = TechnicalIndicators.add_momentum_indicators(df) | |
| # Volatility Indicators | |
| df = TechnicalIndicators.add_volatility_indicators(df) | |
| # Volume Indicators | |
| df = TechnicalIndicators.add_volume_indicators(df) | |
| if include_advanced: | |
| df = TechnicalIndicators.add_advanced_indicators(df) | |
| return df | |
| def add_moving_averages(df: pd.DataFrame) -> pd.DataFrame: | |
| """Add various moving averages""" | |
| df['sma_20'] = ta.sma(df['close'], length=20) | |
| df['sma_50'] = ta.sma(df['close'], length=50) | |
| df['sma_200'] = ta.sma(df['close'], length=200) | |
| df['ema_9'] = ta.ema(df['close'], length=9) | |
| df['ema_21'] = ta.ema(df['close'], length=21) | |
| df['ema_55'] = ta.ema(df['close'], length=55) | |
| # VWAP | |
| df['vwap'] = ta.vwap(df['high'], df['low'], df['close'], df['volume']) | |
| return df | |
| def add_momentum_indicators(df: pd.DataFrame) -> pd.DataFrame: | |
| """Add momentum indicators""" | |
| # RSI | |
| df['rsi'] = ta.rsi(df['close'], length=14) | |
| # MACD | |
| macd = ta.macd(df['close'], fast=12, slow=26, signal=9) | |
| df['macd'] = macd['MACD_12_26_9'] | |
| df['macd_signal'] = macd['MACDs_12_26_9'] | |
| df['macd_hist'] = macd['MACDh_12_26_9'] | |
| # Stochastic | |
| stoch = ta.stoch(df['high'], df['low'], df['close']) | |
| df['stoch_k'] = stoch['STOCHk_14_3_3'] | |
| df['stoch_d'] = stoch['STOCHd_14_3_3'] | |
| # CCI | |
| df['cci'] = ta.cci(df['high'], df['low'], df['close'], length=20) | |
| # Williams %R | |
| df['willr'] = ta.willr(df['high'], df['low'], df['close'], length=14) | |
| return df | |
| def add_volatility_indicators(df: pd.DataFrame) -> pd.DataFrame: | |
| """Add volatility indicators""" | |
| # Bollinger Bands | |
| bbands = ta.bbands(df['close'], length=20, std=2) | |
| df['bb_upper'] = bbands['BBU_20_2.0'] | |
| df['bb_middle'] = bbands['BBM_20_2.0'] | |
| df['bb_lower'] = bbands['BBL_20_2.0'] | |
| df['bb_width'] = (df['bb_upper'] - df['bb_lower']) / df['bb_middle'] | |
| # ATR | |
| df['atr'] = ta.atr(df['high'], df['low'], df['close'], length=14) | |
| # Keltner Channels | |
| kc = ta.kc(df['high'], df['low'], df['close'], length=20) | |
| df['kc_upper'] = kc['KCUe_20_2'] | |
| df['kc_middle'] = kc['KCBe_20_2'] | |
| df['kc_lower'] = kc['KCLe_20_2'] | |
| return df | |
| def add_volume_indicators(df: pd.DataFrame) -> pd.DataFrame: | |
| """Add volume indicators""" | |
| # OBV (On Balance Volume) | |
| df['obv'] = ta.obv(df['close'], df['volume']) | |
| # Volume SMA | |
| df['volume_sma'] = ta.sma(df['volume'], length=20) | |
| # MFI (Money Flow Index) | |
| df['mfi'] = ta.mfi(df['high'], df['low'], df['close'], df['volume'], length=14) | |
| # AD (Accumulation/Distribution) | |
| df['ad'] = ta.ad(df['high'], df['low'], df['close'], df['volume']) | |
| return df | |
| def add_advanced_indicators(df: pd.DataFrame) -> pd.DataFrame: | |
| """Add advanced indicators""" | |
| # Ichimoku Cloud | |
| ichimoku = ta.ichimoku(df['high'], df['low'], df['close']) | |
| if ichimoku is not None and len(ichimoku[0].columns) > 0: | |
| df['ichimoku_a'] = ichimoku[0]['ISA_9'] | |
| df['ichimoku_b'] = ichimoku[0]['ISB_26'] | |
| # Supertrend | |
| supertrend = ta.supertrend(df['high'], df['low'], df['close']) | |
| if supertrend is not None: | |
| df['supertrend'] = supertrend['SUPERT_7_3.0'] | |
| df['supertrend_direction'] = supertrend['SUPERTd_7_3.0'] | |
| return df | |
| def get_signal_summary(df: pd.DataFrame) -> Dict: | |
| """ | |
| Get trading signal summary from indicators | |
| Returns: | |
| Dict with buy/sell/neutral signal counts | |
| """ | |
| if len(df) < 2: | |
| return {'buy': 0, 'sell': 0, 'neutral': 0, 'score': 0} | |
| latest = df.iloc[-1] | |
| prev = df.iloc[-2] | |
| signals = { | |
| 'buy': 0, | |
| 'sell': 0, | |
| 'neutral': 0 | |
| } | |
| # RSI signals | |
| if not pd.isna(latest['rsi']): | |
| if latest['rsi'] < 30: | |
| signals['buy'] += 1 | |
| elif latest['rsi'] > 70: | |
| signals['sell'] += 1 | |
| else: | |
| signals['neutral'] += 1 | |
| # MACD signals | |
| if not pd.isna(latest['macd']) and not pd.isna(latest['macd_signal']): | |
| if latest['macd'] > latest['macd_signal'] and prev['macd'] <= prev['macd_signal']: | |
| signals['buy'] += 1 | |
| elif latest['macd'] < latest['macd_signal'] and prev['macd'] >= prev['macd_signal']: | |
| signals['sell'] += 1 | |
| else: | |
| signals['neutral'] += 1 | |
| # MA crossover | |
| if not pd.isna(latest['sma_20']) and not pd.isna(latest['sma_50']): | |
| if latest['sma_20'] > latest['sma_50'] and prev['sma_20'] <= prev['sma_50']: | |
| signals['buy'] += 1 | |
| elif latest['sma_20'] < latest['sma_50'] and prev['sma_20'] >= prev['sma_50']: | |
| signals['sell'] += 1 | |
| else: | |
| signals['neutral'] += 1 | |
| # Bollinger Bands | |
| if not pd.isna(latest['bb_lower']) and not pd.isna(latest['bb_upper']): | |
| if latest['close'] < latest['bb_lower']: | |
| signals['buy'] += 1 | |
| elif latest['close'] > latest['bb_upper']: | |
| signals['sell'] += 1 | |
| else: | |
| signals['neutral'] += 1 | |
| # Stochastic | |
| if not pd.isna(latest['stoch_k']): | |
| if latest['stoch_k'] < 20: | |
| signals['buy'] += 1 | |
| elif latest['stoch_k'] > 80: | |
| signals['sell'] += 1 | |
| else: | |
| signals['neutral'] += 1 | |
| # Calculate score (-100 to +100) | |
| total = signals['buy'] + signals['sell'] + signals['neutral'] | |
| if total > 0: | |
| signals['score'] = int(((signals['buy'] - signals['sell']) / total) * 100) | |
| else: | |
| signals['score'] = 0 | |
| return signals | |
| def detect_patterns(df: pd.DataFrame) -> List[str]: | |
| """Detect chart patterns""" | |
| patterns = [] | |
| if len(df) < 20: | |
| return patterns | |
| latest = df.iloc[-1] | |
| prev = df.iloc[-2] | |
| # Golden Cross | |
| if not pd.isna(latest['sma_50']) and not pd.isna(latest['sma_200']): | |
| if latest['sma_50'] > latest['sma_200'] and prev['sma_50'] <= prev['sma_200']: | |
| patterns.append('Golden Cross (Bullish)') | |
| # Death Cross | |
| if not pd.isna(latest['sma_50']) and not pd.isna(latest['sma_200']): | |
| if latest['sma_50'] < latest['sma_200'] and prev['sma_50'] >= prev['sma_200']: | |
| patterns.append('Death Cross (Bearish)') | |
| # Bollinger Squeeze | |
| if not pd.isna(latest['bb_width']): | |
| if latest['bb_width'] < df['bb_width'].quantile(0.2): | |
| patterns.append('Bollinger Squeeze (Breakout Pending)') | |
| # RSI Divergence (simplified) | |
| if len(df) >= 10 and not pd.isna(latest['rsi']): | |
| # Bullish divergence: price lower low, RSI higher low | |
| recent_df = df.tail(10) | |
| price_low = recent_df['close'].min() | |
| rsi_low = recent_df['rsi'].min() | |
| if latest['close'] > price_low and latest['rsi'] > rsi_low: | |
| if price_low == recent_df.iloc[-5:]['close'].min(): | |
| patterns.append('Bullish RSI Divergence') | |
| # MACD Histogram increasing | |
| if not pd.isna(latest['macd_hist']) and not pd.isna(prev['macd_hist']): | |
| if latest['macd_hist'] > prev['macd_hist'] > 0: | |
| patterns.append('MACD Momentum Increasing') | |
| elif latest['macd_hist'] < prev['macd_hist'] < 0: | |
| patterns.append('MACD Momentum Decreasing') | |
| return patterns | |