""" 모델 평가 관련 유틸리티 함수 모듈 """ import numpy as np import tensorflow as tf from fastdtw import fastdtw def evaluate_model(model, x_test, y_test, ticker_test, y_test_dt=None, sector_test=None, industry_test=None, time_diffs_test=None, verbose=True): """ 모델을 평가하고 성능 지표를 반환합니다. """ try: # 입력 데이터 준비 inputs = [x_test, ticker_test] # 선택적 입력들 추가 if sector_test is not None: inputs.append(sector_test) if industry_test is not None: inputs.append(industry_test) if time_diffs_test is not None: inputs.append(time_diffs_test) # 예측 수행 predictions = model.predict(inputs, verbose=0 if not verbose else 1) # 예측값 처리 if isinstance(predictions, list): y_pred = predictions[0] # 값 예측 if len(predictions) > 1: y_pred_dt = predictions[1] # 도함수 예측 else: y_pred_dt = None else: y_pred = predictions y_pred_dt = None # 형태 조정 if len(y_pred.shape) == 3: y_pred = y_pred[:, -1, 0] # 마지막 시점의 첫 번째 특성 elif len(y_pred.shape) == 2 and y_pred.shape[1] > 1: y_pred = y_pred[:, -1] # 마지막 열 else: y_pred = y_pred.flatten() # 타겟 값 형태 조정 y_test_flat = y_test.flatten() if hasattr(y_test, 'flatten') else np.array(y_test).flatten() # 길이 맞추기 min_len = min(len(y_pred), len(y_test_flat)) y_pred = y_pred[:min_len] y_test_flat = y_test_flat[:min_len] # 기본 회귀 지표 계산 mse = np.mean((y_pred - y_test_flat) ** 2) mae = np.mean(np.abs(y_pred - y_test_flat)) rmse = np.sqrt(mse) # 상관계수 correlation = np.corrcoef(y_pred, y_test_flat)[0, 1] if len(y_pred) > 1 else 0.0 # R² 점수 ss_res = np.sum((y_test_flat - y_pred) ** 2) ss_tot = np.sum((y_test_flat - np.mean(y_test_flat)) ** 2) r2_score = 1 - (ss_res / ss_tot) if ss_tot != 0 else 0.0 # 방향성 정확도 direction_accuracy = np.mean(np.sign(y_pred) == np.sign(y_test_flat)) metrics = { 'mse': float(mse), 'mae': float(mae), 'rmse': float(rmse), 'correlation': float(correlation), 'r2_score': float(r2_score), 'direction_accuracy': float(direction_accuracy) } # 도함수 평가 if y_pred_dt is not None and y_test_dt is not None: # 도함수 형태 조정 if len(y_pred_dt.shape) == 3: y_pred_dt = y_pred_dt[:, -1, 0] elif len(y_pred_dt.shape) == 2 and y_pred_dt.shape[1] > 1: y_pred_dt = y_pred_dt[:, -1] else: y_pred_dt = y_pred_dt.flatten() y_test_dt_flat = y_test_dt.flatten() if hasattr(y_test_dt, 'flatten') else np.array(y_test_dt).flatten() # 길이 맞추기 min_len_dt = min(len(y_pred_dt), len(y_test_dt_flat)) y_pred_dt = y_pred_dt[:min_len_dt] y_test_dt_flat = y_test_dt_flat[:min_len_dt] # 도함수 지표 계산 dt_mse = np.mean((y_pred_dt - y_test_dt_flat) ** 2) dt_mae = np.mean(np.abs(y_pred_dt - y_test_dt_flat)) dt_correlation = np.corrcoef(y_pred_dt, y_test_dt_flat)[0, 1] if len(y_pred_dt) > 1 else 0.0 metrics.update({ 'dt_mse': float(dt_mse), 'dt_mae': float(dt_mae), 'dt_correlation': float(dt_correlation) }) if verbose: print(f"평가 완료 - MSE: {mse:.6f}, MAE: {mae:.6f}, 상관계수: {correlation:.4f}") if 'dt_mse' in metrics: print(f"도함수 평가 - MSE: {metrics['dt_mse']:.6f}, MAE: {metrics['dt_mae']:.6f}") return metrics except Exception as e: if verbose: print(f"모델 평가 중 오류: {e}") return { 'mse': float('inf'), 'mae': float('inf'), 'rmse': float('inf'), 'correlation': 0.0, 'r2_score': 0.0, 'direction_accuracy': 0.0 } def calculate_dtw(predictions, actual_returns): """ 두 시계열 간의 Dynamic Time Warping 거리를 계산합니다. """ try: def custom_euclidean(u, v): u = float(u) v = float(v) return abs(u - v) # 데이터 1차원화 및 float로 변환 predictions = [float(x) for x in np.array(predictions).flatten()] actual_returns = [float(x) for x in np.array(actual_returns).flatten()] # 길이 맞추기 min_len = min(len(predictions), len(actual_returns)) predictions = predictions[:min_len] actual_returns = actual_returns[:min_len] # 사용자 정의 거리 함수로 fastdtw 계산 distance, _ = fastdtw(predictions, actual_returns, dist=custom_euclidean) return distance except Exception as e: print(f"DTW 계산 중 오류: {e}") import traceback print(traceback.format_exc()) return float('inf') def calculate_tdi(predictions, actual_returns): """ 두 시계열 간의 Temporal Distortion Index를 계산합니다. """ try: def custom_euclidean(u, v): u = float(u) v = float(v) return abs(u - v) # 데이터 1차원화 및 float로 변환 predictions = [float(x) for x in np.array(predictions).flatten()] actual_returns = [float(x) for x in np.array(actual_returns).flatten()] # 길이 맞추기 min_len = min(len(predictions), len(actual_returns)) predictions = predictions[:min_len] actual_returns = actual_returns[:min_len] _, path = fastdtw(predictions, actual_returns, dist=custom_euclidean) path = np.array(path) # 시퀀스 길이 P = len(predictions) # TDI 계산 squared_offsets = (path[:, 0] - path[:, 1])**2 tdi = np.sum(squared_offsets) / (P**2) return tdi except Exception as e: print(f"TDI 계산 중 오류: {e}") return float('inf') def calculate_combined_score(backtest_result, min_trades=75, max_trades=125): """ 거래 횟수와 샤프 비율을 동시에 고려하는 복합 점수 계산 """ portfolio = backtest_result.get('portfolio', {}) trades = len(portfolio.get('trades', [])) sharpe = portfolio.get('sharpe_ratio', 0) # DTW와 TDI 값 가져오기 dtw = max(portfolio.get('dtw', 1.0), 1e-6) tdi = max(portfolio.get('tdi', 1.0), 1e-6) # 거래 횟수에 따른 가중치 if trades < min_trades: trade_weight = (trades / min_trades) ** 0.5 # 부드러운 증가 elif trades > max_trades: trade_weight = np.exp(-(trades - max_trades) / max_trades) # 지수적 감소 else: trade_weight = 1.0 # 최적 구간 combined_score = ( 0.7 * sharpe * trade_weight + # 샤프 비율 (거래 횟수 보정 포함) 0.15 * (1 / dtw) + # DTW (낮을수록 좋음) 0.15 * (1 / tdi) # TDI (낮을수록 좋음) ) return combined_score