File size: 23,044 Bytes
5f10e37 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 |
# rnn/app/models/rnn_predictor.py (ou o nome que você deu, ex: data_preprocessing_api.py)
import os
import numpy as np
import pandas as pd
import tensorflow as tf
from typing import Optional, Dict, List, Tuple
try:
import pandas_ta as ta
except ImportError:
print("AVISO URGENTE (RNN PREDICTOR): pandas_ta não instalado! Cálculo de features falhará.")
ta = None
try:
import joblib
except ImportError:
print("AVISO URGENTE (RNN PREDICTOR): joblib não instalado! Carregamento de scalers falhará.")
joblib = None
# Importar DO CONFIG.PY para consistência
from ..config import ( # Supondo que config.py está um nível acima na pasta 'app'
WINDOW_SIZE as DEFAULT_WINDOW_SIZE, # Renomeado para evitar conflito de nome local
# BASE_FEATURE_COLS, # Não precisamos da lista de nomes base aqui diretamente, mas sim das colunas específicas
# NUM_FEATURES, # Será derivado do input_shape do modelo
MODEL_SAVE_DIR as DEFAULT_MODEL_SAVE_DIR, # Para construir caminhos
MODEL_NAME as DEFAULT_MODEL_NAME,
PRICE_VOL_SCALER_NAME as DEFAULT_PRICE_VOL_SCALER_NAME,
INDICATOR_SCALER_NAME as DEFAULT_INDICATOR_SCALER_NAME,
EXPECTED_SCALED_FEATURES_FOR_MODEL # Esta é a ordem final das features escaladas
)
# Defina as colunas EXATAS que cada scaler da API espera, baseado no config e no script de treino
# Estas devem corresponder às colunas usadas para FITAR os scalers no train_rnn_model.py
# ANTES de serem escaladas pelo general_training_scaler.
API_PRICE_VOL_COLS_TO_SCALE = ['close_div_atr', 'volume_div_atr', 'open_div_atr', 'high_div_atr', 'low_div_atr', 'body_size_norm_atr']
API_INDICATOR_COLS_TO_SCALE = ['log_return', 'rsi_14', 'atr', 'bbp', 'cci_37', 'mfi_37', 'body_vs_avg_body', 'macd', 'sma_10_div_atr', 'adx_14', 'volume_zscore', 'buy_condition_v1']
def calculate_features_for_prediction(ohlcv_df: pd.DataFrame, logger_instance) -> Optional[pd.DataFrame]:
"""
Calcula TODAS as features base necessárias para a predição,
EXATAMENTE como no script de treinamento ANTES do escalonamento final.
"""
if logger_instance is None: import logging; logger_instance = logging.getLogger(__name__)
if ta is None: logger_instance.error("pandas_ta não está disponível para calcular features."); return None
df = ohlcv_df.copy()
required_ohlc_cols = ['open', 'high', 'low', 'close', 'volume']
if not all(col in df.columns for col in required_ohlc_cols):
logger_instance.error(f"DataFrame OHLCV não contém todas as colunas necessárias: {required_ohlc_cols}")
return None
try:
# Indicadores base
df.ta.sma(length=10, close='close', append=True, col_names=('sma_10',))
df.ta.rsi(length=14, close='close', append=True, col_names=('rsi_14',))
macd_out = df.ta.macd(close='close', append=False)
if macd_out is not None:
df['macd'] = macd_out.iloc[:,0] # MACD line
df['macds'] = macd_out.iloc[:,2] # Signal line
df.ta.atr(length=14, append=True, col_names=('atr',))
df.ta.bbands(length=20, close='close', append=True, col_names=('bbl', 'bbm', 'bbu', 'bbb', 'bbp'))
df.ta.cci(length=37, append=True, col_names=('cci_37',)) # Usando o período do seu log
df.ta.mfi(length=37, append=True, col_names=('mfi_37',)) # Usando o período do seu log
df.ta.adx(length=14, append=True) # Adiciona ADX_14, DMP_14, DMN_14
if 'ADX_14' not in df.columns and 'ADX_14_ADX' in df.columns: # Handle naming variation
df.rename(columns={'ADX_14_ADX': 'ADX_14'}, inplace=True)
# Features de Volume Z-score
rolling_vol_mean = df['volume'].rolling(window=20).mean()
rolling_vol_std = df['volume'].rolling(window=20).std()
df['volume_zscore'] = (df['volume'] - rolling_vol_mean) / (rolling_vol_std + 1e-9)
# Features de Candle
df['body_size'] = abs(df['close'] - df['open'])
df['body_size_norm_atr'] = df['body_size'] / (df['atr'] + 1e-9)
df['body_vs_avg_body'] = df['body_size'] / (df['body_size'].rolling(window=20).mean() + 1e-9)
# Log Return
df['log_return'] = np.log(df['close'] / df['close'].shift(1))
# Normalização pelo ATR (DEPOIS de calcular ATR e outras features que o usam)
# Garanta que o ATR não é zero para evitar divisão por zero
df_atr_valid = df[df['atr'] > 1e-7].copy() # Trabalhar em cópia para evitar SettingWithCopyWarning
if df_atr_valid.empty: # Se todos os ATRs forem zero ou NaN
logger_instance.warning("ATR é zero ou NaN para todos os dados, não é possível normalizar pelo ATR.")
# Preencher colunas _div_atr com um valor (ex: 0 ou o valor original) ou retornar None
# Para simplificar, vamos preencher com o valor original se ATR for problemático
cols_to_norm_by_atr = ['open', 'high', 'low', 'close', 'volume', 'sma_10', 'macd']
for col in cols_to_norm_by_atr:
df[f'{col}_div_atr'] = df[col] # Fallback
else:
df['open_div_atr'] = df['open'] / (df['atr'] + 1e-9)
df['high_div_atr'] = df['high'] / (df['atr'] + 1e-9)
df['low_div_atr'] = df['low'] / (df['atr'] + 1e-9)
df['close_div_atr'] = df['close'] / (df['atr'] + 1e-9)
df['volume_div_atr'] = df['volume'] / (df['atr'] + 1e-9)
if 'sma_10' in df.columns: df['sma_10_div_atr'] = df['sma_10'] / (df['atr'] + 1e-9)
if 'macd' in df.columns: df['macd_div_atr'] = df['macd'] / (df['atr'] + 1e-9)
# Feature de Condição de Compra
sma_50_series = df.ta.sma(length=50, close='close', append=False)
if sma_50_series is not None: df['sma_50'] = sma_50_series
else: df['sma_50'] = np.nan
if all(col in df.columns for col in ['macd', 'macds', 'rsi_14', 'close', 'sma_50']):
df['buy_condition_v1'] = ((df['macd'] > df['macds']) & (df['rsi_14'] > 50) & (df['close'] > df['sma_50'])).astype(int)
else:
df['buy_condition_v1'] = 0 # Fallback
df.dropna(inplace=True) # Remove todos os NaNs gerados
logger_instance.info("Features para predição calculadas.")
return df
except Exception as e:
logger_instance.error(f"Erro ao calcular features para predição: {e}", exc_info=True)
return None
def preprocess_for_model_prediction(
features_df: pd.DataFrame,
price_vol_scaler,
indicator_scaler,
expected_scaled_feature_order: List[str], # Vem do config.EXPECTED_SCALED_FEATURES_FOR_MODEL
window_size: int,
logger_instance
) -> np.ndarray:
"""
Aplica scalers carregados e formata os dados para a entrada do modelo.
`features_df` deve conter TODAS as colunas de `API_PRICE_VOL_COLS_TO_SCALE` e `API_INDICATOR_COLS_TO_SCALE`.
"""
if features_df.empty or len(features_df) < window_size:
logger_instance.warning(f"Preprocessing API: Dados insuficientes para janela. Necessário: {window_size}, Disponível: {len(features_df)}")
return np.array([])
# Pegar a última janela de dados
window_data_df = features_df.tail(window_size).copy()
if len(window_data_df) < window_size: # Checagem extra
logger_instance.warning(f"Preprocessing API: Janela de dados incompleta após tail. Necessário: {window_size}, Disponível: {len(window_data_df)}")
return np.array([])
scaled_features_dict = {}
# Aplicar scaler de Preço/Volume
if price_vol_scaler and all(col in window_data_df.columns for col in API_PRICE_VOL_COLS_TO_SCALE):
scaled_pv = price_vol_scaler.transform(window_data_df[API_PRICE_VOL_COLS_TO_SCALE])
for i, col_name in enumerate(API_PRICE_VOL_COLS_TO_SCALE):
scaled_features_dict[f"{col_name}_scaled"] = scaled_pv[:, i]
elif not price_vol_scaler:
logger_instance.error("Preprocessing API: price_volume_scaler não carregado!")
return np.array([])
else:
missing = [col for col in API_PRICE_VOL_COLS_TO_SCALE if col not in window_data_df.columns]
logger_instance.error(f"Preprocessing API: Colunas ausentes para price_volume_scaler: {missing}")
return np.array([])
# Aplicar scaler de Indicadores
if indicator_scaler and all(col in window_data_df.columns for col in API_INDICATOR_COLS_TO_SCALE):
scaled_ind = indicator_scaler.transform(window_data_df[API_INDICATOR_COLS_TO_SCALE])
for i, col_name in enumerate(API_INDICATOR_COLS_TO_SCALE):
scaled_features_dict[f"{col_name}_scaled"] = scaled_ind[:, i]
elif not indicator_scaler:
logger_instance.error("Preprocessing API: indicator_scaler não carregado!")
return np.array([])
else:
missing = [col for col in API_INDICATOR_COLS_TO_SCALE if col not in window_data_df.columns]
logger_instance.error(f"Preprocessing API: Colunas ausentes para indicator_scaler: {missing}")
return np.array([])
# Criar DataFrame com todas as features escaladas e na ordem correta
try:
# Construir o array de features na ordem de EXPECTED_SCALED_FEATURES_FOR_MODEL
final_ordered_features_list = []
for scaled_col_name in expected_scaled_feature_order: # Esta é config.EXPECTED_SCALED_FEATURES_FOR_MODEL
if scaled_col_name in scaled_features_dict:
final_ordered_features_list.append(scaled_features_dict[scaled_col_name])
else:
# Se uma feature em EXPECTED_SCALED_FEATURES_FOR_MODEL não foi gerada/escalada
# Isso indica um desalinhamento entre config.py e a lógica aqui.
# Exemplo: se 'buy_condition_v1' não for escalada e seu nome escalado não existir.
# Tentativa: pegar a coluna original se o _scaled não existir E a original estiver em expected_scaled_feature_order
original_col_name = scaled_col_name.replace("_scaled", "")
if original_col_name in window_data_df.columns and original_col_name == scaled_col_name : # Se a feature esperada é a original não escalada
logger_instance.warning(f"Preprocessing API: Usando feature original não escalada '{original_col_name}' conforme esperado.")
final_ordered_features_list.append(window_data_df[original_col_name].values)
else:
logger_instance.error(f"Preprocessing API: Feature escalada '{scaled_col_name}' esperada pelo modelo não foi encontrada no dicionário de features escaladas.")
return np.array([])
# Transpor para ter (timesteps, features) e depois adicionar dimensão de batch
final_features_array = np.stack(final_ordered_features_list, axis=-1) # (window_size, num_features)
except KeyError as e_key:
logger_instance.error(f"Preprocessing API: Erro de chave ao montar features finais. Provavelmente uma feature em "
f"EXPECTED_SCALED_FEATURES_FOR_MODEL não foi corretamente gerada/escalada. Erro: {e_key}", exc_info=True)
return np.array([])
except Exception as e_stack:
logger_instance.error(f"Preprocessing API: Erro ao empilhar features finais: {e_stack}", exc_info=True)
return np.array([])
if final_features_array.shape != (window_size, len(expected_scaled_feature_order)):
logger_instance.error(f"Preprocessing API: Shape incorreto após escalonamento e ordenação. "
f"Esperado: ({window_size}, {len(expected_scaled_feature_order)}), "
f"Obtido: {final_features_array.shape}")
return np.array([])
reshaped_data = np.reshape(final_features_array, (1, window_size, len(expected_scaled_feature_order)))
logger_instance.info(f"Preprocessing API: Dados pré-processados com shape: {reshaped_data.shape}")
return reshaped_data
class RNNModelPredictor:
def __init__(self, model_dir: str, model_filename: str,
pv_scaler_filename: str, ind_scaler_filename: str,
logger_instance):
self.model_path = os.path.join(model_dir, model_filename)
self.pv_scaler_path = os.path.join(model_dir, pv_scaler_filename)
self.ind_scaler_path = os.path.join(model_dir, ind_scaler_filename)
self.model: Optional[tf.keras.Model] = None
self.price_volume_scaler = None
self.indicator_scaler = None
self.logger = logger_instance
self.num_model_features = 0 # Será definido ao carregar o modelo
self._load_model_and_scalers()
def _load_scaler(self, scaler_path: str, scaler_name: str):
if not joblib: self.logger.error(f"Joblib não importado, não é possível carregar scaler {scaler_name}."); return None
try:
if os.path.exists(scaler_path):
scaler = joblib.load(scaler_path)
self.logger.info(f"Scaler {scaler_name} carregado de {scaler_path}.")
return scaler
else:
self.logger.error(f"Arquivo do scaler {scaler_name} NÃO ENCONTRADO em {scaler_path}.")
return None
except Exception as e:
self.logger.error(f"Erro ao carregar scaler {scaler_name} de {scaler_path}: {e}", exc_info=True)
return None
def _load_model_and_scalers(self):
try:
self.logger.info(f"RNNPredictor: Carregando modelo de {self.model_path}...")
if not os.path.exists(self.model_path):
self.logger.error(f"Arquivo do modelo NÃO ENCONTRADO em {self.model_path}")
return
self.model = tf.keras.models.load_model(self.model_path)
self.logger.info(f"RNNPredictor: Modelo carregado. Input shape: {self.model.input_shape}")
# (batch_size, timesteps, features) -> Ex: (None, 60, 19)
if len(self.model.input_shape) == 3:
self.num_model_features = self.model.input_shape[2]
# Validar contra EXPECTED_SCALED_FEATURES_FOR_MODEL do config.py
if self.num_model_features != len(EXPECTED_SCALED_FEATURES_FOR_MODEL):
self.logger.error(f"DISCREPÂNCIA DE FEATURES! Modelo espera {self.num_model_features} features, "
f"mas EXPECTED_SCALED_FEATURES_FOR_MODEL tem {len(EXPECTED_SCALED_FEATURES_FOR_MODEL)} features.")
self.model = None # Invalida o modelo se houver discrepância
return
else:
self.logger.error(f"Input shape do modelo inesperado: {self.model.input_shape}")
self.model = None
return
self.price_volume_scaler = self._load_scaler(self.pv_scaler_path, "Price/Volume (API)")
self.indicator_scaler = self._load_scaler(self.ind_scaler_path, "Indicator (API)")
if self.price_volume_scaler is None or self.indicator_scaler is None:
self.logger.error("Um ou ambos os scalers não puderam ser carregados. O preditor pode não funcionar.")
# Você pode decidir se invalida o modelo aqui também
# self.model = None
except Exception as e:
self.logger.error(f"RNNPredictor: Falha crítica ao carregar modelo ou scalers: {e}", exc_info=True)
self.model = None
self.price_volume_scaler = None
self.indicator_scaler = None
async def predict_for_asset_ohlcv(
self,
ohlcv_df_raw: pd.DataFrame, # Espera um DataFrame pandas com OHLCV bruto
api_operation_threshold: float = 0.60 # Threshold para decisão final
) -> Tuple[Optional[int], Optional[float]]:
current_loop = asyncio.get_event_loop()
if self.model is None or self.price_volume_scaler is None or self.indicator_scaler is None:
self.logger.warning("RNNPredictor: Modelo ou scalers não carregados. Predição pulada.")
return None, None
# 1. Calcular todas as features base (exatamente como no treino)
features_for_scaling_df = await current_loop.run_in_executor(
None,
calculate_features_for_prediction, # Esta função precisa ser síncrona
ohlcv_df_raw,
self.logger
)
if features_for_scaling_df is None or features_for_scaling_df.empty:
self.logger.warning("RNNPredictor: Falha ao calcular features base para predição.")
return None, None
# 2. Aplicar scalers e formatar para o modelo
processed_input_sequence = await current_loop.run_in_executor(
None,
preprocess_for_model_prediction, # Esta função precisa ser síncrona
features_for_scaling_df,
self.price_volume_scaler,
self.indicator_scaler,
EXPECTED_SCALED_FEATURES_FOR_MODEL, # Importado do config.py
DEFAULT_WINDOW_SIZE, # Importado do config.py
self.logger
)
if processed_input_sequence.size == 0:
self.logger.warning(f"RNNPredictor: Pré-processamento da API falhou.")
return None, None
# 3. Fazer a predição
try:
raw_predictions = await current_loop.run_in_executor(None, self.model.predict, processed_input_sequence)
if raw_predictions.ndim == 2 and raw_predictions.shape[0] == 1 and raw_predictions.shape[1] == 1:
prediction_prob = float(raw_predictions[0, 0])
signal = int(prediction_prob > api_operation_threshold)
self.logger.info(f"RNNPredictor: Predição - Prob: {prediction_prob:.4f}, Threshold: {api_operation_threshold}, Sinal: {signal}")
return signal, prediction_prob
else:
self.logger.warning(f"RNNPredictor: Formato de predição inesperado: {raw_predictions.shape}")
return None, None
except Exception as e:
self.logger.error(f"RNNPredictor: Erro durante a predição com o modelo: {e}", exc_info=True)
return None, None
# --- Para teste local do rnn_predictor.py (exemplo) ---
async def test_predictor():
# Este é apenas um exemplo de como você poderia testar o predictor isoladamente.
# Você precisaria fornecer um DataFrame ohlcv_df_raw de teste.
# Configuração de Log para Teste
import logging
test_logger = logging.getLogger("TestRNNPredictor")
test_logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
if not test_logger.handlers: # Evitar adicionar handlers múltiplos se rodar várias vezes
test_logger.addHandler(handler)
# Caminhos (ajuste se config.py não estiver acessível diretamente assim)
# Supondo que este script está em rnn/app/models/ e config.py em rnn/app/
# e os modelos/scalers estão em rnn/app/model/
model_dir_path = os.path.join(os.path.dirname(__file__), "..", "model") # rnn/app/model/
predictor = RNNModelPredictor(
model_dir=model_dir_path,
model_filename=DEFAULT_MODEL_NAME, # Do config
pv_scaler_filename=DEFAULT_PRICE_VOL_SCALER_NAME, # Do config
ind_scaler_filename=DEFAULT_INDICATOR_SCALER_NAME, # Do config
logger_instance=test_logger
)
if predictor.model is None:
test_logger.error("Teste Falhou: Modelo não carregado no preditor.")
return
# Criar um DataFrame OHLCV de exemplo (substitua por dados reais ou carregados de CSV)
# Precisa ter pelo menos WINDOW_SIZE + (maior lookback de indicador) linhas
num_test_rows = DEFAULT_WINDOW_SIZE + 50
example_data = {
'open': np.random.rand(num_test_rows) * 1000 + 30000,
'high': np.random.rand(num_test_rows) * 1200 + 30000,
'low': np.random.rand(num_test_rows) * 800 + 29800,
'close': np.random.rand(num_test_rows) * 1000 + 30000,
'volume': np.random.rand(num_test_rows) * 100 + 10
}
# Gerar timestamps de 1h para trás a partir de agora
end_time = pd.Timestamp.now(tz='UTC')
start_time = end_time - pd.Timedelta(hours=num_test_rows - 1)
timestamps = pd.date_range(start=start_time, end=end_time, freq='h')
# Ajustar se o número de timestamps não bater exatamente com num_test_rows
if len(timestamps) > num_test_rows:
timestamps = timestamps[:num_test_rows]
elif len(timestamps) < num_test_rows:
# Ajustar dados de exemplo para o número de timestamps disponíveis
for key in example_data:
example_data[key] = example_data[key][:len(timestamps)]
test_ohlcv_df = pd.DataFrame(example_data, index=timestamps)
test_ohlcv_df['high'] = np.maximum(test_ohlcv_df['high'], test_ohlcv_df['close'], test_ohlcv_df['open'])
test_ohlcv_df['low'] = np.minimum(test_ohlcv_df['low'], test_ohlcv_df['close'], test_ohlcv_df['open'])
test_logger.info(f"DataFrame de teste criado com {len(test_ohlcv_df)} linhas.")
signal, probability = await predictor.predict_for_asset_ohlcv(test_ohlcv_df, api_operation_threshold=0.65)
if signal is not None:
test_logger.info(f"Resultado do Teste - Sinal: {signal}, Probabilidade: {probability:.4f}")
else:
test_logger.error("Resultado do Teste - Predição falhou.")
if __name__ == '__main__':
# Para rodar o teste: python rnn/app/models/rnn_predictor.py (ou o nome que você deu)
# Certifique-se que o config.py está acessível (ex: no mesmo diretório ou PYTHONPATH)
# E que os arquivos de modelo e scalers existem em app/model/
import asyncio
# Definir um caminho base para que os imports relativos de config funcionem se rodar daqui
# Isso é um pouco hacky para teste direto do script. Idealmente, teste via pytest com estrutura de projeto.
import sys
# Adiciona o diretório 'app' ao sys.path para encontrar config.py
# Supondo que este script rnn_predictor.py está em rnn/app/models/
# e config.py está em rnn/app/
# Sobe dois níveis para 'rnn', depois desce para 'app'
# app_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
# if app_dir not in sys.path:
# sys.path.insert(0, app_dir)
# Para o import `from ..config import ...` funcionar, você geralmente roda o módulo
# como parte de um pacote, não diretamente.
# Para teste direto, você pode precisar de um import absoluto ou ajuste de PYTHONPATH.
# Simplesmente para teste rápido, vamos assumir que o diretório de trabalho é 'rnn'
# e o import de config seria 'from app.config import ...'
# Se você rodar `python rnn/app/models/rnn_predictor.py` da raiz 'rnn', o import ..config deve funcionar.
print("Rodando teste do RNNModelPredictor...")
asyncio.run(test_predictor()) |