DeepFin / agents /app /utils /ccxt_utils.py
amos-fernandes's picture
Upload 151 files
b3a7985 verified
import os
import ccxt.async_support as ccxt
from typing import Dict, Any, List, Optional, Tuple
# from .utils.logger import get_logger
# logger = get_logger()
# Ou logger passado como argumento para as funções
# Variáveis de ambiente lidas uma vez no início ou passadas para funções
CCXT_EXCHANGE_ID_ENV = os.environ.get("CCXT_EXCHANGE_ID", "binance")
CCXT_API_KEY_ENV = os.environ.get("CCXT_API_KEY")
CCXT_API_SECRET_ENV = os.environ.get("CCXT_API_SECRET")
CCXT_API_PASSWORD_ENV = os.environ.get("CCXT_API_PASSWORD")
CCXT_SANDBOX_MODE_ENV = os.environ.get("CCXT_SANDBOX_MODE", "false").lower() == "true"
async def get_ccxt_exchange(logger_instance) -> Optional[ccxt.Exchange]:
"""
Inicializa e retorna uma instância da exchange ccxt.
Retorna None se a configuração estiver ausente ou a inicialização falhar.
"""
if not CCXT_API_KEY_ENV or not CCXT_API_SECRET_ENV:
logger_instance.warning("CCXT_API_KEY ou CCXT_API_SECRET não configurados. Não é possível inicializar a exchange.")
return None
try:
exchange_class = getattr(ccxt, CCXT_EXCHANGE_ID_ENV)
config = {
'apiKey': CCXT_API_KEY_ENV,
'secret': CCXT_API_SECRET_ENV,
'enableRateLimit': True,
}
if CCXT_API_PASSWORD_ENV:
config['password'] = CCXT_API_PASSWORD_ENV
exchange = exchange_class(config)
if CCXT_SANDBOX_MODE_ENV:
# Configurar sandbox/testnet
# Método set_sandbox_mode (preferível)
if hasattr(exchange, 'set_sandbox_mode') and callable(exchange.set_sandbox_mode):
try:
exchange.set_sandbox_mode(True)
logger_instance.info(f"CCXT: Modo SANDBOX ativado para {CCXT_EXCHANGE_ID_ENV} via set_sandbox_mode.")
except Exception as e_sandbox:
logger_instance.warning(f"CCXT: Tentativa de set_sandbox_mode para {CCXT_EXCHANGE_ID_ENV} falhou: {e_sandbox}. Tentando URL de teste.")
# Tentar fallback para URL de teste se set_sandbox_mode falhar
if 'test' in exchange.urls:
exchange.urls['api'] = exchange.urls['test']
logger_instance.info(f"CCXT: URLs alteradas para TESTNET para {CCXT_EXCHANGE_ID_ENV} (fallback).")
else:
logger_instance.warning(f"CCXT: Nenhuma URL de teste encontrada para {CCXT_EXCHANGE_ID_ENV} como fallback.")
# Alterar URLs diretamente (se set_sandbox_mode não estiver disponível)
elif 'test' in exchange.urls:
exchange.urls['api'] = exchange.urls['test']
logger_instance.info(f"CCXT: URLs alteradas para TESTNET para {CCXT_EXCHANGE_ID_ENV} (método direto).")
else:
logger_instance.warning(f"CCXT: Modo SANDBOX solicitado, mas não há método set_sandbox_mode nem URL de teste para {CCXT_EXCHANGE_ID_ENV}.")
# Carregar mercados, útil para todas as operações
# await exchange.load_markets()
# logger_instance.info(f"CCXT: Mercados carregados para {CCXT_EXCHANGE_ID_ENV}.")
return exchange
except AttributeError:
logger_instance.error(f"CCXT: Exchange ID '{CCXT_EXCHANGE_ID_ENV}' inválida ou não suportada.")
return None
except Exception as e:
logger_instance.error(f"CCXT: Erro ao inicializar exchange {CCXT_EXCHANGE_ID_ENV}: {str(e)}", exc_info=True)
return None
async def fetch_crypto_data(
exchange: ccxt.Exchange,
pairs: List[str],
logger_instance
) -> Tuple[Dict[str, Any], bool, str]:
"""
Busca dados de mercado (ticker, OHLCV) para uma lista de pares de cripto.
Retorna: (dados_coletados, sucesso, mensagem_de_erro_detalhada)
"""
collected_data: Dict[str, Any] = {}
fetch_successful = True
error_message = ""
logger_instance.info(f"CCXT: Iniciando coleta de dados para pares: {pairs}")
for pair_symbol in pairs:
pair_data_key = pair_symbol.replace("/", "_")
current_pair_data = {}
try:
if exchange.has['fetchTicker']:
ticker = await exchange.fetch_ticker(pair_symbol)
current_pair_data['ticker'] = {
'last': ticker.get('last'), 'bid': ticker.get('bid'), 'ask': ticker.get('ask'),
'volume': ticker.get('baseVolume'), 'timestamp': ticker.get('timestamp')
}
if exchange.has['fetchOHLCV']:
# Parametrizar timeframe e limit
ohlcv = await exchange.fetch_ohlcv(pair_symbol, timeframe='1h', limit=72)
current_pair_data['ohlcv_1h'] = ohlcv
# Adicionar mais tipos de dados necessários (order book, trades)
collected_data[pair_data_key] = current_pair_data
logger_instance.info(f"CCXT: Dados coletados para {pair_symbol}: Ticker OK, OHLCV OK (len: {len(ohlcv if 'ohlcv' in current_pair_data else [])})")
except ccxt.NetworkError as e_net:
logger_instance.error(f"CCXT: Erro de REDE ao buscar dados para {pair_symbol}: {e_net}")
error_message += f"NetworkError for {pair_symbol}: {e_net}; "
fetch_successful = False # Flha parcial ou total
break # Interrompe a coleta para todos
except ccxt.ExchangeError as e_exc:
logger_instance.error(f"CCXT: Erro da EXCHANGE ao buscar dados para {pair_symbol}: {e_exc}")
error_message += f"ExchangeError for {pair_symbol}: {e_exc}; "
fetch_successful = False
# Algumas ExchangeErrors podem ser específicas do par (par não existe),
# Continuar para outros pares. Outras podem ser fatais (API key inválida).
# Considerar falha e parar para este par.
collected_data[pair_data_key] = {"error": str(e_exc)} # Registra o erro para o par
except Exception as e_gen:
logger_instance.error(f"CCXT: Erro GERAL ao buscar dados para {pair_symbol}: {e_gen}", exc_info=True)
error_message += f"General error for {pair_symbol}: {e_gen}; "
fetch_successful = False
collected_data[pair_data_key] = {"error": str(e_gen)}
if not fetch_successful:
logger_instance.warning(f"CCXT: Coleta de dados de cripto encontrou erros. Detalhes: {error_message}")
else:
logger_instance.info("CCXT: Coleta de dados de cripto concluída com sucesso.")
return collected_data, fetch_successful, error_message
# Adicione aqui funções para executar ordens (CREATE_ORDER_PLACEHOLDER)
# async def create_crypto_order(...) -> Tuple[Optional[Dict], bool, str]:
# async def fetch_order_status(...)
# Adicionar funções para fechar posições, buscar balanços.