DeepFin / agents /scripts /data_handler.py
amos-fernandes's picture
Upload 151 files
b3a7985 verified
from typing import List, Tuple
import pandas as pd
import numpy as np
import ccxt
import pandas_ta as ta
from datetime import datetime, timedelta, timezone # Adicionado timezone
# Importa constantes do config.py
from config import PREDICTION_HORIZON, PRICE_CHANGE_THRESHOLD, BASE_FEATURE_COLS, WINDOW_SIZE
# Novos inidcadores mover para config
# Períodos para as novas MAs
SHORT_MA_PERIOD = 10 # Exemplo: SMA de 10 períodos
LONG_MA_PERIOD = 50 # Exemplo: SMA de 50 períodos
# Períodos para ADX
ADX_PERIOD = 14
# Níveis para Osciladores Extremos
RSI_OVERBOUGHT = 70
RSI_OVERSOLD = 30
STOCH_OVERBOUGHT = 80 # Para Estocástico %K
STOCH_OVERSOLD = 20
CCI_OVERBOUGHT = 100
CCI_OVERSOLD = -100
MFI_OVERBOUGHT = 80
MFI_OVERSOLD = 20
# Período para Média Móvel do Volume
VOLUME_AVG_PERIOD = 20
#Fim NI
def fetch_ohlcv_data_ccxt(symbol: str, timeframe: str, days_to_fetch: int, limit_per_call: int = 1000) -> pd.DataFrame:
exchange = ccxt.binance()
print(f"Buscando dados para {symbol} na {exchange.id} com timeframe {timeframe}...")
all_ohlcv = []
since_dt = datetime.now(timezone.utc) - timedelta(days=days_to_fetch)
since = exchange.parse8601(since_dt.isoformat())
while True:
try:
print(f"Buscando {limit_per_call} candles desde {exchange.iso8601(since)}...")
ohlcv = exchange.fetch_ohlcv(symbol, timeframe, since, limit_per_call)
if not ohlcv: break
all_ohlcv.extend(ohlcv)
last_timestamp_in_batch = ohlcv[-1][0]
since = last_timestamp_in_batch + exchange.rateLimit
print(f"Coletados {len(ohlcv)} candles. Último: {exchange.iso8601(last_timestamp_in_batch)}. Total: {len(all_ohlcv)}")
if len(ohlcv) < limit_per_call: break
# Condição de parada mais robusta
processed_days = (last_timestamp_in_batch - exchange.parse8601(since_dt.isoformat())) / (1000 * 60 * 60 * 24)
if processed_days >= days_to_fetch:
break
except ccxt.NetworkError as e:
print(f"Erro de rede CCXT: {e}. Tentando novamente em 5s...")
# Adicionar um retry real aqui seria bom, ou time.sleep(5)
except ccxt.ExchangeError as e:
print(f"Erro da Exchange CCXT: {e}. Parando busca.")
break
except Exception as e:
print(f"Erro inesperado no fetch: {e}. Parando busca.")
break
if not all_ohlcv:
raise ValueError("Nenhum dado OHLCV foi coletado.")
df = pd.DataFrame(all_ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
df.set_index('timestamp', inplace=True)
print(f"Total de {len(df)} candles OHLCV coletados para {symbol}.")
return df
def calculate_technical_indicators(ohlcv_df: pd.DataFrame) -> pd.DataFrame:
print("Calculando indicadores técnicos...")
df = ohlcv_df.copy()
if ta:
# Indicadores base
df.ta.sma(length=10, close='close', append=True, col_names=('sma_10',))
df.ta.rsi(length=14, close='close', append=True, col_names=('rsi_14',))
df.ta.log_return(length=14, close='close', append=True, col_names=('log_return',))
df.ta.macd(close='close', append=True, col_names=('macd', 'macdh', 'macds')) # Pega só 'macd' depois
df.ta.atr(length=14, append=True, col_names=('atr',))
df.ta.bbands(length=20, close='close', append=True, col_names=('bbl', 'bbm', 'bbu', 'bbb', 'bbp')) # Pega só 'bbp' depois
df.ta.cci(length=37, append=True, col_names=('cci_37',)) # Usando o período do exemplo
df.ta.mfi(length=37, append=True, col_names=('mfi_37',)) # Usando o período do exemplo
df['body_size'] = abs(df['close'] - df['open'])
df['body_size_norm_atr'] = df['body_size'] / (df['atr'] + 1e-7)
avg_body_period = 12 # Do exemplo do EA
df['avg_body_prev_12'] = df['body_size'].shift(1).rolling(window=avg_body_period).mean() # Média dos 12 corpos anteriores
df['body_vs_avg_body'] = df['body_size'] / (df['avg_body_prev_12'] + 1e-7)
# Normalização pelo ATR (APÓS cálculo de ATR e outros indicadores base)
# Garantir que 'atr' não é zero ou NaN antes da divisão
df.dropna(subset=['atr'], inplace=True) # Remove linhas onde ATR não pôde ser calculado
df = df[df['atr'] > 1e-7] # Remove linhas com ATR muito pequeno ou zero
df['open_div_atr'] = df['open'] / df['atr']
df['high_div_atr'] = df['high'] / df['atr']
df['low_div_atr'] = df['low'] / df['atr']
df['close_div_atr'] = df['close'] / df['atr']
df['close_div_atr'] = df['close'] / df['atr']
df['volume_div_atr'] = df['volume'] / df['atr']
# Assegura que as colunas base para estas normalizações existam
if 'sma_10' in df.columns:
df['sma_10_div_atr'] = df['sma_10'] / df['atr']
if 'macd' in df.columns: # 'macd' é a linha principal do MACD
df['macd_div_atr'] = df['macd'] / df['atr']
# ------ Novos indicadores -- #
# --- Indicadores Base anteriores ---
if 'sma_10' in BASE_FEATURE_COLS or 'sma_10_div_atr' in BASE_FEATURE_COLS: # Só calcula se for usar
df.ta.sma(length=10, close='close', append=True, col_names=('sma_10',))
if 'rsi_14' in BASE_FEATURE_COLS:
df.ta.rsi(length=14, close='close', append=True, col_names=('rsi_14',))
# Calcula componentes MACD, mesmo que só use 'macd' depois
if 'macd' in BASE_FEATURE_COLS or 'macd_div_atr' in BASE_FEATURE_COLS or 'macd_cross_signal' in BASE_FEATURE_COLS:
macd_results = df.ta.macd(close='close', append=False)
if macd_results is not None and not macd_results.empty:
df['macd'] = macd_results.iloc[:,0] # Linha MACD
df['macds'] = macd_results.iloc[:,1] # Linha de Sinal MACD
# df['macdh'] = macd_results.iloc[:,2] # Histograma MACD (opcional)
if 'atr' in BASE_FEATURE_COLS or any('_div_atr' in col for col in BASE_FEATURE_COLS):
df.ta.atr(length=14, append=True, col_names=('atr',))
if 'bbp' in BASE_FEATURE_COLS or 'dist_bbu_norm_atr' in BASE_FEATURE_COLS or 'dist_bbl_norm_atr' in BASE_FEATURE_COLS:
df.ta.bbands(length=20, close='close', append=True, col_names=('bbl', 'bbm', 'bbu', 'bbb', 'bbp'))
if 'cci_37' in BASE_FEATURE_COLS:
df.ta.cci(length=37, append=True, col_names=('cci_37',))
if 'mfi_37' in BASE_FEATURE_COLS:
df.ta.mfi(length=37, append=True, col_names=('mfi_37',))
if 'body_size_norm_atr' in BASE_FEATURE_COLS or 'body_vs_avg_body' in BASE_FEATURE_COLS:
if 'open' in df.columns and 'close' in df.columns:
df['body_size'] = abs(df['close'] - df['open'])
# --- 1. Cruzamentos de Médias Móveis ---
if 'ma_short' not in df.columns and ('ma_cross_signal' in BASE_FEATURE_COLS or 'ma_diff_norm_atr' in BASE_FEATURE_COLS):
df.ta.sma(length=SHORT_MA_PERIOD, close='close', append=True, col_names=('ma_short',))
if 'ma_long' not in df.columns and ('ma_cross_signal' in BASE_FEATURE_COLS or 'ma_diff_norm_atr' in BASE_FEATURE_COLS):
df.ta.sma(length=LONG_MA_PERIOD, close='close', append=True, col_names=('ma_long',))
if 'ma_cross_signal' in BASE_FEATURE_COLS and 'ma_short' in df.columns and 'ma_long' in df.columns:
df['ma_short_prev'] = df['ma_short'].shift(1)
df['ma_long_prev'] = df['ma_long'].shift(1)
# Sinal de Compra: curta cruza longa para cima
buy_cross = (df['ma_short_prev'] < df['ma_long_prev']) & (df['ma_short'] > df['ma_long'])
# Sinal de Venda: curta cruza longa para baixo
sell_cross = (df['ma_short_prev'] > df['ma_long_prev']) & (df['ma_short'] < df['ma_long'])
df['ma_cross_signal'] = 0
df.loc[buy_cross, 'ma_cross_signal'] = 1 # Compra
df.loc[sell_cross, 'ma_cross_signal'] = -1 # Venda (ou use 2 para outra classe)
print("TA: Feature 'ma_cross_signal' calculada.")
if 'ma_diff_norm_atr' in BASE_FEATURE_COLS and 'ma_short' in df.columns and 'ma_long' in df.columns and 'atr' in df.columns:
df['ma_diff'] = df['ma_short'] - df['ma_long']
df['ma_diff_norm_atr'] = df['ma_diff'] / (df['atr'] + 1e-9) # Evitar divisão por zero no ATR
print("TA: Feature 'ma_diff_norm_atr' calculada.")
# --- 2. Força da Tendência (ADX) ---
if 'adx_14' in BASE_FEATURE_COLS or 'adx_trend_signal' in BASE_FEATURE_COLS:
adx_results = df.ta.adx(length=ADX_PERIOD, append=False)
if adx_results is not None and not adx_results.empty:
df['adx_14'] = adx_results.iloc[:,0] # ADX
df['dmp_14'] = adx_results.iloc[:,1] # +DI ou DMP
df['dmn_14'] = adx_results.iloc[:,2] # -DI ou DMN
if 'adx_trend_signal' in BASE_FEATURE_COLS:
df['adx_trend_signal'] = 0
# ADX > 20-25 geralmente indica tendência. +DI > -DI = tendência de alta.
strong_uptrend = (df['adx_14'] > 20) & (df['dmp_14'] > df['dmn_14'])
strong_downtrend = (df['adx_14'] > 20) & (df['dmn_14'] > df['dmp_14'])
df.loc[strong_uptrend, 'adx_trend_signal'] = 1
df.loc[strong_downtrend, 'adx_trend_signal'] = -1
print("TA: Features ADX calculadas.")
# --- 3. Níveis de Suporte/Resistência Dinâmicos (Distância para BBands normalizada por ATR) ---
if 'dist_bbu_norm_atr' in BASE_FEATURE_COLS and 'close' in df.columns and 'bbu' in df.columns and 'atr' in df.columns:
df['dist_bbu_norm_atr'] = (df['bbu'] - df['close']) / (df['atr'] + 1e-9)
print("TA: Feature 'dist_bbu_norm_atr' calculada.")
if 'dist_bbl_norm_atr' in BASE_FEATURE_COLS and 'close' in df.columns and 'bbl' in df.columns and 'atr' in df.columns:
df['dist_bbl_norm_atr'] = (df['close'] - df['bbl']) / (df['atr'] + 1e-9)
print("TA: Feature 'dist_bbl_norm_atr' calculada.")
# --- 4. Osciladores em Níveis Extremos ---
# Estocástico (preciso calcular primeiro)
if 'stoch_k_extreme' in BASE_FEATURE_COLS or 'stoch_d_extreme' in BASE_FEATURE_COLS:
stoch_results = df.ta.stoch(k=14, d=3, smooth_k=3, append=False) # Configurações padrão
if stoch_results is not None and not stoch_results.empty:
df['stoch_k'] = stoch_results.iloc[:,0] # %K
df['stoch_d'] = stoch_results.iloc[:,1] # %D
if 'stoch_k_extreme' in BASE_FEATURE_COLS:
df['stoch_k_extreme'] = 0
df.loc[df['stoch_k'] > STOCH_OVERBOUGHT, 'stoch_k_extreme'] = 1 # Sobrecomprado
df.loc[df['stoch_k'] < STOCH_OVERSOLD, 'stoch_k_extreme'] = -1 # Sobrevendido
print("TA: Features Estocástico e stoch_k_extreme calculadas.")
if 'rsi_extreme' in BASE_FEATURE_COLS and 'rsi_14' in df.columns:
df['rsi_extreme'] = 0
df.loc[df['rsi_14'] > RSI_OVERBOUGHT, 'rsi_extreme'] = 1
df.loc[df['rsi_14'] < RSI_OVERSOLD, 'rsi_extreme'] = -1
print("TA: Feature 'rsi_extreme' calculada.")
if 'cci_extreme' in BASE_FEATURE_COLS and 'cci_37' in df.columns:
df['cci_extreme'] = 0
df.loc[df['cci_37'] > CCI_OVERBOUGHT, 'cci_extreme'] = 1
df.loc[df['cci_37'] < CCI_OVERSOLD, 'cci_extreme'] = -1
print("TA: Feature 'cci_extreme' calculada.")
if 'mfi_extreme' in BASE_FEATURE_COLS and 'mfi_37' in df.columns:
df['mfi_extreme'] = 0
df.loc[df['mfi_37'] > MFI_OVERBOUGHT, 'mfi_extreme'] = 1
df.loc[df['mfi_37'] < MFI_OVERSOLD, 'mfi_extreme'] = -1
print("TA: Feature 'mfi_extreme' calculada.")
# --- 5. Sinais Combinados (Simples) ---
# Ex: RSI sobrevendido E MACD cruzou para cima recentemente?
if 'rsi_macd_buy_combo' in BASE_FEATURE_COLS and 'rsi_14' in df.columns and 'macd' in df.columns and 'macds' in df.columns:
rsi_is_oversold = df['rsi_14'] < RSI_OVERSOLD
# MACD cruzou sinal para cima no candle anterior ou atual
macd_crossed_up = (df['macd'].shift(1) < df['macds'].shift(1)) & (df['macd'] > df['macds'])
df['rsi_macd_buy_combo'] = (rsi_is_oversold & macd_crossed_up).astype(int)
print("TA: Feature 'rsi_macd_buy_combo' calculada.")
# --- 6. Volume Anômalo ---
if 'volume_anom_signal' in BASE_FEATURE_COLS and 'volume' in df.columns:
df['volume_avg'] = df['volume'].rolling(window=VOLUME_AVG_PERIOD).mean()
# Sinal se volume atual > 2x a média
df['volume_anom_signal'] = (df['volume'] > 2 * df['volume_avg']).astype(int)
print("TA: Feature 'volume_anom_signal' calculada.")
# --- Features de Corpo de Candle (já tinha antes, mas garantindo que serão usadas se estiverem em BASE_FEATURE_COLS) ---
if 'body_size_norm_atr' in BASE_FEATURE_COLS:
if 'body_size' in df.columns and 'atr' in df.columns:
# Re-checar se ATR é válido, pois body_size pode ter menos NaNs
temp_df_bs = df[['body_size', 'atr']].copy()
temp_df_bs.dropna(subset=['atr'], inplace=True)
temp_df_bs = temp_df_bs[temp_df_bs['atr'] > 1e-9]
if not temp_df_bs.empty:
df['body_size_norm_atr'] = temp_df_bs['body_size'] / temp_df_bs['atr']
# print("TA: body_size_norm_atr (re)calculado/verificado.")
# else:
# print("AVISO: 'body_size' ou 'atr' ausente para 'body_size_norm_atr'.")
if 'body_vs_avg_body' in BASE_FEATURE_COLS:
if 'body_size' in df.columns:
if 'avg_body_prev' not in df.columns: # Evitar recalcular se já existe
df['avg_body_prev'] = df['body_size'].shift(1).rolling(window=12).mean()
df['body_vs_avg_body'] = df['body_size'] / (df['avg_body_prev'] + 1e-7)
# print("TA: body_vs_avg_body (re)calculado/verificado.")
# else:
# print("AVISO: 'body_size' ausente para 'body_vs_avg_body'.")
# --- Features Derivadas _div_atr (já tinha antes, garantindo que serão usadas se estiverem em BASE_FEATURE_COLS) ---
df.dropna(subset=['atr'], inplace=True)
df = df[df['atr'] > 1e-9]
if 'open_div_atr' in BASE_FEATURE_COLS and 'open' in df.columns: df['open_div_atr'] = df['open'] / df['atr']
if 'high_div_atr' in BASE_FEATURE_COLS and 'high' in df.columns: df['high_div_atr'] = df['high'] / df['atr']
if 'low_div_atr' in BASE_FEATURE_COLS and 'low' in df.columns: df['low_div_atr'] = df['low'] / df['atr']
if 'close_div_atr' in BASE_FEATURE_COLS and 'close' in df.columns: df['close_div_atr'] = df['close'] / df['atr']
if 'volume_div_atr' in BASE_FEATURE_COLS and 'volume' in df.columns: df['volume_div_atr'] = df['volume'] / df['atr']
if 'sma_10_div_atr' in BASE_FEATURE_COLS and 'sma_10' in df.columns: df['sma_10_div_atr'] = df['sma_10'] / df['atr']
if 'macd_div_atr' in BASE_FEATURE_COLS and 'macd' in df.columns: df['macd_div_atr'] = df['macd'] / df['atr']
if 'log_return_1' in BASE_FEATURE_COLS and 'close' in df.columns:
df['log_return_1'] = np.log(df['close'] / df['close'].shift(1))
# Novas features AD e volume
rolling_vol_mean = df['volume'].rolling(window=20).mean()
rolling_vol_std = df['volume'].rolling(window=20).std()
df['volume_zscore'] = (df['volume'] - rolling_vol_mean) / (rolling_vol_std + 1e-7)
# Certifique-se que MACD, MACDS, SMA_50 já foram calculados
df.ta.sma(length=50, close='close', append=True, col_names=('sma_50',))
df['buy_condition_v1'] = ((df['macd'] > df['macds']) & (df['rsi_14'] > 50) & (df['close'] > df['sma_50'])).astype(int)
df['cond_compra_v1'] = ((df['macd'] > df['macds']) & (df['rsi_14'] > 50) & (df['close'] > df.ta.sma(length=50, append=False))).astype(int)
df.dropna(inplace=True)
final_cols_present = [col for col in BASE_FEATURE_COLS if col in df.columns]
if len(final_cols_present) != len(BASE_FEATURE_COLS):
missing = list(set(BASE_FEATURE_COLS) - set(final_cols_present))
print(f"ALERTA: Após todos os cálculos, colunas de BASE_FEATURE_COLS estão faltando: {missing}")
print(f"Verifique os cálculos e se as colunas base para eles (open, high, low, close, volume) existem no input.")
print(f"Colunas disponíveis: {df.columns.tolist()}")
# raise ValueError(f"Nem todas as features base foram geradas: {missing}")
# Selecionar apenas as colunas que realmente existem e estão em BASE_FEATURE_COLS
# para evitar erros se alguma não pôde ser calculada.
# O script de treino verificará se todas as BASE_FEATURE_COLS existem antes de escalar.
existing_base_features = [col for col in BASE_FEATURE_COLS if col in df.columns]
print(f"Indicadores técnicos e features derivadas calculadas. Features retornadas: {existing_base_features}")
return df[existing_base_features + (['open', 'high', 'low', 'close', 'volume'] if not any(c in existing_base_features for c in ['open', 'high', 'low', 'close', 'volume']) else [])] # Garante que OHLCV original está lá para calculate_targets, se não for parte das features
# ---- Fim NI
# Remover todos os NaNs restantes gerados pelos indicadores
df.dropna(inplace=True)
print("Indicadores técnicos calculados e features normalizadas pelo ATR criadas.")
else:
print("pandas_ta não disponível. Verifique a instalação.")
return df
def calculate_targets(df: pd.DataFrame, horizon: int, threshold: float) -> pd.DataFrame:
print("Criando coluna alvo para predição...")
data = df.copy()
data['future_price'] = data['close'].shift(-horizon)
data['price_change_pct'] = (data['future_price'] - data['close']) / data['close']
data['target'] = (data['price_change_pct'] > threshold).astype(int)
data.dropna(subset=['future_price', 'price_change_pct', 'target'], inplace=True)
print(f"Distribuição do Alvo:\n{data['target'].value_counts(normalize=True, dropna=False)}")
return data
def create_sequences(data: pd.DataFrame, target_col_name: str, window_size: int, feature_col_names: List[str]) -> tuple[np.ndarray, np.ndarray]:
print(f"Criando sequências com window_size={window_size} usando features: {feature_col_names}")
X_list, y_list = [], []
# Verificar se todas as feature_col_names e target_col_name existem no DataFrame
required_cols = feature_col_names + [target_col_name]
missing_cols = [col for col in required_cols if col not in data.columns]
if missing_cols:
raise ValueError(f"Colunas ausentes no DataFrame para criar sequências: {missing_cols}. Colunas disponíveis: {data.columns.tolist()}")
# Usar .values pode ser mais rápido para DataFrames grandes, mas indexar por nome é mais seguro
feature_values = data[feature_col_names].values
target_values = data[target_col_name].values
for i in range(len(feature_values) - window_size + 1): # Ajuste no loop para incluir o último elemento possível
X_list.append(feature_values[i : i + window_size])
y_list.append(target_values[i + window_size - 1]) # Alvo correspondente ao final da janela
X = np.array(X_list)
y = np.array(y_list)
print(f"Shape de X (sequências): {X.shape}, Shape de y (alvos): {y.shape}")
return X, y