# rnn/data_handler_multi_asset.py (NOVO ARQUIVO) import pandas as pd import numpy as np import yfinance as yf # Ou ccxt, dependendo da sua preferência de fonte de dados import pandas_ta as ta from datetime import datetime, timedelta, timezone from typing import List, Dict, Optional WINDOW_SIZE = 60 # Importar do seu config.py # Assumindo que config.py está em ../config.py ou rnn/config.py # Ajuste o import conforme sua estrutura. # Se train_rnn_model.py e este data_handler estiverem na mesma pasta 'scripts', # e config.py estiver um nível acima: # from ..app/config.py import ( # MULTI_ASSET_LIST, TIMEFRAME, DAYS_OF_DATA_TO_FETCH, # # etc. para features a serem calculadas # ) # Por agora, vamos definir aqui para exemplo: # EXEMPLO DE CONFIGURAÇÃO (Mova para config.py depois) MULTI_ASSET_SYMBOLS = { 'eth': 'ETH-USD', 'btc': 'BTC-USD', 'ada': 'ADA-USD', 'sol': 'SOL-USD' } # Use os tickers corretos para yfinance ou ccxt TIMEFRAME_YFINANCE = '1h' # yfinance suporta '1m', '2m', '5m', '15m', '30m', '60m', '90m', '1h', '1d', '5d', '1wk', '1mo', '3mo' # Para '1h', yfinance só retorna os últimos 730 dias. Para mais dados, use '1d'. # Se usar ccxt, TIMEFRAME = '1h' como antes. DAYS_TO_FETCH = 365 * 2 # 2 anos # Lista das features base que você quer calcular para CADA ativo # (as 19 que definimos antes) INDIVIDUAL_ASSET_BASE_FEATURES = [ 'open', 'high', 'low', 'close', 'volume', # OHLCV originais são necessários para os cálculos 'sma_10', 'rsi_14', 'macd', 'macds', 'atr', 'bbp', 'cci_37', 'mfi_37', 'adx_14', 'volume_zscore', 'body_size', 'body_size_norm_atr', 'body_vs_avg_body', 'log_return', 'buy_condition_v1', 'open_div_atr', 'high_div_atr', 'low_div_atr', 'close_div_atr', 'volume_div_atr', 'sma_10_div_atr' # 'sma_50' é calculada dentro de buy_condition_v1 # As colunas _div_atr serão criadas a partir destas ] # Features que serão normalizadas pelo ATR COLS_TO_NORM_BY_ATR = ['open', 'high', 'low', 'close', 'volume', 'sma_10', 'macd', 'body_size'] #---------------------------------- # -- New get multi asset data for rl # ... (imports e outras funções como antes) ... def get_multi_asset_data_for_rl( asset_symbols_map: Dict[str, str], timeframe_yf: str, days_to_fetch: int, logger_instance # Adicionar logger como parâmetro ) -> Optional[pd.DataFrame]: # Adicionar logger como parâmetro all_asset_features_list: List[pd.DataFrame] = [] # Tipagem para clareza min_data_length = float('inf') print(all_asset_features_list) #logger_instance.info(f"Iniciando get_multi_asset_data_for_rl para: {list(asset_symbols_map.keys())}") for asset_key, yf_ticker in asset_symbols_map.items(): #logger_instance.info(f"\n--- Processando {asset_key} ({yf_ticker}) ---") # ... (lógica de fetch_single_asset_ohlcv_yf como antes) ... single_asset_ohlcv = fetch_single_asset_ohlcv_yf(yf_ticker, period=f"{days_to_fetch}d", interval=timeframe_yf) # Passar logger se a função aceitar # if single_asset_ohlcv.empty: # logger_instance.warning(f"Sem dados OHLCV para {yf_ticker}, pulando.") # continue single_asset_features = calculate_all_features_for_single_asset(single_asset_ohlcv)#, logger_instance) if single_asset_features is None or single_asset_features.empty: logger_instance.warning(f"Sem features calculadas para {yf_ticker}, pulando.") continue #logger_instance.info(f"Features para {asset_key} shape: {single_asset_features.shape}, Index Min: {single_asset_features.index.min()}, Index Max: {single_asset_features.index.max()}") # Garantir que o índice é DatetimeIndex e UTC para todos antes de adicionar prefixo e à lista if not isinstance(single_asset_features.index, pd.DatetimeIndex): single_asset_features.index = pd.to_datetime(single_asset_features.index) if single_asset_features.index.tz is None: single_asset_features.index = single_asset_features.index.tz_localize('UTC') else: single_asset_features.index = single_asset_features.index.tz_convert('UTC') single_asset_features = single_asset_features.add_prefix(f"{asset_key}_") all_asset_features_list.append(single_asset_features) min_data_length = min(min_data_length, len(single_asset_features)) if not all_asset_features_list: logger_instance.error("Nenhum DataFrame de feature de ativo foi adicionado à lista.") return None if min_data_length == float('inf') or min_data_length < WINDOW_SIZE: # Adicionada checagem de WINDOW_SIZE logger_instance.error(f"min_data_length inválido ({min_data_length}) ou menor que WINDOW_SIZE ({WINDOW_SIZE}). Não é possível truncar/usar.") return None #logger_instance.info(f"Menor número de linhas de dados encontrado (min_data_length): {min_data_length}") # Truncar para garantir que todos os DFs tenham o mesmo comprimento ANTES do concat # E que tenham pelo menos min_data_length. # É importante que os ÍNDICES de data/hora se sobreponham para o join='inner' funcionar. # Apenas pegar o .tail() pode não alinhar os timestamps se os DFs tiverem começos diferentes. # Melhor abordagem: encontrar o índice comum mais recente e o mais antigo. if not all_asset_features_list: # Checagem se a lista não ficou vazia por algum motivo logger_instance.error("all_asset_features_list está vazia antes do alinhamento de índice.") return None print(all_asset_features_list) # Alinhar DataFrames por um índice comum antes de concatenar # 1. Encontrar o primeiro timestamp comum a todos # 2. Encontrar o último timestamp comum a todos # Ou, mais simples, confiar no join='inner' do concat, mas garantir que os DFs são válidos. # Vamos simplificar e manter o truncamento pelo tail, mas com mais logs # e garantir que são DataFrames. truncated_asset_features_list = [] for i, df_asset in enumerate(all_asset_features_list): if isinstance(df_asset, pd.DataFrame) and len(df_asset) >= min_data_length: truncated_df = df_asset.tail(min_data_length) #logger_instance.info(f" DF truncado {i} ({df_asset.columns[0].split('_')[0]}): shape {truncated_df.shape}, ") #f"Index Min: {truncated_df.index.min()}, Max: {truncated_df.index.max()}") truncated_asset_features_list.append(truncated_df) else: asset_name_debug = df_asset.columns[0].split('_')[0] if isinstance(df_asset, pd.DataFrame) and not df_asset.empty else f"DF_{i}" logger_instance.warning(f" DF {asset_name_debug} inválido ou muito curto (len: {len(df_asset) if isinstance(df_asset, pd.DataFrame) else 'N/A'}) para truncamento. Pulando.") if not truncated_asset_features_list: #ogger_instance.error("Nenhum DataFrame válido restou após truncamento para concatenar.") return None # Se houver apenas UM DataFrame na lista, não precisa concatenar, apenas retorna ele. if len(truncated_asset_features_list) == 1: #logger_instance.info("Apenas um DataFrame de ativo processado, retornando-o diretamente.") combined_df = truncated_asset_features_list[0] else: #logger_instance.info(f"Concatenando {len(truncated_asset_features_list)} DataFrames de ativos com join='inner'...") try: combined_df = pd.concat(truncated_asset_features_list, axis=1, join='outer') print(combined_df) except Exception as e_concat: logger_instance.error(f"ERRO CRÍTICO durante pd.concat: {e_concat}", exc_info=True) return None combined_df.fillna(method='ffill', inplace=True) # Depois do ffill, ainda pode haver NaNs no início se algum ativo começar depois dos outros. if not combined_df.empty: #logger_instance.info(f"Shape após ffill: {combined_df.shape}. Buscando primeiro/último índice válido...") first_valid_index = combined_df.first_valid_index() last_valid_index = combined_df.last_valid_index() if pd.isna(first_valid_index) or pd.isna(last_valid_index): print("Não foi possível determinar first/last_valid_index após ffill.") return None print(f"Primeiro índice válido: {first_valid_index}, Último índice válido: {last_valid_index}") combined_df = combined_df.loc[first_valid_index:last_valid_index] print(f"Shape após fatiar por first/last valid index: {combined_df.shape}") # Um dropna final pode ser necessário se o ffill não pegou tudo (improvável, mas seguro) combined_df.dropna(inplace=True) print(f"Shape após dropna final (pós-fatiamento): {combined_df.shape}") print("Imprimindo DF_COMBINED com index ") print(combined_df) #logger_instance.info(f"Tipo de combined_df após concat: {type(combined_df)}") if not isinstance(combined_df, pd.DataFrame): logger_instance.error(f"combined_df NÃO é um DataFrame após concat. Tipo: {type(combined_df)}") return None return combined_df # if combined_df.empty: # logger_instance.error("DataFrame combinado está VAZIO após concatenação e join='inner'. " # "Isso geralmente significa que não há timestamps comuns entre TODOS os ativos processados.") # # Adicionar mais depuração aqui se isso acontecer: # # for i, df_trunc_debug in enumerate(truncated_asset_features_list): # # logger_instance.info(f"Debug DF {i} - Head:\n{df_trunc_debug.head(3)}") # # logger_instance.info(f"Debug DF {i} - Tail:\n{df_trunc_debug.tail(3)}") # # logger_instance.info(f"DataFrame combinado ANTES do dropna final, shape: {combined_df.shape}") # print(combined_df.head()) # Descomente para depuração pesada # return None # ... #---------------------------------- # def get_multi_asset_data_for_rl( # asset_symbols_map: Dict[str, str], # timeframe_yf: str, # days_to_fetch: int # ) -> Optional[pd.DataFrame]: # """ # Busca, processa e combina dados de múltiplos ativos em um DataFrame achatado. # """ # all_asset_features_list = [] # # min_data_length = float('inf') # Inicializar com um valor alto # # Vamos inicializar com 0 e pegar o len do primeiro DF válido # min_data_length = 0 # first_valid_df_processed = False # for asset_key, yf_ticker in asset_symbols_map.items(): # print(f"\n--- Processando {asset_key} ({yf_ticker}) ---") # period_yf = f"{days_to_fetch}d" # if timeframe_yf == '1h' and days_to_fetch > 730: # print(f"AVISO: Para {timeframe_yf}, buscando no máximo 730 dias com yfinance para {yf_ticker}.") # period_yf = "730d" # single_asset_ohlcv = fetch_single_asset_ohlcv_yf(yf_ticker, period=period_yf, interval=timeframe_yf) # if single_asset_ohlcv.empty: # print(f"AVISO: Sem dados OHLCV para {yf_ticker}, pulando este ativo.") # continue # single_asset_features = calculate_all_features_for_single_asset(single_asset_ohlcv) # Passar logger se tiver # if single_asset_features is None or single_asset_features.empty: # print(f"AVISO: Sem features calculadas para {yf_ticker}, pulando este ativo.") # continue # single_asset_features = single_asset_features.add_prefix(f"{asset_key}_") # all_asset_features_list.append(single_asset_features) # # Atualizar min_data_length de forma mais segura # if not first_valid_df_processed: # min_data_length = len(single_asset_features) # first_valid_df_processed = True # else: # min_data_length = min(min_data_length, len(single_asset_features)) # if not all_asset_features_list: # Se nenhum ativo foi processado com sucesso # print("ERRO: Nenhum dado de feature de ativo foi processado com sucesso para a lista `all_asset_features_list`.") # return None # Retorna None, que não tem '.empty' mas será checado por 'is None' # if min_data_length == 0 : # Checagem adicional se algo deu muito errado # print("ERRO: min_data_length é zero após processar ativos, não é possível truncar DataFrames.") # return None # print(f"Menor número de linhas de dados encontrado entre os ativos (min_data_length): {min_data_length}") # truncated_asset_features_list = [df.tail(min_data_length) for df in all_asset_features_list if not df.empty and len(df) >= min_data_length] # # Verificar se a lista de DFs truncados não está vazia ANTES de concatenar # if not truncated_asset_features_list: # print("ERRO: Nenhum DataFrame válido restou após o truncamento. Não é possível concatenar.") # return None # print(f"Concatenando {len(truncated_asset_features_list)} DataFrames de ativos...") # try: # combined_df = pd.concat(truncated_asset_features_list, axis=1, join='inner') # except Exception as e_concat: # print(f"ERRO durante pd.concat: {e_concat}") # return None # Retorna None em caso de erro na concatenação # # Agora, combined_df DEVE ser um DataFrame (mesmo que vazio se o join falhar) # if not isinstance(combined_df, pd.DataFrame): # print(f"ERRO: pd.concat não retornou um DataFrame. Tipo retornado: {type(combined_df)}") # return None # if combined_df.empty: # Esta checagem agora deve funcionar ou ser desnecessária se o anterior já retornou None # print("ERRO: DataFrame combinado está vazio após concatenação e join. Verifique os dados dos ativos e o alinhamento de datas.") # return None # combined_df.dropna(inplace=True) # if combined_df.empty: # Checagem final após dropna # print("ERRO: DataFrame combinado está vazio após dropna final.") # return None # print(f"\nDataFrame multi-ativo final gerado com shape: {combined_df.shape}") # return combined_df def fetch_single_asset_ohlcv_yf(ticker_symbol: str, period: str = "2y", interval: str = "1h") -> pd.DataFrame: """ Adaptação da sua função fetch_historical_ohlcv de financial_data_agent.py """ print(f"Buscando dados para {ticker_symbol} com yfinance (period: {period}, interval: {interval})...") try: ticker = yf.Ticker(ticker_symbol) # Para dados horários, o período máximo é geralmente 730 dias com yfinance # Se precisar de mais, considere '1d' e depois reamostre, ou use ccxt para cripto. if interval == '1h' and period.endswith('y') and int(period[:-1]) * 365 > 730: print(f"AVISO: yfinance pode limitar dados horários a 730 dias. Buscando 'max' para {interval} e depois fatiando.") data = ticker.history(interval=interval, period="730d") # Pega o máximo possível elif interval == '1d' and period.endswith('y'): data = ticker.history(period=period, interval=interval) else: # Para períodos menores ou outros intervalos data = ticker.history(period=period, interval=interval) if data.empty: print(f"Nenhum dado encontrado para {ticker_symbol}.") return pd.DataFrame() data.rename(columns={ "Open": "open", "High": "high", "Low": "low", "Close": "close", "Volume": "volume", "Adj Close": "adj_close" }, inplace=True) # Selecionar apenas as colunas OHLCV e garantir que o índice é DatetimeIndex UTC data = data[['open', 'high', 'low', 'close', 'volume']] if data.index.tz is None: data.index = data.index.tz_localize('UTC') else: data.index = data.index.tz_convert('UTC') # Para dados horários, yfinance pode retornar dados do fim de semana (sem volume) # e o último candle pode estar incompleto. # if interval == '1h': # data = data[data['volume'] > 0] # Remover candles sem volume # data = data[:-1] # Remover o último candle que pode estar incompleto print(f"Dados coletados para {ticker_symbol}: {len(data)} linhas.") return data except Exception as e: print(f"Erro ao buscar dados para {ticker_symbol} com yfinance: {e}") return pd.DataFrame() def calculate_all_features_for_single_asset(ohlcv_df: pd.DataFrame) -> Optional[pd.DataFrame]: """Calcula todas as features base para um único ativo.""" if ohlcv_df.empty: return None df = ohlcv_df.copy() print(f"Calculando features para ativo (shape inicial: {df.shape})...") # GARANTIR que a coluna 'close' original será preservada no DataFrame final if 'close' in df.columns: df['close'] = df['close'] # redundante, mas deixa explícito que não será sobrescrita if ta: df.ta.sma(length=10, close='close', append=True, col_names=('sma_10',)) df.ta.rsi(length=14, close='close', append=True, col_names=('rsi_14',)) macd_out = df.ta.macd(close='close', append=False) if macd_out is not None and not macd_out.empty: df['macd'] = macd_out.iloc[:,0] df['macds'] = macd_out.iloc[:,2] # Linha de sinal para buy_condition df.ta.atr(length=14, append=True, col_names=('atr',)) df.ta.bbands(length=20, close='close', append=True, col_names=('bbl', 'bbm', 'bbu', 'bbb', 'bbp')) df.ta.cci(length=37, append=True, col_names=('cci_37',)) df['volume'] = df['volume'].astype(float) df.ta.mfi(length=37, close='close', high='high', low='low', volume='volume', append=True, col_names=('mfi_37',)) df.ta.mfi(length=37, append=True, col_names=('mfi_37',)) adx_out = df.ta.adx(length=14, append=False) if adx_out is not None and not adx_out.empty: df['adx_14'] = adx_out.iloc[:,0] rolling_vol_mean = df['volume'].rolling(window=20).mean() rolling_vol_std = df['volume'].rolling(window=20).std() df['volume_zscore'] = (df['volume'] - rolling_vol_mean) / (rolling_vol_std + 1e-9) df['body_size'] = abs(df['close'] - df['open']) # ATR precisa existir para as próximas. Drop NaNs do ATR primeiro. df.dropna(subset=['atr'], inplace=True) df_atr_valid = df[df['atr'] > 1e-9].copy() if df_atr_valid.empty: print("AVISO: ATR inválido para todas as linhas restantes, features _div_atr e body_size_norm_atr podem ser todas NaN ou vazias.") # Criar colunas com NaN para manter a estrutura df['body_size_norm_atr'] = np.nan for col in COLS_TO_NORM_BY_ATR: df[f'{col}_div_atr'] = np.nan else: df['body_size_norm_atr'] = df['body_size'] / df['atr'] # ATR já filtrado para > 1e-9 for col in COLS_TO_NORM_BY_ATR: if col in df.columns: df[f'{col}_div_atr'] = df[col] / (df['atr'] + 1e-9) # Adicionar 1e-9 aqui também por segurança else: df[f'{col}_div_atr'] = np.nan df['body_vs_avg_body'] = df['body_size'] / (df['body_size'].rolling(window=20).mean() + 1e-9) df['log_return'] = np.log(df['close'] / df['close'].shift(1)) sma_50_series = df.ta.sma(length=50, close='close', append=False) if sma_50_series is not None: df['sma_50'] = sma_50_series else: df['sma_50'] = np.nan if all(col in df.columns for col in ['macd', 'macds', 'rsi_14', 'close', 'sma_50']): df['buy_condition_v1'] = ((df['macd'] > df['macds']) & (df['rsi_14'] > 50) & (df['close'] > df['sma_50'])).astype(int) else: df['buy_condition_v1'] = 0 # Selecionar apenas as colunas que realmente usaremos como features base para o modelo # (incluindo as _div_atr e as originais que não foram normalizadas por ATR) # Esta lista de features é a que será passada para os scalers no script de treino. # E também as colunas que o rnn_predictor.py precisará ter antes de aplicar seus scalers. # Esta lista deve vir do config.py (BASE_FEATURE_COLS) # Exemplo: # final_feature_columns = [ # 'open_div_atr', 'high_div_atr', 'low_div_atr', 'close_div_atr', 'volume_div_atr', # 'log_return', 'rsi_14', 'atr', 'bbp', 'cci_37', 'mfi_37', # 'body_size_norm_atr', 'body_vs_avg_body', 'macd', 'sma_10_div_atr', # 'adx_14', 'volume_zscore', 'buy_condition_v1' # ] # Esta é a BASE_FEATURE_COLS do seu config.py # Verificar se todas as colunas em INDIVIDUAL_ASSET_BASE_FEATURES existem # (INDIVIDUAL_ASSET_BASE_FEATURES deve ser igual a config.BASE_FEATURE_COLS) current_feature_cols = [col for col in INDIVIDUAL_ASSET_BASE_FEATURES if col in df.columns] missing_cols = [col for col in INDIVIDUAL_ASSET_BASE_FEATURES if col not in df.columns] if missing_cols: print(f"AVISO: Colunas de features ausentes após cálculo: {missing_cols}. Usando apenas as disponíveis: {current_feature_cols}") # GARANTIR que 'close' (preço original) está presente nas features finais if 'close' not in current_feature_cols and 'close' in df.columns: current_feature_cols.append('close') df_final_features = df[current_feature_cols].copy() df_final_features.dropna(inplace=True) print(f"Features calculadas. Shape após dropna: {df_final_features.shape}. Colunas: {df_final_features.columns.tolist()}") return df_final_features else: print("pandas_ta não está disponível.") return None if __name__ == '__main__': print("Testando data_handler_multi_asset.py...") # Substitua pelos tickers yfinance reais que você quer usar test_assets = { 'eth': 'ETH-USD', 'btc': 'BTC-USD', # 'aapl': 'AAPL' # Exemplo de ação } multi_asset_data = get_multi_asset_data_for_rl( test_assets, timeframe_yf='1h', # Para teste rápido, período menor days_to_fetch=90 # Para teste rápido, período menor ) if multi_asset_data is not None and not multi_asset_data.empty: print("\n--- Exemplo do DataFrame Multi-Ativo Gerado ---") print(multi_asset_data.head()) print(f"\nShape: {multi_asset_data.shape}") print(f"\nInfo:") multi_asset_data.info() else: print("\nFalha ao gerar DataFrame multi-ativo.")