AITEST / model_predictor.py
AlanRex's picture
Update model_predictor.py
8fffdff verified
raw
history blame
9.24 kB
# 修正後的 model_predictor.py
import xgboost as xgb
import pandas as pd
import numpy as np
class XGBoostModel:
# 使用類別變數儲存所有可用的模型名稱及其對應的檔案名稱
MODELS = {
'xgboost_model': 'xgboost_model.json'
}
def __init__(self, default_model='xgboost_model'):
# 建立物件時,自動載入預設模型
self.current_model_name = default_model
self.model = self._load_model(self.current_model_name)
def _load_model(self, model_name):
if model_name not in self.MODELS:
raise ValueError(f"找不到模型 '{model_name}'。可用的模型名稱:{list(self.MODELS.keys())}")
filename = self.MODELS[model_name]
try:
# 建立一個新的 XGBoost 模型實例
model = xgb.XGBRegressor()
# 使用 XGBoost 內建的 load_model 方法載入檔案
model.load_model(filename)
return model
except Exception as e:
raise FileNotFoundError(f"無法在本地找到或載入模型檔案 '{filename}':{e}")
def _prepare_features(self, input_df):
"""
將 yfinance 的數據格式轉換為模型期望的格式
"""
# 創建新的 DataFrame 來存放轉換後的特徵
features_df = pd.DataFrame()
# 基本價格和交易量特徵(轉換為小寫)
if 'Close' in input_df.columns:
features_df['close'] = input_df['Close']
if 'Volume' in input_df.columns:
features_df['volume'] = input_df['Volume']
# 計算技術指標(如果不存在的話)
if len(input_df) >= 14: # 確保有足夠的數據計算指標
# RSI
delta = input_df['Close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / loss
features_df['RSI'] = 100 - (100 / (1 + rs))
# MACD
exp1 = input_df['Close'].ewm(span=12).mean()
exp2 = input_df['Close'].ewm(span=26).mean()
features_df['MACD'] = exp1 - exp2
features_df['MACDsign'] = features_df['MACD'].ewm(span=9).mean()
features_df['MACDvol'] = features_df['MACD'] - features_df['MACDsign']
# KD指標
if len(input_df) >= 9:
low_min = input_df['Low'].rolling(window=9).min()
high_max = input_df['High'].rolling(window=9).max()
rsv = (input_df['Close'] - low_min) / (high_max - low_min) * 100
features_df['K'] = rsv.ewm(com=2).mean()
features_df['D'] = features_df['K'].ewm(com=2).mean()
# DMI指標
up_move = input_df['High'] - input_df['High'].shift(1)
down_move = input_df['Low'].shift(1) - input_df['Low']
plus_dm = np.where((up_move > down_move) & (up_move > 0), up_move, 0)
minus_dm = np.where((down_move > up_move) & (down_move > 0), down_move, 0)
tr = np.max([input_df['High'] - input_df['Low'],
abs(input_df['High'] - input_df['Close'].shift(1)),
abs(input_df['Low'] - input_df['Close'].shift(1))], axis=0)
plus_dm_series = pd.Series(plus_dm, index=input_df.index)
minus_dm_series = pd.Series(minus_dm, index=input_df.index)
tr_series = pd.Series(tr, index=input_df.index)
features_df['+DI'] = (plus_dm_series.ewm(com=13, adjust=False).mean() /
tr_series.ewm(com=13, adjust=False).mean()) * 100
features_df['-DI'] = (minus_dm_series.ewm(com=13, adjust=False).mean() /
tr_series.ewm(com=13, adjust=False).mean()) * 100
dx = abs(features_df['+DI'] - features_df['-DI']) / (features_df['+DI'] + features_df['-DI']) * 100
features_df['ADX'] = dx.ewm(com=13, adjust=False).mean()
# 計算報酬率
if 'Close' in input_df.columns:
features_df['rate'] = input_df['Close'].pct_change()
# 模擬缺失的外部數據(使用合理的預設值或簡單的代理值)
# 這些值在實際部署時應該來自真實的數據源
features_df['DJI'] = 0.0 # 道瓊工業指數變化率的代理值
features_df['NAS'] = 0.0 # 納斯達克指數變化率的代理值
features_df['SOX'] = 0.0 # 費城半導體指數變化率的代理值
features_df['S&P_500'] = 0.0 # S&P 500指數變化率的代理值
features_df['TSM_ADR'] = 0.0 # 台積電ADR變化率的代理值
features_df['NEWS'] = 0.0 # 新聞情緒分數的代理值
features_df['business_climate'] = 25.0 # 景氣燈號的代理值
features_df['PMI'] = 50.0 # PMI指標的代理值
# 確保所有必要的欄位都存在,並填充缺失值
required_columns = [
'close', 'volume', 'rate', 'DJI', 'NAS', 'SOX', 'S&P_500', 'TSM_ADR',
'NEWS', 'RSI', 'MACD', 'MACDsign', 'MACDvol', 'K', 'D', '+DI', '-DI',
'ADX', 'business_climate', 'PMI'
]
for col in required_columns:
if col not in features_df.columns:
features_df[col] = 0.0 # 用0填充缺失的欄位
# 只保留模型需要的欄位,並確保順序正確
features_df = features_df[required_columns]
# 填充任何剩餘的NaN值
features_df = features_df.fillna(method='ffill').fillna(0)
return features_df.tail(1) # 只返回最後一行用於預測
def predict(self, model_name, input_df):
# 如果請求的模型名稱與目前載入的不同,則動態載入
if model_name != self.current_model_name:
self.model = self._load_model(model_name)
self.current_model_name = model_name
try:
# 轉換輸入特徵格式
prepared_features = self._prepare_features(input_df)
print(f"準備的特徵形狀: {prepared_features.shape}")
print(f"特徵欄位: {list(prepared_features.columns)}")
# 進行預測
predictions = self.model.predict(prepared_features)
print(f"原始預測結果: {predictions}")
print(f"預測結果形狀: {predictions.shape if hasattr(predictions, 'shape') else 'scalar'}")
print(f"預測結果類型: {type(predictions)}")
# 處理不同的輸出格式
if hasattr(predictions, 'shape'):
if len(predictions.shape) == 2 and predictions.shape[1] == 4:
# 情況1: 二維陣列,4個預測值
result = {
'Close_t0_pred': float(predictions[0][0]),
'Close_t5_pred': float(predictions[0][1]),
'Close_t10_pred': float(predictions[0][2]),
'Close_t20_pred': float(predictions[0][3])
}
elif len(predictions.shape) == 1 and len(predictions) == 4:
# 情況2: 一維陣列,4個預測值
result = {
'Close_t0_pred': float(predictions[0]),
'Close_t5_pred': float(predictions[1]),
'Close_t10_pred': float(predictions[2]),
'Close_t20_pred': float(predictions[3])
}
elif len(predictions.shape) == 1 and len(predictions) == 1:
# 情況3: 一維陣列,1個預測值(使用同一個值代表所有時期)
pred_value = float(predictions[0])
result = {
'Close_t0_pred': pred_value,
'Close_t5_pred': pred_value,
'Close_t10_pred': pred_value,
'Close_t20_pred': pred_value
}
else:
# 其他情況:嘗試使用第一個值
pred_value = float(predictions.flatten()[0])
result = {
'Close_t0_pred': pred_value,
'Close_t5_pred': pred_value,
'Close_t10_pred': pred_value,
'Close_t20_pred': pred_value
}
else:
# 標量值
pred_value = float(predictions)
result = {
'Close_t0_pred': pred_value,
'Close_t5_pred': pred_value,
'Close_t10_pred': pred_value,
'Close_t20_pred': pred_value
}
print(f"最終結果: {result}")
return result
except Exception as e:
print(f"預測過程中發生錯誤: {e}")
import traceback
traceback.print_exc()
raise e