ostock-backend / model /src /data /optimization.py
johnaness's picture
Deploy OStock FastAPI backend to HF Space (Docker SDK, port 7860)
4be2d4d
"""
๊ธฐ์ˆ ์  ์ง€ํ‘œ ์ตœ์ ํ™” ๊ด€๋ จ ํ•จ์ˆ˜
"""
import numpy as np
import pandas as pd
import yfinance as yf
from ..evaluation.backtest import get_risk_free_rate
from .technical_indicators import calculate_ema_series, calculate_macd, calculate_cmf, calculate_rsi
def calculate_risk_free_rate(data, risk_free_rates=None):
"""๋ฌด์œ„ํ—˜ ์ˆ˜์ต๋ฅ  ๊ณ„์‚ฐ์„ ์œ„ํ•œ ๊ณตํ†ต ํ•จ์ˆ˜"""
annual_rf_rate = 0.01 # ๊ธฐ๋ณธ๊ฐ’
if risk_free_rates is not None:
try:
# ๋‹จ์ผ ๊ฐ’(float)์ธ ๊ฒฝ์šฐ
if isinstance(risk_free_rates, (float, int)):
annual_rf_rate = float(risk_free_rates)
# Series/DataFrame์ธ ๊ฒฝ์šฐ
elif hasattr(risk_free_rates, 'index'):
start_date = data.index[0]
end_date = data.index[-1]
try:
rf_subset = risk_free_rates.loc[start_date:end_date]
if not rf_subset.empty:
annual_rf_rate = rf_subset.mean()
else:
annual_rf_rate = risk_free_rates.mean()
except:
annual_rf_rate = risk_free_rates.mean()
except Exception as e:
print(f"๋ฌด์œ„ํ—˜ ์ˆ˜์ต๋ฅ  ์ฒ˜๋ฆฌ ์˜ค๋ฅ˜: {e}")
annual_rf_rate = 0.01
return annual_rf_rate
def minmax_scale(value, min_val, max_val, inverse=False):
"""์ •๊ทœํ™”๋ฅผ ์œ„ํ•œ ๊ณตํ†ต ํ•จ์ˆ˜"""
if max_val == min_val:
return 0
normalized = (value - min_val) / (max_val - min_val)
return 1 - normalized if inverse else normalized
def calculate_performance_metrics(total_return, returns, portfolio_values, mdd, data_length, risk_free_rate):
"""์„ฑ๋Šฅ ์ง€ํ‘œ ๊ณ„์‚ฐ์„ ์œ„ํ•œ ๊ณตํ†ต ํ•จ์ˆ˜"""
# ์—ฐ๊ฐ„์ˆ˜์ต๋ฅ  ๊ณ„์‚ฐ
annual_return = ((1 + total_return) ** (252/data_length)) - 1
annual_std = np.std(returns) * np.sqrt(252) if returns else 0
# ์ƒคํ”„์ง€์ˆ˜ ๊ณ„์‚ฐ
sharpe_ratio = (annual_return - risk_free_rate) / annual_std if annual_std != 0 else 0
return {
'annual_return': annual_return,
'sharpe_ratio': sharpe_ratio,
'mdd': mdd,
'total_return': total_return
}
def find_optimal_parameters(all_results, params_keys, metric_weights={'sharpe_ratio': 0.9, 'mdd': 0.1}):
"""์ตœ์  ํŒŒ๋ผ๋ฏธํ„ฐ ์ฐพ๊ธฐ๋ฅผ ์œ„ํ•œ ๊ณตํ†ต ํ•จ์ˆ˜"""
if not all_results:
return None
# ์ •๊ทœํ™”๋ฅผ ์œ„ํ•œ ์ตœ์†Œ/์ตœ๋Œ€๊ฐ’ ์ฐพ๊ธฐ
sharpe_ratios = [r['metrics']['sharpe_ratio'] for r in all_results]
mdds = [r['metrics']['mdd'] for r in all_results]
min_sharpe, max_sharpe = min(sharpe_ratios), max(sharpe_ratios)
min_mdd, max_mdd = min(mdds), max(mdds)
# ์ตœ์  ํŒŒ๋ผ๋ฏธํ„ฐ ์ฐพ๊ธฐ
best_score = float('-inf')
best_params = None
for result in all_results:
normalized_sharpe = minmax_scale(result['metrics']['sharpe_ratio'], min_sharpe, max_sharpe)
normalized_mdd = minmax_scale(result['metrics']['mdd'], min_mdd, max_mdd, inverse=True)
score = (metric_weights['sharpe_ratio'] * normalized_sharpe +
metric_weights['mdd'] * normalized_mdd)
if score > best_score:
best_score = score
best_params = {key: result[key] for key in params_keys}
best_params['metrics'] = result['metrics']
return best_params
def backtest_ema(data, short_ema, long_ema):
"""EMA ํฌ๋กœ์Šค์˜ค๋ฒ„ ๋ฐฑํ…Œ์ŠคํŒ…"""
positions = []
returns = []
close_values = data['Close'].values
short_values = short_ema.values
long_values = long_ema.values
for i in range(1, len(short_values)):
if (short_values[i-1] <= long_values[i-1]) and (short_values[i] > long_values[i]):
positions.append((i, 'buy'))
elif (short_values[i-1] >= long_values[i-1]) and (short_values[i] < long_values[i]):
positions.append((i, 'sell'))
# ๊ฑฐ๋ž˜ ์ˆ˜์ต๋ฅ  ๊ณ„์‚ฐ
for i, (idx, action) in enumerate(positions[:-1]):
if action == 'buy':
buy_price = close_values[idx]
sell_idx = positions[i + 1][0]
sell_price = close_values[sell_idx]
returns.append((sell_price - buy_price) / buy_price)
# ํฌํŠธํด๋ฆฌ์˜ค ๊ฐ€์น˜ ์‹œ๋ฎฌ๋ ˆ์ด์…˜
portfolio_values = [1.0]
position = False
buy_idx = 0
for i in range(1, len(short_values)):
portfolio_values.append(portfolio_values[-1])
# ๋งค์ˆ˜ ์‹ ํ˜ธ
if (short_values[i-1] <= long_values[i-1]) and (short_values[i] > long_values[i]) and not position:
position = True
buy_idx = i
# ๋งค๋„ ์‹ ํ˜ธ
elif (short_values[i-1] >= long_values[i-1]) and (short_values[i] < long_values[i]) and position:
position = False
buy_price = close_values[buy_idx]
sell_price = close_values[i]
trade_return = (sell_price - buy_price) / buy_price
portfolio_values[-1] *= (1 + trade_return)
# MDD ๊ณ„์‚ฐ
cummax = np.maximum.accumulate(portfolio_values)
drawdowns = (cummax - portfolio_values) / cummax
mdd = np.max(drawdowns) if len(drawdowns) > 0 else 0
total_return = np.prod([1 + r for r in returns]) - 1 if returns else 0
return total_return, returns, portfolio_values, mdd
def evaluate_ema_strategy(data, short_period, long_period, risk_free_rates=None):
"""EMA ์ „๋žต ํ‰๊ฐ€"""
short_ema = calculate_ema_series(data['Close'], short_period)
long_ema = calculate_ema_series(data['Close'], long_period)
total_return, returns, portfolio_values, mdd = backtest_ema(data, short_ema, long_ema)
if not returns:
return {'annual_return': 0, 'sharpe_ratio': 0, 'mdd': 1, 'total_return': 0}
# ๋ฌด์œ„ํ—˜ ์ˆ˜์ต๋ฅ  ๊ฐ€์ ธ์˜ค๊ธฐ
risk_free_rate = calculate_risk_free_rate(data, risk_free_rates)
# ์„ฑ๋Šฅ ์ง€ํ‘œ ๊ณ„์‚ฐ
metrics = calculate_performance_metrics(total_return, returns, portfolio_values, mdd, len(data), risk_free_rate)
# EMA ํŠนํ™” ์ •๋ณด ์ถ”๊ฐ€
metrics.update({
'short_period': short_period,
'long_period': long_period
})
return metrics
def optimize_ema_parameters(data, risk_free_rates=None):
"""์ตœ์ ์˜ EMA ํŒŒ๋ผ๋ฏธํ„ฐ ์ฐพ๊ธฐ"""
short_periods = range(5, 50, 5)
long_periods = range(50, 200, 10)
all_results = []
for short in short_periods:
for long in long_periods:
if short >= long:
continue
try:
result = evaluate_ema_strategy(data, short, long, risk_free_rates)
all_results.append({
'short': short,
'long': long,
'metrics': result
})
except Exception as e:
print(f"Error with EMA params {short}-{long}: {e}")
continue
if not all_results:
return {'short': 10, 'long': 50} # ๊ธฐ๋ณธ๊ฐ’
# ์ตœ์  ํŒŒ๋ผ๋ฏธํ„ฐ ์ฐพ๊ธฐ
best_params = find_optimal_parameters(all_results, ['short', 'long'])
return best_params
def backtest_macd(data, macd, signal):
"""MACD ๋ฐฑํ…Œ์ŠคํŒ… ํ•จ์ˆ˜"""
positions = []
returns = []
macd_values = macd.values
signal_values = signal.values
close_values = data['Close'].values
for i in range(1, len(macd_values)):
if (macd_values[i - 1] <= signal_values[i - 1]) and (macd_values[i] > signal_values[i]):
positions.append((i, 'buy'))
elif (macd_values[i - 1] >= signal_values[i - 1]) and (macd_values[i] < signal_values[i]):
positions.append((i, 'sell'))
# ๊ฑฐ๋ž˜ ์ˆ˜์ต๋ฅ  ๊ณ„์‚ฐ
for i, (idx, action) in enumerate(positions[:-1]):
if action == 'buy':
buy_price = close_values[idx]
sell_idx = positions[i + 1][0]
sell_price = close_values[sell_idx]
returns.append((sell_price - buy_price) / buy_price)
# ํฌํŠธํด๋ฆฌ์˜ค ๊ฐ€์น˜ ์‹œ๋ฎฌ๋ ˆ์ด์…˜
portfolio_values = [1.0]
position = False
buy_idx = 0
for i, (idx, action) in enumerate(positions):
# ์ด์ „ ํฌ์ง€์…˜ ๊ฐ€์น˜ ํ™•์žฅ
while len(portfolio_values) <= idx:
portfolio_values.append(portfolio_values[-1])
# ๋งค์ˆ˜ ์‹ ํ˜ธ
if action == 'buy' and not position:
position = True
buy_idx = idx
# ๋งค๋„ ์‹ ํ˜ธ
elif action == 'sell' and position:
position = False
buy_price = close_values[buy_idx]
sell_price = close_values[idx]
trade_return = (sell_price - buy_price) / buy_price
portfolio_values[-1] *= (1 + trade_return)
while len(portfolio_values) < len(close_values):
portfolio_values.append(portfolio_values[-1])
# MDD ๊ณ„์‚ฐ
cummax = np.maximum.accumulate(portfolio_values)
drawdowns = (cummax - portfolio_values) / cummax
mdd = np.max(drawdowns) if len(drawdowns) > 0 else 0
total_return = np.prod([1 + r for r in returns]) - 1 if returns else 0
return total_return, returns, portfolio_values, mdd
def evaluate_macd_strategy(data, fast_period, slow_period, signal_period, risk_free_rates=None):
"""MACD ์ „๋žต ํ‰๊ฐ€"""
macd, signal = calculate_macd(data, fast_period, slow_period, signal_period)
total_return, returns, portfolio_values, mdd = backtest_macd(data, macd, signal)
if not returns:
return {'annual_return': 0, 'sharpe_ratio': 0, 'mdd': 1, 'total_return': 0}
# ๋ฌด์œ„ํ—˜ ์ˆ˜์ต๋ฅ  ๊ฐ€์ ธ์˜ค๊ธฐ
risk_free_rate = calculate_risk_free_rate(data, risk_free_rates)
# ์„ฑ๋Šฅ ์ง€ํ‘œ ๊ณ„์‚ฐ
metrics = calculate_performance_metrics(total_return, returns, portfolio_values, mdd, len(data), risk_free_rate)
# MACD ํŠนํ™” ์ •๋ณด ์ถ”๊ฐ€
metrics.update({
'fast_period': fast_period,
'slow_period': slow_period,
'signal_period': signal_period
})
return metrics
def optimize_macd_parameters(data, risk_free_rates=None):
"""MACD ํŒŒ๋ผ๋ฏธํ„ฐ ์ตœ์ ํ™”"""
fast_periods = range(5, 20, 2)
slow_periods = range(20, 60, 5)
signal_periods = range(5, 20, 2)
all_results = []
for fast in fast_periods:
for slow in slow_periods:
if fast >= slow:
continue
for signal in signal_periods:
try:
result = evaluate_macd_strategy(data, fast, slow, signal, risk_free_rates)
all_results.append({
'fast': fast,
'slow': slow,
'signal': signal,
'metrics': result
})
except Exception as e:
print(f"Error with MACD params {fast}-{slow}-{signal}: {e}")
continue
if not all_results:
return {'fast': 12, 'slow': 26, 'signal': 9} # ๊ธฐ๋ณธ๊ฐ’
# ์ตœ์  ํŒŒ๋ผ๋ฏธํ„ฐ ์ฐพ๊ธฐ
best_params = find_optimal_parameters(all_results, ['fast', 'slow', 'signal'])
return best_params
def backtest_cmf(data, cmf, threshold=0.05):
"""CMF ๋ฐฑํ…Œ์ŠคํŒ…"""
positions = []
returns = []
close_values = data['Close'].values
cmf_values = cmf.values
# ๋งค๋งค ์‹ ํ˜ธ ์ƒ์„ฑ
for i in range(1, len(cmf_values)):
if np.isnan(cmf_values[i-1]) or np.isnan(cmf_values[i]):
continue
if (cmf_values[i-1] <= threshold) and (cmf_values[i] > threshold):
positions.append((i, 'buy'))
elif (cmf_values[i-1] >= -threshold) and (cmf_values[i] < -threshold):
positions.append((i, 'sell'))
# ๊ฑฐ๋ž˜ ์ˆ˜์ต๋ฅ  ๊ณ„์‚ฐ
for i, (idx, action) in enumerate(positions[:-1]):
if action == 'buy':
buy_price = close_values[idx]
sell_idx = positions[i + 1][0]
sell_price = close_values[sell_idx]
returns.append((sell_price - buy_price) / buy_price)
# ํฌํŠธํด๋ฆฌ์˜ค ๊ฐ€์น˜ ์‹œ๋ฎฌ๋ ˆ์ด์…˜
portfolio_values = [1.0]
position = False
buy_idx = 0
for i, (idx, action) in enumerate(positions):
# ์ด์ „ ํฌ์ง€์…˜ ๊ฐ€์น˜ ํ™•์žฅ
while len(portfolio_values) <= idx:
portfolio_values.append(portfolio_values[-1])
# ๋งค์ˆ˜ ์‹ ํ˜ธ
if action == 'buy' and not position:
position = True
buy_idx = idx
# ๋งค๋„ ์‹ ํ˜ธ
elif action == 'sell' and position:
position = False
buy_price = close_values[buy_idx]
sell_price = close_values[idx]
trade_return = (sell_price - buy_price) / buy_price
portfolio_values[-1] *= (1 + trade_return)
while len(portfolio_values) < len(close_values):
portfolio_values.append(portfolio_values[-1])
# MDD ๊ณ„์‚ฐ
cummax = np.maximum.accumulate(portfolio_values)
drawdowns = (cummax - portfolio_values) / cummax
mdd = np.max(drawdowns) if len(drawdowns) > 0 else 0
total_return = np.prod([1 + r for r in returns]) - 1 if returns else 0
return total_return, returns, portfolio_values, mdd
def evaluate_cmf_strategy(data, period, threshold=0.05, risk_free_rates=None):
"""CMF ์ „๋žต ํ‰๊ฐ€"""
# CMF ๊ณ„์‚ฐ
df_temp = data.copy()
df_temp = calculate_cmf(df_temp, period)
cmf = df_temp[f'CMF_{period}']
total_return, returns, portfolio_values, mdd = backtest_cmf(data, cmf, threshold)
if not returns:
return {'annual_return': 0, 'sharpe_ratio': 0, 'mdd': 1, 'total_return': 0}
# ๋ฌด์œ„ํ—˜ ์ˆ˜์ต๋ฅ  ๊ฐ€์ ธ์˜ค๊ธฐ
risk_free_rate = calculate_risk_free_rate(data, risk_free_rates)
# ์„ฑ๋Šฅ ์ง€ํ‘œ ๊ณ„์‚ฐ
metrics = calculate_performance_metrics(total_return, returns, portfolio_values, mdd, len(data), risk_free_rate)
# CMF ํŠนํ™” ์ •๋ณด ์ถ”๊ฐ€
metrics.update({
'period': period
})
return metrics
def optimize_cmf_period(data, risk_free_rates=None):
"""์ตœ์ ์˜ CMF ๊ธฐ๊ฐ„ ์ฐพ๊ธฐ"""
periods = range(10, 50, 5) # 10์—์„œ 45๊นŒ์ง€ 5์”ฉ ์ฆ๊ฐ€
all_results = []
for period in periods:
try:
result = evaluate_cmf_strategy(data, period, risk_free_rates=risk_free_rates)
all_results.append({
'period': period,
'metrics': result
})
except Exception as e:
print(f"CMF period {period} optimization error: {e}")
continue
if not all_results:
return 20
# ์ตœ์  ํŒŒ๋ผ๋ฏธํ„ฐ ์ฐพ๊ธฐ
best_result = find_optimal_parameters(all_results, ['period'])
return best_result['period'] if best_result else 20
def backtest_rsi(data, rsi, upper_threshold, lower_threshold):
"""RSI ๋ฐฑํ…Œ์ŠคํŒ…"""
positions = []
returns = []
close_values = data['Close'].values
rsi_values = rsi.values
# RSI ์ž„๊ณ„๊ฐ’ ๊ธฐ๋ฐ˜ ๋งค๋งค ์‹ ํ˜ธ
for i in range(1, len(rsi_values)):
if np.isnan(rsi_values[i-1]) or np.isnan(rsi_values[i]):
continue
# ๊ณผ๋งค๋„ ์ƒํƒœ์—์„œ ๋ฐ˜๋“ฑ ์‹œ ๋งค์ˆ˜
if (rsi_values[i-1] <= lower_threshold) and (rsi_values[i] > lower_threshold):
positions.append((i, 'buy'))
# ๊ณผ๋งค์ˆ˜ ์ƒํƒœ์—์„œ ๋ฐ˜๋ฝ ์‹œ ๋งค๋„
elif (rsi_values[i-1] >= upper_threshold) and (rsi_values[i] < upper_threshold):
positions.append((i, 'sell'))
# ๊ฑฐ๋ž˜ ์ˆ˜์ต๋ฅ  ๊ณ„์‚ฐ
for i, (idx, action) in enumerate(positions[:-1]):
if action == 'buy':
buy_price = close_values[idx]
sell_idx = positions[i + 1][0]
sell_price = close_values[sell_idx]
returns.append((sell_price - buy_price) / buy_price)
# ํฌํŠธํด๋ฆฌ์˜ค ๊ฐ€์น˜ ์‹œ๋ฎฌ๋ ˆ์ด์…˜
portfolio_values = [1.0]
position = False
buy_idx = 0
for i, (idx, action) in enumerate(positions):
# ์ด์ „ ํฌ์ง€์…˜ ๊ฐ€์น˜ ํ™•์žฅ
while len(portfolio_values) <= idx:
portfolio_values.append(portfolio_values[-1])
# ๋งค์ˆ˜ ์‹ ํ˜ธ
if action == 'buy' and not position:
position = True
buy_idx = idx
# ๋งค๋„ ์‹ ํ˜ธ
elif action == 'sell' and position:
position = False
buy_price = close_values[buy_idx]
sell_price = close_values[idx]
trade_return = (sell_price - buy_price) / buy_price
portfolio_values[-1] *= (1 + trade_return)
while len(portfolio_values) < len(close_values):
portfolio_values.append(portfolio_values[-1])
# MDD ๊ณ„์‚ฐ
cummax = np.maximum.accumulate(portfolio_values)
drawdowns = (cummax - portfolio_values) / cummax
mdd = np.max(drawdowns) if len(drawdowns) > 0 else 0
total_return = np.prod([1 + r for r in returns]) - 1 if returns else 0
return total_return, returns, portfolio_values, mdd
def evaluate_rsi_strategy(data, period, upper_threshold, lower_threshold, risk_free_rates=None):
"""RSI ์ „๋žต ํ‰๊ฐ€"""
# RSI ๊ณ„์‚ฐ
df_temp = data.copy()
df_temp = calculate_rsi(df_temp, period)
rsi = df_temp[f'RSI_{period}']
total_return, returns, portfolio_values, mdd = backtest_rsi(data, rsi, upper_threshold, lower_threshold)
if not returns:
return {'annual_return': 0, 'sharpe_ratio': 0, 'mdd': 1, 'total_return': 0}
# ๋ฌด์œ„ํ—˜ ์ˆ˜์ต๋ฅ  ๊ฐ€์ ธ์˜ค๊ธฐ
risk_free_rate = calculate_risk_free_rate(data, risk_free_rates)
# ์„ฑ๋Šฅ ์ง€ํ‘œ ๊ณ„์‚ฐ
metrics = calculate_performance_metrics(total_return, returns, portfolio_values, mdd, len(data), risk_free_rate)
# RSI ํŠนํ™” ์ •๋ณด ์ถ”๊ฐ€
metrics.update({
'period': period,
'upper_threshold': upper_threshold,
'lower_threshold': lower_threshold
})
return metrics
def optimize_rsi_parameters(data, risk_free_rates=None):
"""์ตœ์ ์˜ RSI ํŒŒ๋ผ๋ฏธํ„ฐ ์ฐพ๊ธฐ"""
periods = range(5, 30, 2) # 5์—์„œ 28๊นŒ์ง€ 2์”ฉ ์ฆ๊ฐ€
upper_thresholds = range(65, 85, 5) # 65์—์„œ 80๊นŒ์ง€ 5์”ฉ ์ฆ๊ฐ€
lower_thresholds = range(15, 35, 5) # 15์—์„œ 30๊นŒ์ง€ 5์”ฉ ์ฆ๊ฐ€
all_results = []
for period in periods:
for upper in upper_thresholds:
for lower in lower_thresholds:
try:
result = evaluate_rsi_strategy(data, period, upper, lower, risk_free_rates)
all_results.append({
'period': period,
'upper_threshold': upper,
'lower_threshold': lower,
'metrics': result
})
except Exception as e:
print(f"Error with RSI params {period}-{upper}-{lower}: {e}")
continue
if not all_results:
return {'period': 14, 'upper_threshold': 70, 'lower_threshold': 30} # ๊ธฐ๋ณธ๊ฐ’
# ์ตœ์  ํŒŒ๋ผ๋ฏธํ„ฐ ์ฐพ๊ธฐ
best_params = find_optimal_parameters(all_results, ['period', 'upper_threshold', 'lower_threshold'])
return best_params
# -----------------------------
# ์ „์ฒด ์ตœ์ ํ™” ์‹คํ–‰ ํ•จ์ˆ˜
# -----------------------------
def run_technical_optimization(tickers, start_date, end_date):
"""์—ฌ๋Ÿฌ ์ฃผ์‹ ์ข…๋ชฉ์— ๋Œ€ํ•ด ๊ธฐ์ˆ ์  ์ง€ํ‘œ ์ตœ์ ํ™”๋ฅผ ์‹คํ–‰ํ•˜๋Š” ํ•จ์ˆ˜"""
print("์ฃผ์‹ ๋ฐ์ดํ„ฐ ๋‹ค์šด๋กœ๋“œ ๋ฐ ๊ธฐ์ˆ ์  ์ง€ํ‘œ ์ตœ์ ํ™” ์ค‘...")
# ๋ฌด์œ„ํ—˜ ์ˆ˜์ต๋ฅ  ๋ฐ์ดํ„ฐ ๊ฐ€์ ธ์˜ค๊ธฐ
try:
# ์ด๋ฏธ ์ž„ํฌํŠธ๋œ ํ•จ์ˆ˜ ์ง์ ‘ ์‚ฌ์šฉ
risk_free_rate = get_risk_free_rate(start_date=start_date, end_date=end_date)
print(f"๋ฌด์œ„ํ—˜ ์ˆ˜์ต๋ฅ  ๋กœ๋“œ ์„ฑ๊ณต: {risk_free_rate:.4f}")
except Exception as e:
print(f"๋ฌด์œ„ํ—˜ ์ˆ˜์ต๋ฅ  ๋กœ๋“œ ์˜ค๋ฅ˜: {e}")
risk_free_rate = 0.01
print(f"๊ธฐ๋ณธ ๋ฌด์œ„ํ—˜ ์ˆ˜์ต๋ฅ  ์‚ฌ์šฉ: {risk_free_rate:.4f}")
# ๊ฒฐ๊ณผ ์ €์žฅ ๋ฆฌ์ŠคํŠธ
ema_params_list = []
macd_params_list = []
cmf_period_list = []
rsi_params_list = []
# ์ข…๋ชฉ๋ณ„ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ ๋ฐ ์ตœ์ ํ™”
for ticker in tickers:
print(f"\n{ticker} ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ ์ค‘...")
try:
# ์ฃผ๊ฐ€ ๋ฐ์ดํ„ฐ ๋‹ค์šด๋กœ๋“œ
df = yf.download(ticker, start=start_date, end=end_date)
if len(df) < 100:
print(f"{ticker}: ์ถฉ๋ถ„ํ•œ ๋ฐ์ดํ„ฐ๊ฐ€ ์—†์Šต๋‹ˆ๋‹ค.")
continue
# ๋ฉ€ํ‹ฐ์ธ๋ฑ์Šค ์ฒ˜๋ฆฌ
if isinstance(df.columns, pd.MultiIndex):
df.columns = df.columns.droplevel(1)
# 1. EMA ์ตœ์ ํ™” - ๋ฌด์œ„ํ—˜ ์ˆ˜์ต๋ฅ  ์ „๋‹ฌ
try:
print(f" {ticker} EMA ์ตœ์ ํ™” ์ค‘...")
ema_opt = optimize_ema_parameters(df.copy(), risk_free_rates=risk_free_rate)
ema_params_list.append(ema_opt)
print(f" EMA ์ตœ์ ํ™” ์™„๋ฃŒ: ๋‹จ๊ธฐ={ema_opt['short']}, ์žฅ๊ธฐ={ema_opt['long']}")
except Exception as e:
print(f" EMA ์ตœ์ ํ™” ์˜ค๋ฅ˜: {e}")
ema_params_list.append({'short': 10, 'long': 50})
# 2. MACD ์ตœ์ ํ™” - ๋ฌด์œ„ํ—˜ ์ˆ˜์ต๋ฅ  ์ „๋‹ฌ
try:
print(f" {ticker} MACD ์ตœ์ ํ™” ์ค‘...")
macd_opt = optimize_macd_parameters(df.copy(), risk_free_rates=risk_free_rate)
macd_params_list.append(macd_opt)
print(f" MACD ์ตœ์ ํ™” ์™„๋ฃŒ: ๋น ๋ฆ„={macd_opt['fast']}, ๋А๋ฆผ={macd_opt['slow']}, ์‹ ํ˜ธ={macd_opt['signal']}")
except Exception as e:
print(f" MACD ์ตœ์ ํ™” ์˜ค๋ฅ˜: {e}")
macd_params_list.append({'fast': 12, 'slow': 26, 'signal': 9})
# 3. CMF ์ตœ์ ํ™” - ๋ฌด์œ„ํ—˜ ์ˆ˜์ต๋ฅ  ์ „๋‹ฌ
try:
print(f" {ticker} CMF ์ตœ์ ํ™” ์ค‘...")
cmf_period = optimize_cmf_period(df.copy(), risk_free_rates=risk_free_rate)
cmf_period_list.append(cmf_period)
print(f" CMF ์ตœ์ ํ™” ์™„๋ฃŒ: ๊ธฐ๊ฐ„={cmf_period}")
except Exception as e:
print(f" CMF ์ตœ์ ํ™” ์˜ค๋ฅ˜: {e}")
cmf_period_list.append(20)
# 4. RSI ์ตœ์ ํ™” - ๋ฌด์œ„ํ—˜ ์ˆ˜์ต๋ฅ  ์ „๋‹ฌ
try:
print(f" {ticker} RSI ์ตœ์ ํ™” ์ค‘...")
rsi_opt = optimize_rsi_parameters(df.copy(), risk_free_rates=risk_free_rate)
rsi_params_list.append(rsi_opt)
print(f" RSI ์ตœ์ ํ™” ์™„๋ฃŒ: ๊ธฐ๊ฐ„={rsi_opt['period']}, ์ƒํ•œ={rsi_opt['upper_threshold']}, ํ•˜ํ•œ={rsi_opt['lower_threshold']}")
except Exception as e:
print(f" RSI ์ตœ์ ํ™” ์˜ค๋ฅ˜: {e}")
rsi_params_list.append({'period': 14, 'upper_threshold': 70, 'lower_threshold': 30})
except Exception as e:
print(f"{ticker} ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ ์˜ค๋ฅ˜: {e}")
continue
# ํ‰๊ท  ํŒŒ๋ผ๋ฏธํ„ฐ ๊ณ„์‚ฐ - ๊ณตํ†ต ๋กœ์ง์œผ๋กœ ์ถ”์ถœ ๊ฐ€๋Šฅ
if ema_params_list:
avg_ema_short = int(np.mean([p['short'] for p in ema_params_list]))
avg_ema_long = int(np.mean([p['long'] for p in ema_params_list]))
else:
avg_ema_short, avg_ema_long = 10, 50
if macd_params_list:
avg_macd_fast = int(np.mean([p['fast'] for p in macd_params_list]))
avg_macd_slow = int(np.mean([p['slow'] for p in macd_params_list]))
avg_macd_signal = int(np.mean([p['signal'] for p in macd_params_list]))
else:
avg_macd_fast, avg_macd_slow, avg_macd_signal = 12, 26, 9
avg_cmf_period = int(np.mean(cmf_period_list)) if cmf_period_list else 20
if rsi_params_list:
avg_rsi_period = int(np.mean([p['period'] for p in rsi_params_list]))
avg_rsi_upper = int(np.mean([p['upper_threshold'] for p in rsi_params_list]))
avg_rsi_lower = int(np.mean([p['lower_threshold'] for p in rsi_params_list]))
else:
avg_rsi_period, avg_rsi_upper, avg_rsi_lower = 14, 70, 30
# ์ตœ์ข… ํŒŒ๋ผ๋ฏธํ„ฐ ๊ตฌ์„ฑ
optimal_params = {
'ema': {'short': avg_ema_short, 'long': avg_ema_long},
'macd': {'fast': avg_macd_fast, 'slow': avg_macd_slow, 'signal': avg_macd_signal},
'cmf': avg_cmf_period,
'rsi': {'period': avg_rsi_period, 'upper_threshold': avg_rsi_upper, 'lower_threshold': avg_rsi_lower}
}
print("\n===== ์ตœ์ ํ™”๋œ (ํ‰๊ท ) ํŒŒ๋ผ๋ฏธํ„ฐ =====")
print(f"EMA: ๋‹จ๊ธฐ={optimal_params['ema']['short']}, ์žฅ๊ธฐ={optimal_params['ema']['long']}")
print(f"MACD: ๋น ๋ฆ„={optimal_params['macd']['fast']}, ๋А๋ฆผ={optimal_params['macd']['slow']}, ์‹ ํ˜ธ={optimal_params['macd']['signal']}")
print(f"CMF ๊ธฐ๊ฐ„: {optimal_params['cmf']}")
print(f"RSI: ๊ธฐ๊ฐ„={optimal_params['rsi']['period']}, ์ƒํ•œ={optimal_params['rsi']['upper_threshold']}, ํ•˜ํ•œ={optimal_params['rsi']['lower_threshold']}")
return optimal_params