AITEST / model_predictor.py
AlanRex's picture
Update model_predictor.py
f9e7f22 verified
raw
history blame
22.6 kB
# model_predictor.py - 支援漲幅百分比輸出的XGBoost模型預測器
# 修改版本:輸出改為漲幅百分比而非絕對價格
# model_predictor.py - 修正版本,對應訓練腳本的確切配置
import os
import numpy as np
import pandas as pd
import xgboost as xgb
from sklearn.preprocessing import MinMaxScaler
import joblib
import warnings
warnings.filterwarnings('ignore')
class XGBoostModel:
def __init__(self):
"""
初始化 XGBoost 模型類別
根據訓練腳本 xgboost_for_stock_trend_&_prices_prediction_gpu_v_2_1_3.py 的配置
"""
# 根據訓練腳本的 new_feature_columns,確保順序完全一致
self.feature_columns = [
'close', # 前一日收盤價
'return_t-1', # 前一日報酬率
'return_t-5', # 過去 5 日累積報酬率
'MA5_close', # 5 日移動平均價
'volatility_5d', # 5 日報酬標準差
'volume_ratio_5d', # 今日成交量 ÷ 5 日均量
'MACD_diff', # MACD - signal
'dji_return_t-1', # 前一日道瓊指數報酬率
'sox_return_t-1', # 前一日費半指數報酬率
'NEWS', # 新聞情緒分數
'MACDvol', # MACD柱狀圖
'RSI_14', # 14日RSI
'ADX', # ADX指標
'volume_weighted_return' # 成交量加權報酬率
]
# 預測目標對應(根據訓練腳本的 train_y)
self.prediction_mapping = {
'Change_pct_t1_pred': 1, # 1天後漲幅%
'Change_pct_t5_pred': 5, # 5天後漲幅%
'Change_pct_t10_pred': 10, # 10天後漲幅%
'Change_pct_t20_pred': 20 # 20天後漲幅%
}
self.model = None
self.scaler = None
self.is_model_loaded = False
# 模型檔案路徑
self.model_path = 'xgboost_model.json'
self.scaler_path = 'feature_scaler.pkl'
def create_features_from_stock_data(self, stock_data):
"""
從股票資料創建所需的特徵
完全對應訓練腳本中的 create_new_features 函數
Args:
stock_data: yfinance 格式的股票資料 DataFrame
Returns:
processed_df: 包含所有特徵的 DataFrame
"""
df = stock_data.copy()
# 確保必要的基礎欄位存在
required_base_columns = ['Close', 'Volume', 'High', 'Low']
for col in required_base_columns:
if col not in df.columns:
raise ValueError(f"缺少必要的基礎欄位: {col}")
# 統一欄位名稱(yfinance 使用大寫)
df['close'] = df['Close']
df['volume'] = df['Volume']
# 1. return_t-1 — 前一日報酬率
df['return_t-1'] = df['close'].pct_change()
# 2. return_t-5 — 過去 5 日累積報酬率
df['return_t-5'] = (df['close'] / df['close'].shift(5) - 1)
# 3. MA5_close — 5 日移動平均價
df['MA5_close'] = df['close'].rolling(window=5).mean()
# 4. volatility_5d — 5 日報酬標準差
df['volatility_5d'] = df['return_t-1'].rolling(window=5).std()
# 5. volume_ratio_5d — 今日成交量 ÷ 5 日均量
df['volume_5d_avg'] = df['volume'].rolling(window=5).mean()
df['volume_ratio_5d'] = df['volume'] / df['volume_5d_avg']
# 6. MACD_diff — MACD - signal
exp1 = df['close'].ewm(span=12).mean()
exp2 = df['close'].ewm(span=26).mean()
macd_line = exp1 - exp2
signal_line = macd_line.ewm(span=9).mean()
df['MACD_diff'] = macd_line - signal_line
# 7-8. 美股指數報酬率(需要外部資料,暫設為0)
df['dji_return_t-1'] = 0.0 # 這需要從外部獲取道瓊指數資料
df['sox_return_t-1'] = 0.0 # 這需要從外部獲取費半指數資料
# 9. NEWS — 新聞情緒分數(需要外部資料,暫設為0)
df['NEWS'] = 0.0
# 10. MACDvol — MACD柱狀圖
df['MACDvol'] = macd_line - signal_line
# 11. RSI_14 — 14日RSI
delta = df['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / loss
df['RSI_14'] = 100 - (100 / (1 + rs))
# 12. ADX — 平均趨向指標
df['up_move'] = df['High'] - df['High'].shift(1)
df['down_move'] = df['Low'].shift(1) - df['Low']
df['+DM'] = np.where((df['up_move'] > df['down_move']) & (df['up_move'] > 0), df['up_move'], 0)
df['-DM'] = np.where((df['down_move'] > df['up_move']) & (df['down_move'] > 0), df['down_move'], 0)
high_low = df['High'] - df['Low']
high_close_prev = np.abs(df['High'] - df['close'].shift(1))
low_close_prev = np.abs(df['Low'] - df['close'].shift(1))
df['TR'] = np.maximum.reduce([high_low, high_close_prev, low_close_prev])
df['+DI'] = (df['+DM'].ewm(com=13, adjust=False).mean() / df['TR'].ewm(com=13, adjust=False).mean()) * 100
df['-DI'] = (df['-DM'].ewm(com=13, adjust=False).mean() / df['TR'].ewm(com=13, adjust=False).mean()) * 100
df['DX'] = np.abs(df['+DI'] - df['-DI']) / (df['+DI'] + df['-DI']) * 100
df['ADX'] = df['DX'].ewm(com=13, adjust=False).mean()
# 13. volume_weighted_return — 成交量加權報酬率
df['volume_weighted_return'] = np.abs(df['return_t-1']) * df['volume']
# 清理輔助欄位
cleanup_columns = ['volume_5d_avg', 'up_move', 'down_move', '+DM', '-DM', 'TR', '+DI', '-DI', 'DX']
df.drop(columns=[col for col in cleanup_columns if col in df.columns], inplace=True)
# 填補 NaN 值
df.fillna(method='ffill', inplace=True)
df.fillna(0, inplace=True) # 剩餘的 NaN 用 0 填補
return df
def load_model(self, model_name='xgboost_model'):
"""
載入訓練好的模型和標準化器
Args:
model_name: 模型名稱
Returns:
bool: 載入是否成功
"""
try:
# 載入 XGBoost 模型
if os.path.exists(self.model_path):
self.model = xgb.XGBRegressor()
self.model.load_model(self.model_path)
print(f"成功載入模型: {self.model_path}")
else:
print(f"警告:模型檔案 {self.model_path} 不存在")
return False
# 嘗試載入標準化器(如果存在)
if os.path.exists(self.scaler_path):
self.scaler = joblib.load(self.scaler_path)
print(f"成功載入標準化器: {self.scaler_path}")
else:
print(f"警告:未找到標準化器檔案 {self.scaler_path},將使用原始數據進行預測")
# 根據訓練腳本,模型沒有使用標準化,所以這是正常的
self.scaler = None
self.is_model_loaded = True
return True
except Exception as e:
print(f"載入模型時發生錯誤: {e}")
return False
def predict(self, model_name, input_data):
"""
使用載入的模型進行預測
Args:
model_name: 模型名稱(保持接口一致性)
input_data: 輸入特徵 DataFrame 或 numpy array
Returns:
dict: 預測結果字典,包含各時間框架的漲幅百分比
"""
if not self.is_model_loaded:
if not self.load_model(model_name):
raise RuntimeError("模型載入失敗,無法進行預測")
try:
# 確保輸入是 DataFrame 格式
if isinstance(input_data, np.ndarray):
if input_data.shape[1] != len(self.feature_columns):
raise ValueError(f"輸入特徵數量不匹配。期望: {len(self.feature_columns)}, 實際: {input_data.shape[1]}")
input_df = pd.DataFrame(input_data, columns=self.feature_columns)
elif isinstance(input_data, pd.DataFrame):
input_df = input_data.copy()
else:
raise ValueError("輸入數據必須是 DataFrame 或 numpy array")
# 確保所有必需的特徵都存在
missing_features = [col for col in self.feature_columns if col not in input_df.columns]
if missing_features:
raise ValueError(f"缺少必要的特徵欄位: {missing_features}")
# 選擇並排序特徵
input_features = input_df[self.feature_columns]
# 檢查 NaN 值
if input_features.isnull().any().any():
print("警告:輸入數據包含 NaN 值,將用 0 填補")
input_features = input_features.fillna(0)
# 應用標準化(如果有的話)
if self.scaler is not None:
input_features_scaled = self.scaler.transform(input_features)
else:
input_features_scaled = input_features.values
# 進行預測
predictions = self.model.predict(input_features_scaled)
# 處理預測結果的維度
if predictions.ndim == 1:
# 如果是單一樣本的預測,reshape 成 (1, 4)
if len(predictions) == 4:
predictions = predictions.reshape(1, -1)
else:
raise ValueError(f"預測結果維度不正確: {predictions.shape}")
# 確保結果是 (n_samples, 4) 的形狀
if predictions.shape[1] != 4:
raise ValueError(f"模型預測輸出維度錯誤,期望 4 個輸出,實際: {predictions.shape[1]}")
# 構建預測結果字典(取第一個樣本的預測)
result = {}
prediction_keys = ['Change_pct_t1_pred', 'Change_pct_t5_pred', 'Change_pct_t10_pred', 'Change_pct_t20_pred']
for i, key in enumerate(prediction_keys):
result[key] = float(predictions[0, i]) # 取第一個樣本的第 i 個預測
return result
except Exception as e:
print(f"預測過程中發生錯誤: {e}")
raise
def predict_single_timeframe(self, stock_data, days, news_score=0.0, us_market_data=None):
"""
預測單一時間框架的漲幅
Args:
stock_data: 股票歷史數據 (yfinance格式)
days: 預測天數 (1, 5, 10, 20)
news_score: 新聞情緒分數
us_market_data: 美股市場數據 (可選)
Returns:
float: 預測的漲幅百分比
"""
try:
# 創建特徵
processed_df = self.create_features_from_stock_data(stock_data)
# 使用最新的數據點
latest_data = processed_df.iloc[-1:].copy()
# 更新新聞分數
latest_data.loc[latest_data.index[0], 'NEWS'] = news_score
# 更新美股數據(如果提供)
if us_market_data:
if 'DJI' in us_market_data and len(us_market_data) > 1:
dji_return = (us_market_data['DJI'][-1] - us_market_data['DJI'][-2]) / us_market_data['DJI'][-2]
latest_data.loc[latest_data.index[0], 'dji_return_t-1'] = dji_return
if 'SOX' in us_market_data and len(us_market_data) > 1:
sox_return = (us_market_data['SOX'][-1] - us_market_data['SOX'][-2]) / us_market_data['SOX'][-2]
latest_data.loc[latest_data.index[0], 'sox_return_t-1'] = sox_return
# 進行預測
predictions = self.predict('xgboost_model', latest_data)
# 根據天數返回對應的預測值
if days == 1:
return predictions['Change_pct_t1_pred']
elif days == 5:
return predictions['Change_pct_t5_pred']
elif days == 10:
return predictions['Change_pct_t10_pred']
elif days == 20:
return predictions['Change_pct_t20_pred']
else:
# 對於其他天數,使用最接近的預測值
if days <= 3:
return predictions['Change_pct_t1_pred']
elif days <= 7:
return predictions['Change_pct_t5_pred']
elif days <= 15:
return predictions['Change_pct_t10_pred']
else:
return predictions['Change_pct_t20_pred']
except Exception as e:
print(f"單一時間框架預測失敗: {e}")
return 0.0
def validate_input_features(self, input_data):
"""
驗證輸入特徵的完整性和有效性
Args:
input_data: 輸入的特徵數據
Returns:
dict: 驗證結果
"""
validation_result = {
'is_valid': True,
'missing_features': [],
'invalid_values': [],
'warnings': []
}
try:
if isinstance(input_data, np.ndarray):
if input_data.shape[1] != len(self.feature_columns):
validation_result['is_valid'] = False
validation_result['warnings'].append(f"特徵數量不匹配: 期望{len(self.feature_columns)}, 實際{input_data.shape[1]}")
return validation_result
# 檢查缺失特徵
if isinstance(input_data, pd.DataFrame):
missing_features = [col for col in self.feature_columns if col not in input_data.columns]
if missing_features:
validation_result['missing_features'] = missing_features
validation_result['is_valid'] = False
# 檢查數值有效性
for feature in self.feature_columns:
if feature in input_data.columns:
if input_data[feature].isnull().any():
validation_result['invalid_values'].append(f"{feature}: 包含NaN值")
if np.isinf(input_data[feature]).any():
validation_result['invalid_values'].append(f"{feature}: 包含無限值")
return validation_result
except Exception as e:
validation_result['is_valid'] = False
validation_result['warnings'].append(f"驗證過程出錯: {e}")
return validation_result
def get_feature_importance(self):
"""
獲取模型的特徵重要性
Returns:
dict: 特徵重要性字典
"""
if not self.is_model_loaded:
return {}
try:
importance_scores = self.model.feature_importances_
importance_dict = {}
for i, feature in enumerate(self.feature_columns):
importance_dict[feature] = float(importance_scores[i])
# 按重要性排序
sorted_importance = dict(sorted(importance_dict.items(), key=lambda x: x[1], reverse=True))
return sorted_importance
except Exception as e:
print(f"獲取特徵重要性失敗: {e}")
return {}
def get_prediction_confidence(self, input_data):
"""
估算預測信心度
Args:
input_data: 輸入特徵數據
Returns:
float: 信心度分數 (0-1)
"""
try:
# 基礎信心度檢查
validation_result = self.validate_input_features(input_data)
if not validation_result['is_valid']:
return 0.3 # 數據有問題時給予較低信心度
# 根據特徵完整性調整信心度
base_confidence = 0.7
if validation_result['missing_features']:
base_confidence -= len(validation_result['missing_features']) * 0.05
if validation_result['invalid_values']:
base_confidence -= len(validation_result['invalid_values']) * 0.05
return max(0.3, min(0.9, base_confidence))
except Exception as e:
print(f"計算預測信心度失敗: {e}")
return 0.5
def validate_input(self, input_df):
"""
驗證輸入數據的有效性
Args:
input_df (pd.DataFrame): 輸入特徵
Returns:
tuple: (是否有效, 錯誤訊息列表)
"""
errors = []
try:
# 檢查是否為空
if input_df.empty:
errors.append("輸入數據為空")
# 檢查必要特徵
required_features = ['close', 'return_t-1']
for feature in required_features:
if feature not in input_df.columns:
errors.append(f"缺少必要特徵:{feature}")
elif pd.isna(input_df[feature].iloc[0]):
errors.append(f"必要特徵包含空值:{feature}")
# 檢查數據合理性
if 'close' in input_df.columns:
close_price = input_df['close'].iloc[0]
if close_price <= 0:
errors.append(f"收盤價不合理:{close_price}")
if 'return_t-1' in input_df.columns:
return_val = input_df['return_t-1'].iloc[0]
if abs(return_val) > 0.5: # 單日漲跌幅超過50%可能有問題
errors.append(f"報酬率異常:{return_val:.3f}")
return len(errors) == 0, errors
except Exception as e:
errors.append(f"驗證過程發生錯誤:{e}")
return False, errors
def get_feature_importance(self):
"""
獲取特徵重要性
Returns:
dict: 特徵重要性字典
"""
try:
if self.model is None:
return None
# 獲取特徵重要性
importance_scores = self.model.feature_importances_
# 創建特徵重要性字典
importance_dict = {}
for i, feature in enumerate(self.feature_columns):
if i < len(importance_scores):
importance_dict[feature] = float(importance_scores[i])
# 按重要性排序
sorted_importance = dict(sorted(importance_dict.items(),
key=lambda x: x[1],
reverse=True))
return sorted_importance
except Exception as e:
print(f"獲取特徵重要性時發生錯誤:{e}")
return None
def explain_prediction(self, input_df, predictions):
"""
解釋預測結果
Args:
input_df (pd.DataFrame): 輸入特徵
predictions (dict): 預測結果
Returns:
str: 解釋文本
"""
try:
explanation = []
explanation.append("=== 預測解釋 ===")
# 分析主要驅動因素
feature_importance = self.get_feature_importance()
if feature_importance:
explanation.append("主要影響因素:")
top_features = list(feature_importance.keys())[:3]
for feature in top_features:
if feature in input_df.columns:
value = input_df[feature].iloc[0]
importance = feature_importance[feature]
explanation.append(f" - {feature}: {value:.4f} (重要性: {importance:.3f})")
# 分析預測趨勢
explanation.append("\n預測趨勢分析:")
for key, value in predictions.items():
days = key.split('_')[2][1:]
trend = "看漲" if value > 1 else "看跌" if value < -1 else "持平"
explanation.append(f" - {days}日: {value:+.2f}% ({trend})")
return "\n".join(explanation)
except Exception as e:
return f"解釋生成失敗: {e}"
# 範例使用方式
if __name__ == "__main__":
# 初始化模型
model = XGBoostModel()
# 準備測試數據
test_data = pd.DataFrame({
'close': [150.0],
'return_t-1': [0.02],
'return_t-5': [0.05],
'MA5_close': [148.0],
'volatility_5d': [0.025],
'volume_ratio_5d': [1.2],
'MACD_diff': [0.5],
'dji_return_t-1': [0.01],
'sox_return_t-1': [0.015],
'NEWS': [0.1]
})
print("測試模型預測器...")
print("輸入特徵:")
print(test_data)
# 進行預測
predictions = model.predict('xgboost_model', test_data)
if predictions:
print("\n預測成功!")
print("結果說明:輸出為相對於當前價格的漲幅百分比")
# 解釋預測
explanation = model.explain_prediction(test_data, predictions)
print(f"\n{explanation}")
# 計算信心度
confidence = model.get_prediction_confidence(test_data)
print(f"\n預測信心度: {confidence:.2%}")
else:
print("預測失敗!")