Spaces:
Sleeping
Sleeping
| """ | |
| ๋ชจ๋ธ ํ๊ฐ ๊ด๋ จ ์ ํธ๋ฆฌํฐ ํจ์ ๋ชจ๋ | |
| """ | |
| import numpy as np | |
| import tensorflow as tf | |
| from fastdtw import fastdtw | |
| def evaluate_model(model, x_test, y_test, ticker_test, y_test_dt=None, | |
| sector_test=None, industry_test=None, time_diffs_test=None, verbose=True): | |
| """ | |
| ๋ชจ๋ธ์ ํ๊ฐํ๊ณ ์ฑ๋ฅ ์งํ๋ฅผ ๋ฐํํฉ๋๋ค. | |
| """ | |
| try: | |
| # ์ ๋ ฅ ๋ฐ์ดํฐ ์ค๋น | |
| inputs = [x_test, ticker_test] | |
| # ์ ํ์ ์ ๋ ฅ๋ค ์ถ๊ฐ | |
| if sector_test is not None: | |
| inputs.append(sector_test) | |
| if industry_test is not None: | |
| inputs.append(industry_test) | |
| if time_diffs_test is not None: | |
| inputs.append(time_diffs_test) | |
| # ์์ธก ์ํ | |
| predictions = model.predict(inputs, verbose=0 if not verbose else 1) | |
| # ์์ธก๊ฐ ์ฒ๋ฆฌ | |
| if isinstance(predictions, list): | |
| y_pred = predictions[0] # ๊ฐ ์์ธก | |
| if len(predictions) > 1: | |
| y_pred_dt = predictions[1] # ๋ํจ์ ์์ธก | |
| else: | |
| y_pred_dt = None | |
| else: | |
| y_pred = predictions | |
| y_pred_dt = None | |
| # ํํ ์กฐ์ | |
| if len(y_pred.shape) == 3: | |
| y_pred = y_pred[:, -1, 0] # ๋ง์ง๋ง ์์ ์ ์ฒซ ๋ฒ์งธ ํน์ฑ | |
| elif len(y_pred.shape) == 2 and y_pred.shape[1] > 1: | |
| y_pred = y_pred[:, -1] # ๋ง์ง๋ง ์ด | |
| else: | |
| y_pred = y_pred.flatten() | |
| # ํ๊ฒ ๊ฐ ํํ ์กฐ์ | |
| y_test_flat = y_test.flatten() if hasattr(y_test, 'flatten') else np.array(y_test).flatten() | |
| # ๊ธธ์ด ๋ง์ถ๊ธฐ | |
| min_len = min(len(y_pred), len(y_test_flat)) | |
| y_pred = y_pred[:min_len] | |
| y_test_flat = y_test_flat[:min_len] | |
| # ๊ธฐ๋ณธ ํ๊ท ์งํ ๊ณ์ฐ | |
| mse = np.mean((y_pred - y_test_flat) ** 2) | |
| mae = np.mean(np.abs(y_pred - y_test_flat)) | |
| rmse = np.sqrt(mse) | |
| # ์๊ด๊ณ์ | |
| correlation = np.corrcoef(y_pred, y_test_flat)[0, 1] if len(y_pred) > 1 else 0.0 | |
| # Rยฒ ์ ์ | |
| ss_res = np.sum((y_test_flat - y_pred) ** 2) | |
| ss_tot = np.sum((y_test_flat - np.mean(y_test_flat)) ** 2) | |
| r2_score = 1 - (ss_res / ss_tot) if ss_tot != 0 else 0.0 | |
| # ๋ฐฉํฅ์ฑ ์ ํ๋ | |
| direction_accuracy = np.mean(np.sign(y_pred) == np.sign(y_test_flat)) | |
| metrics = { | |
| 'mse': float(mse), | |
| 'mae': float(mae), | |
| 'rmse': float(rmse), | |
| 'correlation': float(correlation), | |
| 'r2_score': float(r2_score), | |
| 'direction_accuracy': float(direction_accuracy) | |
| } | |
| # ๋ํจ์ ํ๊ฐ | |
| if y_pred_dt is not None and y_test_dt is not None: | |
| # ๋ํจ์ ํํ ์กฐ์ | |
| if len(y_pred_dt.shape) == 3: | |
| y_pred_dt = y_pred_dt[:, -1, 0] | |
| elif len(y_pred_dt.shape) == 2 and y_pred_dt.shape[1] > 1: | |
| y_pred_dt = y_pred_dt[:, -1] | |
| else: | |
| y_pred_dt = y_pred_dt.flatten() | |
| y_test_dt_flat = y_test_dt.flatten() if hasattr(y_test_dt, 'flatten') else np.array(y_test_dt).flatten() | |
| # ๊ธธ์ด ๋ง์ถ๊ธฐ | |
| min_len_dt = min(len(y_pred_dt), len(y_test_dt_flat)) | |
| y_pred_dt = y_pred_dt[:min_len_dt] | |
| y_test_dt_flat = y_test_dt_flat[:min_len_dt] | |
| # ๋ํจ์ ์งํ ๊ณ์ฐ | |
| dt_mse = np.mean((y_pred_dt - y_test_dt_flat) ** 2) | |
| dt_mae = np.mean(np.abs(y_pred_dt - y_test_dt_flat)) | |
| dt_correlation = np.corrcoef(y_pred_dt, y_test_dt_flat)[0, 1] if len(y_pred_dt) > 1 else 0.0 | |
| metrics.update({ | |
| 'dt_mse': float(dt_mse), | |
| 'dt_mae': float(dt_mae), | |
| 'dt_correlation': float(dt_correlation) | |
| }) | |
| if verbose: | |
| print(f"ํ๊ฐ ์๋ฃ - MSE: {mse:.6f}, MAE: {mae:.6f}, ์๊ด๊ณ์: {correlation:.4f}") | |
| if 'dt_mse' in metrics: | |
| print(f"๋ํจ์ ํ๊ฐ - MSE: {metrics['dt_mse']:.6f}, MAE: {metrics['dt_mae']:.6f}") | |
| return metrics | |
| except Exception as e: | |
| if verbose: | |
| print(f"๋ชจ๋ธ ํ๊ฐ ์ค ์ค๋ฅ: {e}") | |
| return { | |
| 'mse': float('inf'), | |
| 'mae': float('inf'), | |
| 'rmse': float('inf'), | |
| 'correlation': 0.0, | |
| 'r2_score': 0.0, | |
| 'direction_accuracy': 0.0 | |
| } | |
| def calculate_dtw(predictions, actual_returns): | |
| """ | |
| ๋ ์๊ณ์ด ๊ฐ์ Dynamic Time Warping ๊ฑฐ๋ฆฌ๋ฅผ ๊ณ์ฐํฉ๋๋ค. | |
| """ | |
| try: | |
| def custom_euclidean(u, v): | |
| u = float(u) | |
| v = float(v) | |
| return abs(u - v) | |
| # ๋ฐ์ดํฐ 1์ฐจ์ํ ๋ฐ float๋ก ๋ณํ | |
| predictions = [float(x) for x in np.array(predictions).flatten()] | |
| actual_returns = [float(x) for x in np.array(actual_returns).flatten()] | |
| # ๊ธธ์ด ๋ง์ถ๊ธฐ | |
| min_len = min(len(predictions), len(actual_returns)) | |
| predictions = predictions[:min_len] | |
| actual_returns = actual_returns[:min_len] | |
| # ์ฌ์ฉ์ ์ ์ ๊ฑฐ๋ฆฌ ํจ์๋ก fastdtw ๊ณ์ฐ | |
| distance, _ = fastdtw(predictions, actual_returns, dist=custom_euclidean) | |
| return distance | |
| except Exception as e: | |
| print(f"DTW ๊ณ์ฐ ์ค ์ค๋ฅ: {e}") | |
| import traceback | |
| print(traceback.format_exc()) | |
| return float('inf') | |
| def calculate_tdi(predictions, actual_returns): | |
| """ | |
| ๋ ์๊ณ์ด ๊ฐ์ Temporal Distortion Index๋ฅผ ๊ณ์ฐํฉ๋๋ค. | |
| """ | |
| try: | |
| def custom_euclidean(u, v): | |
| u = float(u) | |
| v = float(v) | |
| return abs(u - v) | |
| # ๋ฐ์ดํฐ 1์ฐจ์ํ ๋ฐ float๋ก ๋ณํ | |
| predictions = [float(x) for x in np.array(predictions).flatten()] | |
| actual_returns = [float(x) for x in np.array(actual_returns).flatten()] | |
| # ๊ธธ์ด ๋ง์ถ๊ธฐ | |
| min_len = min(len(predictions), len(actual_returns)) | |
| predictions = predictions[:min_len] | |
| actual_returns = actual_returns[:min_len] | |
| _, path = fastdtw(predictions, actual_returns, dist=custom_euclidean) | |
| path = np.array(path) | |
| # ์ํ์ค ๊ธธ์ด | |
| P = len(predictions) | |
| # TDI ๊ณ์ฐ | |
| squared_offsets = (path[:, 0] - path[:, 1])**2 | |
| tdi = np.sum(squared_offsets) / (P**2) | |
| return tdi | |
| except Exception as e: | |
| print(f"TDI ๊ณ์ฐ ์ค ์ค๋ฅ: {e}") | |
| return float('inf') | |
| def calculate_combined_score(backtest_result, min_trades=75, max_trades=125): | |
| """ | |
| ๊ฑฐ๋ ํ์์ ์คํ ๋น์จ์ ๋์์ ๊ณ ๋ คํ๋ ๋ณตํฉ ์ ์ ๊ณ์ฐ | |
| """ | |
| portfolio = backtest_result.get('portfolio', {}) | |
| trades = len(portfolio.get('trades', [])) | |
| sharpe = portfolio.get('sharpe_ratio', 0) | |
| # DTW์ TDI ๊ฐ ๊ฐ์ ธ์ค๊ธฐ | |
| dtw = max(portfolio.get('dtw', 1.0), 1e-6) | |
| tdi = max(portfolio.get('tdi', 1.0), 1e-6) | |
| # ๊ฑฐ๋ ํ์์ ๋ฐ๋ฅธ ๊ฐ์ค์น | |
| if trades < min_trades: | |
| trade_weight = (trades / min_trades) ** 0.5 # ๋ถ๋๋ฌ์ด ์ฆ๊ฐ | |
| elif trades > max_trades: | |
| trade_weight = np.exp(-(trades - max_trades) / max_trades) # ์ง์์ ๊ฐ์ | |
| else: | |
| trade_weight = 1.0 # ์ต์ ๊ตฌ๊ฐ | |
| combined_score = ( | |
| 0.7 * sharpe * trade_weight + # ์คํ ๋น์จ (๊ฑฐ๋ ํ์ ๋ณด์ ํฌํจ) | |
| 0.15 * (1 / dtw) + # DTW (๋ฎ์์๋ก ์ข์) | |
| 0.15 * (1 / tdi) # TDI (๋ฎ์์๋ก ์ข์) | |
| ) | |
| return combined_score | |