johnaness's picture
Deploy OStock FastAPI backend to HF Space (Docker SDK, port 7860)
4be2d4d
"""
백테스트 관련 유틸리티 함수 모듈
"""
import numpy as np
import pandas as pd
import yfinance as yf
from .model_evaluation import calculate_dtw, calculate_tdi
def get_risk_free_rate(start_date=None, end_date=None, ticker='^IRX'):
"""
야후 파이낸스에서 국채 수익률 데이터를 가져옵니다.
^TNX: 10년물 미국 국채 수익률
^IRX: 13주물 미국 국채 수익률
"""
try:
# 데이터 형식 확인 및 변환
if start_date and isinstance(start_date, str):
start_date = pd.to_datetime(start_date)
if end_date and isinstance(end_date, str):
end_date = pd.to_datetime(end_date)
# 국채 데이터 다운로드
treasury_data = yf.download(ticker, start=start_date, end=end_date)
if not treasury_data.empty:
# 수익률은 퍼센트로 표시되므로 100으로 나눔
avg_yield_raw = treasury_data['Close'].mean()
avg_yield = float(avg_yield_raw) / 100.0
return avg_yield
except Exception as e:
print(f"국채 수익률 데이터 가져오기 오류: {e}")
# 기본값 반환
default_rate = 0.02 # 2%
print(f"국채 수익률을 가져올 수 없습니다. 기본값 {default_rate:.4f}(2%)를 사용합니다.")
return default_rate
def calculate_max_drawdown(equity_curve):
"""
주식 그래프에서 최대 낙폭을 계산
"""
if len(equity_curve) <= 1:
return 0.0
equity_curve = np.asarray(equity_curve)
peak = np.maximum.accumulate(equity_curve)
drawdown = (equity_curve - peak) / np.maximum(peak, 1e-10)
return np.min(drawdown)
def calculate_performance_metrics(portfolio_values, daily_returns, risk_free_rate=0.0):
"""
포트폴리오 성능 지표 계산
"""
if len(portfolio_values) <= 1:
return {
'total_return': 0.0,
'annualized_return': 0.0,
'sharpe_ratio': 0.0,
'max_drawdown': 0.0,
'win_rate': 0.0,
'avg_return': 0.0,
'std_dev': 0.0,
'trades': []
}
# 넘파이 배열로 변환
returns_array = np.array(daily_returns)
portfolio_values = np.array(portfolio_values)
# 0이 아닌 수익률만 필터링
non_zero_returns = returns_array[np.abs(returns_array) > 1e-8]
# 총 수익률 계산
total_return = (portfolio_values[-1] / portfolio_values[0]) - 1
# 연간화된 수익률
n_days = len(portfolio_values) - 1
if n_days > 0:
n_years = n_days / 252
if n_years > 0:
annualized_return = ((1 + total_return) ** (1 / n_years)) - 1
else:
annualized_return = total_return * 252
else:
annualized_return = 0.0
# 무위험 수익률 처리
if np.isscalar(risk_free_rate):
daily_rf_rate = risk_free_rate / 252
else:
daily_rf_rate = 0.0
# 샤프 비율 계산 방식 개선
if len(non_zero_returns) > 1:
# 실제 거래가 있는 날의 수익률로만 계산
excess_returns = non_zero_returns - daily_rf_rate
excess_mean = np.mean(excess_returns)
excess_std = np.std(excess_returns, ddof=1)
if excess_std > 1e-8:
# 거래 빈도를 고려한 연간화
trading_frequency = len(non_zero_returns) / len(returns_array)
annualized_factor = np.sqrt(252 * trading_frequency)
sharpe_ratio = (excess_mean / excess_std) * annualized_factor
else:
sharpe_ratio = 0.0
else:
# 전체 수익률로 계산
excess_returns = returns_array - daily_rf_rate
if len(excess_returns) > 1:
excess_mean = np.mean(excess_returns)
excess_std = np.std(excess_returns, ddof=1)
if excess_std > 1e-8:
sharpe_ratio = (excess_mean / excess_std) * np.sqrt(252)
else:
sharpe_ratio = 0.0
else:
sharpe_ratio = 0.0
# 최대 낙폭 계산
max_drawdown = calculate_max_drawdown(portfolio_values)
# 승률 계산 (0이 아닌 수익률만 사용)
if len(non_zero_returns) > 0:
positive_returns = non_zero_returns[non_zero_returns > 0]
win_rate = len(positive_returns) / len(non_zero_returns)
else:
win_rate = 0.0
# 연간화된 표준편차
std_dev = np.std(returns_array) * np.sqrt(252)
return {
'total_return': float(total_return),
'annualized_return': float(annualized_return),
'sharpe_ratio': float(sharpe_ratio),
'max_drawdown': float(max_drawdown),
'win_rate': float(win_rate),
'avg_return': float(np.mean(returns_array)),
'std_dev': float(std_dev),
'active_trading_days': len(non_zero_returns),
'total_days': len(returns_array),
'trading_frequency': len(non_zero_returns) / len(returns_array) if len(returns_array) > 0 else 0
}
def backtest_by_ticker(predictions, actual_returns, ticker_ids, threshold=0.05,
commission=0.0025, risk_free_rate=None):
"""
개별 종목별 및 전체 포트폴리오 백테스트 함수 - 매수/홀딩 전략
"""
# 무위험 수익률이 전달되지 않은 경우 가져오기
if risk_free_rate is None:
risk_free_rate = get_risk_free_rate()
# 입력 데이터를 numpy 배열로 변환
if hasattr(predictions, 'values'):
predictions = predictions.values
if hasattr(actual_returns, 'values'):
actual_returns = actual_returns.values
if hasattr(ticker_ids, 'values'):
ticker_ids = ticker_ids.values
# 모든 입력이 numpy 배열임을 보장
predictions = np.asarray(predictions)
actual_returns = np.asarray(actual_returns)
ticker_ids = np.asarray(ticker_ids)
n_samples = len(predictions)
unique_tickers = np.unique(ticker_ids)
n_tickers = len(unique_tickers)
# 전체 포트폴리오 변수
initial_capital = 1.0
portfolio_values = [initial_capital]
portfolio_trades = []
daily_returns = []
# 종목별 성과 추적 변수
ticker_results = {ticker_id: {
'cash': initial_capital / n_tickers, # 각 종목에 균등 배분된 현금
'shares': 0, # 보유 주식 수
'values': [initial_capital / n_tickers],
'returns': [],
'trades': [],
'position': 0, # 0: 현금, 1: 주식 보유
'last_price': 1.0, # 마지막 주가 (정규화된 가격)
} for ticker_id in unique_tickers}
# 날짜별로 모든 종목의 신호 처리
for t in range(n_samples):
daily_portfolio_value = 0.0
previous_portfolio_value = portfolio_values[-1]
# 각 종목별 처리
for ticker_id in unique_tickers:
# 현재 종목의 데이터 찾기
ticker_mask = ticker_ids == ticker_id
if not any(ticker_mask[t:t+1]):
# 해당 시점에 종목 데이터가 없으면 이전 값 유지
ticker_result = ticker_results[ticker_id]
current_value = ticker_result['cash'] + ticker_result['shares'] * ticker_result['last_price']
daily_portfolio_value += current_value
# 수익률은 0으로 처리
ticker_result['returns'].append(0.0)
ticker_result['values'].append(current_value)
continue
# 현재 종목의 예측값과 실제 수익률
ticker_pred = predictions[t:t+1][ticker_mask[t:t+1]][0]
ticker_actual = actual_returns[t:t+1][ticker_mask[t:t+1]][0]
ticker_result = ticker_results[ticker_id]
# 현재 주가 업데이트 (이전 가격 * (1 + 수익률))
ticker_result['last_price'] *= (1 + ticker_actual)
# 신호 결정 (매수/매도만)
if ticker_pred > threshold and ticker_result['position'] == 0:
new_signal = 1
elif ticker_pred < -threshold and ticker_result['position'] == 1:
new_signal = 0
else:
new_signal = ticker_result['position']
current_position = ticker_result['position']
# 포지션 변경 처리
if new_signal != current_position:
if current_position == 0 and new_signal == 1:
# 현금 → 주식 (매수)
available_cash = ticker_result['cash'] * (1 - commission)
shares_to_buy = available_cash / ticker_result['last_price']
ticker_result['shares'] = shares_to_buy
ticker_result['cash'] = 0
ticker_result['position'] = 1
trade = {
'day': t,
'ticker': ticker_id,
'action': 'BUY',
'shares': shares_to_buy,
'price': ticker_result['last_price'],
'value': available_cash,
'pred': ticker_pred
}
elif current_position == 1 and new_signal == 0:
# 주식 → 현금 (매도)
shares_to_sell = ticker_result['shares']
sale_proceeds = shares_to_sell * ticker_result['last_price'] * (1 - commission)
ticker_result['cash'] = sale_proceeds
ticker_result['shares'] = 0
ticker_result['position'] = 0
trade = {
'day': t,
'ticker': ticker_id,
'action': 'SELL',
'shares': shares_to_sell,
'price': ticker_result['last_price'],
'value': sale_proceeds,
'pred': ticker_pred
}
portfolio_trades.append(trade)
ticker_result['trades'].append(trade)
# 현재 포트폴리오 가치 계산
current_value = ticker_result['cash'] + ticker_result['shares'] * ticker_result['last_price']
# 일일 수익률 계산
previous_value = ticker_result['values'][-1] if ticker_result['values'] else initial_capital / n_tickers
ticker_daily_return = (current_value / previous_value) - 1 if previous_value > 0 else 0
# 종목별 수익률과 값 기록
ticker_result['returns'].append(ticker_daily_return)
ticker_result['values'].append(current_value)
# 포트폴리오 값에 반영
daily_portfolio_value += current_value
# 일일 포트폴리오 전체 수익률 계산 및 추가
portfolio_daily_return = (daily_portfolio_value / previous_portfolio_value) - 1 if previous_portfolio_value > 0 else 0
daily_returns.append(portfolio_daily_return)
portfolio_values.append(daily_portfolio_value)
# 전체 포트폴리오 성과 계산
portfolio_metrics = calculate_performance_metrics(portfolio_values, daily_returns, risk_free_rate)
portfolio_metrics['trades'] = portfolio_trades
# 각 종목별 성과 계산
for ticker_id in unique_tickers:
ticker_values = ticker_results[ticker_id]['values']
ticker_returns = ticker_results[ticker_id]['returns']
# 종목별 지표 계산
if len(ticker_returns) > 0:
ticker_metrics = calculate_performance_metrics(ticker_values, ticker_returns, risk_free_rate)
ticker_results[ticker_id].update(ticker_metrics)
else:
ticker_results[ticker_id].update({
'total_return': 0,
'sharpe_ratio': 0,
'max_drawdown': 0,
'trade_count': 0,
'win_rate': 0,
'risk_free_rate': risk_free_rate
})
# DTW와 TDI 계산
try:
# 배열을 명시적으로 1차원으로 변환
flat_predictions = np.asarray(predictions).flatten()
flat_actual_returns = np.asarray(actual_returns).flatten()
# NaN 값 제거
mask_pred = ~np.isnan(flat_predictions)
mask_act = ~np.isnan(flat_actual_returns)
clean_predictions = flat_predictions[mask_pred]
clean_actual_returns = flat_actual_returns[mask_act]
# 길이 맞추기
min_len = min(len(clean_predictions), len(clean_actual_returns))
if min_len > 0:
clean_predictions = clean_predictions[:min_len]
clean_actual_returns = clean_actual_returns[:min_len]
# 이제 1차원 벡터로 DTW 계산
portfolio_metrics['dtw'] = calculate_dtw(clean_predictions, clean_actual_returns)
portfolio_metrics['tdi'] = calculate_tdi(clean_predictions, clean_actual_returns)
else:
print("DTW/TDI 계산을 위한 유효한 데이터 없음")
portfolio_metrics['dtw'] = 1.0
portfolio_metrics['tdi'] = 1.0
except Exception as e:
print(f"DTW/TDI 계산 중 오류: {e}")
portfolio_metrics['dtw'] = 1.0
portfolio_metrics['tdi'] = 1.0
# 결과 결합
result = {
'portfolio': portfolio_metrics,
'by_ticker': ticker_results
}
# 종목별 샤프 비율 평균 추가
avg_sharpe = np.mean([ticker_results[ticker_id]['sharpe_ratio'] for ticker_id in unique_tickers])
result['avg_ticker_sharpe'] = avg_sharpe
return result