File size: 6,904 Bytes
b3a7985 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
import os
import ccxt.async_support as ccxt
from typing import Dict, Any, List, Optional, Tuple
# from .utils.logger import get_logger
# logger = get_logger()
# Ou logger passado como argumento para as funções
# Variáveis de ambiente lidas uma vez no início ou passadas para funções
CCXT_EXCHANGE_ID_ENV = os.environ.get("CCXT_EXCHANGE_ID", "binance")
CCXT_API_KEY_ENV = os.environ.get("CCXT_API_KEY")
CCXT_API_SECRET_ENV = os.environ.get("CCXT_API_SECRET")
CCXT_API_PASSWORD_ENV = os.environ.get("CCXT_API_PASSWORD")
CCXT_SANDBOX_MODE_ENV = os.environ.get("CCXT_SANDBOX_MODE", "false").lower() == "true"
async def get_ccxt_exchange(logger_instance) -> Optional[ccxt.Exchange]:
"""
Inicializa e retorna uma instância da exchange ccxt.
Retorna None se a configuração estiver ausente ou a inicialização falhar.
"""
if not CCXT_API_KEY_ENV or not CCXT_API_SECRET_ENV:
logger_instance.warning("CCXT_API_KEY ou CCXT_API_SECRET não configurados. Não é possível inicializar a exchange.")
return None
try:
exchange_class = getattr(ccxt, CCXT_EXCHANGE_ID_ENV)
config = {
'apiKey': CCXT_API_KEY_ENV,
'secret': CCXT_API_SECRET_ENV,
'enableRateLimit': True,
}
if CCXT_API_PASSWORD_ENV:
config['password'] = CCXT_API_PASSWORD_ENV
exchange = exchange_class(config)
if CCXT_SANDBOX_MODE_ENV:
# Configurar sandbox/testnet
# Método set_sandbox_mode (preferível)
if hasattr(exchange, 'set_sandbox_mode') and callable(exchange.set_sandbox_mode):
try:
exchange.set_sandbox_mode(True)
logger_instance.info(f"CCXT: Modo SANDBOX ativado para {CCXT_EXCHANGE_ID_ENV} via set_sandbox_mode.")
except Exception as e_sandbox:
logger_instance.warning(f"CCXT: Tentativa de set_sandbox_mode para {CCXT_EXCHANGE_ID_ENV} falhou: {e_sandbox}. Tentando URL de teste.")
# Tentar fallback para URL de teste se set_sandbox_mode falhar
if 'test' in exchange.urls:
exchange.urls['api'] = exchange.urls['test']
logger_instance.info(f"CCXT: URLs alteradas para TESTNET para {CCXT_EXCHANGE_ID_ENV} (fallback).")
else:
logger_instance.warning(f"CCXT: Nenhuma URL de teste encontrada para {CCXT_EXCHANGE_ID_ENV} como fallback.")
# Alterar URLs diretamente (se set_sandbox_mode não estiver disponível)
elif 'test' in exchange.urls:
exchange.urls['api'] = exchange.urls['test']
logger_instance.info(f"CCXT: URLs alteradas para TESTNET para {CCXT_EXCHANGE_ID_ENV} (método direto).")
else:
logger_instance.warning(f"CCXT: Modo SANDBOX solicitado, mas não há método set_sandbox_mode nem URL de teste para {CCXT_EXCHANGE_ID_ENV}.")
# Carregar mercados, útil para todas as operações
# await exchange.load_markets()
# logger_instance.info(f"CCXT: Mercados carregados para {CCXT_EXCHANGE_ID_ENV}.")
return exchange
except AttributeError:
logger_instance.error(f"CCXT: Exchange ID '{CCXT_EXCHANGE_ID_ENV}' inválida ou não suportada.")
return None
except Exception as e:
logger_instance.error(f"CCXT: Erro ao inicializar exchange {CCXT_EXCHANGE_ID_ENV}: {str(e)}", exc_info=True)
return None
async def fetch_crypto_data(
exchange: ccxt.Exchange,
pairs: List[str],
logger_instance
) -> Tuple[Dict[str, Any], bool, str]:
"""
Busca dados de mercado (ticker, OHLCV) para uma lista de pares de cripto.
Retorna: (dados_coletados, sucesso, mensagem_de_erro_detalhada)
"""
collected_data: Dict[str, Any] = {}
fetch_successful = True
error_message = ""
logger_instance.info(f"CCXT: Iniciando coleta de dados para pares: {pairs}")
for pair_symbol in pairs:
pair_data_key = pair_symbol.replace("/", "_")
current_pair_data = {}
try:
if exchange.has['fetchTicker']:
ticker = await exchange.fetch_ticker(pair_symbol)
current_pair_data['ticker'] = {
'last': ticker.get('last'), 'bid': ticker.get('bid'), 'ask': ticker.get('ask'),
'volume': ticker.get('baseVolume'), 'timestamp': ticker.get('timestamp')
}
if exchange.has['fetchOHLCV']:
# Parametrizar timeframe e limit
ohlcv = await exchange.fetch_ohlcv(pair_symbol, timeframe='1h', limit=72)
current_pair_data['ohlcv_1h'] = ohlcv
# Adicionar mais tipos de dados necessários (order book, trades)
collected_data[pair_data_key] = current_pair_data
logger_instance.info(f"CCXT: Dados coletados para {pair_symbol}: Ticker OK, OHLCV OK (len: {len(ohlcv if 'ohlcv' in current_pair_data else [])})")
except ccxt.NetworkError as e_net:
logger_instance.error(f"CCXT: Erro de REDE ao buscar dados para {pair_symbol}: {e_net}")
error_message += f"NetworkError for {pair_symbol}: {e_net}; "
fetch_successful = False # Flha parcial ou total
break # Interrompe a coleta para todos
except ccxt.ExchangeError as e_exc:
logger_instance.error(f"CCXT: Erro da EXCHANGE ao buscar dados para {pair_symbol}: {e_exc}")
error_message += f"ExchangeError for {pair_symbol}: {e_exc}; "
fetch_successful = False
# Algumas ExchangeErrors podem ser específicas do par (par não existe),
# Continuar para outros pares. Outras podem ser fatais (API key inválida).
# Considerar falha e parar para este par.
collected_data[pair_data_key] = {"error": str(e_exc)} # Registra o erro para o par
except Exception as e_gen:
logger_instance.error(f"CCXT: Erro GERAL ao buscar dados para {pair_symbol}: {e_gen}", exc_info=True)
error_message += f"General error for {pair_symbol}: {e_gen}; "
fetch_successful = False
collected_data[pair_data_key] = {"error": str(e_gen)}
if not fetch_successful:
logger_instance.warning(f"CCXT: Coleta de dados de cripto encontrou erros. Detalhes: {error_message}")
else:
logger_instance.info("CCXT: Coleta de dados de cripto concluída com sucesso.")
return collected_data, fetch_successful, error_message
# Adicione aqui funções para executar ordens (CREATE_ORDER_PLACEHOLDER)
# async def create_crypto_order(...) -> Tuple[Optional[Dict], bool, str]:
# async def fetch_order_status(...)
# Adicionar funções para fechar posições, buscar balanços. |