File size: 22,873 Bytes
5f10e37 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 |
# rnn/data_handler_multi_asset.py (NOVO ARQUIVO)
import pandas as pd
import numpy as np
import yfinance as yf # Ou ccxt, dependendo da sua preferência de fonte de dados
import pandas_ta as ta
from datetime import datetime, timedelta, timezone
from typing import List, Dict, Optional
WINDOW_SIZE = 60
# Importar do seu config.py
# Assumindo que config.py está em ../config.py ou rnn/config.py
# Ajuste o import conforme sua estrutura.
# Se train_rnn_model.py e este data_handler estiverem na mesma pasta 'scripts',
# e config.py estiver um nível acima:
# from ..app/config.py import (
# MULTI_ASSET_LIST, TIMEFRAME, DAYS_OF_DATA_TO_FETCH,
# # etc. para features a serem calculadas
# )
# Por agora, vamos definir aqui para exemplo:
# EXEMPLO DE CONFIGURAÇÃO (Mova para config.py depois)
MULTI_ASSET_SYMBOLS = {
'eth': 'ETH-USD',
'btc': 'BTC-USD',
'ada': 'ADA-USD',
'sol': 'SOL-USD'
} # Use os tickers corretos para yfinance ou ccxt
TIMEFRAME_YFINANCE = '1h' # yfinance suporta '1m', '2m', '5m', '15m', '30m', '60m', '90m', '1h', '1d', '5d', '1wk', '1mo', '3mo'
# Para '1h', yfinance só retorna os últimos 730 dias. Para mais dados, use '1d'.
# Se usar ccxt, TIMEFRAME = '1h' como antes.
DAYS_TO_FETCH = 365 * 2 # 2 anos
# Lista das features base que você quer calcular para CADA ativo
# (as 19 que definimos antes)
INDIVIDUAL_ASSET_BASE_FEATURES = [
'open', 'high', 'low', 'close', 'volume', # OHLCV originais são necessários para os cálculos
'sma_10', 'rsi_14', 'macd', 'macds', 'atr', 'bbp', 'cci_37', 'mfi_37', 'adx_14',
'volume_zscore', 'body_size', 'body_size_norm_atr', 'body_vs_avg_body',
'log_return', 'buy_condition_v1',
'open_div_atr', 'high_div_atr',
'low_div_atr',
'close_div_atr',
'volume_div_atr',
'sma_10_div_atr' # 'sma_50' é calculada dentro de buy_condition_v1
# As colunas _div_atr serão criadas a partir destas
]
# Features que serão normalizadas pelo ATR
COLS_TO_NORM_BY_ATR = ['open', 'high', 'low', 'close', 'volume', 'sma_10', 'macd', 'body_size']
#----------------------------------
# -- New get multi asset data for rl
# ... (imports e outras funções como antes) ...
def get_multi_asset_data_for_rl(
asset_symbols_map: Dict[str, str],
timeframe_yf: str,
days_to_fetch: int,
logger_instance # Adicionar logger como parâmetro
) -> Optional[pd.DataFrame]: # Adicionar logger como parâmetro
all_asset_features_list: List[pd.DataFrame] = [] # Tipagem para clareza
min_data_length = float('inf')
print(all_asset_features_list)
#logger_instance.info(f"Iniciando get_multi_asset_data_for_rl para: {list(asset_symbols_map.keys())}")
for asset_key, yf_ticker in asset_symbols_map.items():
#logger_instance.info(f"\n--- Processando {asset_key} ({yf_ticker}) ---")
# ... (lógica de fetch_single_asset_ohlcv_yf como antes) ...
single_asset_ohlcv = fetch_single_asset_ohlcv_yf(yf_ticker, period=f"{days_to_fetch}d", interval=timeframe_yf) # Passar logger se a função aceitar
# if single_asset_ohlcv.empty:
# logger_instance.warning(f"Sem dados OHLCV para {yf_ticker}, pulando.")
# continue
single_asset_features = calculate_all_features_for_single_asset(single_asset_ohlcv)#, logger_instance)
if single_asset_features is None or single_asset_features.empty:
logger_instance.warning(f"Sem features calculadas para {yf_ticker}, pulando.")
continue
#logger_instance.info(f"Features para {asset_key} shape: {single_asset_features.shape}, Index Min: {single_asset_features.index.min()}, Index Max: {single_asset_features.index.max()}")
# Garantir que o índice é DatetimeIndex e UTC para todos antes de adicionar prefixo e à lista
if not isinstance(single_asset_features.index, pd.DatetimeIndex):
single_asset_features.index = pd.to_datetime(single_asset_features.index)
if single_asset_features.index.tz is None:
single_asset_features.index = single_asset_features.index.tz_localize('UTC')
else:
single_asset_features.index = single_asset_features.index.tz_convert('UTC')
single_asset_features = single_asset_features.add_prefix(f"{asset_key}_")
all_asset_features_list.append(single_asset_features)
min_data_length = min(min_data_length, len(single_asset_features))
if not all_asset_features_list:
logger_instance.error("Nenhum DataFrame de feature de ativo foi adicionado à lista.")
return None
if min_data_length == float('inf') or min_data_length < WINDOW_SIZE: # Adicionada checagem de WINDOW_SIZE
logger_instance.error(f"min_data_length inválido ({min_data_length}) ou menor que WINDOW_SIZE ({WINDOW_SIZE}). Não é possível truncar/usar.")
return None
#logger_instance.info(f"Menor número de linhas de dados encontrado (min_data_length): {min_data_length}")
# Truncar para garantir que todos os DFs tenham o mesmo comprimento ANTES do concat
# E que tenham pelo menos min_data_length.
# É importante que os ÍNDICES de data/hora se sobreponham para o join='inner' funcionar.
# Apenas pegar o .tail() pode não alinhar os timestamps se os DFs tiverem começos diferentes.
# Melhor abordagem: encontrar o índice comum mais recente e o mais antigo.
if not all_asset_features_list: # Checagem se a lista não ficou vazia por algum motivo
logger_instance.error("all_asset_features_list está vazia antes do alinhamento de índice.")
return None
print(all_asset_features_list)
# Alinhar DataFrames por um índice comum antes de concatenar
# 1. Encontrar o primeiro timestamp comum a todos
# 2. Encontrar o último timestamp comum a todos
# Ou, mais simples, confiar no join='inner' do concat, mas garantir que os DFs são válidos.
# Vamos simplificar e manter o truncamento pelo tail, mas com mais logs
# e garantir que são DataFrames.
truncated_asset_features_list = []
for i, df_asset in enumerate(all_asset_features_list):
if isinstance(df_asset, pd.DataFrame) and len(df_asset) >= min_data_length:
truncated_df = df_asset.tail(min_data_length)
#logger_instance.info(f" DF truncado {i} ({df_asset.columns[0].split('_')[0]}): shape {truncated_df.shape}, ")
#f"Index Min: {truncated_df.index.min()}, Max: {truncated_df.index.max()}")
truncated_asset_features_list.append(truncated_df)
else:
asset_name_debug = df_asset.columns[0].split('_')[0] if isinstance(df_asset, pd.DataFrame) and not df_asset.empty else f"DF_{i}"
logger_instance.warning(f" DF {asset_name_debug} inválido ou muito curto (len: {len(df_asset) if isinstance(df_asset, pd.DataFrame) else 'N/A'}) para truncamento. Pulando.")
if not truncated_asset_features_list:
#ogger_instance.error("Nenhum DataFrame válido restou após truncamento para concatenar.")
return None
# Se houver apenas UM DataFrame na lista, não precisa concatenar, apenas retorna ele.
if len(truncated_asset_features_list) == 1:
#logger_instance.info("Apenas um DataFrame de ativo processado, retornando-o diretamente.")
combined_df = truncated_asset_features_list[0]
else:
#logger_instance.info(f"Concatenando {len(truncated_asset_features_list)} DataFrames de ativos com join='inner'...")
try:
combined_df = pd.concat(truncated_asset_features_list, axis=1, join='outer')
print(combined_df)
except Exception as e_concat:
logger_instance.error(f"ERRO CRÍTICO durante pd.concat: {e_concat}", exc_info=True)
return None
combined_df.fillna(method='ffill', inplace=True)
# Depois do ffill, ainda pode haver NaNs no início se algum ativo começar depois dos outros.
if not combined_df.empty:
#logger_instance.info(f"Shape após ffill: {combined_df.shape}. Buscando primeiro/último índice válido...")
first_valid_index = combined_df.first_valid_index()
last_valid_index = combined_df.last_valid_index()
if pd.isna(first_valid_index) or pd.isna(last_valid_index):
print("Não foi possível determinar first/last_valid_index após ffill.")
return None
print(f"Primeiro índice válido: {first_valid_index}, Último índice válido: {last_valid_index}")
combined_df = combined_df.loc[first_valid_index:last_valid_index]
print(f"Shape após fatiar por first/last valid index: {combined_df.shape}")
# Um dropna final pode ser necessário se o ffill não pegou tudo (improvável, mas seguro)
combined_df.dropna(inplace=True)
print(f"Shape após dropna final (pós-fatiamento): {combined_df.shape}")
print("Imprimindo DF_COMBINED com index ")
print(combined_df)
#logger_instance.info(f"Tipo de combined_df após concat: {type(combined_df)}")
if not isinstance(combined_df, pd.DataFrame):
logger_instance.error(f"combined_df NÃO é um DataFrame após concat. Tipo: {type(combined_df)}")
return None
return combined_df
# if combined_df.empty:
# logger_instance.error("DataFrame combinado está VAZIO após concatenação e join='inner'. "
# "Isso geralmente significa que não há timestamps comuns entre TODOS os ativos processados.")
# # Adicionar mais depuração aqui se isso acontecer:
# # for i, df_trunc_debug in enumerate(truncated_asset_features_list):
# # logger_instance.info(f"Debug DF {i} - Head:\n{df_trunc_debug.head(3)}")
# # logger_instance.info(f"Debug DF {i} - Tail:\n{df_trunc_debug.tail(3)}")
# # logger_instance.info(f"DataFrame combinado ANTES do dropna final, shape: {combined_df.shape}")
# print(combined_df.head()) # Descomente para depuração pesada
# return None
# ...
#----------------------------------
# def get_multi_asset_data_for_rl(
# asset_symbols_map: Dict[str, str],
# timeframe_yf: str,
# days_to_fetch: int
# ) -> Optional[pd.DataFrame]:
# """
# Busca, processa e combina dados de múltiplos ativos em um DataFrame achatado.
# """
# all_asset_features_list = []
# # min_data_length = float('inf') # Inicializar com um valor alto
# # Vamos inicializar com 0 e pegar o len do primeiro DF válido
# min_data_length = 0
# first_valid_df_processed = False
# for asset_key, yf_ticker in asset_symbols_map.items():
# print(f"\n--- Processando {asset_key} ({yf_ticker}) ---")
# period_yf = f"{days_to_fetch}d"
# if timeframe_yf == '1h' and days_to_fetch > 730:
# print(f"AVISO: Para {timeframe_yf}, buscando no máximo 730 dias com yfinance para {yf_ticker}.")
# period_yf = "730d"
# single_asset_ohlcv = fetch_single_asset_ohlcv_yf(yf_ticker, period=period_yf, interval=timeframe_yf)
# if single_asset_ohlcv.empty:
# print(f"AVISO: Sem dados OHLCV para {yf_ticker}, pulando este ativo.")
# continue
# single_asset_features = calculate_all_features_for_single_asset(single_asset_ohlcv) # Passar logger se tiver
# if single_asset_features is None or single_asset_features.empty:
# print(f"AVISO: Sem features calculadas para {yf_ticker}, pulando este ativo.")
# continue
# single_asset_features = single_asset_features.add_prefix(f"{asset_key}_")
# all_asset_features_list.append(single_asset_features)
# # Atualizar min_data_length de forma mais segura
# if not first_valid_df_processed:
# min_data_length = len(single_asset_features)
# first_valid_df_processed = True
# else:
# min_data_length = min(min_data_length, len(single_asset_features))
# if not all_asset_features_list: # Se nenhum ativo foi processado com sucesso
# print("ERRO: Nenhum dado de feature de ativo foi processado com sucesso para a lista `all_asset_features_list`.")
# return None # Retorna None, que não tem '.empty' mas será checado por 'is None'
# if min_data_length == 0 : # Checagem adicional se algo deu muito errado
# print("ERRO: min_data_length é zero após processar ativos, não é possível truncar DataFrames.")
# return None
# print(f"Menor número de linhas de dados encontrado entre os ativos (min_data_length): {min_data_length}")
# truncated_asset_features_list = [df.tail(min_data_length) for df in all_asset_features_list if not df.empty and len(df) >= min_data_length]
# # Verificar se a lista de DFs truncados não está vazia ANTES de concatenar
# if not truncated_asset_features_list:
# print("ERRO: Nenhum DataFrame válido restou após o truncamento. Não é possível concatenar.")
# return None
# print(f"Concatenando {len(truncated_asset_features_list)} DataFrames de ativos...")
# try:
# combined_df = pd.concat(truncated_asset_features_list, axis=1, join='inner')
# except Exception as e_concat:
# print(f"ERRO durante pd.concat: {e_concat}")
# return None # Retorna None em caso de erro na concatenação
# # Agora, combined_df DEVE ser um DataFrame (mesmo que vazio se o join falhar)
# if not isinstance(combined_df, pd.DataFrame):
# print(f"ERRO: pd.concat não retornou um DataFrame. Tipo retornado: {type(combined_df)}")
# return None
# if combined_df.empty: # Esta checagem agora deve funcionar ou ser desnecessária se o anterior já retornou None
# print("ERRO: DataFrame combinado está vazio após concatenação e join. Verifique os dados dos ativos e o alinhamento de datas.")
# return None
# combined_df.dropna(inplace=True)
# if combined_df.empty: # Checagem final após dropna
# print("ERRO: DataFrame combinado está vazio após dropna final.")
# return None
# print(f"\nDataFrame multi-ativo final gerado com shape: {combined_df.shape}")
# return combined_df
def fetch_single_asset_ohlcv_yf(ticker_symbol: str, period: str = "2y", interval: str = "1h") -> pd.DataFrame:
""" Adaptação da sua função fetch_historical_ohlcv de financial_data_agent.py """
print(f"Buscando dados para {ticker_symbol} com yfinance (period: {period}, interval: {interval})...")
try:
ticker = yf.Ticker(ticker_symbol)
# Para dados horários, o período máximo é geralmente 730 dias com yfinance
# Se precisar de mais, considere '1d' e depois reamostre, ou use ccxt para cripto.
if interval == '1h' and period.endswith('y') and int(period[:-1]) * 365 > 730:
print(f"AVISO: yfinance pode limitar dados horários a 730 dias. Buscando 'max' para {interval} e depois fatiando.")
data = ticker.history(interval=interval, period="730d") # Pega o máximo possível
elif interval == '1d' and period.endswith('y'):
data = ticker.history(period=period, interval=interval)
else: # Para períodos menores ou outros intervalos
data = ticker.history(period=period, interval=interval)
if data.empty:
print(f"Nenhum dado encontrado para {ticker_symbol}.")
return pd.DataFrame()
data.rename(columns={
"Open": "open", "High": "high", "Low": "low",
"Close": "close", "Volume": "volume", "Adj Close": "adj_close"
}, inplace=True)
# Selecionar apenas as colunas OHLCV e garantir que o índice é DatetimeIndex UTC
data = data[['open', 'high', 'low', 'close', 'volume']]
if data.index.tz is None:
data.index = data.index.tz_localize('UTC')
else:
data.index = data.index.tz_convert('UTC')
# Para dados horários, yfinance pode retornar dados do fim de semana (sem volume)
# e o último candle pode estar incompleto.
# if interval == '1h':
# data = data[data['volume'] > 0] # Remover candles sem volume
# data = data[:-1] # Remover o último candle que pode estar incompleto
print(f"Dados coletados para {ticker_symbol}: {len(data)} linhas.")
return data
except Exception as e:
print(f"Erro ao buscar dados para {ticker_symbol} com yfinance: {e}")
return pd.DataFrame()
def calculate_all_features_for_single_asset(ohlcv_df: pd.DataFrame) -> Optional[pd.DataFrame]:
"""Calcula todas as features base para um único ativo."""
if ohlcv_df.empty: return None
df = ohlcv_df.copy()
print(f"Calculando features para ativo (shape inicial: {df.shape})...")
# GARANTIR que a coluna 'close' original será preservada no DataFrame final
if 'close' in df.columns:
df['close'] = df['close'] # redundante, mas deixa explícito que não será sobrescrita
if ta:
df.ta.sma(length=10, close='close', append=True, col_names=('sma_10',))
df.ta.rsi(length=14, close='close', append=True, col_names=('rsi_14',))
macd_out = df.ta.macd(close='close', append=False)
if macd_out is not None and not macd_out.empty:
df['macd'] = macd_out.iloc[:,0]
df['macds'] = macd_out.iloc[:,2] # Linha de sinal para buy_condition
df.ta.atr(length=14, append=True, col_names=('atr',))
df.ta.bbands(length=20, close='close', append=True, col_names=('bbl', 'bbm', 'bbu', 'bbb', 'bbp'))
df.ta.cci(length=37, append=True, col_names=('cci_37',))
df['volume'] = df['volume'].astype(float)
df.ta.mfi(length=37, close='close', high='high', low='low', volume='volume', append=True, col_names=('mfi_37',))
df.ta.mfi(length=37, append=True, col_names=('mfi_37',))
adx_out = df.ta.adx(length=14, append=False)
if adx_out is not None and not adx_out.empty:
df['adx_14'] = adx_out.iloc[:,0]
rolling_vol_mean = df['volume'].rolling(window=20).mean()
rolling_vol_std = df['volume'].rolling(window=20).std()
df['volume_zscore'] = (df['volume'] - rolling_vol_mean) / (rolling_vol_std + 1e-9)
df['body_size'] = abs(df['close'] - df['open'])
# ATR precisa existir para as próximas. Drop NaNs do ATR primeiro.
df.dropna(subset=['atr'], inplace=True)
df_atr_valid = df[df['atr'] > 1e-9].copy()
if df_atr_valid.empty:
print("AVISO: ATR inválido para todas as linhas restantes, features _div_atr e body_size_norm_atr podem ser todas NaN ou vazias.")
# Criar colunas com NaN para manter a estrutura
df['body_size_norm_atr'] = np.nan
for col in COLS_TO_NORM_BY_ATR:
df[f'{col}_div_atr'] = np.nan
else:
df['body_size_norm_atr'] = df['body_size'] / df['atr'] # ATR já filtrado para > 1e-9
for col in COLS_TO_NORM_BY_ATR:
if col in df.columns:
df[f'{col}_div_atr'] = df[col] / (df['atr'] + 1e-9) # Adicionar 1e-9 aqui também por segurança
else:
df[f'{col}_div_atr'] = np.nan
df['body_vs_avg_body'] = df['body_size'] / (df['body_size'].rolling(window=20).mean() + 1e-9)
df['log_return'] = np.log(df['close'] / df['close'].shift(1))
sma_50_series = df.ta.sma(length=50, close='close', append=False)
if sma_50_series is not None: df['sma_50'] = sma_50_series
else: df['sma_50'] = np.nan
if all(col in df.columns for col in ['macd', 'macds', 'rsi_14', 'close', 'sma_50']):
df['buy_condition_v1'] = ((df['macd'] > df['macds']) & (df['rsi_14'] > 50) & (df['close'] > df['sma_50'])).astype(int)
else:
df['buy_condition_v1'] = 0
# Selecionar apenas as colunas que realmente usaremos como features base para o modelo
# (incluindo as _div_atr e as originais que não foram normalizadas por ATR)
# Esta lista de features é a que será passada para os scalers no script de treino.
# E também as colunas que o rnn_predictor.py precisará ter antes de aplicar seus scalers.
# Esta lista deve vir do config.py (BASE_FEATURE_COLS)
# Exemplo:
# final_feature_columns = [
# 'open_div_atr', 'high_div_atr', 'low_div_atr', 'close_div_atr', 'volume_div_atr',
# 'log_return', 'rsi_14', 'atr', 'bbp', 'cci_37', 'mfi_37',
# 'body_size_norm_atr', 'body_vs_avg_body', 'macd', 'sma_10_div_atr',
# 'adx_14', 'volume_zscore', 'buy_condition_v1'
# ] # Esta é a BASE_FEATURE_COLS do seu config.py
# Verificar se todas as colunas em INDIVIDUAL_ASSET_BASE_FEATURES existem
# (INDIVIDUAL_ASSET_BASE_FEATURES deve ser igual a config.BASE_FEATURE_COLS)
current_feature_cols = [col for col in INDIVIDUAL_ASSET_BASE_FEATURES if col in df.columns]
missing_cols = [col for col in INDIVIDUAL_ASSET_BASE_FEATURES if col not in df.columns]
if missing_cols:
print(f"AVISO: Colunas de features ausentes após cálculo: {missing_cols}. Usando apenas as disponíveis: {current_feature_cols}")
# GARANTIR que 'close' (preço original) está presente nas features finais
if 'close' not in current_feature_cols and 'close' in df.columns:
current_feature_cols.append('close')
df_final_features = df[current_feature_cols].copy()
df_final_features.dropna(inplace=True)
print(f"Features calculadas. Shape após dropna: {df_final_features.shape}. Colunas: {df_final_features.columns.tolist()}")
return df_final_features
else:
print("pandas_ta não está disponível.")
return None
if __name__ == '__main__':
print("Testando data_handler_multi_asset.py...")
# Substitua pelos tickers yfinance reais que você quer usar
test_assets = {
'eth': 'ETH-USD',
'btc': 'BTC-USD',
# 'aapl': 'AAPL' # Exemplo de ação
}
multi_asset_data = get_multi_asset_data_for_rl(
test_assets,
timeframe_yf='1h', # Para teste rápido, período menor
days_to_fetch=90 # Para teste rápido, período menor
)
if multi_asset_data is not None and not multi_asset_data.empty:
print("\n--- Exemplo do DataFrame Multi-Ativo Gerado ---")
print(multi_asset_data.head())
print(f"\nShape: {multi_asset_data.shape}")
print(f"\nInfo:")
multi_asset_data.info()
else:
print("\nFalha ao gerar DataFrame multi-ativo.") |