ostock-backend / model /src /evaluation /model_evaluation.py
johnaness's picture
Deploy OStock FastAPI backend to HF Space (Docker SDK, port 7860)
4be2d4d
"""
๋ชจ๋ธ ํ‰๊ฐ€ ๊ด€๋ จ ์œ ํ‹ธ๋ฆฌํ‹ฐ ํ•จ์ˆ˜ ๋ชจ๋“ˆ
"""
import numpy as np
import tensorflow as tf
from fastdtw import fastdtw
def evaluate_model(model, x_test, y_test, ticker_test, y_test_dt=None,
sector_test=None, industry_test=None, time_diffs_test=None, verbose=True):
"""
๋ชจ๋ธ์„ ํ‰๊ฐ€ํ•˜๊ณ  ์„ฑ๋Šฅ ์ง€ํ‘œ๋ฅผ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค.
"""
try:
# ์ž…๋ ฅ ๋ฐ์ดํ„ฐ ์ค€๋น„
inputs = [x_test, ticker_test]
# ์„ ํƒ์  ์ž…๋ ฅ๋“ค ์ถ”๊ฐ€
if sector_test is not None:
inputs.append(sector_test)
if industry_test is not None:
inputs.append(industry_test)
if time_diffs_test is not None:
inputs.append(time_diffs_test)
# ์˜ˆ์ธก ์ˆ˜ํ–‰
predictions = model.predict(inputs, verbose=0 if not verbose else 1)
# ์˜ˆ์ธก๊ฐ’ ์ฒ˜๋ฆฌ
if isinstance(predictions, list):
y_pred = predictions[0] # ๊ฐ’ ์˜ˆ์ธก
if len(predictions) > 1:
y_pred_dt = predictions[1] # ๋„ํ•จ์ˆ˜ ์˜ˆ์ธก
else:
y_pred_dt = None
else:
y_pred = predictions
y_pred_dt = None
# ํ˜•ํƒœ ์กฐ์ •
if len(y_pred.shape) == 3:
y_pred = y_pred[:, -1, 0] # ๋งˆ์ง€๋ง‰ ์‹œ์ ์˜ ์ฒซ ๋ฒˆ์งธ ํŠน์„ฑ
elif len(y_pred.shape) == 2 and y_pred.shape[1] > 1:
y_pred = y_pred[:, -1] # ๋งˆ์ง€๋ง‰ ์—ด
else:
y_pred = y_pred.flatten()
# ํƒ€๊ฒŸ ๊ฐ’ ํ˜•ํƒœ ์กฐ์ •
y_test_flat = y_test.flatten() if hasattr(y_test, 'flatten') else np.array(y_test).flatten()
# ๊ธธ์ด ๋งž์ถ”๊ธฐ
min_len = min(len(y_pred), len(y_test_flat))
y_pred = y_pred[:min_len]
y_test_flat = y_test_flat[:min_len]
# ๊ธฐ๋ณธ ํšŒ๊ท€ ์ง€ํ‘œ ๊ณ„์‚ฐ
mse = np.mean((y_pred - y_test_flat) ** 2)
mae = np.mean(np.abs(y_pred - y_test_flat))
rmse = np.sqrt(mse)
# ์ƒ๊ด€๊ณ„์ˆ˜
correlation = np.corrcoef(y_pred, y_test_flat)[0, 1] if len(y_pred) > 1 else 0.0
# Rยฒ ์ ์ˆ˜
ss_res = np.sum((y_test_flat - y_pred) ** 2)
ss_tot = np.sum((y_test_flat - np.mean(y_test_flat)) ** 2)
r2_score = 1 - (ss_res / ss_tot) if ss_tot != 0 else 0.0
# ๋ฐฉํ–ฅ์„ฑ ์ •ํ™•๋„
direction_accuracy = np.mean(np.sign(y_pred) == np.sign(y_test_flat))
metrics = {
'mse': float(mse),
'mae': float(mae),
'rmse': float(rmse),
'correlation': float(correlation),
'r2_score': float(r2_score),
'direction_accuracy': float(direction_accuracy)
}
# ๋„ํ•จ์ˆ˜ ํ‰๊ฐ€
if y_pred_dt is not None and y_test_dt is not None:
# ๋„ํ•จ์ˆ˜ ํ˜•ํƒœ ์กฐ์ •
if len(y_pred_dt.shape) == 3:
y_pred_dt = y_pred_dt[:, -1, 0]
elif len(y_pred_dt.shape) == 2 and y_pred_dt.shape[1] > 1:
y_pred_dt = y_pred_dt[:, -1]
else:
y_pred_dt = y_pred_dt.flatten()
y_test_dt_flat = y_test_dt.flatten() if hasattr(y_test_dt, 'flatten') else np.array(y_test_dt).flatten()
# ๊ธธ์ด ๋งž์ถ”๊ธฐ
min_len_dt = min(len(y_pred_dt), len(y_test_dt_flat))
y_pred_dt = y_pred_dt[:min_len_dt]
y_test_dt_flat = y_test_dt_flat[:min_len_dt]
# ๋„ํ•จ์ˆ˜ ์ง€ํ‘œ ๊ณ„์‚ฐ
dt_mse = np.mean((y_pred_dt - y_test_dt_flat) ** 2)
dt_mae = np.mean(np.abs(y_pred_dt - y_test_dt_flat))
dt_correlation = np.corrcoef(y_pred_dt, y_test_dt_flat)[0, 1] if len(y_pred_dt) > 1 else 0.0
metrics.update({
'dt_mse': float(dt_mse),
'dt_mae': float(dt_mae),
'dt_correlation': float(dt_correlation)
})
if verbose:
print(f"ํ‰๊ฐ€ ์™„๋ฃŒ - MSE: {mse:.6f}, MAE: {mae:.6f}, ์ƒ๊ด€๊ณ„์ˆ˜: {correlation:.4f}")
if 'dt_mse' in metrics:
print(f"๋„ํ•จ์ˆ˜ ํ‰๊ฐ€ - MSE: {metrics['dt_mse']:.6f}, MAE: {metrics['dt_mae']:.6f}")
return metrics
except Exception as e:
if verbose:
print(f"๋ชจ๋ธ ํ‰๊ฐ€ ์ค‘ ์˜ค๋ฅ˜: {e}")
return {
'mse': float('inf'),
'mae': float('inf'),
'rmse': float('inf'),
'correlation': 0.0,
'r2_score': 0.0,
'direction_accuracy': 0.0
}
def calculate_dtw(predictions, actual_returns):
"""
๋‘ ์‹œ๊ณ„์—ด ๊ฐ„์˜ Dynamic Time Warping ๊ฑฐ๋ฆฌ๋ฅผ ๊ณ„์‚ฐํ•ฉ๋‹ˆ๋‹ค.
"""
try:
def custom_euclidean(u, v):
u = float(u)
v = float(v)
return abs(u - v)
# ๋ฐ์ดํ„ฐ 1์ฐจ์›ํ™” ๋ฐ float๋กœ ๋ณ€ํ™˜
predictions = [float(x) for x in np.array(predictions).flatten()]
actual_returns = [float(x) for x in np.array(actual_returns).flatten()]
# ๊ธธ์ด ๋งž์ถ”๊ธฐ
min_len = min(len(predictions), len(actual_returns))
predictions = predictions[:min_len]
actual_returns = actual_returns[:min_len]
# ์‚ฌ์šฉ์ž ์ •์˜ ๊ฑฐ๋ฆฌ ํ•จ์ˆ˜๋กœ fastdtw ๊ณ„์‚ฐ
distance, _ = fastdtw(predictions, actual_returns, dist=custom_euclidean)
return distance
except Exception as e:
print(f"DTW ๊ณ„์‚ฐ ์ค‘ ์˜ค๋ฅ˜: {e}")
import traceback
print(traceback.format_exc())
return float('inf')
def calculate_tdi(predictions, actual_returns):
"""
๋‘ ์‹œ๊ณ„์—ด ๊ฐ„์˜ Temporal Distortion Index๋ฅผ ๊ณ„์‚ฐํ•ฉ๋‹ˆ๋‹ค.
"""
try:
def custom_euclidean(u, v):
u = float(u)
v = float(v)
return abs(u - v)
# ๋ฐ์ดํ„ฐ 1์ฐจ์›ํ™” ๋ฐ float๋กœ ๋ณ€ํ™˜
predictions = [float(x) for x in np.array(predictions).flatten()]
actual_returns = [float(x) for x in np.array(actual_returns).flatten()]
# ๊ธธ์ด ๋งž์ถ”๊ธฐ
min_len = min(len(predictions), len(actual_returns))
predictions = predictions[:min_len]
actual_returns = actual_returns[:min_len]
_, path = fastdtw(predictions, actual_returns, dist=custom_euclidean)
path = np.array(path)
# ์‹œํ€€์Šค ๊ธธ์ด
P = len(predictions)
# TDI ๊ณ„์‚ฐ
squared_offsets = (path[:, 0] - path[:, 1])**2
tdi = np.sum(squared_offsets) / (P**2)
return tdi
except Exception as e:
print(f"TDI ๊ณ„์‚ฐ ์ค‘ ์˜ค๋ฅ˜: {e}")
return float('inf')
def calculate_combined_score(backtest_result, min_trades=75, max_trades=125):
"""
๊ฑฐ๋ž˜ ํšŸ์ˆ˜์™€ ์ƒคํ”„ ๋น„์œจ์„ ๋™์‹œ์— ๊ณ ๋ คํ•˜๋Š” ๋ณตํ•ฉ ์ ์ˆ˜ ๊ณ„์‚ฐ
"""
portfolio = backtest_result.get('portfolio', {})
trades = len(portfolio.get('trades', []))
sharpe = portfolio.get('sharpe_ratio', 0)
# DTW์™€ TDI ๊ฐ’ ๊ฐ€์ ธ์˜ค๊ธฐ
dtw = max(portfolio.get('dtw', 1.0), 1e-6)
tdi = max(portfolio.get('tdi', 1.0), 1e-6)
# ๊ฑฐ๋ž˜ ํšŸ์ˆ˜์— ๋”ฐ๋ฅธ ๊ฐ€์ค‘์น˜
if trades < min_trades:
trade_weight = (trades / min_trades) ** 0.5 # ๋ถ€๋“œ๋Ÿฌ์šด ์ฆ๊ฐ€
elif trades > max_trades:
trade_weight = np.exp(-(trades - max_trades) / max_trades) # ์ง€์ˆ˜์  ๊ฐ์†Œ
else:
trade_weight = 1.0 # ์ตœ์  ๊ตฌ๊ฐ„
combined_score = (
0.7 * sharpe * trade_weight + # ์ƒคํ”„ ๋น„์œจ (๊ฑฐ๋ž˜ ํšŸ์ˆ˜ ๋ณด์ • ํฌํ•จ)
0.15 * (1 / dtw) + # DTW (๋‚ฎ์„์ˆ˜๋ก ์ข‹์Œ)
0.15 * (1 / tdi) # TDI (๋‚ฎ์„์ˆ˜๋ก ์ข‹์Œ)
)
return combined_score