""" 기술적 지표 최적화 관련 함수 """ import numpy as np import pandas as pd import yfinance as yf from ..evaluation.backtest import get_risk_free_rate from .technical_indicators import calculate_ema_series, calculate_macd, calculate_cmf, calculate_rsi def calculate_risk_free_rate(data, risk_free_rates=None): """무위험 수익률 계산을 위한 공통 함수""" annual_rf_rate = 0.01 # 기본값 if risk_free_rates is not None: try: # 단일 값(float)인 경우 if isinstance(risk_free_rates, (float, int)): annual_rf_rate = float(risk_free_rates) # Series/DataFrame인 경우 elif hasattr(risk_free_rates, 'index'): start_date = data.index[0] end_date = data.index[-1] try: rf_subset = risk_free_rates.loc[start_date:end_date] if not rf_subset.empty: annual_rf_rate = rf_subset.mean() else: annual_rf_rate = risk_free_rates.mean() except: annual_rf_rate = risk_free_rates.mean() except Exception as e: print(f"무위험 수익률 처리 오류: {e}") annual_rf_rate = 0.01 return annual_rf_rate def minmax_scale(value, min_val, max_val, inverse=False): """정규화를 위한 공통 함수""" if max_val == min_val: return 0 normalized = (value - min_val) / (max_val - min_val) return 1 - normalized if inverse else normalized def calculate_performance_metrics(total_return, returns, portfolio_values, mdd, data_length, risk_free_rate): """성능 지표 계산을 위한 공통 함수""" # 연간수익률 계산 annual_return = ((1 + total_return) ** (252/data_length)) - 1 annual_std = np.std(returns) * np.sqrt(252) if returns else 0 # 샤프지수 계산 sharpe_ratio = (annual_return - risk_free_rate) / annual_std if annual_std != 0 else 0 return { 'annual_return': annual_return, 'sharpe_ratio': sharpe_ratio, 'mdd': mdd, 'total_return': total_return } def find_optimal_parameters(all_results, params_keys, metric_weights={'sharpe_ratio': 0.9, 'mdd': 0.1}): """최적 파라미터 찾기를 위한 공통 함수""" if not all_results: return None # 정규화를 위한 최소/최대값 찾기 sharpe_ratios = [r['metrics']['sharpe_ratio'] for r in all_results] mdds = [r['metrics']['mdd'] for r in all_results] min_sharpe, max_sharpe = min(sharpe_ratios), max(sharpe_ratios) min_mdd, max_mdd = min(mdds), max(mdds) # 최적 파라미터 찾기 best_score = float('-inf') best_params = None for result in all_results: normalized_sharpe = minmax_scale(result['metrics']['sharpe_ratio'], min_sharpe, max_sharpe) normalized_mdd = minmax_scale(result['metrics']['mdd'], min_mdd, max_mdd, inverse=True) score = (metric_weights['sharpe_ratio'] * normalized_sharpe + metric_weights['mdd'] * normalized_mdd) if score > best_score: best_score = score best_params = {key: result[key] for key in params_keys} best_params['metrics'] = result['metrics'] return best_params def backtest_ema(data, short_ema, long_ema): """EMA 크로스오버 백테스팅""" positions = [] returns = [] close_values = data['Close'].values short_values = short_ema.values long_values = long_ema.values for i in range(1, len(short_values)): if (short_values[i-1] <= long_values[i-1]) and (short_values[i] > long_values[i]): positions.append((i, 'buy')) elif (short_values[i-1] >= long_values[i-1]) and (short_values[i] < long_values[i]): positions.append((i, 'sell')) # 거래 수익률 계산 for i, (idx, action) in enumerate(positions[:-1]): if action == 'buy': buy_price = close_values[idx] sell_idx = positions[i + 1][0] sell_price = close_values[sell_idx] returns.append((sell_price - buy_price) / buy_price) # 포트폴리오 가치 시뮬레이션 portfolio_values = [1.0] position = False buy_idx = 0 for i in range(1, len(short_values)): portfolio_values.append(portfolio_values[-1]) # 매수 신호 if (short_values[i-1] <= long_values[i-1]) and (short_values[i] > long_values[i]) and not position: position = True buy_idx = i # 매도 신호 elif (short_values[i-1] >= long_values[i-1]) and (short_values[i] < long_values[i]) and position: position = False buy_price = close_values[buy_idx] sell_price = close_values[i] trade_return = (sell_price - buy_price) / buy_price portfolio_values[-1] *= (1 + trade_return) # MDD 계산 cummax = np.maximum.accumulate(portfolio_values) drawdowns = (cummax - portfolio_values) / cummax mdd = np.max(drawdowns) if len(drawdowns) > 0 else 0 total_return = np.prod([1 + r for r in returns]) - 1 if returns else 0 return total_return, returns, portfolio_values, mdd def evaluate_ema_strategy(data, short_period, long_period, risk_free_rates=None): """EMA 전략 평가""" short_ema = calculate_ema_series(data['Close'], short_period) long_ema = calculate_ema_series(data['Close'], long_period) total_return, returns, portfolio_values, mdd = backtest_ema(data, short_ema, long_ema) if not returns: return {'annual_return': 0, 'sharpe_ratio': 0, 'mdd': 1, 'total_return': 0} # 무위험 수익률 가져오기 risk_free_rate = calculate_risk_free_rate(data, risk_free_rates) # 성능 지표 계산 metrics = calculate_performance_metrics(total_return, returns, portfolio_values, mdd, len(data), risk_free_rate) # EMA 특화 정보 추가 metrics.update({ 'short_period': short_period, 'long_period': long_period }) return metrics def optimize_ema_parameters(data, risk_free_rates=None): """최적의 EMA 파라미터 찾기""" short_periods = range(5, 50, 5) long_periods = range(50, 200, 10) all_results = [] for short in short_periods: for long in long_periods: if short >= long: continue try: result = evaluate_ema_strategy(data, short, long, risk_free_rates) all_results.append({ 'short': short, 'long': long, 'metrics': result }) except Exception as e: print(f"Error with EMA params {short}-{long}: {e}") continue if not all_results: return {'short': 10, 'long': 50} # 기본값 # 최적 파라미터 찾기 best_params = find_optimal_parameters(all_results, ['short', 'long']) return best_params def backtest_macd(data, macd, signal): """MACD 백테스팅 함수""" positions = [] returns = [] macd_values = macd.values signal_values = signal.values close_values = data['Close'].values for i in range(1, len(macd_values)): if (macd_values[i - 1] <= signal_values[i - 1]) and (macd_values[i] > signal_values[i]): positions.append((i, 'buy')) elif (macd_values[i - 1] >= signal_values[i - 1]) and (macd_values[i] < signal_values[i]): positions.append((i, 'sell')) # 거래 수익률 계산 for i, (idx, action) in enumerate(positions[:-1]): if action == 'buy': buy_price = close_values[idx] sell_idx = positions[i + 1][0] sell_price = close_values[sell_idx] returns.append((sell_price - buy_price) / buy_price) # 포트폴리오 가치 시뮬레이션 portfolio_values = [1.0] position = False buy_idx = 0 for i, (idx, action) in enumerate(positions): # 이전 포지션 가치 확장 while len(portfolio_values) <= idx: portfolio_values.append(portfolio_values[-1]) # 매수 신호 if action == 'buy' and not position: position = True buy_idx = idx # 매도 신호 elif action == 'sell' and position: position = False buy_price = close_values[buy_idx] sell_price = close_values[idx] trade_return = (sell_price - buy_price) / buy_price portfolio_values[-1] *= (1 + trade_return) while len(portfolio_values) < len(close_values): portfolio_values.append(portfolio_values[-1]) # MDD 계산 cummax = np.maximum.accumulate(portfolio_values) drawdowns = (cummax - portfolio_values) / cummax mdd = np.max(drawdowns) if len(drawdowns) > 0 else 0 total_return = np.prod([1 + r for r in returns]) - 1 if returns else 0 return total_return, returns, portfolio_values, mdd def evaluate_macd_strategy(data, fast_period, slow_period, signal_period, risk_free_rates=None): """MACD 전략 평가""" macd, signal = calculate_macd(data, fast_period, slow_period, signal_period) total_return, returns, portfolio_values, mdd = backtest_macd(data, macd, signal) if not returns: return {'annual_return': 0, 'sharpe_ratio': 0, 'mdd': 1, 'total_return': 0} # 무위험 수익률 가져오기 risk_free_rate = calculate_risk_free_rate(data, risk_free_rates) # 성능 지표 계산 metrics = calculate_performance_metrics(total_return, returns, portfolio_values, mdd, len(data), risk_free_rate) # MACD 특화 정보 추가 metrics.update({ 'fast_period': fast_period, 'slow_period': slow_period, 'signal_period': signal_period }) return metrics def optimize_macd_parameters(data, risk_free_rates=None): """MACD 파라미터 최적화""" fast_periods = range(5, 20, 2) slow_periods = range(20, 60, 5) signal_periods = range(5, 20, 2) all_results = [] for fast in fast_periods: for slow in slow_periods: if fast >= slow: continue for signal in signal_periods: try: result = evaluate_macd_strategy(data, fast, slow, signal, risk_free_rates) all_results.append({ 'fast': fast, 'slow': slow, 'signal': signal, 'metrics': result }) except Exception as e: print(f"Error with MACD params {fast}-{slow}-{signal}: {e}") continue if not all_results: return {'fast': 12, 'slow': 26, 'signal': 9} # 기본값 # 최적 파라미터 찾기 best_params = find_optimal_parameters(all_results, ['fast', 'slow', 'signal']) return best_params def backtest_cmf(data, cmf, threshold=0.05): """CMF 백테스팅""" positions = [] returns = [] close_values = data['Close'].values cmf_values = cmf.values # 매매 신호 생성 for i in range(1, len(cmf_values)): if np.isnan(cmf_values[i-1]) or np.isnan(cmf_values[i]): continue if (cmf_values[i-1] <= threshold) and (cmf_values[i] > threshold): positions.append((i, 'buy')) elif (cmf_values[i-1] >= -threshold) and (cmf_values[i] < -threshold): positions.append((i, 'sell')) # 거래 수익률 계산 for i, (idx, action) in enumerate(positions[:-1]): if action == 'buy': buy_price = close_values[idx] sell_idx = positions[i + 1][0] sell_price = close_values[sell_idx] returns.append((sell_price - buy_price) / buy_price) # 포트폴리오 가치 시뮬레이션 portfolio_values = [1.0] position = False buy_idx = 0 for i, (idx, action) in enumerate(positions): # 이전 포지션 가치 확장 while len(portfolio_values) <= idx: portfolio_values.append(portfolio_values[-1]) # 매수 신호 if action == 'buy' and not position: position = True buy_idx = idx # 매도 신호 elif action == 'sell' and position: position = False buy_price = close_values[buy_idx] sell_price = close_values[idx] trade_return = (sell_price - buy_price) / buy_price portfolio_values[-1] *= (1 + trade_return) while len(portfolio_values) < len(close_values): portfolio_values.append(portfolio_values[-1]) # MDD 계산 cummax = np.maximum.accumulate(portfolio_values) drawdowns = (cummax - portfolio_values) / cummax mdd = np.max(drawdowns) if len(drawdowns) > 0 else 0 total_return = np.prod([1 + r for r in returns]) - 1 if returns else 0 return total_return, returns, portfolio_values, mdd def evaluate_cmf_strategy(data, period, threshold=0.05, risk_free_rates=None): """CMF 전략 평가""" # CMF 계산 df_temp = data.copy() df_temp = calculate_cmf(df_temp, period) cmf = df_temp[f'CMF_{period}'] total_return, returns, portfolio_values, mdd = backtest_cmf(data, cmf, threshold) if not returns: return {'annual_return': 0, 'sharpe_ratio': 0, 'mdd': 1, 'total_return': 0} # 무위험 수익률 가져오기 risk_free_rate = calculate_risk_free_rate(data, risk_free_rates) # 성능 지표 계산 metrics = calculate_performance_metrics(total_return, returns, portfolio_values, mdd, len(data), risk_free_rate) # CMF 특화 정보 추가 metrics.update({ 'period': period }) return metrics def optimize_cmf_period(data, risk_free_rates=None): """최적의 CMF 기간 찾기""" periods = range(10, 50, 5) # 10에서 45까지 5씩 증가 all_results = [] for period in periods: try: result = evaluate_cmf_strategy(data, period, risk_free_rates=risk_free_rates) all_results.append({ 'period': period, 'metrics': result }) except Exception as e: print(f"CMF period {period} optimization error: {e}") continue if not all_results: return 20 # 최적 파라미터 찾기 best_result = find_optimal_parameters(all_results, ['period']) return best_result['period'] if best_result else 20 def backtest_rsi(data, rsi, upper_threshold, lower_threshold): """RSI 백테스팅""" positions = [] returns = [] close_values = data['Close'].values rsi_values = rsi.values # RSI 임계값 기반 매매 신호 for i in range(1, len(rsi_values)): if np.isnan(rsi_values[i-1]) or np.isnan(rsi_values[i]): continue # 과매도 상태에서 반등 시 매수 if (rsi_values[i-1] <= lower_threshold) and (rsi_values[i] > lower_threshold): positions.append((i, 'buy')) # 과매수 상태에서 반락 시 매도 elif (rsi_values[i-1] >= upper_threshold) and (rsi_values[i] < upper_threshold): positions.append((i, 'sell')) # 거래 수익률 계산 for i, (idx, action) in enumerate(positions[:-1]): if action == 'buy': buy_price = close_values[idx] sell_idx = positions[i + 1][0] sell_price = close_values[sell_idx] returns.append((sell_price - buy_price) / buy_price) # 포트폴리오 가치 시뮬레이션 portfolio_values = [1.0] position = False buy_idx = 0 for i, (idx, action) in enumerate(positions): # 이전 포지션 가치 확장 while len(portfolio_values) <= idx: portfolio_values.append(portfolio_values[-1]) # 매수 신호 if action == 'buy' and not position: position = True buy_idx = idx # 매도 신호 elif action == 'sell' and position: position = False buy_price = close_values[buy_idx] sell_price = close_values[idx] trade_return = (sell_price - buy_price) / buy_price portfolio_values[-1] *= (1 + trade_return) while len(portfolio_values) < len(close_values): portfolio_values.append(portfolio_values[-1]) # MDD 계산 cummax = np.maximum.accumulate(portfolio_values) drawdowns = (cummax - portfolio_values) / cummax mdd = np.max(drawdowns) if len(drawdowns) > 0 else 0 total_return = np.prod([1 + r for r in returns]) - 1 if returns else 0 return total_return, returns, portfolio_values, mdd def evaluate_rsi_strategy(data, period, upper_threshold, lower_threshold, risk_free_rates=None): """RSI 전략 평가""" # RSI 계산 df_temp = data.copy() df_temp = calculate_rsi(df_temp, period) rsi = df_temp[f'RSI_{period}'] total_return, returns, portfolio_values, mdd = backtest_rsi(data, rsi, upper_threshold, lower_threshold) if not returns: return {'annual_return': 0, 'sharpe_ratio': 0, 'mdd': 1, 'total_return': 0} # 무위험 수익률 가져오기 risk_free_rate = calculate_risk_free_rate(data, risk_free_rates) # 성능 지표 계산 metrics = calculate_performance_metrics(total_return, returns, portfolio_values, mdd, len(data), risk_free_rate) # RSI 특화 정보 추가 metrics.update({ 'period': period, 'upper_threshold': upper_threshold, 'lower_threshold': lower_threshold }) return metrics def optimize_rsi_parameters(data, risk_free_rates=None): """최적의 RSI 파라미터 찾기""" periods = range(5, 30, 2) # 5에서 28까지 2씩 증가 upper_thresholds = range(65, 85, 5) # 65에서 80까지 5씩 증가 lower_thresholds = range(15, 35, 5) # 15에서 30까지 5씩 증가 all_results = [] for period in periods: for upper in upper_thresholds: for lower in lower_thresholds: try: result = evaluate_rsi_strategy(data, period, upper, lower, risk_free_rates) all_results.append({ 'period': period, 'upper_threshold': upper, 'lower_threshold': lower, 'metrics': result }) except Exception as e: print(f"Error with RSI params {period}-{upper}-{lower}: {e}") continue if not all_results: return {'period': 14, 'upper_threshold': 70, 'lower_threshold': 30} # 기본값 # 최적 파라미터 찾기 best_params = find_optimal_parameters(all_results, ['period', 'upper_threshold', 'lower_threshold']) return best_params # ----------------------------- # 전체 최적화 실행 함수 # ----------------------------- def run_technical_optimization(tickers, start_date, end_date): """여러 주식 종목에 대해 기술적 지표 최적화를 실행하는 함수""" print("주식 데이터 다운로드 및 기술적 지표 최적화 중...") # 무위험 수익률 데이터 가져오기 try: # 이미 임포트된 함수 직접 사용 risk_free_rate = get_risk_free_rate(start_date=start_date, end_date=end_date) print(f"무위험 수익률 로드 성공: {risk_free_rate:.4f}") except Exception as e: print(f"무위험 수익률 로드 오류: {e}") risk_free_rate = 0.01 print(f"기본 무위험 수익률 사용: {risk_free_rate:.4f}") # 결과 저장 리스트 ema_params_list = [] macd_params_list = [] cmf_period_list = [] rsi_params_list = [] # 종목별 데이터 처리 및 최적화 for ticker in tickers: print(f"\n{ticker} 데이터 처리 중...") try: # 주가 데이터 다운로드 df = yf.download(ticker, start=start_date, end=end_date) if len(df) < 100: print(f"{ticker}: 충분한 데이터가 없습니다.") continue # 멀티인덱스 처리 if isinstance(df.columns, pd.MultiIndex): df.columns = df.columns.droplevel(1) # 1. EMA 최적화 - 무위험 수익률 전달 try: print(f" {ticker} EMA 최적화 중...") ema_opt = optimize_ema_parameters(df.copy(), risk_free_rates=risk_free_rate) ema_params_list.append(ema_opt) print(f" EMA 최적화 완료: 단기={ema_opt['short']}, 장기={ema_opt['long']}") except Exception as e: print(f" EMA 최적화 오류: {e}") ema_params_list.append({'short': 10, 'long': 50}) # 2. MACD 최적화 - 무위험 수익률 전달 try: print(f" {ticker} MACD 최적화 중...") macd_opt = optimize_macd_parameters(df.copy(), risk_free_rates=risk_free_rate) macd_params_list.append(macd_opt) print(f" MACD 최적화 완료: 빠름={macd_opt['fast']}, 느림={macd_opt['slow']}, 신호={macd_opt['signal']}") except Exception as e: print(f" MACD 최적화 오류: {e}") macd_params_list.append({'fast': 12, 'slow': 26, 'signal': 9}) # 3. CMF 최적화 - 무위험 수익률 전달 try: print(f" {ticker} CMF 최적화 중...") cmf_period = optimize_cmf_period(df.copy(), risk_free_rates=risk_free_rate) cmf_period_list.append(cmf_period) print(f" CMF 최적화 완료: 기간={cmf_period}") except Exception as e: print(f" CMF 최적화 오류: {e}") cmf_period_list.append(20) # 4. RSI 최적화 - 무위험 수익률 전달 try: print(f" {ticker} RSI 최적화 중...") rsi_opt = optimize_rsi_parameters(df.copy(), risk_free_rates=risk_free_rate) rsi_params_list.append(rsi_opt) print(f" RSI 최적화 완료: 기간={rsi_opt['period']}, 상한={rsi_opt['upper_threshold']}, 하한={rsi_opt['lower_threshold']}") except Exception as e: print(f" RSI 최적화 오류: {e}") rsi_params_list.append({'period': 14, 'upper_threshold': 70, 'lower_threshold': 30}) except Exception as e: print(f"{ticker} 데이터 처리 오류: {e}") continue # 평균 파라미터 계산 - 공통 로직으로 추출 가능 if ema_params_list: avg_ema_short = int(np.mean([p['short'] for p in ema_params_list])) avg_ema_long = int(np.mean([p['long'] for p in ema_params_list])) else: avg_ema_short, avg_ema_long = 10, 50 if macd_params_list: avg_macd_fast = int(np.mean([p['fast'] for p in macd_params_list])) avg_macd_slow = int(np.mean([p['slow'] for p in macd_params_list])) avg_macd_signal = int(np.mean([p['signal'] for p in macd_params_list])) else: avg_macd_fast, avg_macd_slow, avg_macd_signal = 12, 26, 9 avg_cmf_period = int(np.mean(cmf_period_list)) if cmf_period_list else 20 if rsi_params_list: avg_rsi_period = int(np.mean([p['period'] for p in rsi_params_list])) avg_rsi_upper = int(np.mean([p['upper_threshold'] for p in rsi_params_list])) avg_rsi_lower = int(np.mean([p['lower_threshold'] for p in rsi_params_list])) else: avg_rsi_period, avg_rsi_upper, avg_rsi_lower = 14, 70, 30 # 최종 파라미터 구성 optimal_params = { 'ema': {'short': avg_ema_short, 'long': avg_ema_long}, 'macd': {'fast': avg_macd_fast, 'slow': avg_macd_slow, 'signal': avg_macd_signal}, 'cmf': avg_cmf_period, 'rsi': {'period': avg_rsi_period, 'upper_threshold': avg_rsi_upper, 'lower_threshold': avg_rsi_lower} } print("\n===== 최적화된 (평균) 파라미터 =====") print(f"EMA: 단기={optimal_params['ema']['short']}, 장기={optimal_params['ema']['long']}") print(f"MACD: 빠름={optimal_params['macd']['fast']}, 느림={optimal_params['macd']['slow']}, 신호={optimal_params['macd']['signal']}") print(f"CMF 기간: {optimal_params['cmf']}") print(f"RSI: 기간={optimal_params['rsi']['period']}, 상한={optimal_params['rsi']['upper_threshold']}, 하한={optimal_params['rsi']['lower_threshold']}") return optimal_params