Spaces:
Sleeping
Sleeping
| # model_predictor.py - 支援漲幅百分比輸出的XGBoost模型預測器 | |
| # 修改版本:輸出改為漲幅百分比而非絕對價格 | |
| # model_predictor.py - 修正版本,對應訓練腳本的確切配置 | |
| import os | |
| import numpy as np | |
| import pandas as pd | |
| import xgboost as xgb | |
| from sklearn.preprocessing import MinMaxScaler | |
| import joblib | |
| import warnings | |
| warnings.filterwarnings('ignore') | |
| class XGBoostModel: | |
| def __init__(self): | |
| """ | |
| 初始化 XGBoost 模型類別 | |
| 根據訓練腳本 xgboost_for_stock_trend_&_prices_prediction_gpu_v_2_1_3.py 的配置 | |
| """ | |
| # 根據訓練腳本的 new_feature_columns,確保順序完全一致 | |
| self.feature_columns = [ | |
| 'close', # 前一日收盤價 | |
| 'return_t-1', # 前一日報酬率 | |
| 'return_t-5', # 過去 5 日累積報酬率 | |
| 'MA5_close', # 5 日移動平均價 | |
| 'volatility_5d', # 5 日報酬標準差 | |
| 'volume_ratio_5d', # 今日成交量 ÷ 5 日均量 | |
| 'MACD_diff', # MACD - signal | |
| 'dji_return_t-1', # 前一日道瓊指數報酬率 | |
| 'sox_return_t-1', # 前一日費半指數報酬率 | |
| 'NEWS', # 新聞情緒分數 | |
| 'MACDvol', # MACD柱狀圖 | |
| 'RSI_14', # 14日RSI | |
| 'ADX', # ADX指標 | |
| 'volume_weighted_return' # 成交量加權報酬率 | |
| ] | |
| # 預測目標對應(根據訓練腳本的 train_y) | |
| self.prediction_mapping = { | |
| 'Change_pct_t1_pred': 1, # 1天後漲幅% | |
| 'Change_pct_t5_pred': 5, # 5天後漲幅% | |
| 'Change_pct_t10_pred': 10, # 10天後漲幅% | |
| 'Change_pct_t20_pred': 20 # 20天後漲幅% | |
| } | |
| self.model = None | |
| self.scaler = None | |
| self.is_model_loaded = False | |
| # 模型檔案路徑 | |
| self.model_path = 'xgboost_model.json' | |
| self.scaler_path = 'feature_scaler.pkl' | |
| def create_features_from_stock_data(self, stock_data): | |
| """ | |
| 從股票資料創建所需的特徵 | |
| 完全對應訓練腳本中的 create_new_features 函數 | |
| Args: | |
| stock_data: yfinance 格式的股票資料 DataFrame | |
| Returns: | |
| processed_df: 包含所有特徵的 DataFrame | |
| """ | |
| df = stock_data.copy() | |
| # 確保必要的基礎欄位存在 | |
| required_base_columns = ['Close', 'Volume', 'High', 'Low'] | |
| for col in required_base_columns: | |
| if col not in df.columns: | |
| raise ValueError(f"缺少必要的基礎欄位: {col}") | |
| # 統一欄位名稱(yfinance 使用大寫) | |
| df['close'] = df['Close'] | |
| df['volume'] = df['Volume'] | |
| # 1. return_t-1 — 前一日報酬率 | |
| df['return_t-1'] = df['close'].pct_change() | |
| # 2. return_t-5 — 過去 5 日累積報酬率 | |
| df['return_t-5'] = (df['close'] / df['close'].shift(5) - 1) | |
| # 3. MA5_close — 5 日移動平均價 | |
| df['MA5_close'] = df['close'].rolling(window=5).mean() | |
| # 4. volatility_5d — 5 日報酬標準差 | |
| df['volatility_5d'] = df['return_t-1'].rolling(window=5).std() | |
| # 5. volume_ratio_5d — 今日成交量 ÷ 5 日均量 | |
| df['volume_5d_avg'] = df['volume'].rolling(window=5).mean() | |
| df['volume_ratio_5d'] = df['volume'] / df['volume_5d_avg'] | |
| # 6. MACD_diff — MACD - signal | |
| exp1 = df['close'].ewm(span=12).mean() | |
| exp2 = df['close'].ewm(span=26).mean() | |
| macd_line = exp1 - exp2 | |
| signal_line = macd_line.ewm(span=9).mean() | |
| df['MACD_diff'] = macd_line - signal_line | |
| # 7-8. 美股指數報酬率(需要外部資料,暫設為0) | |
| df['dji_return_t-1'] = 0.0 # 這需要從外部獲取道瓊指數資料 | |
| df['sox_return_t-1'] = 0.0 # 這需要從外部獲取費半指數資料 | |
| # 9. NEWS — 新聞情緒分數(需要外部資料,暫設為0) | |
| df['NEWS'] = 0.0 | |
| # 10. MACDvol — MACD柱狀圖 | |
| df['MACDvol'] = macd_line - signal_line | |
| # 11. RSI_14 — 14日RSI | |
| delta = df['close'].diff() | |
| gain = (delta.where(delta > 0, 0)).rolling(window=14).mean() | |
| loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean() | |
| rs = gain / loss | |
| df['RSI_14'] = 100 - (100 / (1 + rs)) | |
| # 12. ADX — 平均趨向指標 | |
| df['up_move'] = df['High'] - df['High'].shift(1) | |
| df['down_move'] = df['Low'].shift(1) - df['Low'] | |
| df['+DM'] = np.where((df['up_move'] > df['down_move']) & (df['up_move'] > 0), df['up_move'], 0) | |
| df['-DM'] = np.where((df['down_move'] > df['up_move']) & (df['down_move'] > 0), df['down_move'], 0) | |
| high_low = df['High'] - df['Low'] | |
| high_close_prev = np.abs(df['High'] - df['close'].shift(1)) | |
| low_close_prev = np.abs(df['Low'] - df['close'].shift(1)) | |
| df['TR'] = np.maximum.reduce([high_low, high_close_prev, low_close_prev]) | |
| df['+DI'] = (df['+DM'].ewm(com=13, adjust=False).mean() / df['TR'].ewm(com=13, adjust=False).mean()) * 100 | |
| df['-DI'] = (df['-DM'].ewm(com=13, adjust=False).mean() / df['TR'].ewm(com=13, adjust=False).mean()) * 100 | |
| df['DX'] = np.abs(df['+DI'] - df['-DI']) / (df['+DI'] + df['-DI']) * 100 | |
| df['ADX'] = df['DX'].ewm(com=13, adjust=False).mean() | |
| # 13. volume_weighted_return — 成交量加權報酬率 | |
| df['volume_weighted_return'] = np.abs(df['return_t-1']) * df['volume'] | |
| # 清理輔助欄位 | |
| cleanup_columns = ['volume_5d_avg', 'up_move', 'down_move', '+DM', '-DM', 'TR', '+DI', '-DI', 'DX'] | |
| df.drop(columns=[col for col in cleanup_columns if col in df.columns], inplace=True) | |
| # 填補 NaN 值 | |
| df.fillna(method='ffill', inplace=True) | |
| df.fillna(0, inplace=True) # 剩餘的 NaN 用 0 填補 | |
| return df | |
| def load_model(self, model_name='xgboost_model'): | |
| """ | |
| 載入訓練好的模型和標準化器 | |
| Args: | |
| model_name: 模型名稱 | |
| Returns: | |
| bool: 載入是否成功 | |
| """ | |
| try: | |
| # 載入 XGBoost 模型 | |
| if os.path.exists(self.model_path): | |
| self.model = xgb.XGBRegressor() | |
| self.model.load_model(self.model_path) | |
| print(f"成功載入模型: {self.model_path}") | |
| else: | |
| print(f"警告:模型檔案 {self.model_path} 不存在") | |
| return False | |
| # 嘗試載入標準化器(如果存在) | |
| if os.path.exists(self.scaler_path): | |
| self.scaler = joblib.load(self.scaler_path) | |
| print(f"成功載入標準化器: {self.scaler_path}") | |
| else: | |
| print(f"警告:未找到標準化器檔案 {self.scaler_path},將使用原始數據進行預測") | |
| # 根據訓練腳本,模型沒有使用標準化,所以這是正常的 | |
| self.scaler = None | |
| self.is_model_loaded = True | |
| return True | |
| except Exception as e: | |
| print(f"載入模型時發生錯誤: {e}") | |
| return False | |
| def predict(self, model_name, input_data): | |
| """ | |
| 使用載入的模型進行預測 | |
| Args: | |
| model_name: 模型名稱(保持接口一致性) | |
| input_data: 輸入特徵 DataFrame 或 numpy array | |
| Returns: | |
| dict: 預測結果字典,包含各時間框架的漲幅百分比 | |
| """ | |
| if not self.is_model_loaded: | |
| if not self.load_model(model_name): | |
| raise RuntimeError("模型載入失敗,無法進行預測") | |
| try: | |
| # 確保輸入是 DataFrame 格式 | |
| if isinstance(input_data, np.ndarray): | |
| if input_data.shape[1] != len(self.feature_columns): | |
| raise ValueError(f"輸入特徵數量不匹配。期望: {len(self.feature_columns)}, 實際: {input_data.shape[1]}") | |
| input_df = pd.DataFrame(input_data, columns=self.feature_columns) | |
| elif isinstance(input_data, pd.DataFrame): | |
| input_df = input_data.copy() | |
| else: | |
| raise ValueError("輸入數據必須是 DataFrame 或 numpy array") | |
| # 確保所有必需的特徵都存在 | |
| missing_features = [col for col in self.feature_columns if col not in input_df.columns] | |
| if missing_features: | |
| raise ValueError(f"缺少必要的特徵欄位: {missing_features}") | |
| # 選擇並排序特徵 | |
| input_features = input_df[self.feature_columns] | |
| # 檢查 NaN 值 | |
| if input_features.isnull().any().any(): | |
| print("警告:輸入數據包含 NaN 值,將用 0 填補") | |
| input_features = input_features.fillna(0) | |
| # 應用標準化(如果有的話) | |
| if self.scaler is not None: | |
| input_features_scaled = self.scaler.transform(input_features) | |
| else: | |
| input_features_scaled = input_features.values | |
| # 進行預測 | |
| predictions = self.model.predict(input_features_scaled) | |
| # 處理預測結果的維度 | |
| if predictions.ndim == 1: | |
| # 如果是單一樣本的預測,reshape 成 (1, 4) | |
| if len(predictions) == 4: | |
| predictions = predictions.reshape(1, -1) | |
| else: | |
| raise ValueError(f"預測結果維度不正確: {predictions.shape}") | |
| # 確保結果是 (n_samples, 4) 的形狀 | |
| if predictions.shape[1] != 4: | |
| raise ValueError(f"模型預測輸出維度錯誤,期望 4 個輸出,實際: {predictions.shape[1]}") | |
| # 構建預測結果字典(取第一個樣本的預測) | |
| result = {} | |
| prediction_keys = ['Change_pct_t1_pred', 'Change_pct_t5_pred', 'Change_pct_t10_pred', 'Change_pct_t20_pred'] | |
| for i, key in enumerate(prediction_keys): | |
| result[key] = float(predictions[0, i]) # 取第一個樣本的第 i 個預測 | |
| return result | |
| except Exception as e: | |
| print(f"預測過程中發生錯誤: {e}") | |
| raise | |
| def predict_single_timeframe(self, stock_data, days, news_score=0.0, us_market_data=None): | |
| """ | |
| 預測單一時間框架的漲幅 | |
| Args: | |
| stock_data: 股票歷史數據 (yfinance格式) | |
| days: 預測天數 (1, 5, 10, 20) | |
| news_score: 新聞情緒分數 | |
| us_market_data: 美股市場數據 (可選) | |
| Returns: | |
| float: 預測的漲幅百分比 | |
| """ | |
| try: | |
| # 創建特徵 | |
| processed_df = self.create_features_from_stock_data(stock_data) | |
| # 使用最新的數據點 | |
| latest_data = processed_df.iloc[-1:].copy() | |
| # 更新新聞分數 | |
| latest_data.loc[latest_data.index[0], 'NEWS'] = news_score | |
| # 更新美股數據(如果提供) | |
| if us_market_data: | |
| if 'DJI' in us_market_data and len(us_market_data) > 1: | |
| dji_return = (us_market_data['DJI'][-1] - us_market_data['DJI'][-2]) / us_market_data['DJI'][-2] | |
| latest_data.loc[latest_data.index[0], 'dji_return_t-1'] = dji_return | |
| if 'SOX' in us_market_data and len(us_market_data) > 1: | |
| sox_return = (us_market_data['SOX'][-1] - us_market_data['SOX'][-2]) / us_market_data['SOX'][-2] | |
| latest_data.loc[latest_data.index[0], 'sox_return_t-1'] = sox_return | |
| # 進行預測 | |
| predictions = self.predict('xgboost_model', latest_data) | |
| # 根據天數返回對應的預測值 | |
| if days == 1: | |
| return predictions['Change_pct_t1_pred'] | |
| elif days == 5: | |
| return predictions['Change_pct_t5_pred'] | |
| elif days == 10: | |
| return predictions['Change_pct_t10_pred'] | |
| elif days == 20: | |
| return predictions['Change_pct_t20_pred'] | |
| else: | |
| # 對於其他天數,使用最接近的預測值 | |
| if days <= 3: | |
| return predictions['Change_pct_t1_pred'] | |
| elif days <= 7: | |
| return predictions['Change_pct_t5_pred'] | |
| elif days <= 15: | |
| return predictions['Change_pct_t10_pred'] | |
| else: | |
| return predictions['Change_pct_t20_pred'] | |
| except Exception as e: | |
| print(f"單一時間框架預測失敗: {e}") | |
| return 0.0 | |
| def validate_input_features(self, input_data): | |
| """ | |
| 驗證輸入特徵的完整性和有效性 | |
| Args: | |
| input_data: 輸入的特徵數據 | |
| Returns: | |
| dict: 驗證結果 | |
| """ | |
| validation_result = { | |
| 'is_valid': True, | |
| 'missing_features': [], | |
| 'invalid_values': [], | |
| 'warnings': [] | |
| } | |
| try: | |
| if isinstance(input_data, np.ndarray): | |
| if input_data.shape[1] != len(self.feature_columns): | |
| validation_result['is_valid'] = False | |
| validation_result['warnings'].append(f"特徵數量不匹配: 期望{len(self.feature_columns)}, 實際{input_data.shape[1]}") | |
| return validation_result | |
| # 檢查缺失特徵 | |
| if isinstance(input_data, pd.DataFrame): | |
| missing_features = [col for col in self.feature_columns if col not in input_data.columns] | |
| if missing_features: | |
| validation_result['missing_features'] = missing_features | |
| validation_result['is_valid'] = False | |
| # 檢查數值有效性 | |
| for feature in self.feature_columns: | |
| if feature in input_data.columns: | |
| if input_data[feature].isnull().any(): | |
| validation_result['invalid_values'].append(f"{feature}: 包含NaN值") | |
| if np.isinf(input_data[feature]).any(): | |
| validation_result['invalid_values'].append(f"{feature}: 包含無限值") | |
| return validation_result | |
| except Exception as e: | |
| validation_result['is_valid'] = False | |
| validation_result['warnings'].append(f"驗證過程出錯: {e}") | |
| return validation_result | |
| def get_feature_importance(self): | |
| """ | |
| 獲取模型的特徵重要性 | |
| Returns: | |
| dict: 特徵重要性字典 | |
| """ | |
| if not self.is_model_loaded: | |
| return {} | |
| try: | |
| importance_scores = self.model.feature_importances_ | |
| importance_dict = {} | |
| for i, feature in enumerate(self.feature_columns): | |
| importance_dict[feature] = float(importance_scores[i]) | |
| # 按重要性排序 | |
| sorted_importance = dict(sorted(importance_dict.items(), key=lambda x: x[1], reverse=True)) | |
| return sorted_importance | |
| except Exception as e: | |
| print(f"獲取特徵重要性失敗: {e}") | |
| return {} | |
| def get_prediction_confidence(self, input_data): | |
| """ | |
| 估算預測信心度 | |
| Args: | |
| input_data: 輸入特徵數據 | |
| Returns: | |
| float: 信心度分數 (0-1) | |
| """ | |
| try: | |
| # 基礎信心度檢查 | |
| validation_result = self.validate_input_features(input_data) | |
| if not validation_result['is_valid']: | |
| return 0.3 # 數據有問題時給予較低信心度 | |
| # 根據特徵完整性調整信心度 | |
| base_confidence = 0.7 | |
| if validation_result['missing_features']: | |
| base_confidence -= len(validation_result['missing_features']) * 0.05 | |
| if validation_result['invalid_values']: | |
| base_confidence -= len(validation_result['invalid_values']) * 0.05 | |
| return max(0.3, min(0.9, base_confidence)) | |
| except Exception as e: | |
| print(f"計算預測信心度失敗: {e}") | |
| return 0.5 | |
| def validate_input(self, input_df): | |
| """ | |
| 驗證輸入數據的有效性 | |
| Args: | |
| input_df (pd.DataFrame): 輸入特徵 | |
| Returns: | |
| tuple: (是否有效, 錯誤訊息列表) | |
| """ | |
| errors = [] | |
| try: | |
| # 檢查是否為空 | |
| if input_df.empty: | |
| errors.append("輸入數據為空") | |
| # 檢查必要特徵 | |
| required_features = ['close', 'return_t-1'] | |
| for feature in required_features: | |
| if feature not in input_df.columns: | |
| errors.append(f"缺少必要特徵:{feature}") | |
| elif pd.isna(input_df[feature].iloc[0]): | |
| errors.append(f"必要特徵包含空值:{feature}") | |
| # 檢查數據合理性 | |
| if 'close' in input_df.columns: | |
| close_price = input_df['close'].iloc[0] | |
| if close_price <= 0: | |
| errors.append(f"收盤價不合理:{close_price}") | |
| if 'return_t-1' in input_df.columns: | |
| return_val = input_df['return_t-1'].iloc[0] | |
| if abs(return_val) > 0.5: # 單日漲跌幅超過50%可能有問題 | |
| errors.append(f"報酬率異常:{return_val:.3f}") | |
| return len(errors) == 0, errors | |
| except Exception as e: | |
| errors.append(f"驗證過程發生錯誤:{e}") | |
| return False, errors | |
| def get_feature_importance(self): | |
| """ | |
| 獲取特徵重要性 | |
| Returns: | |
| dict: 特徵重要性字典 | |
| """ | |
| try: | |
| if self.model is None: | |
| return None | |
| # 獲取特徵重要性 | |
| importance_scores = self.model.feature_importances_ | |
| # 創建特徵重要性字典 | |
| importance_dict = {} | |
| for i, feature in enumerate(self.feature_columns): | |
| if i < len(importance_scores): | |
| importance_dict[feature] = float(importance_scores[i]) | |
| # 按重要性排序 | |
| sorted_importance = dict(sorted(importance_dict.items(), | |
| key=lambda x: x[1], | |
| reverse=True)) | |
| return sorted_importance | |
| except Exception as e: | |
| print(f"獲取特徵重要性時發生錯誤:{e}") | |
| return None | |
| def explain_prediction(self, input_df, predictions): | |
| """ | |
| 解釋預測結果 | |
| Args: | |
| input_df (pd.DataFrame): 輸入特徵 | |
| predictions (dict): 預測結果 | |
| Returns: | |
| str: 解釋文本 | |
| """ | |
| try: | |
| explanation = [] | |
| explanation.append("=== 預測解釋 ===") | |
| # 分析主要驅動因素 | |
| feature_importance = self.get_feature_importance() | |
| if feature_importance: | |
| explanation.append("主要影響因素:") | |
| top_features = list(feature_importance.keys())[:3] | |
| for feature in top_features: | |
| if feature in input_df.columns: | |
| value = input_df[feature].iloc[0] | |
| importance = feature_importance[feature] | |
| explanation.append(f" - {feature}: {value:.4f} (重要性: {importance:.3f})") | |
| # 分析預測趨勢 | |
| explanation.append("\n預測趨勢分析:") | |
| for key, value in predictions.items(): | |
| days = key.split('_')[2][1:] | |
| trend = "看漲" if value > 1 else "看跌" if value < -1 else "持平" | |
| explanation.append(f" - {days}日: {value:+.2f}% ({trend})") | |
| return "\n".join(explanation) | |
| except Exception as e: | |
| return f"解釋生成失敗: {e}" | |
| # 範例使用方式 | |
| if __name__ == "__main__": | |
| # 初始化模型 | |
| model = XGBoostModel() | |
| # 準備測試數據 | |
| test_data = pd.DataFrame({ | |
| 'close': [150.0], | |
| 'return_t-1': [0.02], | |
| 'return_t-5': [0.05], | |
| 'MA5_close': [148.0], | |
| 'volatility_5d': [0.025], | |
| 'volume_ratio_5d': [1.2], | |
| 'MACD_diff': [0.5], | |
| 'dji_return_t-1': [0.01], | |
| 'sox_return_t-1': [0.015], | |
| 'NEWS': [0.1] | |
| }) | |
| print("測試模型預測器...") | |
| print("輸入特徵:") | |
| print(test_data) | |
| # 進行預測 | |
| predictions = model.predict('xgboost_model', test_data) | |
| if predictions: | |
| print("\n預測成功!") | |
| print("結果說明:輸出為相對於當前價格的漲幅百分比") | |
| # 解釋預測 | |
| explanation = model.explain_prediction(test_data, predictions) | |
| print(f"\n{explanation}") | |
| # 計算信心度 | |
| confidence = model.get_prediction_confidence(test_data) | |
| print(f"\n預測信心度: {confidence:.2%}") | |
| else: | |
| print("預測失敗!") |