|
|
|
|
|
|
|
|
|
|
|
import os |
|
|
import pandas as pd |
|
|
import numpy as np |
|
|
import xgboost as xgb |
|
|
from sklearn.preprocessing import StandardScaler |
|
|
import pickle |
|
|
import joblib |
|
|
|
|
|
class XGBoostModel: |
|
|
def __init__(self): |
|
|
""" |
|
|
初始化 XGBoost 模型預測器 |
|
|
|
|
|
【重要更新】 |
|
|
- 模型現在輸出漲幅百分比而非絕對價格 |
|
|
- 支援 1日、5日、10日、20日的漲幅預測 |
|
|
""" |
|
|
self.model = None |
|
|
self.scaler = None |
|
|
|
|
|
self.feature_columns = [ |
|
|
'close', |
|
|
'return_t-1', |
|
|
'return_t-5', |
|
|
'MA5_close', |
|
|
'volatility_5d', |
|
|
'volume_ratio_5d', |
|
|
'MACD_diff', |
|
|
'dji_return_t-1', |
|
|
'sox_return_t-1', |
|
|
'NEWS', |
|
|
'MACDvol', |
|
|
'RSI_14', |
|
|
'ADX', |
|
|
'volume_weighted_return' |
|
|
] |
|
|
|
|
|
|
|
|
self.output_targets = { |
|
|
1: 'Change_pct_t1_pred', |
|
|
5: 'Change_pct_t5_pred', |
|
|
10: 'Change_pct_t10_pred', |
|
|
20: 'Change_pct_t20_pred' |
|
|
} |
|
|
|
|
|
print("XGBoost 模型預測器初始化完成") |
|
|
print(f"輸出格式:漲幅百分比 (1日, 5日, 10日, 20日)") |
|
|
print(f"預期特徵數量: {len(self.feature_columns)}") |
|
|
|
|
|
def load_model(self, model_path): |
|
|
""" |
|
|
載入預訓練的 XGBoost 模型 |
|
|
|
|
|
Args: |
|
|
model_path (str): 模型檔案路徑 (.json 格式) |
|
|
|
|
|
Returns: |
|
|
bool: 是否成功載入 |
|
|
""" |
|
|
try: |
|
|
|
|
|
if not os.path.exists(model_path): |
|
|
print(f"錯誤:找不到模型檔案 {model_path}") |
|
|
return False |
|
|
|
|
|
|
|
|
self.model = xgb.XGBRegressor() |
|
|
self.model.load_model(model_path) |
|
|
|
|
|
print(f"成功載入模型:{model_path}") |
|
|
print(f"預期特徵數量:{len(self.feature_columns)}") |
|
|
|
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
print(f"載入模型時發生錯誤:{e}") |
|
|
return False |
|
|
|
|
|
def load_scaler(self, scaler_path): |
|
|
"""停用標準化流程""" |
|
|
print("⚠️ 已停用標準化:模型使用原始特徵進行預測。") |
|
|
self.scaler = None |
|
|
return False |
|
|
|
|
|
def preprocess_features(self, input_df): |
|
|
|
|
|
missing_features = [f for f in self.feature_columns if f not in input_df.columns] |
|
|
if missing_features: |
|
|
print(f"警告:缺少以下特徵:{missing_features}") |
|
|
for feature in missing_features: |
|
|
input_df[feature] = 0 |
|
|
|
|
|
input_df = input_df[self.feature_columns].fillna(0) |
|
|
|
|
|
|
|
|
return input_df |
|
|
|
|
|
def predict(self, model_name, input_df): |
|
|
""" |
|
|
進行股價漲幅預測 |
|
|
|
|
|
Args: |
|
|
model_name (str): 模型名稱(用於載入對應模型) |
|
|
input_df (pd.DataFrame): 輸入特徵 |
|
|
|
|
|
Returns: |
|
|
dict: 預測結果,包含各時間點的漲幅百分比 |
|
|
""" |
|
|
try: |
|
|
|
|
|
if self.model is None: |
|
|
model_path = f"{model_name}.json" |
|
|
if not self.load_model(model_path): |
|
|
return None |
|
|
|
|
|
|
|
|
if self.scaler is None: |
|
|
scaler_path = f"{model_name}_scaler.pkl" |
|
|
self.load_scaler(scaler_path) |
|
|
|
|
|
|
|
|
processed_df = self.preprocess_features(input_df.copy()) |
|
|
|
|
|
|
|
|
predictions = self.model.predict(processed_df) |
|
|
|
|
|
|
|
|
if predictions.ndim == 1: |
|
|
|
|
|
result = { |
|
|
'Change_pct_t1_pred': float(predictions[0]) |
|
|
} |
|
|
else: |
|
|
|
|
|
result = { |
|
|
'Change_pct_t1_pred': float(predictions[0][0]) if len(predictions[0]) > 0 else 0.0, |
|
|
'Change_pct_t5_pred': float(predictions[0][1]) if len(predictions[0]) > 1 else 0.0, |
|
|
'Change_pct_t10_pred': float(predictions[0][2]) if len(predictions[0]) > 2 else 0.0, |
|
|
'Change_pct_t20_pred': float(predictions[0][3]) if len(predictions[0]) > 3 else 0.0 |
|
|
} |
|
|
|
|
|
|
|
|
print("=== 漲幅預測結果 ===") |
|
|
for key, value in result.items(): |
|
|
days = key.split('_')[2][1:] |
|
|
direction = "上漲" if value > 0 else "下跌" |
|
|
print(f" {days}日後預測: {value:+.2f}% ({direction})") |
|
|
|
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
print(f"預測過程中發生錯誤:{e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
return None |
|
|
|
|
|
def predict_single_timeframe(self, model_name, input_df, days): |
|
|
""" |
|
|
預測特定時間框架的漲幅 |
|
|
|
|
|
Args: |
|
|
model_name (str): 模型名稱 |
|
|
input_df (pd.DataFrame): 輸入特徵 |
|
|
days (int): 預測天數 (1, 5, 10, 20) |
|
|
|
|
|
Returns: |
|
|
float: 預測的漲幅百分比 |
|
|
""" |
|
|
try: |
|
|
predictions = self.predict(model_name, input_df) |
|
|
if predictions is None: |
|
|
return None |
|
|
|
|
|
|
|
|
target_key = f'Change_pct_t{days}_pred' |
|
|
|
|
|
if target_key in predictions: |
|
|
return predictions[target_key] |
|
|
else: |
|
|
print(f"警告:找不到 {days} 日預測結果") |
|
|
return None |
|
|
|
|
|
except Exception as e: |
|
|
print(f"單一時間框架預測時發生錯誤:{e}") |
|
|
return None |
|
|
|
|
|
def get_prediction_confidence(self, input_df): |
|
|
""" |
|
|
評估預測的信心度 |
|
|
|
|
|
Args: |
|
|
input_df (pd.DataFrame): 輸入特徵 |
|
|
|
|
|
Returns: |
|
|
float: 信心度 (0-1) |
|
|
""" |
|
|
try: |
|
|
|
|
|
feature_completeness = 0 |
|
|
total_features = len(self.feature_columns) |
|
|
|
|
|
for feature in self.feature_columns: |
|
|
if feature in input_df.columns: |
|
|
value = input_df[feature].iloc[0] |
|
|
if not pd.isna(value) and value != 0: |
|
|
feature_completeness += 1 |
|
|
|
|
|
completeness_ratio = feature_completeness / total_features |
|
|
|
|
|
|
|
|
base_confidence = max(0.5, completeness_ratio) |
|
|
|
|
|
|
|
|
important_features = ['close', 'return_t-1', 'MA5_close'] |
|
|
missing_important = 0 |
|
|
for feature in important_features: |
|
|
if feature not in input_df.columns or pd.isna(input_df[feature].iloc[0]): |
|
|
missing_important += 1 |
|
|
|
|
|
if missing_important > 0: |
|
|
base_confidence *= (1 - missing_important * 0.1) |
|
|
|
|
|
return min(0.9, max(0.3, base_confidence)) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"計算信心度時發生錯誤:{e}") |
|
|
return 0.5 |
|
|
|
|
|
def validate_input(self, input_df): |
|
|
""" |
|
|
驗證輸入數據的有效性 |
|
|
|
|
|
Args: |
|
|
input_df (pd.DataFrame): 輸入特徵 |
|
|
|
|
|
Returns: |
|
|
tuple: (是否有效, 錯誤訊息列表) |
|
|
""" |
|
|
errors = [] |
|
|
|
|
|
try: |
|
|
|
|
|
if input_df.empty: |
|
|
errors.append("輸入數據為空") |
|
|
|
|
|
|
|
|
required_features = ['close', 'return_t-1'] |
|
|
for feature in required_features: |
|
|
if feature not in input_df.columns: |
|
|
errors.append(f"缺少必要特徵:{feature}") |
|
|
elif pd.isna(input_df[feature].iloc[0]): |
|
|
errors.append(f"必要特徵包含空值:{feature}") |
|
|
|
|
|
|
|
|
if 'close' in input_df.columns: |
|
|
close_price = input_df['close'].iloc[0] |
|
|
if close_price <= 0: |
|
|
errors.append(f"收盤價不合理:{close_price}") |
|
|
|
|
|
if 'return_t-1' in input_df.columns: |
|
|
return_val = input_df['return_t-1'].iloc[0] |
|
|
if abs(return_val) > 0.5: |
|
|
errors.append(f"報酬率異常:{return_val:.3f}") |
|
|
|
|
|
return len(errors) == 0, errors |
|
|
|
|
|
except Exception as e: |
|
|
errors.append(f"驗證過程發生錯誤:{e}") |
|
|
return False, errors |
|
|
|
|
|
def get_feature_importance(self): |
|
|
""" |
|
|
獲取特徵重要性 |
|
|
|
|
|
Returns: |
|
|
dict: 特徵重要性字典 |
|
|
""" |
|
|
try: |
|
|
if self.model is None: |
|
|
return None |
|
|
|
|
|
|
|
|
importance_scores = self.model.feature_importances_ |
|
|
|
|
|
|
|
|
importance_dict = {} |
|
|
for i, feature in enumerate(self.feature_columns): |
|
|
if i < len(importance_scores): |
|
|
importance_dict[feature] = float(importance_scores[i]) |
|
|
|
|
|
|
|
|
sorted_importance = dict(sorted(importance_dict.items(), |
|
|
key=lambda x: x[1], |
|
|
reverse=True)) |
|
|
|
|
|
return sorted_importance |
|
|
|
|
|
except Exception as e: |
|
|
print(f"獲取特徵重要性時發生錯誤:{e}") |
|
|
return None |
|
|
|
|
|
def explain_prediction(self, input_df, predictions): |
|
|
""" |
|
|
解釋預測結果 |
|
|
|
|
|
Args: |
|
|
input_df (pd.DataFrame): 輸入特徵 |
|
|
predictions (dict): 預測結果 |
|
|
|
|
|
Returns: |
|
|
str: 解釋文本 |
|
|
""" |
|
|
try: |
|
|
explanation = [] |
|
|
explanation.append("=== 預測解釋 ===") |
|
|
|
|
|
|
|
|
feature_importance = self.get_feature_importance() |
|
|
if feature_importance: |
|
|
explanation.append("主要影響因素:") |
|
|
top_features = list(feature_importance.keys())[:3] |
|
|
for feature in top_features: |
|
|
if feature in input_df.columns: |
|
|
value = input_df[feature].iloc[0] |
|
|
importance = feature_importance[feature] |
|
|
explanation.append(f" - {feature}: {value:.4f} (重要性: {importance:.3f})") |
|
|
|
|
|
|
|
|
explanation.append("\n預測趨勢分析:") |
|
|
for key, value in predictions.items(): |
|
|
days = key.split('_')[2][1:] |
|
|
trend = "看漲" if value > 1 else "看跌" if value < -1 else "持平" |
|
|
explanation.append(f" - {days}日: {value:+.2f}% ({trend})") |
|
|
|
|
|
return "\n".join(explanation) |
|
|
|
|
|
except Exception as e: |
|
|
return f"解釋生成失敗: {e}" |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
model = XGBoostModel() |
|
|
|
|
|
|
|
|
test_data = pd.DataFrame({ |
|
|
'close': [150.0], |
|
|
'return_t-1': [0.02], |
|
|
'return_t-5': [0.05], |
|
|
'MA5_close': [148.0], |
|
|
'volatility_5d': [0.025], |
|
|
'volume_ratio_5d': [1.2], |
|
|
'MACD_diff': [0.5], |
|
|
'dji_return_t-1': [0.01], |
|
|
'sox_return_t-1': [0.015], |
|
|
'NEWS': [0.1], |
|
|
'MACDvol': [0.015], |
|
|
'RSI_14': [0.015], |
|
|
'ADX': [0.015], |
|
|
'volume_weighted_return': [0.015] |
|
|
}) |
|
|
|
|
|
print("測試模型預測器...") |
|
|
print("輸入特徵:") |
|
|
print(test_data) |
|
|
|
|
|
|
|
|
predictions = model.predict('xgboost_model', test_data) |
|
|
|
|
|
if predictions: |
|
|
print("\n預測成功!") |
|
|
print("結果說明:輸出為相對於當前價格的漲幅百分比") |
|
|
|
|
|
|
|
|
explanation = model.explain_prediction(test_data, predictions) |
|
|
print(f"\n{explanation}") |
|
|
|
|
|
|
|
|
confidence = model.get_prediction_confidence(test_data) |
|
|
print(f"\n預測信心度: {confidence:.2%}") |
|
|
else: |
|
|
print("預測失敗!") |