Spaces:
Sleeping
Sleeping
| """ | |
| 백테스트 관련 유틸리티 함수 모듈 | |
| """ | |
| import numpy as np | |
| import pandas as pd | |
| import yfinance as yf | |
| from .model_evaluation import calculate_dtw, calculate_tdi | |
| def get_risk_free_rate(start_date=None, end_date=None, ticker='^IRX'): | |
| """ | |
| 야후 파이낸스에서 국채 수익률 데이터를 가져옵니다. | |
| ^TNX: 10년물 미국 국채 수익률 | |
| ^IRX: 13주물 미국 국채 수익률 | |
| """ | |
| try: | |
| # 데이터 형식 확인 및 변환 | |
| if start_date and isinstance(start_date, str): | |
| start_date = pd.to_datetime(start_date) | |
| if end_date and isinstance(end_date, str): | |
| end_date = pd.to_datetime(end_date) | |
| # 국채 데이터 다운로드 | |
| treasury_data = yf.download(ticker, start=start_date, end=end_date) | |
| if not treasury_data.empty: | |
| # 수익률은 퍼센트로 표시되므로 100으로 나눔 | |
| avg_yield_raw = treasury_data['Close'].mean() | |
| avg_yield = float(avg_yield_raw) / 100.0 | |
| return avg_yield | |
| except Exception as e: | |
| print(f"국채 수익률 데이터 가져오기 오류: {e}") | |
| # 기본값 반환 | |
| default_rate = 0.02 # 2% | |
| print(f"국채 수익률을 가져올 수 없습니다. 기본값 {default_rate:.4f}(2%)를 사용합니다.") | |
| return default_rate | |
| def calculate_max_drawdown(equity_curve): | |
| """ | |
| 주식 그래프에서 최대 낙폭을 계산 | |
| """ | |
| if len(equity_curve) <= 1: | |
| return 0.0 | |
| equity_curve = np.asarray(equity_curve) | |
| peak = np.maximum.accumulate(equity_curve) | |
| drawdown = (equity_curve - peak) / np.maximum(peak, 1e-10) | |
| return np.min(drawdown) | |
| def calculate_performance_metrics(portfolio_values, daily_returns, risk_free_rate=0.0): | |
| """ | |
| 포트폴리오 성능 지표 계산 | |
| """ | |
| if len(portfolio_values) <= 1: | |
| return { | |
| 'total_return': 0.0, | |
| 'annualized_return': 0.0, | |
| 'sharpe_ratio': 0.0, | |
| 'max_drawdown': 0.0, | |
| 'win_rate': 0.0, | |
| 'avg_return': 0.0, | |
| 'std_dev': 0.0, | |
| 'trades': [] | |
| } | |
| # 넘파이 배열로 변환 | |
| returns_array = np.array(daily_returns) | |
| portfolio_values = np.array(portfolio_values) | |
| # 0이 아닌 수익률만 필터링 | |
| non_zero_returns = returns_array[np.abs(returns_array) > 1e-8] | |
| # 총 수익률 계산 | |
| total_return = (portfolio_values[-1] / portfolio_values[0]) - 1 | |
| # 연간화된 수익률 | |
| n_days = len(portfolio_values) - 1 | |
| if n_days > 0: | |
| n_years = n_days / 252 | |
| if n_years > 0: | |
| annualized_return = ((1 + total_return) ** (1 / n_years)) - 1 | |
| else: | |
| annualized_return = total_return * 252 | |
| else: | |
| annualized_return = 0.0 | |
| # 무위험 수익률 처리 | |
| if np.isscalar(risk_free_rate): | |
| daily_rf_rate = risk_free_rate / 252 | |
| else: | |
| daily_rf_rate = 0.0 | |
| # 샤프 비율 계산 방식 개선 | |
| if len(non_zero_returns) > 1: | |
| # 실제 거래가 있는 날의 수익률로만 계산 | |
| excess_returns = non_zero_returns - daily_rf_rate | |
| excess_mean = np.mean(excess_returns) | |
| excess_std = np.std(excess_returns, ddof=1) | |
| if excess_std > 1e-8: | |
| # 거래 빈도를 고려한 연간화 | |
| trading_frequency = len(non_zero_returns) / len(returns_array) | |
| annualized_factor = np.sqrt(252 * trading_frequency) | |
| sharpe_ratio = (excess_mean / excess_std) * annualized_factor | |
| else: | |
| sharpe_ratio = 0.0 | |
| else: | |
| # 전체 수익률로 계산 | |
| excess_returns = returns_array - daily_rf_rate | |
| if len(excess_returns) > 1: | |
| excess_mean = np.mean(excess_returns) | |
| excess_std = np.std(excess_returns, ddof=1) | |
| if excess_std > 1e-8: | |
| sharpe_ratio = (excess_mean / excess_std) * np.sqrt(252) | |
| else: | |
| sharpe_ratio = 0.0 | |
| else: | |
| sharpe_ratio = 0.0 | |
| # 최대 낙폭 계산 | |
| max_drawdown = calculate_max_drawdown(portfolio_values) | |
| # 승률 계산 (0이 아닌 수익률만 사용) | |
| if len(non_zero_returns) > 0: | |
| positive_returns = non_zero_returns[non_zero_returns > 0] | |
| win_rate = len(positive_returns) / len(non_zero_returns) | |
| else: | |
| win_rate = 0.0 | |
| # 연간화된 표준편차 | |
| std_dev = np.std(returns_array) * np.sqrt(252) | |
| return { | |
| 'total_return': float(total_return), | |
| 'annualized_return': float(annualized_return), | |
| 'sharpe_ratio': float(sharpe_ratio), | |
| 'max_drawdown': float(max_drawdown), | |
| 'win_rate': float(win_rate), | |
| 'avg_return': float(np.mean(returns_array)), | |
| 'std_dev': float(std_dev), | |
| 'active_trading_days': len(non_zero_returns), | |
| 'total_days': len(returns_array), | |
| 'trading_frequency': len(non_zero_returns) / len(returns_array) if len(returns_array) > 0 else 0 | |
| } | |
| def backtest_by_ticker(predictions, actual_returns, ticker_ids, threshold=0.05, | |
| commission=0.0025, risk_free_rate=None): | |
| """ | |
| 개별 종목별 및 전체 포트폴리오 백테스트 함수 - 매수/홀딩 전략 | |
| """ | |
| # 무위험 수익률이 전달되지 않은 경우 가져오기 | |
| if risk_free_rate is None: | |
| risk_free_rate = get_risk_free_rate() | |
| # 입력 데이터를 numpy 배열로 변환 | |
| if hasattr(predictions, 'values'): | |
| predictions = predictions.values | |
| if hasattr(actual_returns, 'values'): | |
| actual_returns = actual_returns.values | |
| if hasattr(ticker_ids, 'values'): | |
| ticker_ids = ticker_ids.values | |
| # 모든 입력이 numpy 배열임을 보장 | |
| predictions = np.asarray(predictions) | |
| actual_returns = np.asarray(actual_returns) | |
| ticker_ids = np.asarray(ticker_ids) | |
| n_samples = len(predictions) | |
| unique_tickers = np.unique(ticker_ids) | |
| n_tickers = len(unique_tickers) | |
| # 전체 포트폴리오 변수 | |
| initial_capital = 1.0 | |
| portfolio_values = [initial_capital] | |
| portfolio_trades = [] | |
| daily_returns = [] | |
| # 종목별 성과 추적 변수 | |
| ticker_results = {ticker_id: { | |
| 'cash': initial_capital / n_tickers, # 각 종목에 균등 배분된 현금 | |
| 'shares': 0, # 보유 주식 수 | |
| 'values': [initial_capital / n_tickers], | |
| 'returns': [], | |
| 'trades': [], | |
| 'position': 0, # 0: 현금, 1: 주식 보유 | |
| 'last_price': 1.0, # 마지막 주가 (정규화된 가격) | |
| } for ticker_id in unique_tickers} | |
| # 날짜별로 모든 종목의 신호 처리 | |
| for t in range(n_samples): | |
| daily_portfolio_value = 0.0 | |
| previous_portfolio_value = portfolio_values[-1] | |
| # 각 종목별 처리 | |
| for ticker_id in unique_tickers: | |
| # 현재 종목의 데이터 찾기 | |
| ticker_mask = ticker_ids == ticker_id | |
| if not any(ticker_mask[t:t+1]): | |
| # 해당 시점에 종목 데이터가 없으면 이전 값 유지 | |
| ticker_result = ticker_results[ticker_id] | |
| current_value = ticker_result['cash'] + ticker_result['shares'] * ticker_result['last_price'] | |
| daily_portfolio_value += current_value | |
| # 수익률은 0으로 처리 | |
| ticker_result['returns'].append(0.0) | |
| ticker_result['values'].append(current_value) | |
| continue | |
| # 현재 종목의 예측값과 실제 수익률 | |
| ticker_pred = predictions[t:t+1][ticker_mask[t:t+1]][0] | |
| ticker_actual = actual_returns[t:t+1][ticker_mask[t:t+1]][0] | |
| ticker_result = ticker_results[ticker_id] | |
| # 현재 주가 업데이트 (이전 가격 * (1 + 수익률)) | |
| ticker_result['last_price'] *= (1 + ticker_actual) | |
| # 신호 결정 (매수/매도만) | |
| if ticker_pred > threshold and ticker_result['position'] == 0: | |
| new_signal = 1 | |
| elif ticker_pred < -threshold and ticker_result['position'] == 1: | |
| new_signal = 0 | |
| else: | |
| new_signal = ticker_result['position'] | |
| current_position = ticker_result['position'] | |
| # 포지션 변경 처리 | |
| if new_signal != current_position: | |
| if current_position == 0 and new_signal == 1: | |
| # 현금 → 주식 (매수) | |
| available_cash = ticker_result['cash'] * (1 - commission) | |
| shares_to_buy = available_cash / ticker_result['last_price'] | |
| ticker_result['shares'] = shares_to_buy | |
| ticker_result['cash'] = 0 | |
| ticker_result['position'] = 1 | |
| trade = { | |
| 'day': t, | |
| 'ticker': ticker_id, | |
| 'action': 'BUY', | |
| 'shares': shares_to_buy, | |
| 'price': ticker_result['last_price'], | |
| 'value': available_cash, | |
| 'pred': ticker_pred | |
| } | |
| elif current_position == 1 and new_signal == 0: | |
| # 주식 → 현금 (매도) | |
| shares_to_sell = ticker_result['shares'] | |
| sale_proceeds = shares_to_sell * ticker_result['last_price'] * (1 - commission) | |
| ticker_result['cash'] = sale_proceeds | |
| ticker_result['shares'] = 0 | |
| ticker_result['position'] = 0 | |
| trade = { | |
| 'day': t, | |
| 'ticker': ticker_id, | |
| 'action': 'SELL', | |
| 'shares': shares_to_sell, | |
| 'price': ticker_result['last_price'], | |
| 'value': sale_proceeds, | |
| 'pred': ticker_pred | |
| } | |
| portfolio_trades.append(trade) | |
| ticker_result['trades'].append(trade) | |
| # 현재 포트폴리오 가치 계산 | |
| current_value = ticker_result['cash'] + ticker_result['shares'] * ticker_result['last_price'] | |
| # 일일 수익률 계산 | |
| previous_value = ticker_result['values'][-1] if ticker_result['values'] else initial_capital / n_tickers | |
| ticker_daily_return = (current_value / previous_value) - 1 if previous_value > 0 else 0 | |
| # 종목별 수익률과 값 기록 | |
| ticker_result['returns'].append(ticker_daily_return) | |
| ticker_result['values'].append(current_value) | |
| # 포트폴리오 값에 반영 | |
| daily_portfolio_value += current_value | |
| # 일일 포트폴리오 전체 수익률 계산 및 추가 | |
| portfolio_daily_return = (daily_portfolio_value / previous_portfolio_value) - 1 if previous_portfolio_value > 0 else 0 | |
| daily_returns.append(portfolio_daily_return) | |
| portfolio_values.append(daily_portfolio_value) | |
| # 전체 포트폴리오 성과 계산 | |
| portfolio_metrics = calculate_performance_metrics(portfolio_values, daily_returns, risk_free_rate) | |
| portfolio_metrics['trades'] = portfolio_trades | |
| # 각 종목별 성과 계산 | |
| for ticker_id in unique_tickers: | |
| ticker_values = ticker_results[ticker_id]['values'] | |
| ticker_returns = ticker_results[ticker_id]['returns'] | |
| # 종목별 지표 계산 | |
| if len(ticker_returns) > 0: | |
| ticker_metrics = calculate_performance_metrics(ticker_values, ticker_returns, risk_free_rate) | |
| ticker_results[ticker_id].update(ticker_metrics) | |
| else: | |
| ticker_results[ticker_id].update({ | |
| 'total_return': 0, | |
| 'sharpe_ratio': 0, | |
| 'max_drawdown': 0, | |
| 'trade_count': 0, | |
| 'win_rate': 0, | |
| 'risk_free_rate': risk_free_rate | |
| }) | |
| # DTW와 TDI 계산 | |
| try: | |
| # 배열을 명시적으로 1차원으로 변환 | |
| flat_predictions = np.asarray(predictions).flatten() | |
| flat_actual_returns = np.asarray(actual_returns).flatten() | |
| # NaN 값 제거 | |
| mask_pred = ~np.isnan(flat_predictions) | |
| mask_act = ~np.isnan(flat_actual_returns) | |
| clean_predictions = flat_predictions[mask_pred] | |
| clean_actual_returns = flat_actual_returns[mask_act] | |
| # 길이 맞추기 | |
| min_len = min(len(clean_predictions), len(clean_actual_returns)) | |
| if min_len > 0: | |
| clean_predictions = clean_predictions[:min_len] | |
| clean_actual_returns = clean_actual_returns[:min_len] | |
| # 이제 1차원 벡터로 DTW 계산 | |
| portfolio_metrics['dtw'] = calculate_dtw(clean_predictions, clean_actual_returns) | |
| portfolio_metrics['tdi'] = calculate_tdi(clean_predictions, clean_actual_returns) | |
| else: | |
| print("DTW/TDI 계산을 위한 유효한 데이터 없음") | |
| portfolio_metrics['dtw'] = 1.0 | |
| portfolio_metrics['tdi'] = 1.0 | |
| except Exception as e: | |
| print(f"DTW/TDI 계산 중 오류: {e}") | |
| portfolio_metrics['dtw'] = 1.0 | |
| portfolio_metrics['tdi'] = 1.0 | |
| # 결과 결합 | |
| result = { | |
| 'portfolio': portfolio_metrics, | |
| 'by_ticker': ticker_results | |
| } | |
| # 종목별 샤프 비율 평균 추가 | |
| avg_sharpe = np.mean([ticker_results[ticker_id]['sharpe_ratio'] for ticker_id in unique_tickers]) | |
| result['avg_ticker_sharpe'] = avg_sharpe | |
| return result |