import pandas as pd import numpy as np import logging from typing import Dict, List, Optional, Tuple, Any from core.indicators import TechnicalIndicators from core.data_engine import DataEngine import yaml logger = logging.getLogger(__name__) class ScalpingStrategy: def __init__(self, data_engine: DataEngine): self.data_engine = data_engine self.settings = yaml.safe_load(open("config/settings.yaml")) self.ema_fast_period = self.settings["strategy"]["ema_fast"] self.ema_slow_period = self.settings["strategy"]["ema_slow"] self.rsi_period = self.settings["strategy"]["rsi_period"] self.strategy_weights = { 'ema_momentum': 0.4, 'breakout': 0.35, 'pullback': 0.25 } self.min_confidence = 0.6 def generate_signal(self, symbol: str, interval: str = "1") -> Tuple[str, float, float]: try: current_price = self.data_engine.get_prices(symbol, limit=1) if not current_price: return "NEUTRAL", 0.0, 0.0 current_price = current_price[-1] ema_signal, ema_conf = self._ema_momentum_strategy(symbol, interval) breakout_signal, breakout_conf = self._breakout_strategy(symbol, interval) pullback_signal, pullback_conf = self._pullback_strategy(symbol, interval) buy_signals = [] sell_signals = [] if ema_signal == "BUY": buy_signals.append(ema_conf * self.strategy_weights['ema_momentum']) elif ema_signal == "SELL": sell_signals.append(ema_conf * self.strategy_weights['ema_momentum']) if breakout_signal == "BUY": buy_signals.append(breakout_conf * self.strategy_weights['breakout']) elif breakout_signal == "SELL": sell_signals.append(breakout_conf * self.strategy_weights['breakout']) if pullback_signal == "BUY": buy_signals.append(pullback_conf * self.strategy_weights['pullback']) elif pullback_signal == "SELL": sell_signals.append(pullback_conf * self.strategy_weights['pullback']) buy_strength = sum(buy_signals) if buy_signals else 0.0 sell_strength = sum(sell_signals) if sell_signals else 0.0 if buy_strength > self.min_confidence and buy_strength > sell_strength: final_confidence = min(buy_strength, 1.0) return "BUY", final_confidence, current_price elif sell_strength > self.min_confidence and sell_strength > buy_strength: final_confidence = min(sell_strength, 1.0) return "SELL", final_confidence, current_price else: return "NEUTRAL", 0.0, current_price except Exception as e: logger.error(f"Error generating signal for {symbol}: {e}") return "NEUTRAL", 0.0, 0.0 def _ema_momentum_strategy(self, symbol: str, interval: str) -> Tuple[str, float]: try: df = self.data_engine.get_candles(symbol, interval, limit=50) if df.empty or len(df) < self.ema_slow_period + 5: return "NEUTRAL", 0.0 ema_fast = self.data_engine.calculate_ema(symbol, interval, self.ema_fast_period) ema_slow = self.data_engine.calculate_ema(symbol, interval, self.ema_slow_period) rsi = self.data_engine.calculate_rsi(symbol, interval, self.rsi_period) if ema_fast is None or ema_slow is None or rsi is None: return "NEUTRAL", 0.0 ema_fast_prev = df['close'].ewm(span=self.ema_fast_period, adjust=False).mean().iloc[-2] ema_slow_prev = df['close'].ewm(span=self.ema_slow_period, adjust=False).mean().iloc[-2] crossover_up = ema_fast_prev <= ema_slow_prev and ema_fast > ema_slow crossover_down = ema_fast_prev >= ema_slow_prev and ema_fast < ema_slow rsi_oversold = rsi < 35 rsi_overbought = rsi > 65 orderbook_imbalance = self.data_engine.get_orderbook_imbalance(symbol) confidence = 0.0 signal = "NEUTRAL" if crossover_up and rsi_oversold and orderbook_imbalance > 0.1: confidence = 0.8 signal = "BUY" elif crossover_down and rsi_overbought and orderbook_imbalance < -0.1: confidence = 0.8 signal = "SELL" return signal, confidence except Exception as e: logger.error(f"EMA momentum strategy error for {symbol}: {e}") return "NEUTRAL", 0.0 def _breakout_strategy(self, symbol: str, interval: str) -> Tuple[str, float]: try: df = self.data_engine.get_candles(symbol, interval, limit=20) if df.empty or len(df) < 10: return "NEUTRAL", 0.0 volume_spike = self.data_engine.detect_volume_spike(symbol, interval, threshold=1.8) price_change_rate = self.data_engine.get_price_change_rate(symbol, periods=3) spread = self.data_engine.get_spread(symbol) orderbook_imbalance = self.data_engine.get_orderbook_imbalance(symbol) recent_high = df['high'].tail(5).max() recent_low = df['low'].tail(5).min() current_price = df['close'].iloc[-1] breakout_up = current_price > recent_high * 1.001 breakout_down = current_price < recent_low * 0.999 confidence = 0.0 signal = "NEUTRAL" if breakout_up and volume_spike and price_change_rate > 0.002 and orderbook_imbalance > 0.15: confidence = 0.75 signal = "BUY" elif breakout_down and volume_spike and price_change_rate < -0.002 and orderbook_imbalance < -0.15: confidence = 0.75 signal = "SELL" return signal, confidence except Exception as e: logger.error(f"Breakout strategy error for {symbol}: {e}") return "NEUTRAL", 0.0 def _pullback_strategy(self, symbol: str, interval: str) -> Tuple[str, float]: try: df = self.data_engine.get_candles(symbol, interval, limit=30) if df.empty or len(df) < 20: return "NEUTRAL", 0.0 ema_21 = df['close'].ewm(span=21, adjust=False).mean() ema_slope = ema_21.diff().iloc[-1] uptrend = ema_slope > 0 downtrend = ema_slope < 0 ema_9 = df['close'].ewm(span=9, adjust=False).mean().iloc[-1] current_price = df['close'].iloc[-1] price_to_ema_ratio = current_price / ema_9 recent_volume = df['volume'].tail(5).mean() previous_volume = df['volume'].tail(10).head(5).mean() volume_trend = recent_volume / previous_volume if previous_volume > 0 else 1.0 pullback_up = uptrend and price_to_ema_ratio < 0.995 pullback_down = downtrend and price_to_ema_ratio > 1.005 volume_decrease_on_pullback = volume_trend < 0.8 volume_increase_on_continuation = volume_trend > 1.2 confidence = 0.0 signal = "NEUTRAL" if pullback_up and volume_decrease_on_pullback: if volume_increase_on_continuation and current_price > ema_9: confidence = 0.7 signal = "BUY" elif pullback_down and volume_decrease_on_pullback: if volume_increase_on_continuation and current_price < ema_9: confidence = 0.7 signal = "SELL" return signal, confidence except Exception as e: logger.error(f"Pullback strategy error for {symbol}: {e}") return "NEUTRAL", 0.0 def get_strategy_status(self, symbol: str) -> Dict[str, Any]: try: status = { 'symbol': symbol, 'timestamp': pd.Timestamp.now(), 'strategies': {} } ema_signal, ema_conf = self._ema_momentum_strategy(symbol) breakout_signal, breakout_conf = self._breakout_strategy(symbol) pullback_signal, pullback_conf = self._pullback_strategy(symbol) status['strategies'] = { 'ema_momentum': { 'signal': ema_signal, 'confidence': ema_conf }, 'breakout': { 'signal': breakout_signal, 'confidence': breakout_conf }, 'pullback': { 'signal': pullback_signal, 'confidence': pullback_conf } } current_price = self.data_engine.get_prices(symbol, limit=1) status['current_price'] = current_price[-1] if current_price else None ema_fast = self.data_engine.calculate_ema(symbol, "1", self.ema_fast_period) ema_slow = self.data_engine.calculate_ema(symbol, "1", self.ema_slow_period) rsi = self.data_engine.calculate_rsi(symbol, "1", self.rsi_period) status['indicators'] = { 'ema_fast': ema_fast, 'ema_slow': ema_slow, 'rsi': rsi } return status except Exception as e: logger.error(f"Error getting strategy status for {symbol}: {e}") return {'error': str(e)}