Spaces:
Paused
Paused
| import pandas as pd | |
| import numpy as np | |
| from typing import Dict, List, Optional, Any, Tuple | |
| import logging | |
| import yaml | |
| from datetime import datetime, timedelta | |
| import json | |
| import os | |
| from core.strategy import ScalpingStrategy | |
| from core.data_engine import DataEngine | |
| from core.risk import RiskManager | |
| from services.logger import log | |
| logger = logging.getLogger(__name__) | |
| class BacktestingEngine: | |
| def __init__(self): | |
| self.settings = yaml.safe_load(open("config/settings.yaml")) | |
| self.pairs = yaml.safe_load(open("config/pairs.yaml"))["pairs"] | |
| self.initial_balance = 1000 | |
| self.fee_rate = 0.001 | |
| self.slippage = 0.0005 | |
| self.results = {} | |
| def load_historical_data(self, symbol: str, interval: str = "1", | |
| days: int = 30) -> Optional[pd.DataFrame]: | |
| try: | |
| periods = days * 24 * 60 | |
| base_price = 50000 if symbol.startswith('BTC') else 3000 if symbol.startswith('ETH') else 100 | |
| np.random.seed(42) | |
| timestamps = pd.date_range( | |
| start=datetime.now() - timedelta(days=days), | |
| end=datetime.now(), | |
| freq='1min' | |
| )[:periods] | |
| returns = np.random.normal(0, 0.001, periods) | |
| prices = base_price * np.exp(np.cumsum(returns)) | |
| highs = prices * (1 + np.abs(np.random.normal(0, 0.002, periods))) | |
| lows = prices * (1 - np.abs(np.random.normal(0, 0.002, periods))) | |
| opens = np.roll(prices, 1) | |
| opens[0] = base_price | |
| volumes = np.random.lognormal(10, 1, periods) | |
| df = pd.DataFrame({ | |
| 'timestamp': timestamps, | |
| 'open': opens, | |
| 'high': highs, | |
| 'low': lows, | |
| 'close': prices, | |
| 'volume': volumes | |
| }) | |
| df.set_index('timestamp', inplace=True) | |
| return df | |
| except Exception as e: | |
| logger.error(f"Error loading historical data for {symbol}: {e}") | |
| return None | |
| def run_backtest(self, symbol: str, strategy_params: Optional[Dict[str, Any]] = None, | |
| start_date: Optional[datetime] = None, end_date: Optional[datetime] = None) -> Dict[str, Any]: | |
| try: | |
| log(f"๐ Starting backtest for {symbol}") | |
| df = self.load_historical_data(symbol) | |
| if df is None or df.empty: | |
| return {'error': f'No data available for {symbol}'} | |
| if start_date: | |
| df = df[df.index >= start_date] | |
| if end_date: | |
| df = df[df.index <= end_date] | |
| if len(df) < 100: | |
| return {'error': f'Insufficient data for {symbol}: {len(df)} candles'} | |
| data_engine = DataEngine() | |
| strategy = ScalpingStrategy(data_engine) | |
| if strategy_params: | |
| for key, value in strategy_params.items(): | |
| if hasattr(strategy, key): | |
| setattr(strategy, key, value) | |
| mock_exchange = MockExchange(self.fee_rate, self.slippage) | |
| risk_manager = RiskManager(mock_exchange) | |
| risk_manager.max_daily_loss = float('inf') | |
| trades, equity_curve = self._simulate_trading( | |
| df, strategy, risk_manager, mock_exchange, symbol | |
| ) | |
| metrics = self._calculate_metrics(trades, equity_curve, df) | |
| result = { | |
| 'symbol': symbol, | |
| 'total_trades': len(trades), | |
| 'winning_trades': sum(1 for t in trades if t['pnl'] > 0), | |
| 'losing_trades': sum(1 for t in trades if t['pnl'] < 0), | |
| 'total_pnl': sum(t['pnl'] for t in trades), | |
| 'max_drawdown': metrics['max_drawdown'], | |
| 'win_rate': metrics['win_rate'], | |
| 'profit_factor': metrics['profit_factor'], | |
| 'sharpe_ratio': metrics['sharpe_ratio'], | |
| 'avg_trade_duration': metrics['avg_trade_duration'], | |
| 'trades': trades[:50], | |
| 'equity_curve': equity_curve[-100:] | |
| } | |
| self.results[symbol] = result | |
| log(f"โ Backtest completed for {symbol}: {len(trades)} trades, PnL: {result['total_pnl']:.2f}") | |
| return result | |
| except Exception as e: | |
| logger.error(f"Error in backtest for {symbol}: {e}") | |
| return {'error': str(e)} | |
| def _simulate_trading(self, df: pd.DataFrame, strategy: ScalpingStrategy, | |
| risk_manager: RiskManager, exchange: 'MockExchange', | |
| symbol: str) -> Tuple[List[Dict], List[float]]: | |
| trades = [] | |
| equity_curve = [self.initial_balance] | |
| open_position = None | |
| for i, (timestamp, row) in enumerate(df.iterrows()): | |
| current_price = row['close'] | |
| candle_data = { | |
| 'timestamp': timestamp.timestamp() * 1000, | |
| 'open': row['open'], | |
| 'high': row['high'], | |
| 'low': row['low'], | |
| 'close': row['close'], | |
| 'volume': row['volume'] | |
| } | |
| historical_df = df.iloc[:i+1] | |
| strategy_data_engine = DataEngine() | |
| for j in range(max(0, i-200), i+1): | |
| hist_candle = df.iloc[j] | |
| hist_data = { | |
| 'timestamp': df.index[j].timestamp() * 1000, | |
| 'open': hist_candle['open'], | |
| 'high': hist_candle['high'], | |
| 'low': hist_candle['low'], | |
| 'close': hist_candle['close'], | |
| 'volume': hist_candle['volume'] | |
| } | |
| strategy_data_engine.update_candle(symbol, "1", hist_data) | |
| strategy.data_engine = strategy_data_engine | |
| if open_position: | |
| position_age = (timestamp - open_position['entry_time']).seconds / 60 | |
| exit_reason = None | |
| if open_position['side'] == 'BUY': | |
| if current_price >= open_position['tp_price']: | |
| exit_reason = 'TP' | |
| elif current_price <= open_position['sl_price']: | |
| exit_reason = 'SL' | |
| elif position_age > 15: | |
| exit_reason = 'TIMEOUT' | |
| else: | |
| if current_price <= open_position['tp_price']: | |
| exit_reason = 'TP' | |
| elif current_price >= open_position['sl_price']: | |
| exit_reason = 'SL' | |
| elif position_age > 15: | |
| exit_reason = 'TIMEOUT' | |
| if exit_reason: | |
| pnl = exchange.close_position(open_position, current_price) | |
| equity_curve.append(equity_curve[-1] + pnl) | |
| trade = { | |
| 'entry_time': open_position['entry_time'], | |
| 'exit_time': timestamp, | |
| 'side': open_position['side'], | |
| 'entry_price': open_position['entry_price'], | |
| 'exit_price': current_price, | |
| 'quantity': open_position['quantity'], | |
| 'pnl': pnl, | |
| 'reason': exit_reason, | |
| 'duration_minutes': position_age | |
| } | |
| trades.append(trade) | |
| open_position = None | |
| elif i > 50: | |
| signal, confidence, price = strategy.generate_signal(symbol) | |
| if signal in ['BUY', 'SELL'] and confidence > 0.6: | |
| if risk_manager.validate_entry_signal(symbol, signal, confidence): | |
| qty = risk_manager.calculate_position_size(symbol, price, signal) | |
| if qty > 0: | |
| open_position = { | |
| 'entry_time': timestamp, | |
| 'side': signal, | |
| 'entry_price': price, | |
| 'quantity': qty, | |
| 'tp_price': price * (1.025 if signal == 'BUY' else 0.975), | |
| 'sl_price': price * (0.99 if signal == 'BUY' else 1.01) | |
| } | |
| return trades, equity_curve | |
| def _calculate_metrics(self, trades: List[Dict], equity_curve: List[float], | |
| df: pd.DataFrame) -> Dict[str, float]: | |
| try: | |
| if not trades: | |
| return { | |
| 'max_drawdown': 0.0, | |
| 'win_rate': 0.0, | |
| 'profit_factor': 0.0, | |
| 'sharpe_ratio': 0.0, | |
| 'avg_trade_duration': 0.0 | |
| } | |
| peak = equity_curve[0] | |
| max_drawdown = 0.0 | |
| for equity in equity_curve: | |
| if equity > peak: | |
| peak = equity | |
| drawdown = (peak - equity) / peak | |
| max_drawdown = max(max_drawdown, drawdown) | |
| winning_trades = [t for t in trades if t['pnl'] > 0] | |
| win_rate = len(winning_trades) / len(trades) if trades else 0.0 | |
| gross_profit = sum(t['pnl'] for t in winning_trades) | |
| gross_loss = abs(sum(t['pnl'] for t in trades if t['pnl'] < 0)) | |
| profit_factor = gross_profit / gross_loss if gross_loss > 0 else float('inf') | |
| returns = np.diff(equity_curve) / equity_curve[:-1] | |
| if len(returns) > 1 and np.std(returns) > 0: | |
| sharpe_ratio = np.mean(returns) / np.std(returns) * np.sqrt(365 * 24 * 60) | |
| else: | |
| sharpe_ratio = 0.0 | |
| durations = [t['duration_minutes'] for t in trades] | |
| avg_trade_duration = np.mean(durations) if durations else 0.0 | |
| return { | |
| 'max_drawdown': max_drawdown, | |
| 'win_rate': win_rate, | |
| 'profit_factor': profit_factor, | |
| 'sharpe_ratio': sharpe_ratio, | |
| 'avg_trade_duration': avg_trade_duration | |
| } | |
| except Exception as e: | |
| logger.error(f"Error calculating metrics: {e}") | |
| return { | |
| 'max_drawdown': 0.0, | |
| 'win_rate': 0.0, | |
| 'profit_factor': 0.0, | |
| 'sharpe_ratio': 0.0, | |
| 'avg_trade_duration': 0.0 | |
| } | |
| def optimize_parameters(self, symbol: str, param_ranges: Dict[str, List[float]]) -> Dict[str, Any]: | |
| try: | |
| log(f"๐ฏ Starting parameter optimization for {symbol}") | |
| best_result = None | |
| best_params = None | |
| best_score = -float('inf') | |
| from itertools import product | |
| param_names = list(param_ranges.keys()) | |
| param_values = list(param_ranges.values()) | |
| total_combinations = np.prod([len(v) for v in param_values]) | |
| log(f"Testing {total_combinations} parameter combinations") | |
| for i, param_combo in enumerate(product(*param_values)): | |
| param_dict = dict(zip(param_names, param_combo)) | |
| result = self.run_backtest(symbol, strategy_params=param_dict) | |
| if 'error' not in result: | |
| score = result['sharpe_ratio'] - result['max_drawdown'] * 10 | |
| if score > best_score: | |
| best_score = score | |
| best_result = result | |
| best_params = param_dict | |
| if (i + 1) % 10 == 0: | |
| log(f"Progress: {i + 1}/{total_combinations} combinations tested") | |
| if best_result: | |
| log(f"โ Optimization completed. Best params: {best_params}") | |
| return { | |
| 'best_parameters': best_params, | |
| 'best_result': best_result, | |
| 'optimization_score': best_score | |
| } | |
| else: | |
| return {'error': 'No valid results found during optimization'} | |
| except Exception as e: | |
| logger.error(f"Error in parameter optimization: {e}") | |
| return {'error': str(e)} | |
| def save_results(self, filename: str = "backtest_results.json"): | |
| try: | |
| os.makedirs("backtest_results", exist_ok=True) | |
| filepath = f"backtest_results/{filename}" | |
| with open(filepath, 'w') as f: | |
| json.dump(self.results, f, indent=2, default=str) | |
| log(f"๐พ Results saved to {filepath}") | |
| except Exception as e: | |
| logger.error(f"Error saving results: {e}") | |
| def load_results(self, filename: str = "backtest_results.json") -> Dict[str, Any]: | |
| try: | |
| filepath = f"backtest_results/{filename}" | |
| if os.path.exists(filepath): | |
| with open(filepath, 'r') as f: | |
| self.results = json.load(f) | |
| log(f"๐ Results loaded from {filepath}") | |
| return self.results | |
| else: | |
| return {} | |
| except Exception as e: | |
| logger.error(f"Error loading results: {e}") | |
| return {} | |
| def generate_report(self, symbol: str) -> str: | |
| try: | |
| if symbol not in self.results: | |
| return f"No backtest results found for {symbol}" | |
| result = self.results[symbol] | |
| report = f | |
| for i, trade in enumerate(result['trades'][-5:]): | |
| report += f"{i+1}. {trade['side']} {trade['quantity']:.3f} @ {trade['entry_price']:.2f} -> {trade['exit_price']:.2f} (PnL: ${trade['pnl']:.2f})\n" | |
| return report | |
| except Exception as e: | |
| logger.error(f"Error generating report: {e}") | |
| return f"Error generating report: {e}" | |
| class MockExchange: | |
| def __init__(self, fee_rate: float = 0.001, slippage: float = 0.0005): | |
| self.fee_rate = fee_rate | |
| self.slippage = slippage | |
| def get_balance(self): | |
| return [{"coin": "USDT", "walletBalance": "10000"}] | |
| def get_positions(self): | |
| return [] | |
| def calculate_position_size(self, symbol, entry_price, side): | |
| return 0.01 | |
| def validate_entry_signal(self, symbol, signal, confidence): | |
| return True | |
| def close_position(self, position, exit_price): | |
| entry_price = position['entry_price'] | |
| quantity = position['quantity'] | |
| side = position['side'] | |
| if side == 'BUY': | |
| exit_price *= (1 - self.slippage) | |
| else: | |
| exit_price *= (1 + self.slippage) | |
| if side == 'BUY': | |
| pnl = (exit_price - entry_price) / entry_price * quantity | |
| else: | |
| pnl = (entry_price - exit_price) / entry_price * quantity | |
| fee = abs(pnl) * self.fee_rate | |
| pnl -= fee | |
| return pnl | |