import pandas as pd import numpy as np from typing import Dict, List, Optional, Any, Tuple import logging import yaml from datetime import datetime, timedelta import json import os from core.strategy import ScalpingStrategy from core.data_engine import DataEngine from core.risk import RiskManager from services.logger import log logger = logging.getLogger(__name__) class BacktestingEngine: def __init__(self): self.settings = yaml.safe_load(open("config/settings.yaml")) self.pairs = yaml.safe_load(open("config/pairs.yaml"))["pairs"] self.initial_balance = 1000 self.fee_rate = 0.001 self.slippage = 0.0005 self.results = {} def load_historical_data(self, symbol: str, interval: str = "1", days: int = 30) -> Optional[pd.DataFrame]: try: periods = days * 24 * 60 base_price = 50000 if symbol.startswith('BTC') else 3000 if symbol.startswith('ETH') else 100 np.random.seed(42) timestamps = pd.date_range( start=datetime.now() - timedelta(days=days), end=datetime.now(), freq='1min' )[:periods] returns = np.random.normal(0, 0.001, periods) prices = base_price * np.exp(np.cumsum(returns)) highs = prices * (1 + np.abs(np.random.normal(0, 0.002, periods))) lows = prices * (1 - np.abs(np.random.normal(0, 0.002, periods))) opens = np.roll(prices, 1) opens[0] = base_price volumes = np.random.lognormal(10, 1, periods) df = pd.DataFrame({ 'timestamp': timestamps, 'open': opens, 'high': highs, 'low': lows, 'close': prices, 'volume': volumes }) df.set_index('timestamp', inplace=True) return df except Exception as e: logger.error(f"Error loading historical data for {symbol}: {e}") return None def run_backtest(self, symbol: str, strategy_params: Optional[Dict[str, Any]] = None, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None) -> Dict[str, Any]: try: log(f"🔄 Starting backtest for {symbol}") df = self.load_historical_data(symbol) if df is None or df.empty: return {'error': f'No data available for {symbol}'} if start_date: df = df[df.index >= start_date] if end_date: df = df[df.index <= end_date] if len(df) < 100: return {'error': f'Insufficient data for {symbol}: {len(df)} candles'} data_engine = DataEngine() strategy = ScalpingStrategy(data_engine) if strategy_params: for key, value in strategy_params.items(): if hasattr(strategy, key): setattr(strategy, key, value) mock_exchange = MockExchange(self.fee_rate, self.slippage) risk_manager = RiskManager(mock_exchange) risk_manager.max_daily_loss = float('inf') trades, equity_curve = self._simulate_trading( df, strategy, risk_manager, mock_exchange, symbol ) metrics = self._calculate_metrics(trades, equity_curve, df) result = { 'symbol': symbol, 'total_trades': len(trades), 'winning_trades': sum(1 for t in trades if t['pnl'] > 0), 'losing_trades': sum(1 for t in trades if t['pnl'] < 0), 'total_pnl': sum(t['pnl'] for t in trades), 'max_drawdown': metrics['max_drawdown'], 'win_rate': metrics['win_rate'], 'profit_factor': metrics['profit_factor'], 'sharpe_ratio': metrics['sharpe_ratio'], 'avg_trade_duration': metrics['avg_trade_duration'], 'trades': trades[:50], 'equity_curve': equity_curve[-100:] } self.results[symbol] = result log(f"✅ Backtest completed for {symbol}: {len(trades)} trades, PnL: {result['total_pnl']:.2f}") return result except Exception as e: logger.error(f"Error in backtest for {symbol}: {e}") return {'error': str(e)} def _simulate_trading(self, df: pd.DataFrame, strategy: ScalpingStrategy, risk_manager: RiskManager, exchange: 'MockExchange', symbol: str) -> Tuple[List[Dict], List[float]]: trades = [] equity_curve = [self.initial_balance] open_position = None for i, (timestamp, row) in enumerate(df.iterrows()): current_price = row['close'] candle_data = { 'timestamp': timestamp.timestamp() * 1000, 'open': row['open'], 'high': row['high'], 'low': row['low'], 'close': row['close'], 'volume': row['volume'] } historical_df = df.iloc[:i+1] strategy_data_engine = DataEngine() for j in range(max(0, i-200), i+1): hist_candle = df.iloc[j] hist_data = { 'timestamp': df.index[j].timestamp() * 1000, 'open': hist_candle['open'], 'high': hist_candle['high'], 'low': hist_candle['low'], 'close': hist_candle['close'], 'volume': hist_candle['volume'] } strategy_data_engine.update_candle(symbol, "1", hist_data) strategy.data_engine = strategy_data_engine if open_position: position_age = (timestamp - open_position['entry_time']).seconds / 60 exit_reason = None if open_position['side'] == 'BUY': if current_price >= open_position['tp_price']: exit_reason = 'TP' elif current_price <= open_position['sl_price']: exit_reason = 'SL' elif position_age > 15: exit_reason = 'TIMEOUT' else: if current_price <= open_position['tp_price']: exit_reason = 'TP' elif current_price >= open_position['sl_price']: exit_reason = 'SL' elif position_age > 15: exit_reason = 'TIMEOUT' if exit_reason: pnl = exchange.close_position(open_position, current_price) equity_curve.append(equity_curve[-1] + pnl) trade = { 'entry_time': open_position['entry_time'], 'exit_time': timestamp, 'side': open_position['side'], 'entry_price': open_position['entry_price'], 'exit_price': current_price, 'quantity': open_position['quantity'], 'pnl': pnl, 'reason': exit_reason, 'duration_minutes': position_age } trades.append(trade) open_position = None elif i > 50: signal, confidence, price = strategy.generate_signal(symbol) if signal in ['BUY', 'SELL'] and confidence > 0.6: if risk_manager.validate_entry_signal(symbol, signal, confidence): qty = risk_manager.calculate_position_size(symbol, price, signal) if qty > 0: open_position = { 'entry_time': timestamp, 'side': signal, 'entry_price': price, 'quantity': qty, 'tp_price': price * (1.025 if signal == 'BUY' else 0.975), 'sl_price': price * (0.99 if signal == 'BUY' else 1.01) } return trades, equity_curve def _calculate_metrics(self, trades: List[Dict], equity_curve: List[float], df: pd.DataFrame) -> Dict[str, float]: try: if not trades: return { 'max_drawdown': 0.0, 'win_rate': 0.0, 'profit_factor': 0.0, 'sharpe_ratio': 0.0, 'avg_trade_duration': 0.0 } peak = equity_curve[0] max_drawdown = 0.0 for equity in equity_curve: if equity > peak: peak = equity drawdown = (peak - equity) / peak max_drawdown = max(max_drawdown, drawdown) winning_trades = [t for t in trades if t['pnl'] > 0] win_rate = len(winning_trades) / len(trades) if trades else 0.0 gross_profit = sum(t['pnl'] for t in winning_trades) gross_loss = abs(sum(t['pnl'] for t in trades if t['pnl'] < 0)) profit_factor = gross_profit / gross_loss if gross_loss > 0 else float('inf') returns = np.diff(equity_curve) / equity_curve[:-1] if len(returns) > 1 and np.std(returns) > 0: sharpe_ratio = np.mean(returns) / np.std(returns) * np.sqrt(365 * 24 * 60) else: sharpe_ratio = 0.0 durations = [t['duration_minutes'] for t in trades] avg_trade_duration = np.mean(durations) if durations else 0.0 return { 'max_drawdown': max_drawdown, 'win_rate': win_rate, 'profit_factor': profit_factor, 'sharpe_ratio': sharpe_ratio, 'avg_trade_duration': avg_trade_duration } except Exception as e: logger.error(f"Error calculating metrics: {e}") return { 'max_drawdown': 0.0, 'win_rate': 0.0, 'profit_factor': 0.0, 'sharpe_ratio': 0.0, 'avg_trade_duration': 0.0 } def optimize_parameters(self, symbol: str, param_ranges: Dict[str, List[float]]) -> Dict[str, Any]: try: log(f"🎯 Starting parameter optimization for {symbol}") best_result = None best_params = None best_score = -float('inf') from itertools import product param_names = list(param_ranges.keys()) param_values = list(param_ranges.values()) total_combinations = np.prod([len(v) for v in param_values]) log(f"Testing {total_combinations} parameter combinations") for i, param_combo in enumerate(product(*param_values)): param_dict = dict(zip(param_names, param_combo)) result = self.run_backtest(symbol, strategy_params=param_dict) if 'error' not in result: score = result['sharpe_ratio'] - result['max_drawdown'] * 10 if score > best_score: best_score = score best_result = result best_params = param_dict if (i + 1) % 10 == 0: log(f"Progress: {i + 1}/{total_combinations} combinations tested") if best_result: log(f"✅ Optimization completed. Best params: {best_params}") return { 'best_parameters': best_params, 'best_result': best_result, 'optimization_score': best_score } else: return {'error': 'No valid results found during optimization'} except Exception as e: logger.error(f"Error in parameter optimization: {e}") return {'error': str(e)} def save_results(self, filename: str = "backtest_results.json"): try: os.makedirs("backtest_results", exist_ok=True) filepath = f"backtest_results/{filename}" with open(filepath, 'w') as f: json.dump(self.results, f, indent=2, default=str) log(f"💾 Results saved to {filepath}") except Exception as e: logger.error(f"Error saving results: {e}") def load_results(self, filename: str = "backtest_results.json") -> Dict[str, Any]: try: filepath = f"backtest_results/{filename}" if os.path.exists(filepath): with open(filepath, 'r') as f: self.results = json.load(f) log(f"📂 Results loaded from {filepath}") return self.results else: return {} except Exception as e: logger.error(f"Error loading results: {e}") return {} def generate_report(self, symbol: str) -> str: try: if symbol not in self.results: return f"No backtest results found for {symbol}" result = self.results[symbol] report = f for i, trade in enumerate(result['trades'][-5:]): report += f"{i+1}. {trade['side']} {trade['quantity']:.3f} @ {trade['entry_price']:.2f} -> {trade['exit_price']:.2f} (PnL: ${trade['pnl']:.2f})\n" return report except Exception as e: logger.error(f"Error generating report: {e}") return f"Error generating report: {e}" class MockExchange: def __init__(self, fee_rate: float = 0.001, slippage: float = 0.0005): self.fee_rate = fee_rate self.slippage = slippage def get_balance(self): return [{"coin": "USDT", "walletBalance": "10000"}] def get_positions(self): return [] def calculate_position_size(self, symbol, entry_price, side): return 0.01 def validate_entry_signal(self, symbol, signal, confidence): return True def close_position(self, position, exit_price): entry_price = position['entry_price'] quantity = position['quantity'] side = position['side'] if side == 'BUY': exit_price *= (1 - self.slippage) else: exit_price *= (1 + self.slippage) if side == 'BUY': pnl = (exit_price - entry_price) / entry_price * quantity else: pnl = (entry_price - exit_price) / entry_price * quantity fee = abs(pnl) * self.fee_rate pnl -= fee return pnl