""" 백테스트 관련 유틸리티 함수 모듈 """ import numpy as np import pandas as pd import yfinance as yf from .model_evaluation import calculate_dtw, calculate_tdi def get_risk_free_rate(start_date=None, end_date=None, ticker='^IRX'): """ 야후 파이낸스에서 국채 수익률 데이터를 가져옵니다. ^TNX: 10년물 미국 국채 수익률 ^IRX: 13주물 미국 국채 수익률 """ try: # 데이터 형식 확인 및 변환 if start_date and isinstance(start_date, str): start_date = pd.to_datetime(start_date) if end_date and isinstance(end_date, str): end_date = pd.to_datetime(end_date) # 국채 데이터 다운로드 treasury_data = yf.download(ticker, start=start_date, end=end_date) if not treasury_data.empty: # 수익률은 퍼센트로 표시되므로 100으로 나눔 avg_yield_raw = treasury_data['Close'].mean() avg_yield = float(avg_yield_raw) / 100.0 return avg_yield except Exception as e: print(f"국채 수익률 데이터 가져오기 오류: {e}") # 기본값 반환 default_rate = 0.02 # 2% print(f"국채 수익률을 가져올 수 없습니다. 기본값 {default_rate:.4f}(2%)를 사용합니다.") return default_rate def calculate_max_drawdown(equity_curve): """ 주식 그래프에서 최대 낙폭을 계산 """ if len(equity_curve) <= 1: return 0.0 equity_curve = np.asarray(equity_curve) peak = np.maximum.accumulate(equity_curve) drawdown = (equity_curve - peak) / np.maximum(peak, 1e-10) return np.min(drawdown) def calculate_performance_metrics(portfolio_values, daily_returns, risk_free_rate=0.0): """ 포트폴리오 성능 지표 계산 """ if len(portfolio_values) <= 1: return { 'total_return': 0.0, 'annualized_return': 0.0, 'sharpe_ratio': 0.0, 'max_drawdown': 0.0, 'win_rate': 0.0, 'avg_return': 0.0, 'std_dev': 0.0, 'trades': [] } # 넘파이 배열로 변환 returns_array = np.array(daily_returns) portfolio_values = np.array(portfolio_values) # 0이 아닌 수익률만 필터링 non_zero_returns = returns_array[np.abs(returns_array) > 1e-8] # 총 수익률 계산 total_return = (portfolio_values[-1] / portfolio_values[0]) - 1 # 연간화된 수익률 n_days = len(portfolio_values) - 1 if n_days > 0: n_years = n_days / 252 if n_years > 0: annualized_return = ((1 + total_return) ** (1 / n_years)) - 1 else: annualized_return = total_return * 252 else: annualized_return = 0.0 # 무위험 수익률 처리 if np.isscalar(risk_free_rate): daily_rf_rate = risk_free_rate / 252 else: daily_rf_rate = 0.0 # 샤프 비율 계산 방식 개선 if len(non_zero_returns) > 1: # 실제 거래가 있는 날의 수익률로만 계산 excess_returns = non_zero_returns - daily_rf_rate excess_mean = np.mean(excess_returns) excess_std = np.std(excess_returns, ddof=1) if excess_std > 1e-8: # 거래 빈도를 고려한 연간화 trading_frequency = len(non_zero_returns) / len(returns_array) annualized_factor = np.sqrt(252 * trading_frequency) sharpe_ratio = (excess_mean / excess_std) * annualized_factor else: sharpe_ratio = 0.0 else: # 전체 수익률로 계산 excess_returns = returns_array - daily_rf_rate if len(excess_returns) > 1: excess_mean = np.mean(excess_returns) excess_std = np.std(excess_returns, ddof=1) if excess_std > 1e-8: sharpe_ratio = (excess_mean / excess_std) * np.sqrt(252) else: sharpe_ratio = 0.0 else: sharpe_ratio = 0.0 # 최대 낙폭 계산 max_drawdown = calculate_max_drawdown(portfolio_values) # 승률 계산 (0이 아닌 수익률만 사용) if len(non_zero_returns) > 0: positive_returns = non_zero_returns[non_zero_returns > 0] win_rate = len(positive_returns) / len(non_zero_returns) else: win_rate = 0.0 # 연간화된 표준편차 std_dev = np.std(returns_array) * np.sqrt(252) return { 'total_return': float(total_return), 'annualized_return': float(annualized_return), 'sharpe_ratio': float(sharpe_ratio), 'max_drawdown': float(max_drawdown), 'win_rate': float(win_rate), 'avg_return': float(np.mean(returns_array)), 'std_dev': float(std_dev), 'active_trading_days': len(non_zero_returns), 'total_days': len(returns_array), 'trading_frequency': len(non_zero_returns) / len(returns_array) if len(returns_array) > 0 else 0 } def backtest_by_ticker(predictions, actual_returns, ticker_ids, threshold=0.05, commission=0.0025, risk_free_rate=None): """ 개별 종목별 및 전체 포트폴리오 백테스트 함수 - 매수/홀딩 전략 """ # 무위험 수익률이 전달되지 않은 경우 가져오기 if risk_free_rate is None: risk_free_rate = get_risk_free_rate() # 입력 데이터를 numpy 배열로 변환 if hasattr(predictions, 'values'): predictions = predictions.values if hasattr(actual_returns, 'values'): actual_returns = actual_returns.values if hasattr(ticker_ids, 'values'): ticker_ids = ticker_ids.values # 모든 입력이 numpy 배열임을 보장 predictions = np.asarray(predictions) actual_returns = np.asarray(actual_returns) ticker_ids = np.asarray(ticker_ids) n_samples = len(predictions) unique_tickers = np.unique(ticker_ids) n_tickers = len(unique_tickers) # 전체 포트폴리오 변수 initial_capital = 1.0 portfolio_values = [initial_capital] portfolio_trades = [] daily_returns = [] # 종목별 성과 추적 변수 ticker_results = {ticker_id: { 'cash': initial_capital / n_tickers, # 각 종목에 균등 배분된 현금 'shares': 0, # 보유 주식 수 'values': [initial_capital / n_tickers], 'returns': [], 'trades': [], 'position': 0, # 0: 현금, 1: 주식 보유 'last_price': 1.0, # 마지막 주가 (정규화된 가격) } for ticker_id in unique_tickers} # 날짜별로 모든 종목의 신호 처리 for t in range(n_samples): daily_portfolio_value = 0.0 previous_portfolio_value = portfolio_values[-1] # 각 종목별 처리 for ticker_id in unique_tickers: # 현재 종목의 데이터 찾기 ticker_mask = ticker_ids == ticker_id if not any(ticker_mask[t:t+1]): # 해당 시점에 종목 데이터가 없으면 이전 값 유지 ticker_result = ticker_results[ticker_id] current_value = ticker_result['cash'] + ticker_result['shares'] * ticker_result['last_price'] daily_portfolio_value += current_value # 수익률은 0으로 처리 ticker_result['returns'].append(0.0) ticker_result['values'].append(current_value) continue # 현재 종목의 예측값과 실제 수익률 ticker_pred = predictions[t:t+1][ticker_mask[t:t+1]][0] ticker_actual = actual_returns[t:t+1][ticker_mask[t:t+1]][0] ticker_result = ticker_results[ticker_id] # 현재 주가 업데이트 (이전 가격 * (1 + 수익률)) ticker_result['last_price'] *= (1 + ticker_actual) # 신호 결정 (매수/매도만) if ticker_pred > threshold and ticker_result['position'] == 0: new_signal = 1 elif ticker_pred < -threshold and ticker_result['position'] == 1: new_signal = 0 else: new_signal = ticker_result['position'] current_position = ticker_result['position'] # 포지션 변경 처리 if new_signal != current_position: if current_position == 0 and new_signal == 1: # 현금 → 주식 (매수) available_cash = ticker_result['cash'] * (1 - commission) shares_to_buy = available_cash / ticker_result['last_price'] ticker_result['shares'] = shares_to_buy ticker_result['cash'] = 0 ticker_result['position'] = 1 trade = { 'day': t, 'ticker': ticker_id, 'action': 'BUY', 'shares': shares_to_buy, 'price': ticker_result['last_price'], 'value': available_cash, 'pred': ticker_pred } elif current_position == 1 and new_signal == 0: # 주식 → 현금 (매도) shares_to_sell = ticker_result['shares'] sale_proceeds = shares_to_sell * ticker_result['last_price'] * (1 - commission) ticker_result['cash'] = sale_proceeds ticker_result['shares'] = 0 ticker_result['position'] = 0 trade = { 'day': t, 'ticker': ticker_id, 'action': 'SELL', 'shares': shares_to_sell, 'price': ticker_result['last_price'], 'value': sale_proceeds, 'pred': ticker_pred } portfolio_trades.append(trade) ticker_result['trades'].append(trade) # 현재 포트폴리오 가치 계산 current_value = ticker_result['cash'] + ticker_result['shares'] * ticker_result['last_price'] # 일일 수익률 계산 previous_value = ticker_result['values'][-1] if ticker_result['values'] else initial_capital / n_tickers ticker_daily_return = (current_value / previous_value) - 1 if previous_value > 0 else 0 # 종목별 수익률과 값 기록 ticker_result['returns'].append(ticker_daily_return) ticker_result['values'].append(current_value) # 포트폴리오 값에 반영 daily_portfolio_value += current_value # 일일 포트폴리오 전체 수익률 계산 및 추가 portfolio_daily_return = (daily_portfolio_value / previous_portfolio_value) - 1 if previous_portfolio_value > 0 else 0 daily_returns.append(portfolio_daily_return) portfolio_values.append(daily_portfolio_value) # 전체 포트폴리오 성과 계산 portfolio_metrics = calculate_performance_metrics(portfolio_values, daily_returns, risk_free_rate) portfolio_metrics['trades'] = portfolio_trades # 각 종목별 성과 계산 for ticker_id in unique_tickers: ticker_values = ticker_results[ticker_id]['values'] ticker_returns = ticker_results[ticker_id]['returns'] # 종목별 지표 계산 if len(ticker_returns) > 0: ticker_metrics = calculate_performance_metrics(ticker_values, ticker_returns, risk_free_rate) ticker_results[ticker_id].update(ticker_metrics) else: ticker_results[ticker_id].update({ 'total_return': 0, 'sharpe_ratio': 0, 'max_drawdown': 0, 'trade_count': 0, 'win_rate': 0, 'risk_free_rate': risk_free_rate }) # DTW와 TDI 계산 try: # 배열을 명시적으로 1차원으로 변환 flat_predictions = np.asarray(predictions).flatten() flat_actual_returns = np.asarray(actual_returns).flatten() # NaN 값 제거 mask_pred = ~np.isnan(flat_predictions) mask_act = ~np.isnan(flat_actual_returns) clean_predictions = flat_predictions[mask_pred] clean_actual_returns = flat_actual_returns[mask_act] # 길이 맞추기 min_len = min(len(clean_predictions), len(clean_actual_returns)) if min_len > 0: clean_predictions = clean_predictions[:min_len] clean_actual_returns = clean_actual_returns[:min_len] # 이제 1차원 벡터로 DTW 계산 portfolio_metrics['dtw'] = calculate_dtw(clean_predictions, clean_actual_returns) portfolio_metrics['tdi'] = calculate_tdi(clean_predictions, clean_actual_returns) else: print("DTW/TDI 계산을 위한 유효한 데이터 없음") portfolio_metrics['dtw'] = 1.0 portfolio_metrics['tdi'] = 1.0 except Exception as e: print(f"DTW/TDI 계산 중 오류: {e}") portfolio_metrics['dtw'] = 1.0 portfolio_metrics['tdi'] = 1.0 # 결과 결합 result = { 'portfolio': portfolio_metrics, 'by_ticker': ticker_results } # 종목별 샤프 비율 평균 추가 avg_sharpe = np.mean([ticker_results[ticker_id]['sharpe_ratio'] for ticker_id in unique_tickers]) result['avg_ticker_sharpe'] = avg_sharpe return result