Spaces:
Sleeping
Sleeping
| # trade_analysis/indicators.py | |
| """ | |
| Technical indicators module - Enhanced for momentum trading | |
| Works with your enhanced_api.py and other modules | |
| """ | |
| import pandas as pd | |
| import numpy as np | |
| import warnings | |
| warnings.filterwarnings('ignore') | |
| def enrich_with_indicators(df: pd.DataFrame, interval_str: str) -> pd.DataFrame: | |
| """ | |
| Enrich dataframe with technical indicators | |
| Compatible with your data.py output format | |
| """ | |
| if df.empty or 'Close' not in df.columns: | |
| return pd.DataFrame() | |
| print(f"Enriching data for interval: {interval_str}, {len(df)} rows.") | |
| df_enriched = df.copy() | |
| # Ensure we have all OHLCV columns | |
| required_cols = ['Open', 'High', 'Low', 'Close', 'Volume'] | |
| for col in required_cols: | |
| if col not in df_enriched.columns: | |
| # Handle case variations (Close vs close) | |
| lower_col = col.lower() | |
| if lower_col in df_enriched.columns: | |
| df_enriched[col] = df_enriched[lower_col] | |
| else: | |
| df_enriched[col] = np.nan | |
| try: | |
| # Try using pandas_ta if available | |
| import pandas_ta as ta | |
| # 1. ADX (Length 9) - Momentum strength | |
| try: | |
| adx_results = ta.adx(df_enriched['High'], df_enriched['Low'], | |
| df_enriched['Close'], length=9) | |
| if adx_results is not None and not adx_results.empty: | |
| df_enriched['ADX_9'] = adx_results.iloc[:, 0] | |
| else: | |
| df_enriched['ADX_9'] = 25.0 # Default neutral | |
| except: | |
| df_enriched['ADX_9'] = 25.0 | |
| # 2. RSI (Length 14) - Momentum oscillator | |
| try: | |
| rsi = ta.rsi(df_enriched['Close'], length=14) | |
| df_enriched['RSI_14'] = rsi if rsi is not None else 50.0 | |
| except: | |
| df_enriched['RSI_14'] = calculate_rsi_manual(df_enriched['Close'], 14) | |
| # 3. MACD - Trend following | |
| try: | |
| macd = ta.macd(df_enriched['Close'], fast=12, slow=26, signal=9) | |
| if macd is not None and not macd.empty: | |
| # Find the histogram column | |
| for col in macd.columns: | |
| if 'h' in col.lower() or 'hist' in col.lower(): | |
| df_enriched['MACDh_12_26_9'] = macd[col] | |
| break | |
| else: | |
| df_enriched['MACDh_12_26_9'] = 0 | |
| else: | |
| df_enriched['MACDh_12_26_9'] = 0 | |
| except: | |
| df_enriched['MACDh_12_26_9'] = 0 | |
| # 4. EMA (Length 9) - Fast moving average | |
| try: | |
| ema = ta.ema(df_enriched['Close'], length=9) | |
| df_enriched['EMA_9'] = ema if ema is not None else df_enriched['Close'].ewm(span=9).mean() | |
| except: | |
| df_enriched['EMA_9'] = df_enriched['Close'].ewm(span=9, adjust=False).mean() | |
| # 5. ATR (Length 14) - Volatility | |
| try: | |
| atr = ta.atr(df_enriched['High'], df_enriched['Low'], | |
| df_enriched['Close'], length=14) | |
| df_enriched['ATR_14'] = atr if atr is not None else calculate_atr_manual(df_enriched, 14) | |
| except: | |
| df_enriched['ATR_14'] = calculate_atr_manual(df_enriched, 14) | |
| # 6. VWAP - Only for intraday | |
| if interval_str in ['15m', '5m', '1m', 'hourly']: | |
| try: | |
| vwap = ta.vwap(df_enriched['High'], df_enriched['Low'], | |
| df_enriched['Close'], df_enriched['Volume']) | |
| df_enriched['VWAP'] = vwap if vwap is not None else df_enriched['Close'] | |
| except: | |
| df_enriched['VWAP'] = calculate_vwap_manual(df_enriched) | |
| except ImportError: | |
| print("pandas_ta not available, using manual calculations") | |
| # Fallback to manual calculations | |
| df_enriched['RSI_14'] = calculate_rsi_manual(df_enriched['Close'], 14) | |
| df_enriched['ADX_9'] = 25.0 # Default | |
| df_enriched['MACDh_12_26_9'] = calculate_macd_histogram_manual(df_enriched['Close']) | |
| df_enriched['EMA_9'] = df_enriched['Close'].ewm(span=9, adjust=False).mean() | |
| df_enriched['ATR_14'] = calculate_atr_manual(df_enriched, 14) | |
| if interval_str in ['15m', '5m', '1m', 'hourly']: | |
| df_enriched['VWAP'] = calculate_vwap_manual(df_enriched) | |
| # Custom momentum indicators for your strategy | |
| # 7. Volume analysis | |
| df_enriched['volume_ma_20'] = df_enriched['Volume'].rolling(20).mean() | |
| df_enriched['volume_spike'] = df_enriched['Volume'] > (df_enriched['volume_ma_20'] * 2.0) | |
| df_enriched['volume_exhaustion'] = ( | |
| df_enriched['Volume'].rolling(5).mean() < (df_enriched['volume_ma_20'] * 0.8) | |
| ) | |
| # 8. Price momentum | |
| df_enriched['returns'] = df_enriched['Close'].pct_change() | |
| df_enriched['momentum_5'] = df_enriched['Close'] / df_enriched['Close'].shift(5) - 1 | |
| df_enriched['momentum_10'] = df_enriched['Close'] / df_enriched['Close'].shift(10) - 1 | |
| # 9. Volatility | |
| df_enriched['volatility'] = df_enriched['returns'].rolling(20).std() * np.sqrt(252) | |
| # 10. High-Low ratio (for gap detection) | |
| df_enriched['high_low_ratio'] = ( | |
| (df_enriched['High'] - df_enriched['Low']) / df_enriched['Close'] | |
| ) | |
| # 11. Support/Resistance levels | |
| df_enriched['resistance'] = df_enriched['High'].rolling(20).max() | |
| df_enriched['support'] = df_enriched['Low'].rolling(20).min() | |
| # 12. Trend strength | |
| sma_20 = df_enriched['Close'].rolling(20).mean() | |
| sma_50 = df_enriched['Close'].rolling(50).mean() | |
| df_enriched['trend_strength'] = (sma_20 - sma_50) / sma_50 * 100 | |
| # Clean up | |
| df_enriched.fillna(method='bfill', inplace=True) | |
| df_enriched.fillna(method='ffill', inplace=True) | |
| df_enriched.fillna(0, inplace=True) | |
| return df_enriched | |
| def identify_current_setup(df: pd.DataFrame, timeframe_str: str) -> dict: | |
| """ | |
| Identify current market setup for trading decisions | |
| Returns dict compatible with enhanced_api.py expectations | |
| """ | |
| if df.empty or len(df) < 2: | |
| return { | |
| "timeframe": timeframe_str, | |
| "direction": "neutral", | |
| "adx": 0, | |
| "rsi": 50, | |
| "gap_risk": "unknown", | |
| "volume_spike": False, | |
| "volume_exhaustion": False, | |
| "error": "Insufficient data" | |
| } | |
| # Get latest values | |
| latest = df.iloc[-1] | |
| prev = df.iloc[-2] if len(df) > 1 else latest | |
| # Determine direction | |
| if latest.get('Close', 0) > prev.get('Close', 0): | |
| direction = "up" | |
| elif latest.get('Close', 0) < prev.get('Close', 0): | |
| direction = "down" | |
| else: | |
| direction = "neutral" | |
| # Gap risk assessment | |
| atr_val = latest.get('ATR_14', 0) | |
| close_val = latest.get('Close', 0) | |
| gap_risk = "low" | |
| if close_val > 0 and atr_val > 0: | |
| atr_percentage = atr_val / close_val | |
| if atr_percentage > 0.02: | |
| gap_risk = "high" | |
| elif atr_percentage > 0.01: | |
| gap_risk = "moderate" | |
| # Momentum assessment | |
| rsi = latest.get('RSI_14', 50) | |
| momentum_5 = latest.get('momentum_5', 0) | |
| momentum_10 = latest.get('momentum_10', 0) | |
| # Trend assessment | |
| trend = "neutral" | |
| if latest.get('EMA_9', 0) > latest.get('Close', 0): | |
| trend = "bearish" | |
| elif latest.get('EMA_9', 0) < latest.get('Close', 0): | |
| trend = "bullish" | |
| # Volume analysis | |
| volume_spike = bool(latest.get('volume_spike', False)) | |
| volume_exhaustion = bool(latest.get('volume_exhaustion', False)) | |
| # Support/Resistance proximity | |
| close = latest.get('Close', 0) | |
| resistance = latest.get('resistance', close * 1.02) | |
| support = latest.get('support', close * 0.98) | |
| near_resistance = (resistance - close) / close < 0.005 # Within 0.5% | |
| near_support = (close - support) / close < 0.005 | |
| # Build setup dictionary | |
| setup = { | |
| "timeframe": timeframe_str, | |
| "direction": direction, | |
| "adx": round(latest.get('ADX_9', 0), 2), | |
| "rsi": round(rsi, 2), | |
| "gap_risk": gap_risk, | |
| "volume_spike": volume_spike, | |
| "volume_exhaustion": volume_exhaustion, | |
| "trend": trend, | |
| "momentum_5": round(momentum_5 * 100, 2), # As percentage | |
| "momentum_10": round(momentum_10 * 100, 2), | |
| "volatility": round(latest.get('volatility', 0), 4), | |
| "near_resistance": near_resistance, | |
| "near_support": near_support, | |
| "macd_histogram": round(latest.get('MACDh_12_26_9', 0), 4) | |
| } | |
| # Add timeframe-specific signals | |
| if timeframe_str == "15m": | |
| setup["scalp_ready"] = ( | |
| volume_spike and | |
| abs(momentum_5) > 0.005 and | |
| 30 < rsi < 70 | |
| ) | |
| elif timeframe_str == "hourly": | |
| setup["swing_ready"] = ( | |
| trend != "neutral" and | |
| not volume_exhaustion and | |
| 20 < rsi < 80 | |
| ) | |
| elif timeframe_str == "daily": | |
| setup["position_ready"] = ( | |
| latest.get('ADX_9', 0) > 25 and | |
| trend != "neutral" | |
| ) | |
| return setup | |
| # Manual calculation functions (fallbacks) | |
| def calculate_rsi_manual(close_prices: pd.Series, period: int = 14) -> pd.Series: | |
| """Manual RSI calculation""" | |
| delta = close_prices.diff() | |
| gain = delta.where(delta > 0, 0) | |
| loss = -delta.where(delta < 0, 0) | |
| avg_gain = gain.rolling(window=period).mean() | |
| avg_loss = loss.rolling(window=period).mean() | |
| rs = avg_gain / avg_loss | |
| rsi = 100 - (100 / (1 + rs)) | |
| return rsi.fillna(50) | |
| def calculate_atr_manual(df: pd.DataFrame, period: int = 14) -> pd.Series: | |
| """Manual ATR calculation""" | |
| high = df['High'] | |
| low = df['Low'] | |
| close = df['Close'] | |
| tr1 = high - low | |
| tr2 = abs(high - close.shift()) | |
| tr3 = abs(low - close.shift()) | |
| tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1) | |
| atr = tr.rolling(window=period).mean() | |
| return atr.fillna(0) | |
| def calculate_macd_histogram_manual(close_prices: pd.Series) -> pd.Series: | |
| """Manual MACD histogram calculation""" | |
| ema_12 = close_prices.ewm(span=12, adjust=False).mean() | |
| ema_26 = close_prices.ewm(span=26, adjust=False).mean() | |
| macd_line = ema_12 - ema_26 | |
| signal_line = macd_line.ewm(span=9, adjust=False).mean() | |
| histogram = macd_line - signal_line | |
| return histogram.fillna(0) | |
| def calculate_vwap_manual(df: pd.DataFrame) -> pd.Series: | |
| """Manual VWAP calculation""" | |
| typical_price = (df['High'] + df['Low'] + df['Close']) / 3 | |
| cumulative_tpv = (typical_price * df['Volume']).cumsum() | |
| cumulative_volume = df['Volume'].cumsum() | |
| vwap = cumulative_tpv / cumulative_volume | |
| return vwap.fillna(df['Close']) | |
| # Additional helper functions for agent.py and other modules | |
| def get_momentum_signals(df: pd.DataFrame) -> dict: | |
| """ | |
| Get momentum signals for the agent | |
| Used by agent.py for quick decisions | |
| """ | |
| if df.empty or len(df) < 20: | |
| return {"signal": "NEUTRAL", "strength": 0} | |
| latest = df.iloc[-1] | |
| # Check multiple momentum conditions | |
| rsi = latest.get('RSI_14', 50) | |
| momentum_5 = latest.get('momentum_5', 0) | |
| volume_spike = latest.get('volume_spike', False) | |
| macd_hist = latest.get('MACDh_12_26_9', 0) | |
| # Bullish signals | |
| if (rsi > 55 and momentum_5 > 0.01 and volume_spike and macd_hist > 0): | |
| return {"signal": "BULLISH", "strength": 0.8} | |
| elif (rsi > 50 and momentum_5 > 0.005): | |
| return {"signal": "BULLISH", "strength": 0.6} | |
| # Bearish signals | |
| elif (rsi < 45 and momentum_5 < -0.01 and volume_spike and macd_hist < 0): | |
| return {"signal": "BEARISH", "strength": 0.8} | |
| elif (rsi < 50 and momentum_5 < -0.005): | |
| return {"signal": "BEARISH", "strength": 0.6} | |
| # Neutral | |
| else: | |
| return {"signal": "NEUTRAL", "strength": 0.3} | |
| def calculate_entry_signals(df: pd.DataFrame, timeframe: str) -> dict: | |
| """ | |
| Calculate specific entry signals for different timeframes | |
| Used by enhanced_api.py for options strategies | |
| """ | |
| if df.empty: | |
| return {"entry": False, "confidence": 0} | |
| setup = identify_current_setup(df, timeframe) | |
| # Timeframe-specific entry logic | |
| if timeframe in ["1m", "5m"]: | |
| # Scalping entries | |
| entry = ( | |
| setup.get('volume_spike', False) and | |
| abs(setup.get('momentum_5', 0)) > 0.5 and | |
| 30 < setup.get('rsi', 50) < 70 | |
| ) | |
| confidence = 70 if entry else 30 | |
| elif timeframe == "15m": | |
| # Momentum entries | |
| entry = ( | |
| setup.get('direction') != 'neutral' and | |
| setup.get('adx', 0) > 25 and | |
| not setup.get('volume_exhaustion', False) | |
| ) | |
| confidence = 75 if entry else 40 | |
| else: # Daily/Hourly | |
| # Swing entries | |
| entry = ( | |
| setup.get('trend') != 'neutral' and | |
| 20 < setup.get('rsi', 50) < 80 and | |
| setup.get('adx', 0) > 20 | |
| ) | |
| confidence = 80 if entry else 35 | |
| return { | |
| "entry": entry, | |
| "confidence": confidence, | |
| "setup_details": setup | |
| } |