# model_predictor.py - 支援漲幅百分比輸出的XGBoost模型預測器 # 修改版本:輸出改為漲幅百分比而非絕對價格 # model_predictor.py - 修正版本,對應訓練腳本的確切配置 import os import numpy as np import pandas as pd import xgboost as xgb from sklearn.preprocessing import MinMaxScaler import joblib import warnings warnings.filterwarnings('ignore') class XGBoostModel: def __init__(self): """ 初始化 XGBoost 模型類別 根據訓練腳本 xgboost_for_stock_trend_&_prices_prediction_gpu_v_2_1_3.py 的配置 """ # 根據訓練腳本的 new_feature_columns,確保順序完全一致 self.feature_columns = [ 'close', # 前一日收盤價 'return_t-1', # 前一日報酬率 'return_t-5', # 過去 5 日累積報酬率 'MA5_close', # 5 日移動平均價 'volatility_5d', # 5 日報酬標準差 'volume_ratio_5d', # 今日成交量 ÷ 5 日均量 'MACD_diff', # MACD - signal 'dji_return_t-1', # 前一日道瓊指數報酬率 'sox_return_t-1', # 前一日費半指數報酬率 'NEWS', # 新聞情緒分數 'MACDvol', # MACD柱狀圖 'RSI_14', # 14日RSI 'ADX', # ADX指標 'volume_weighted_return' # 成交量加權報酬率 ] # 預測目標對應(根據訓練腳本的 train_y) self.prediction_mapping = { 'Change_pct_t1_pred': 1, # 1天後漲幅% 'Change_pct_t5_pred': 5, # 5天後漲幅% 'Change_pct_t10_pred': 10, # 10天後漲幅% 'Change_pct_t20_pred': 20 # 20天後漲幅% } self.model = None self.scaler = None self.is_model_loaded = False # 模型檔案路徑 self.model_path = 'xgboost_model.json' self.scaler_path = 'feature_scaler.pkl' def create_features_from_stock_data(self, stock_data): """ 從股票資料創建所需的特徵 完全對應訓練腳本中的 create_new_features 函數 Args: stock_data: yfinance 格式的股票資料 DataFrame Returns: processed_df: 包含所有特徵的 DataFrame """ df = stock_data.copy() # 確保必要的基礎欄位存在 required_base_columns = ['Close', 'Volume', 'High', 'Low'] for col in required_base_columns: if col not in df.columns: raise ValueError(f"缺少必要的基礎欄位: {col}") # 統一欄位名稱(yfinance 使用大寫) df['close'] = df['Close'] df['volume'] = df['Volume'] # 1. return_t-1 — 前一日報酬率 df['return_t-1'] = df['close'].pct_change() # 2. return_t-5 — 過去 5 日累積報酬率 df['return_t-5'] = (df['close'] / df['close'].shift(5) - 1) # 3. MA5_close — 5 日移動平均價 df['MA5_close'] = df['close'].rolling(window=5).mean() # 4. volatility_5d — 5 日報酬標準差 df['volatility_5d'] = df['return_t-1'].rolling(window=5).std() # 5. volume_ratio_5d — 今日成交量 ÷ 5 日均量 df['volume_5d_avg'] = df['volume'].rolling(window=5).mean() df['volume_ratio_5d'] = df['volume'] / df['volume_5d_avg'] # 6. MACD_diff — MACD - signal exp1 = df['close'].ewm(span=12).mean() exp2 = df['close'].ewm(span=26).mean() macd_line = exp1 - exp2 signal_line = macd_line.ewm(span=9).mean() df['MACD_diff'] = macd_line - signal_line # 7-8. 美股指數報酬率(需要外部資料,暫設為0) df['dji_return_t-1'] = 0.0 # 這需要從外部獲取道瓊指數資料 df['sox_return_t-1'] = 0.0 # 這需要從外部獲取費半指數資料 # 9. NEWS — 新聞情緒分數(需要外部資料,暫設為0) df['NEWS'] = 0.0 # 10. MACDvol — MACD柱狀圖 df['MACDvol'] = macd_line - signal_line # 11. RSI_14 — 14日RSI delta = df['close'].diff() gain = (delta.where(delta > 0, 0)).rolling(window=14).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean() rs = gain / loss df['RSI_14'] = 100 - (100 / (1 + rs)) # 12. ADX — 平均趨向指標 df['up_move'] = df['High'] - df['High'].shift(1) df['down_move'] = df['Low'].shift(1) - df['Low'] df['+DM'] = np.where((df['up_move'] > df['down_move']) & (df['up_move'] > 0), df['up_move'], 0) df['-DM'] = np.where((df['down_move'] > df['up_move']) & (df['down_move'] > 0), df['down_move'], 0) high_low = df['High'] - df['Low'] high_close_prev = np.abs(df['High'] - df['close'].shift(1)) low_close_prev = np.abs(df['Low'] - df['close'].shift(1)) df['TR'] = np.maximum.reduce([high_low, high_close_prev, low_close_prev]) df['+DI'] = (df['+DM'].ewm(com=13, adjust=False).mean() / df['TR'].ewm(com=13, adjust=False).mean()) * 100 df['-DI'] = (df['-DM'].ewm(com=13, adjust=False).mean() / df['TR'].ewm(com=13, adjust=False).mean()) * 100 df['DX'] = np.abs(df['+DI'] - df['-DI']) / (df['+DI'] + df['-DI']) * 100 df['ADX'] = df['DX'].ewm(com=13, adjust=False).mean() # 13. volume_weighted_return — 成交量加權報酬率 df['volume_weighted_return'] = np.abs(df['return_t-1']) * df['volume'] # 清理輔助欄位 cleanup_columns = ['volume_5d_avg', 'up_move', 'down_move', '+DM', '-DM', 'TR', '+DI', '-DI', 'DX'] df.drop(columns=[col for col in cleanup_columns if col in df.columns], inplace=True) # 填補 NaN 值 df.fillna(method='ffill', inplace=True) df.fillna(0, inplace=True) # 剩餘的 NaN 用 0 填補 return df def load_model(self, model_name='xgboost_model'): """ 載入訓練好的模型和標準化器 Args: model_name: 模型名稱 Returns: bool: 載入是否成功 """ try: # 載入 XGBoost 模型 if os.path.exists(self.model_path): self.model = xgb.XGBRegressor() self.model.load_model(self.model_path) print(f"成功載入模型: {self.model_path}") else: print(f"警告:模型檔案 {self.model_path} 不存在") return False # 嘗試載入標準化器(如果存在) if os.path.exists(self.scaler_path): self.scaler = joblib.load(self.scaler_path) print(f"成功載入標準化器: {self.scaler_path}") else: print(f"警告:未找到標準化器檔案 {self.scaler_path},將使用原始數據進行預測") # 根據訓練腳本,模型沒有使用標準化,所以這是正常的 self.scaler = None self.is_model_loaded = True return True except Exception as e: print(f"載入模型時發生錯誤: {e}") return False def predict(self, model_name, input_data): """ 使用載入的模型進行預測 Args: model_name: 模型名稱(保持接口一致性) input_data: 輸入特徵 DataFrame 或 numpy array Returns: dict: 預測結果字典,包含各時間框架的漲幅百分比 """ if not self.is_model_loaded: if not self.load_model(model_name): raise RuntimeError("模型載入失敗,無法進行預測") try: # 確保輸入是 DataFrame 格式 if isinstance(input_data, np.ndarray): if input_data.shape[1] != len(self.feature_columns): raise ValueError(f"輸入特徵數量不匹配。期望: {len(self.feature_columns)}, 實際: {input_data.shape[1]}") input_df = pd.DataFrame(input_data, columns=self.feature_columns) elif isinstance(input_data, pd.DataFrame): input_df = input_data.copy() else: raise ValueError("輸入數據必須是 DataFrame 或 numpy array") # 確保所有必需的特徵都存在 missing_features = [col for col in self.feature_columns if col not in input_df.columns] if missing_features: raise ValueError(f"缺少必要的特徵欄位: {missing_features}") # 選擇並排序特徵 input_features = input_df[self.feature_columns] # 檢查 NaN 值 if input_features.isnull().any().any(): print("警告:輸入數據包含 NaN 值,將用 0 填補") input_features = input_features.fillna(0) # 應用標準化(如果有的話) if self.scaler is not None: input_features_scaled = self.scaler.transform(input_features) else: input_features_scaled = input_features.values # 進行預測 predictions = self.model.predict(input_features_scaled) # 處理預測結果的維度 if predictions.ndim == 1: # 如果是單一樣本的預測,reshape 成 (1, 4) if len(predictions) == 4: predictions = predictions.reshape(1, -1) else: raise ValueError(f"預測結果維度不正確: {predictions.shape}") # 確保結果是 (n_samples, 4) 的形狀 if predictions.shape[1] != 4: raise ValueError(f"模型預測輸出維度錯誤,期望 4 個輸出,實際: {predictions.shape[1]}") # 構建預測結果字典(取第一個樣本的預測) result = {} prediction_keys = ['Change_pct_t1_pred', 'Change_pct_t5_pred', 'Change_pct_t10_pred', 'Change_pct_t20_pred'] for i, key in enumerate(prediction_keys): result[key] = float(predictions[0, i]) # 取第一個樣本的第 i 個預測 return result except Exception as e: print(f"預測過程中發生錯誤: {e}") raise def predict_single_timeframe(self, stock_data, days, news_score=0.0, us_market_data=None): """ 預測單一時間框架的漲幅 Args: stock_data: 股票歷史數據 (yfinance格式) days: 預測天數 (1, 5, 10, 20) news_score: 新聞情緒分數 us_market_data: 美股市場數據 (可選) Returns: float: 預測的漲幅百分比 """ try: # 創建特徵 processed_df = self.create_features_from_stock_data(stock_data) # 使用最新的數據點 latest_data = processed_df.iloc[-1:].copy() # 更新新聞分數 latest_data.loc[latest_data.index[0], 'NEWS'] = news_score # 更新美股數據(如果提供) if us_market_data: if 'DJI' in us_market_data and len(us_market_data) > 1: dji_return = (us_market_data['DJI'][-1] - us_market_data['DJI'][-2]) / us_market_data['DJI'][-2] latest_data.loc[latest_data.index[0], 'dji_return_t-1'] = dji_return if 'SOX' in us_market_data and len(us_market_data) > 1: sox_return = (us_market_data['SOX'][-1] - us_market_data['SOX'][-2]) / us_market_data['SOX'][-2] latest_data.loc[latest_data.index[0], 'sox_return_t-1'] = sox_return # 進行預測 predictions = self.predict('xgboost_model', latest_data) # 根據天數返回對應的預測值 if days == 1: return predictions['Change_pct_t1_pred'] elif days == 5: return predictions['Change_pct_t5_pred'] elif days == 10: return predictions['Change_pct_t10_pred'] elif days == 20: return predictions['Change_pct_t20_pred'] else: # 對於其他天數,使用最接近的預測值 if days <= 3: return predictions['Change_pct_t1_pred'] elif days <= 7: return predictions['Change_pct_t5_pred'] elif days <= 15: return predictions['Change_pct_t10_pred'] else: return predictions['Change_pct_t20_pred'] except Exception as e: print(f"單一時間框架預測失敗: {e}") return 0.0 def validate_input_features(self, input_data): """ 驗證輸入特徵的完整性和有效性 Args: input_data: 輸入的特徵數據 Returns: dict: 驗證結果 """ validation_result = { 'is_valid': True, 'missing_features': [], 'invalid_values': [], 'warnings': [] } try: if isinstance(input_data, np.ndarray): if input_data.shape[1] != len(self.feature_columns): validation_result['is_valid'] = False validation_result['warnings'].append(f"特徵數量不匹配: 期望{len(self.feature_columns)}, 實際{input_data.shape[1]}") return validation_result # 檢查缺失特徵 if isinstance(input_data, pd.DataFrame): missing_features = [col for col in self.feature_columns if col not in input_data.columns] if missing_features: validation_result['missing_features'] = missing_features validation_result['is_valid'] = False # 檢查數值有效性 for feature in self.feature_columns: if feature in input_data.columns: if input_data[feature].isnull().any(): validation_result['invalid_values'].append(f"{feature}: 包含NaN值") if np.isinf(input_data[feature]).any(): validation_result['invalid_values'].append(f"{feature}: 包含無限值") return validation_result except Exception as e: validation_result['is_valid'] = False validation_result['warnings'].append(f"驗證過程出錯: {e}") return validation_result def get_feature_importance(self): """ 獲取模型的特徵重要性 Returns: dict: 特徵重要性字典 """ if not self.is_model_loaded: return {} try: importance_scores = self.model.feature_importances_ importance_dict = {} for i, feature in enumerate(self.feature_columns): importance_dict[feature] = float(importance_scores[i]) # 按重要性排序 sorted_importance = dict(sorted(importance_dict.items(), key=lambda x: x[1], reverse=True)) return sorted_importance except Exception as e: print(f"獲取特徵重要性失敗: {e}") return {} def get_prediction_confidence(self, input_data): """ 估算預測信心度 Args: input_data: 輸入特徵數據 Returns: float: 信心度分數 (0-1) """ try: # 基礎信心度檢查 validation_result = self.validate_input_features(input_data) if not validation_result['is_valid']: return 0.3 # 數據有問題時給予較低信心度 # 根據特徵完整性調整信心度 base_confidence = 0.7 if validation_result['missing_features']: base_confidence -= len(validation_result['missing_features']) * 0.05 if validation_result['invalid_values']: base_confidence -= len(validation_result['invalid_values']) * 0.05 return max(0.3, min(0.9, base_confidence)) except Exception as e: print(f"計算預測信心度失敗: {e}") return 0.5 def validate_input(self, input_df): """ 驗證輸入數據的有效性 Args: input_df (pd.DataFrame): 輸入特徵 Returns: tuple: (是否有效, 錯誤訊息列表) """ errors = [] try: # 檢查是否為空 if input_df.empty: errors.append("輸入數據為空") # 檢查必要特徵 required_features = ['close', 'return_t-1'] for feature in required_features: if feature not in input_df.columns: errors.append(f"缺少必要特徵:{feature}") elif pd.isna(input_df[feature].iloc[0]): errors.append(f"必要特徵包含空值:{feature}") # 檢查數據合理性 if 'close' in input_df.columns: close_price = input_df['close'].iloc[0] if close_price <= 0: errors.append(f"收盤價不合理:{close_price}") if 'return_t-1' in input_df.columns: return_val = input_df['return_t-1'].iloc[0] if abs(return_val) > 0.5: # 單日漲跌幅超過50%可能有問題 errors.append(f"報酬率異常:{return_val:.3f}") return len(errors) == 0, errors except Exception as e: errors.append(f"驗證過程發生錯誤:{e}") return False, errors def get_feature_importance(self): """ 獲取特徵重要性 Returns: dict: 特徵重要性字典 """ try: if self.model is None: return None # 獲取特徵重要性 importance_scores = self.model.feature_importances_ # 創建特徵重要性字典 importance_dict = {} for i, feature in enumerate(self.feature_columns): if i < len(importance_scores): importance_dict[feature] = float(importance_scores[i]) # 按重要性排序 sorted_importance = dict(sorted(importance_dict.items(), key=lambda x: x[1], reverse=True)) return sorted_importance except Exception as e: print(f"獲取特徵重要性時發生錯誤:{e}") return None def explain_prediction(self, input_df, predictions): """ 解釋預測結果 Args: input_df (pd.DataFrame): 輸入特徵 predictions (dict): 預測結果 Returns: str: 解釋文本 """ try: explanation = [] explanation.append("=== 預測解釋 ===") # 分析主要驅動因素 feature_importance = self.get_feature_importance() if feature_importance: explanation.append("主要影響因素:") top_features = list(feature_importance.keys())[:3] for feature in top_features: if feature in input_df.columns: value = input_df[feature].iloc[0] importance = feature_importance[feature] explanation.append(f" - {feature}: {value:.4f} (重要性: {importance:.3f})") # 分析預測趨勢 explanation.append("\n預測趨勢分析:") for key, value in predictions.items(): days = key.split('_')[2][1:] trend = "看漲" if value > 1 else "看跌" if value < -1 else "持平" explanation.append(f" - {days}日: {value:+.2f}% ({trend})") return "\n".join(explanation) except Exception as e: return f"解釋生成失敗: {e}" # 範例使用方式 if __name__ == "__main__": # 初始化模型 model = XGBoostModel() # 準備測試數據 test_data = pd.DataFrame({ 'close': [150.0], 'return_t-1': [0.02], 'return_t-5': [0.05], 'MA5_close': [148.0], 'volatility_5d': [0.025], 'volume_ratio_5d': [1.2], 'MACD_diff': [0.5], 'dji_return_t-1': [0.01], 'sox_return_t-1': [0.015], 'NEWS': [0.1] }) print("測試模型預測器...") print("輸入特徵:") print(test_data) # 進行預測 predictions = model.predict('xgboost_model', test_data) if predictions: print("\n預測成功!") print("結果說明:輸出為相對於當前價格的漲幅百分比") # 解釋預測 explanation = model.explain_prediction(test_data, predictions) print(f"\n{explanation}") # 計算信心度 confidence = model.get_prediction_confidence(test_data) print(f"\n預測信心度: {confidence:.2%}") else: print("預測失敗!")