Spaces:
Running
Running
File size: 22,597 Bytes
595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 f9e7f22 595c0d3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 |
# model_predictor.py - 支援漲幅百分比輸出的XGBoost模型預測器
# 修改版本:輸出改為漲幅百分比而非絕對價格
# model_predictor.py - 修正版本,對應訓練腳本的確切配置
import os
import numpy as np
import pandas as pd
import xgboost as xgb
from sklearn.preprocessing import MinMaxScaler
import joblib
import warnings
warnings.filterwarnings('ignore')
class XGBoostModel:
def __init__(self):
"""
初始化 XGBoost 模型類別
根據訓練腳本 xgboost_for_stock_trend_&_prices_prediction_gpu_v_2_1_3.py 的配置
"""
# 根據訓練腳本的 new_feature_columns,確保順序完全一致
self.feature_columns = [
'close', # 前一日收盤價
'return_t-1', # 前一日報酬率
'return_t-5', # 過去 5 日累積報酬率
'MA5_close', # 5 日移動平均價
'volatility_5d', # 5 日報酬標準差
'volume_ratio_5d', # 今日成交量 ÷ 5 日均量
'MACD_diff', # MACD - signal
'dji_return_t-1', # 前一日道瓊指數報酬率
'sox_return_t-1', # 前一日費半指數報酬率
'NEWS', # 新聞情緒分數
'MACDvol', # MACD柱狀圖
'RSI_14', # 14日RSI
'ADX', # ADX指標
'volume_weighted_return' # 成交量加權報酬率
]
# 預測目標對應(根據訓練腳本的 train_y)
self.prediction_mapping = {
'Change_pct_t1_pred': 1, # 1天後漲幅%
'Change_pct_t5_pred': 5, # 5天後漲幅%
'Change_pct_t10_pred': 10, # 10天後漲幅%
'Change_pct_t20_pred': 20 # 20天後漲幅%
}
self.model = None
self.scaler = None
self.is_model_loaded = False
# 模型檔案路徑
self.model_path = 'xgboost_model.json'
self.scaler_path = 'feature_scaler.pkl'
def create_features_from_stock_data(self, stock_data):
"""
從股票資料創建所需的特徵
完全對應訓練腳本中的 create_new_features 函數
Args:
stock_data: yfinance 格式的股票資料 DataFrame
Returns:
processed_df: 包含所有特徵的 DataFrame
"""
df = stock_data.copy()
# 確保必要的基礎欄位存在
required_base_columns = ['Close', 'Volume', 'High', 'Low']
for col in required_base_columns:
if col not in df.columns:
raise ValueError(f"缺少必要的基礎欄位: {col}")
# 統一欄位名稱(yfinance 使用大寫)
df['close'] = df['Close']
df['volume'] = df['Volume']
# 1. return_t-1 — 前一日報酬率
df['return_t-1'] = df['close'].pct_change()
# 2. return_t-5 — 過去 5 日累積報酬率
df['return_t-5'] = (df['close'] / df['close'].shift(5) - 1)
# 3. MA5_close — 5 日移動平均價
df['MA5_close'] = df['close'].rolling(window=5).mean()
# 4. volatility_5d — 5 日報酬標準差
df['volatility_5d'] = df['return_t-1'].rolling(window=5).std()
# 5. volume_ratio_5d — 今日成交量 ÷ 5 日均量
df['volume_5d_avg'] = df['volume'].rolling(window=5).mean()
df['volume_ratio_5d'] = df['volume'] / df['volume_5d_avg']
# 6. MACD_diff — MACD - signal
exp1 = df['close'].ewm(span=12).mean()
exp2 = df['close'].ewm(span=26).mean()
macd_line = exp1 - exp2
signal_line = macd_line.ewm(span=9).mean()
df['MACD_diff'] = macd_line - signal_line
# 7-8. 美股指數報酬率(需要外部資料,暫設為0)
df['dji_return_t-1'] = 0.0 # 這需要從外部獲取道瓊指數資料
df['sox_return_t-1'] = 0.0 # 這需要從外部獲取費半指數資料
# 9. NEWS — 新聞情緒分數(需要外部資料,暫設為0)
df['NEWS'] = 0.0
# 10. MACDvol — MACD柱狀圖
df['MACDvol'] = macd_line - signal_line
# 11. RSI_14 — 14日RSI
delta = df['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / loss
df['RSI_14'] = 100 - (100 / (1 + rs))
# 12. ADX — 平均趨向指標
df['up_move'] = df['High'] - df['High'].shift(1)
df['down_move'] = df['Low'].shift(1) - df['Low']
df['+DM'] = np.where((df['up_move'] > df['down_move']) & (df['up_move'] > 0), df['up_move'], 0)
df['-DM'] = np.where((df['down_move'] > df['up_move']) & (df['down_move'] > 0), df['down_move'], 0)
high_low = df['High'] - df['Low']
high_close_prev = np.abs(df['High'] - df['close'].shift(1))
low_close_prev = np.abs(df['Low'] - df['close'].shift(1))
df['TR'] = np.maximum.reduce([high_low, high_close_prev, low_close_prev])
df['+DI'] = (df['+DM'].ewm(com=13, adjust=False).mean() / df['TR'].ewm(com=13, adjust=False).mean()) * 100
df['-DI'] = (df['-DM'].ewm(com=13, adjust=False).mean() / df['TR'].ewm(com=13, adjust=False).mean()) * 100
df['DX'] = np.abs(df['+DI'] - df['-DI']) / (df['+DI'] + df['-DI']) * 100
df['ADX'] = df['DX'].ewm(com=13, adjust=False).mean()
# 13. volume_weighted_return — 成交量加權報酬率
df['volume_weighted_return'] = np.abs(df['return_t-1']) * df['volume']
# 清理輔助欄位
cleanup_columns = ['volume_5d_avg', 'up_move', 'down_move', '+DM', '-DM', 'TR', '+DI', '-DI', 'DX']
df.drop(columns=[col for col in cleanup_columns if col in df.columns], inplace=True)
# 填補 NaN 值
df.fillna(method='ffill', inplace=True)
df.fillna(0, inplace=True) # 剩餘的 NaN 用 0 填補
return df
def load_model(self, model_name='xgboost_model'):
"""
載入訓練好的模型和標準化器
Args:
model_name: 模型名稱
Returns:
bool: 載入是否成功
"""
try:
# 載入 XGBoost 模型
if os.path.exists(self.model_path):
self.model = xgb.XGBRegressor()
self.model.load_model(self.model_path)
print(f"成功載入模型: {self.model_path}")
else:
print(f"警告:模型檔案 {self.model_path} 不存在")
return False
# 嘗試載入標準化器(如果存在)
if os.path.exists(self.scaler_path):
self.scaler = joblib.load(self.scaler_path)
print(f"成功載入標準化器: {self.scaler_path}")
else:
print(f"警告:未找到標準化器檔案 {self.scaler_path},將使用原始數據進行預測")
# 根據訓練腳本,模型沒有使用標準化,所以這是正常的
self.scaler = None
self.is_model_loaded = True
return True
except Exception as e:
print(f"載入模型時發生錯誤: {e}")
return False
def predict(self, model_name, input_data):
"""
使用載入的模型進行預測
Args:
model_name: 模型名稱(保持接口一致性)
input_data: 輸入特徵 DataFrame 或 numpy array
Returns:
dict: 預測結果字典,包含各時間框架的漲幅百分比
"""
if not self.is_model_loaded:
if not self.load_model(model_name):
raise RuntimeError("模型載入失敗,無法進行預測")
try:
# 確保輸入是 DataFrame 格式
if isinstance(input_data, np.ndarray):
if input_data.shape[1] != len(self.feature_columns):
raise ValueError(f"輸入特徵數量不匹配。期望: {len(self.feature_columns)}, 實際: {input_data.shape[1]}")
input_df = pd.DataFrame(input_data, columns=self.feature_columns)
elif isinstance(input_data, pd.DataFrame):
input_df = input_data.copy()
else:
raise ValueError("輸入數據必須是 DataFrame 或 numpy array")
# 確保所有必需的特徵都存在
missing_features = [col for col in self.feature_columns if col not in input_df.columns]
if missing_features:
raise ValueError(f"缺少必要的特徵欄位: {missing_features}")
# 選擇並排序特徵
input_features = input_df[self.feature_columns]
# 檢查 NaN 值
if input_features.isnull().any().any():
print("警告:輸入數據包含 NaN 值,將用 0 填補")
input_features = input_features.fillna(0)
# 應用標準化(如果有的話)
if self.scaler is not None:
input_features_scaled = self.scaler.transform(input_features)
else:
input_features_scaled = input_features.values
# 進行預測
predictions = self.model.predict(input_features_scaled)
# 處理預測結果的維度
if predictions.ndim == 1:
# 如果是單一樣本的預測,reshape 成 (1, 4)
if len(predictions) == 4:
predictions = predictions.reshape(1, -1)
else:
raise ValueError(f"預測結果維度不正確: {predictions.shape}")
# 確保結果是 (n_samples, 4) 的形狀
if predictions.shape[1] != 4:
raise ValueError(f"模型預測輸出維度錯誤,期望 4 個輸出,實際: {predictions.shape[1]}")
# 構建預測結果字典(取第一個樣本的預測)
result = {}
prediction_keys = ['Change_pct_t1_pred', 'Change_pct_t5_pred', 'Change_pct_t10_pred', 'Change_pct_t20_pred']
for i, key in enumerate(prediction_keys):
result[key] = float(predictions[0, i]) # 取第一個樣本的第 i 個預測
return result
except Exception as e:
print(f"預測過程中發生錯誤: {e}")
raise
def predict_single_timeframe(self, stock_data, days, news_score=0.0, us_market_data=None):
"""
預測單一時間框架的漲幅
Args:
stock_data: 股票歷史數據 (yfinance格式)
days: 預測天數 (1, 5, 10, 20)
news_score: 新聞情緒分數
us_market_data: 美股市場數據 (可選)
Returns:
float: 預測的漲幅百分比
"""
try:
# 創建特徵
processed_df = self.create_features_from_stock_data(stock_data)
# 使用最新的數據點
latest_data = processed_df.iloc[-1:].copy()
# 更新新聞分數
latest_data.loc[latest_data.index[0], 'NEWS'] = news_score
# 更新美股數據(如果提供)
if us_market_data:
if 'DJI' in us_market_data and len(us_market_data) > 1:
dji_return = (us_market_data['DJI'][-1] - us_market_data['DJI'][-2]) / us_market_data['DJI'][-2]
latest_data.loc[latest_data.index[0], 'dji_return_t-1'] = dji_return
if 'SOX' in us_market_data and len(us_market_data) > 1:
sox_return = (us_market_data['SOX'][-1] - us_market_data['SOX'][-2]) / us_market_data['SOX'][-2]
latest_data.loc[latest_data.index[0], 'sox_return_t-1'] = sox_return
# 進行預測
predictions = self.predict('xgboost_model', latest_data)
# 根據天數返回對應的預測值
if days == 1:
return predictions['Change_pct_t1_pred']
elif days == 5:
return predictions['Change_pct_t5_pred']
elif days == 10:
return predictions['Change_pct_t10_pred']
elif days == 20:
return predictions['Change_pct_t20_pred']
else:
# 對於其他天數,使用最接近的預測值
if days <= 3:
return predictions['Change_pct_t1_pred']
elif days <= 7:
return predictions['Change_pct_t5_pred']
elif days <= 15:
return predictions['Change_pct_t10_pred']
else:
return predictions['Change_pct_t20_pred']
except Exception as e:
print(f"單一時間框架預測失敗: {e}")
return 0.0
def validate_input_features(self, input_data):
"""
驗證輸入特徵的完整性和有效性
Args:
input_data: 輸入的特徵數據
Returns:
dict: 驗證結果
"""
validation_result = {
'is_valid': True,
'missing_features': [],
'invalid_values': [],
'warnings': []
}
try:
if isinstance(input_data, np.ndarray):
if input_data.shape[1] != len(self.feature_columns):
validation_result['is_valid'] = False
validation_result['warnings'].append(f"特徵數量不匹配: 期望{len(self.feature_columns)}, 實際{input_data.shape[1]}")
return validation_result
# 檢查缺失特徵
if isinstance(input_data, pd.DataFrame):
missing_features = [col for col in self.feature_columns if col not in input_data.columns]
if missing_features:
validation_result['missing_features'] = missing_features
validation_result['is_valid'] = False
# 檢查數值有效性
for feature in self.feature_columns:
if feature in input_data.columns:
if input_data[feature].isnull().any():
validation_result['invalid_values'].append(f"{feature}: 包含NaN值")
if np.isinf(input_data[feature]).any():
validation_result['invalid_values'].append(f"{feature}: 包含無限值")
return validation_result
except Exception as e:
validation_result['is_valid'] = False
validation_result['warnings'].append(f"驗證過程出錯: {e}")
return validation_result
def get_feature_importance(self):
"""
獲取模型的特徵重要性
Returns:
dict: 特徵重要性字典
"""
if not self.is_model_loaded:
return {}
try:
importance_scores = self.model.feature_importances_
importance_dict = {}
for i, feature in enumerate(self.feature_columns):
importance_dict[feature] = float(importance_scores[i])
# 按重要性排序
sorted_importance = dict(sorted(importance_dict.items(), key=lambda x: x[1], reverse=True))
return sorted_importance
except Exception as e:
print(f"獲取特徵重要性失敗: {e}")
return {}
def get_prediction_confidence(self, input_data):
"""
估算預測信心度
Args:
input_data: 輸入特徵數據
Returns:
float: 信心度分數 (0-1)
"""
try:
# 基礎信心度檢查
validation_result = self.validate_input_features(input_data)
if not validation_result['is_valid']:
return 0.3 # 數據有問題時給予較低信心度
# 根據特徵完整性調整信心度
base_confidence = 0.7
if validation_result['missing_features']:
base_confidence -= len(validation_result['missing_features']) * 0.05
if validation_result['invalid_values']:
base_confidence -= len(validation_result['invalid_values']) * 0.05
return max(0.3, min(0.9, base_confidence))
except Exception as e:
print(f"計算預測信心度失敗: {e}")
return 0.5
def validate_input(self, input_df):
"""
驗證輸入數據的有效性
Args:
input_df (pd.DataFrame): 輸入特徵
Returns:
tuple: (是否有效, 錯誤訊息列表)
"""
errors = []
try:
# 檢查是否為空
if input_df.empty:
errors.append("輸入數據為空")
# 檢查必要特徵
required_features = ['close', 'return_t-1']
for feature in required_features:
if feature not in input_df.columns:
errors.append(f"缺少必要特徵:{feature}")
elif pd.isna(input_df[feature].iloc[0]):
errors.append(f"必要特徵包含空值:{feature}")
# 檢查數據合理性
if 'close' in input_df.columns:
close_price = input_df['close'].iloc[0]
if close_price <= 0:
errors.append(f"收盤價不合理:{close_price}")
if 'return_t-1' in input_df.columns:
return_val = input_df['return_t-1'].iloc[0]
if abs(return_val) > 0.5: # 單日漲跌幅超過50%可能有問題
errors.append(f"報酬率異常:{return_val:.3f}")
return len(errors) == 0, errors
except Exception as e:
errors.append(f"驗證過程發生錯誤:{e}")
return False, errors
def get_feature_importance(self):
"""
獲取特徵重要性
Returns:
dict: 特徵重要性字典
"""
try:
if self.model is None:
return None
# 獲取特徵重要性
importance_scores = self.model.feature_importances_
# 創建特徵重要性字典
importance_dict = {}
for i, feature in enumerate(self.feature_columns):
if i < len(importance_scores):
importance_dict[feature] = float(importance_scores[i])
# 按重要性排序
sorted_importance = dict(sorted(importance_dict.items(),
key=lambda x: x[1],
reverse=True))
return sorted_importance
except Exception as e:
print(f"獲取特徵重要性時發生錯誤:{e}")
return None
def explain_prediction(self, input_df, predictions):
"""
解釋預測結果
Args:
input_df (pd.DataFrame): 輸入特徵
predictions (dict): 預測結果
Returns:
str: 解釋文本
"""
try:
explanation = []
explanation.append("=== 預測解釋 ===")
# 分析主要驅動因素
feature_importance = self.get_feature_importance()
if feature_importance:
explanation.append("主要影響因素:")
top_features = list(feature_importance.keys())[:3]
for feature in top_features:
if feature in input_df.columns:
value = input_df[feature].iloc[0]
importance = feature_importance[feature]
explanation.append(f" - {feature}: {value:.4f} (重要性: {importance:.3f})")
# 分析預測趨勢
explanation.append("\n預測趨勢分析:")
for key, value in predictions.items():
days = key.split('_')[2][1:]
trend = "看漲" if value > 1 else "看跌" if value < -1 else "持平"
explanation.append(f" - {days}日: {value:+.2f}% ({trend})")
return "\n".join(explanation)
except Exception as e:
return f"解釋生成失敗: {e}"
# 範例使用方式
if __name__ == "__main__":
# 初始化模型
model = XGBoostModel()
# 準備測試數據
test_data = pd.DataFrame({
'close': [150.0],
'return_t-1': [0.02],
'return_t-5': [0.05],
'MA5_close': [148.0],
'volatility_5d': [0.025],
'volume_ratio_5d': [1.2],
'MACD_diff': [0.5],
'dji_return_t-1': [0.01],
'sox_return_t-1': [0.015],
'NEWS': [0.1]
})
print("測試模型預測器...")
print("輸入特徵:")
print(test_data)
# 進行預測
predictions = model.predict('xgboost_model', test_data)
if predictions:
print("\n預測成功!")
print("結果說明:輸出為相對於當前價格的漲幅百分比")
# 解釋預測
explanation = model.explain_prediction(test_data, predictions)
print(f"\n{explanation}")
# 計算信心度
confidence = model.get_prediction_confidence(test_data)
print(f"\n預測信心度: {confidence:.2%}")
else:
print("預測失敗!") |