File size: 13,448 Bytes
b3a7985 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 |
# rnn/data_handler_multi_asset.py (NOVO ARQUIVO)
import pandas as pd
import numpy as np
import yfinance as yf # Ou ccxt, dependendo da sua preferência de fonte de dados
import pandas_ta as ta
from datetime import datetime, timedelta, timezone
from typing import List, Dict, Optional
# Importar do seu config.py
# Assumindo que config.py está em ../config.py ou rnn/config.py
# Ajuste o import conforme sua estrutura.
# Se train_rnn_model.py e este data_handler estiverem na mesma pasta 'scripts',
# e config.py estiver um nível acima:
# from ..app/config.py import (
# MULTI_ASSET_LIST, TIMEFRAME, DAYS_OF_DATA_TO_FETCH,
# # etc. para features a serem calculadas
# )
# Por agora, vamos definir aqui para exemplo:
# EXEMPLO DE CONFIGURAÇÃO (Mova para config.py depois)
MULTI_ASSET_SYMBOLS = {
'crypto_eth': 'ETH-USD', # yfinance ticker para ETH/USD
'crypto_ada': 'ADA-USD', # yfinance ticker para ADA/USD
'stock_aapl': 'AAPL', # NASDAQ
'stock_petr': 'PETR4.SA' # B3
} # Use os tickers corretos para yfinance ou ccxt
TIMEFRAME_YFINANCE = '1h' # yfinance suporta '1m', '2m', '5m', '15m', '30m', '60m', '90m', '1h', '1d', '5d', '1wk', '1mo', '3mo'
# Para '1h', yfinance só retorna os últimos 730 dias. Para mais dados, use '1d'.
# Se usar ccxt, TIMEFRAME = '1h' como antes.
DAYS_TO_FETCH = 365 * 2 # 2 anos
# Lista das features base que você quer calcular para CADA ativo
# (as 19 que definimos antes)
INDIVIDUAL_ASSET_BASE_FEATURES = [
'open', 'high', 'low', 'close', 'volume', # OHLCV originais são necessários para os cálculos
'sma_10', 'rsi_14', 'macd', 'macds', 'atr', 'bbp', 'cci_37', 'mfi_37', 'adx_14',
'volume_zscore', 'body_size', 'body_size_norm_atr', 'body_vs_avg_body',
'log_return', 'buy_condition_v1', # 'sma_50' é calculada dentro de buy_condition_v1
# As colunas _div_atr serão criadas a partir destas
]
# Features que serão normalizadas pelo ATR
COLS_TO_NORM_BY_ATR = ['open', 'high', 'low', 'close', 'volume', 'sma_10', 'macd', 'body_size']
def fetch_single_asset_ohlcv_yf(ticker_symbol: str, period: str = "2y", interval: str = "1h") -> pd.DataFrame:
""" Adaptação da sua função fetch_historical_ohlcv de financial_data_agent.py """
print(f"Buscando dados para {ticker_symbol} com yfinance (period: {period}, interval: {interval})...")
try:
ticker = yf.Ticker(ticker_symbol)
# Para dados horários, o período máximo é geralmente 730 dias com yfinance
# Se precisar de mais, considere '1d' e depois reamostre, ou use ccxt para cripto.
if interval == '1h' and period.endswith('y') and int(period[:-1]) * 365 > 730:
print(f"AVISO: yfinance pode limitar dados horários a 730 dias. Buscando 'max' para {interval} e depois fatiando.")
data = ticker.history(interval=interval, period="730d") # Pega o máximo possível
elif interval == '1d' and period.endswith('y'):
data = ticker.history(period=period, interval=interval)
else: # Para períodos menores ou outros intervalos
data = ticker.history(period=period, interval=interval)
if data.empty:
print(f"Nenhum dado encontrado para {ticker_symbol}.")
return pd.DataFrame()
data.rename(columns={
"Open": "open", "High": "high", "Low": "low",
"Close": "close", "Volume": "volume", "Adj Close": "adj_close"
}, inplace=True)
# Selecionar apenas as colunas OHLCV e garantir que o índice é DatetimeIndex UTC
data = data[['open', 'high', 'low', 'close', 'volume']]
if data.index.tz is None:
data.index = data.index.tz_localize('UTC')
else:
data.index = data.index.tz_convert('UTC')
# Para dados horários, yfinance pode retornar dados do fim de semana (sem volume)
# e o último candle pode estar incompleto.
# if interval == '1h':
# data = data[data['volume'] > 0] # Remover candles sem volume
# data = data[:-1] # Remover o último candle que pode estar incompleto
print(f"Dados coletados para {ticker_symbol}: {len(data)} linhas.")
return data
except Exception as e:
print(f"Erro ao buscar dados para {ticker_symbol} com yfinance: {e}")
return pd.DataFrame()
def calculate_all_features_for_single_asset(ohlcv_df: pd.DataFrame) -> Optional[pd.DataFrame]:
"""Calcula todas as features base para um único ativo."""
if ohlcv_df.empty: return None
df = ohlcv_df.copy()
print(f"Calculando features para ativo (shape inicial: {df.shape})...")
if ta:
df.ta.sma(length=10, close='close', append=True, col_names=('sma_10',))
df.ta.rsi(length=14, close='close', append=True, col_names=('rsi_14',))
macd_out = df.ta.macd(close='close', append=False)
if macd_out is not None and not macd_out.empty:
df['macd'] = macd_out.iloc[:,0]
df['macds'] = macd_out.iloc[:,2] # Linha de sinal para buy_condition
df.ta.atr(length=14, append=True, col_names=('atr',))
df.ta.bbands(length=20, close='close', append=True, col_names=('bbl', 'bbm', 'bbu', 'bbb', 'bbp'))
df.ta.cci(length=37, append=True, col_names=('cci_37',))
df.ta.mfi(length=37, append=True, col_names=('mfi_37',))
adx_out = df.ta.adx(length=14, append=False)
if adx_out is not None and not adx_out.empty:
df['adx_14'] = adx_out.iloc[:,0]
rolling_vol_mean = df['volume'].rolling(window=20).mean()
rolling_vol_std = df['volume'].rolling(window=20).std()
df['volume_zscore'] = (df['volume'] - rolling_vol_mean) / (rolling_vol_std + 1e-9)
df['body_size'] = abs(df['close'] - df['open'])
# ATR precisa existir para as próximas. Drop NaNs do ATR primeiro.
df.dropna(subset=['atr'], inplace=True)
df_atr_valid = df[df['atr'] > 1e-9].copy()
if df_atr_valid.empty:
print("AVISO: ATR inválido para todas as linhas restantes, features _div_atr e body_size_norm_atr podem ser todas NaN ou vazias.")
# Criar colunas com NaN para manter a estrutura
df['body_size_norm_atr'] = np.nan
for col in COLS_TO_NORM_BY_ATR:
df[f'{col}_div_atr'] = np.nan
else:
df['body_size_norm_atr'] = df['body_size'] / df['atr'] # ATR já filtrado para > 1e-9
for col in COLS_TO_NORM_BY_ATR:
if col in df.columns:
df[f'{col}_div_atr'] = df[col] / (df['atr'] + 1e-9) # Adicionar 1e-9 aqui também por segurança
else:
df[f'{col}_div_atr'] = np.nan
df['body_vs_avg_body'] = df['body_size'] / (df['body_size'].rolling(window=20).mean() + 1e-9)
df['log_return'] = np.log(df['close'] / df['close'].shift(1))
sma_50_series = df.ta.sma(length=50, close='close', append=False)
if sma_50_series is not None: df['sma_50'] = sma_50_series
else: df['sma_50'] = np.nan
if all(col in df.columns for col in ['macd', 'macds', 'rsi_14', 'close', 'sma_50']):
df['buy_condition_v1'] = ((df['macd'] > df['macds']) & (df['rsi_14'] > 50) & (df['close'] > df['sma_50'])).astype(int)
else:
df['buy_condition_v1'] = 0
# Selecionar apenas as colunas que realmente usaremos como features base para o modelo
# (incluindo as _div_atr e as originais que não foram normalizadas por ATR)
# Esta lista de features é a que será passada para os scalers no script de treino.
# E também as colunas que o rnn_predictor.py precisará ter antes de aplicar seus scalers.
# Esta lista deve vir do config.py (BASE_FEATURE_COLS)
# Exemplo:
# final_feature_columns = [
# 'open_div_atr', 'high_div_atr', 'low_div_atr', 'close_div_atr', 'volume_div_atr',
# 'log_return', 'rsi_14', 'atr', 'bbp', 'cci_37', 'mfi_37',
# 'body_size_norm_atr', 'body_vs_avg_body', 'macd', 'sma_10_div_atr',
# 'adx_14', 'volume_zscore', 'buy_condition_v1'
# ] # Esta é a BASE_FEATURE_COLS do seu config.py
# Verificar se todas as colunas em INDIVIDUAL_ASSET_BASE_FEATURES existem
# (INDIVIDUAL_ASSET_BASE_FEATURES deve ser igual a config.BASE_FEATURE_COLS)
current_feature_cols = [col for col in INDIVIDUAL_ASSET_BASE_FEATURES if col in df.columns]
missing_cols = [col for col in INDIVIDUAL_ASSET_BASE_FEATURES if col not in df.columns]
if missing_cols:
print(f"AVISO: Colunas de features ausentes após cálculo: {missing_cols}. Usando apenas as disponíveis: {current_feature_cols}")
df_final_features = df[current_feature_cols].copy()
df_final_features.dropna(inplace=True)
print(f"Features calculadas. Shape após dropna: {df_final_features.shape}. Colunas: {df_final_features.columns.tolist()}")
return df_final_features
else:
print("pandas_ta não está disponível.")
return None
def get_multi_asset_data_for_rl(
asset_symbols_map: Dict[str, str], # Ex: {'crypto_eth': 'ETH-USD', ...}
timeframe_yf: str,
days_to_fetch: int
) -> Optional[pd.DataFrame]:
"""
Busca, processa e combina dados de múltiplos ativos em um DataFrame achatado.
"""
all_asset_features_list = []
min_data_length = float('inf') # Para truncar todos os DFs para o mesmo comprimento
# Usar os tickers yfinance do asset_symbols_map
for asset_key, yf_ticker in asset_symbols_map.items():
print(f"\n--- Processando {asset_key} ({yf_ticker}) ---")
# Para yfinance, '1h' retorna max 730d. '1d' retorna mais.
# Se days_to_fetch for > 730 e timeframe_yf for '1h', ajuste o período.
period_yf = f"{days_to_fetch}d" # yfinance aceita "Xd" para dias
if timeframe_yf == '1h' and days_to_fetch > 730:
print(f"AVISO: Para {timeframe_yf}, buscando no máximo 730 dias com yfinance para {yf_ticker}.")
period_yf = "730d"
single_asset_ohlcv = fetch_single_asset_ohlcv_yf(yf_ticker, period=period_yf, interval=timeframe_yf)
if single_asset_ohlcv.empty:
print(f"AVISO: Sem dados OHLCV para {yf_ticker}, pulando este ativo.")
continue
single_asset_features = calculate_all_features_for_single_asset(single_asset_ohlcv)
if single_asset_features is None or single_asset_features.empty:
print(f"AVISO: Sem features calculadas para {yf_ticker}, pulando este ativo.")
continue
# Adicionar prefixo para achatar
single_asset_features = single_asset_features.add_prefix(f"{asset_key}_")
all_asset_features_list.append(single_asset_features)
min_data_length = min(min_data_length, len(single_asset_features))
if not all_asset_features_list:
print("ERRO: Nenhum dado de feature de ativo foi processado com sucesso.")
return None
# Truncar todos os DataFrames para o mesmo comprimento (o do menor) ANTES de concatenar
# para garantir alinhamento temporal mais robusto se os históricos tiverem começos diferentes.
# Isso é feito alinhando pelo final dos DataFrames.
if min_data_length == float('inf') or min_data_length == 0 :
print("ERRO: min_data_length inválido, não é possível truncar DataFrames.")
return None
truncated_asset_features_list = [df.tail(min_data_length) for df in all_asset_features_list]
# Concatenar todos os DataFrames de features (alinhados por timestamp/índice)
# join='inner' garante que só teremos timestamps onde TODOS os ativos (que retornaram dados) têm dados.
combined_df = pd.concat(truncated_asset_features_list, axis=1, join='inner')
# Verificar se o resultado não está vazio
if combined_df.empty:
print("ERRO: DataFrame combinado está vazio após concatenação e join. Verifique os dados dos ativos.")
return None
# Drop quaisquer linhas que ainda possam ter NaNs após o join (improvável se cada df individual já foi tratado)
combined_df.dropna(inplace=True)
if combined_df.empty:
print("ERRO: DataFrame combinado está vazio após dropna final.")
return None
print(f"\nDataFrame multi-ativo final gerado com shape: {combined_df.shape}")
print(f"Exemplo de colunas: {combined_df.columns.tolist()[:10]}...") # Mostra as primeiras 10
return combined_df
if __name__ == '__main__':
print("Testando data_handler_multi_asset.py...")
# Substitua pelos tickers yfinance reais que você quer usar
test_assets = {
'eth': 'ETH-USD',
'btc': 'BTC-USD',
# 'aapl': 'AAPL' # Exemplo de ação
}
multi_asset_data = get_multi_asset_data_for_rl(
test_assets,
timeframe_yf='1h', # Para teste rápido, período menor
days_to_fetch=90 # Para teste rápido, período menor
)
if multi_asset_data is not None and not multi_asset_data.empty:
print("\n--- Exemplo do DataFrame Multi-Ativo Gerado ---")
print(multi_asset_data.head())
print(f"\nShape: {multi_asset_data.shape}")
print(f"\nInfo:")
multi_asset_data.info()
else:
print("\nFalha ao gerar DataFrame multi-ativo.") |