AITEST / model_predictor.py
AlanRex's picture
Update model_predictor.py
be3e2e8 verified
# model_predictor.py - 支援漲幅百分比輸出的XGBoost模型預測器
# 修改版本:輸出改為漲幅百分比而非絕對價格
import os
import pandas as pd
import numpy as np
import xgboost as xgb
from sklearn.preprocessing import StandardScaler
import pickle
import joblib
class XGBoostModel:
def __init__(self):
"""
初始化 XGBoost 模型預測器
【重要更新】
- 模型現在輸出漲幅百分比而非絕對價格
- 支援 1日、5日、10日、20日的漲幅預測
"""
self.model = None
self.scaler = None
# 【【修改點】】更新特徵欄位列表以包含新特徵
self.feature_columns = [
'close', # 前一日收盤價
'return_t-1', # 前一日報酬率
'return_t-5', # 過去 5 日累積報酬率
'MA5_close', # 5 日移動平均價
'volatility_5d', # 5 日報酬標準差
'volume_ratio_5d', # 今日成交量 ÷ 5 日均量
'MACD_diff', # MACD - signal
'dji_return_t-1', # 前一日道瓊指數報酬率
'sox_return_t-1', # 前一日費半指數報酬率
'NEWS', # 新聞情緒分數
'MACDvol', # 成交量MACD
'RSI_14', # 14日RSI
'ADX', # ADX趨勢指標
'volume_weighted_return' # 成交量加權報酬率
]
# 【新增】輸出目標對應表
self.output_targets = {
1: 'Change_pct_t1_pred', # 1天後漲幅%
5: 'Change_pct_t5_pred', # 5天後漲幅%
10: 'Change_pct_t10_pred', # 10天後漲幅%
20: 'Change_pct_t20_pred' # 20天後漲幅%
}
print("XGBoost 模型預測器初始化完成")
print(f"輸出格式:漲幅百分比 (1日, 5日, 10日, 20日)")
print(f"預期特徵數量: {len(self.feature_columns)}")
def load_model(self, model_path):
"""
載入預訓練的 XGBoost 模型
Args:
model_path (str): 模型檔案路徑 (.json 格式)
Returns:
bool: 是否成功載入
"""
try:
# 檢查模型檔案是否存在
if not os.path.exists(model_path):
print(f"錯誤:找不到模型檔案 {model_path}")
return False
# 載入 XGBoost 模型
self.model = xgb.XGBRegressor()
self.model.load_model(model_path)
print(f"成功載入模型:{model_path}")
print(f"預期特徵數量:{len(self.feature_columns)}")
return True
except Exception as e:
print(f"載入模型時發生錯誤:{e}")
return False
def load_scaler(self, scaler_path):
"""停用標準化流程"""
print("⚠️ 已停用標準化:模型使用原始特徵進行預測。")
self.scaler = None
return False
def preprocess_features(self, input_df):
# 確保特徵齊全
missing_features = [f for f in self.feature_columns if f not in input_df.columns]
if missing_features:
print(f"警告:缺少以下特徵:{missing_features}")
for feature in missing_features:
input_df[feature] = 0
input_df = input_df[self.feature_columns].fillna(0)
# ✅ 直接回傳原始特徵
return input_df
def predict(self, model_name, input_df):
"""
進行股價漲幅預測
Args:
model_name (str): 模型名稱(用於載入對應模型)
input_df (pd.DataFrame): 輸入特徵
Returns:
dict: 預測結果,包含各時間點的漲幅百分比
"""
try:
# 載入模型(如果尚未載入)
if self.model is None:
model_path = f"{model_name}.json"
if not self.load_model(model_path):
return None
# 載入標準化器(如果存在)
if self.scaler is None:
scaler_path = f"{model_name}_scaler.pkl"
self.load_scaler(scaler_path)
# 預處理特徵
processed_df = self.preprocess_features(input_df.copy())
# 進行預測
predictions = self.model.predict(processed_df)
# 【重要修改】將預測結果格式化為漲幅百分比
if predictions.ndim == 1:
# 如果只有一個輸出,假設是 1 日預測
result = {
'Change_pct_t1_pred': float(predictions[0])
}
else:
# 多輸出情況:1日, 5日, 10日, 20日
result = {
'Change_pct_t1_pred': float(predictions[0][0]) if len(predictions[0]) > 0 else 0.0,
'Change_pct_t5_pred': float(predictions[0][1]) if len(predictions[0]) > 1 else 0.0,
'Change_pct_t10_pred': float(predictions[0][2]) if len(predictions[0]) > 2 else 0.0,
'Change_pct_t20_pred': float(predictions[0][3]) if len(predictions[0]) > 3 else 0.0
}
# 輸出預測結果摘要
print("=== 漲幅預測結果 ===")
for key, value in result.items():
days = key.split('_')[2][1:] # 提取天數
direction = "上漲" if value > 0 else "下跌"
print(f" {days}日後預測: {value:+.2f}% ({direction})")
return result
except Exception as e:
print(f"預測過程中發生錯誤:{e}")
import traceback
traceback.print_exc()
return None
def predict_single_timeframe(self, model_name, input_df, days):
"""
預測特定時間框架的漲幅
Args:
model_name (str): 模型名稱
input_df (pd.DataFrame): 輸入特徵
days (int): 預測天數 (1, 5, 10, 20)
Returns:
float: 預測的漲幅百分比
"""
try:
predictions = self.predict(model_name, input_df)
if predictions is None:
return None
# 根據天數選擇對應的預測結果
target_key = f'Change_pct_t{days}_pred'
if target_key in predictions:
return predictions[target_key]
else:
print(f"警告:找不到 {days} 日預測結果")
return None
except Exception as e:
print(f"單一時間框架預測時發生錯誤:{e}")
return None
def get_prediction_confidence(self, input_df):
"""
評估預測的信心度
Args:
input_df (pd.DataFrame): 輸入特徵
Returns:
float: 信心度 (0-1)
"""
try:
# 基於特徵完整性和質量評估信心度
feature_completeness = 0
total_features = len(self.feature_columns)
for feature in self.feature_columns:
if feature in input_df.columns:
value = input_df[feature].iloc[0]
if not pd.isna(value) and value != 0:
feature_completeness += 1
completeness_ratio = feature_completeness / total_features
# 基於數據質量調整信心度
base_confidence = max(0.5, completeness_ratio)
# 如果重要特徵缺失,降低信心度
important_features = ['close', 'return_t-1', 'MA5_close']
missing_important = 0
for feature in important_features:
if feature not in input_df.columns or pd.isna(input_df[feature].iloc[0]):
missing_important += 1
if missing_important > 0:
base_confidence *= (1 - missing_important * 0.1)
return min(0.9, max(0.3, base_confidence))
except Exception as e:
print(f"計算信心度時發生錯誤:{e}")
return 0.5
def validate_input(self, input_df):
"""
驗證輸入數據的有效性
Args:
input_df (pd.DataFrame): 輸入特徵
Returns:
tuple: (是否有效, 錯誤訊息列表)
"""
errors = []
try:
# 檢查是否為空
if input_df.empty:
errors.append("輸入數據為空")
# 檢查必要特徵
required_features = ['close', 'return_t-1']
for feature in required_features:
if feature not in input_df.columns:
errors.append(f"缺少必要特徵:{feature}")
elif pd.isna(input_df[feature].iloc[0]):
errors.append(f"必要特徵包含空值:{feature}")
# 檢查數據合理性
if 'close' in input_df.columns:
close_price = input_df['close'].iloc[0]
if close_price <= 0:
errors.append(f"收盤價不合理:{close_price}")
if 'return_t-1' in input_df.columns:
return_val = input_df['return_t-1'].iloc[0]
if abs(return_val) > 0.5: # 單日漲跌幅超過50%可能有問題
errors.append(f"報酬率異常:{return_val:.3f}")
return len(errors) == 0, errors
except Exception as e:
errors.append(f"驗證過程發生錯誤:{e}")
return False, errors
def get_feature_importance(self):
"""
獲取特徵重要性
Returns:
dict: 特徵重要性字典
"""
try:
if self.model is None:
return None
# 獲取特徵重要性
importance_scores = self.model.feature_importances_
# 創建特徵重要性字典
importance_dict = {}
for i, feature in enumerate(self.feature_columns):
if i < len(importance_scores):
importance_dict[feature] = float(importance_scores[i])
# 按重要性排序
sorted_importance = dict(sorted(importance_dict.items(),
key=lambda x: x[1],
reverse=True))
return sorted_importance
except Exception as e:
print(f"獲取特徵重要性時發生錯誤:{e}")
return None
def explain_prediction(self, input_df, predictions):
"""
解釋預測結果
Args:
input_df (pd.DataFrame): 輸入特徵
predictions (dict): 預測結果
Returns:
str: 解釋文本
"""
try:
explanation = []
explanation.append("=== 預測解釋 ===")
# 分析主要驅動因素
feature_importance = self.get_feature_importance()
if feature_importance:
explanation.append("主要影響因素:")
top_features = list(feature_importance.keys())[:3]
for feature in top_features:
if feature in input_df.columns:
value = input_df[feature].iloc[0]
importance = feature_importance[feature]
explanation.append(f" - {feature}: {value:.4f} (重要性: {importance:.3f})")
# 分析預測趨勢
explanation.append("\n預測趨勢分析:")
for key, value in predictions.items():
days = key.split('_')[2][1:]
trend = "看漲" if value > 1 else "看跌" if value < -1 else "持平"
explanation.append(f" - {days}日: {value:+.2f}% ({trend})")
return "\n".join(explanation)
except Exception as e:
return f"解釋生成失敗: {e}"
# 範例使用方式
if __name__ == "__main__":
# 初始化模型
model = XGBoostModel()
# 準備測試數據
test_data = pd.DataFrame({
'close': [150.0],
'return_t-1': [0.02],
'return_t-5': [0.05],
'MA5_close': [148.0],
'volatility_5d': [0.025],
'volume_ratio_5d': [1.2],
'MACD_diff': [0.5],
'dji_return_t-1': [0.01],
'sox_return_t-1': [0.015],
'NEWS': [0.1],
'MACDvol': [0.015],
'RSI_14': [0.015],
'ADX': [0.015],
'volume_weighted_return': [0.015]
})
print("測試模型預測器...")
print("輸入特徵:")
print(test_data)
# 進行預測
predictions = model.predict('xgboost_model', test_data)
if predictions:
print("\n預測成功!")
print("結果說明:輸出為相對於當前價格的漲幅百分比")
# 解釋預測
explanation = model.explain_prediction(test_data, predictions)
print(f"\n{explanation}")
# 計算信心度
confidence = model.get_prediction_confidence(test_data)
print(f"\n預測信心度: {confidence:.2%}")
else:
print("預測失敗!")