| |
| """ |
| Binance DNS Connector with Multi-Endpoint Failover |
| Handles Binance API connections with automatic DNS-based failover across multiple mirror endpoints |
| """ |
|
|
| import httpx |
| from typing import Optional, Dict, Any, List |
| import asyncio |
| import logging |
| from datetime import datetime |
| import time |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class BinanceDNSConnector: |
| """ |
| Binance API connector with DNS-based failover support |
| |
| Features: |
| - Multiple DNS endpoints for Binance global distribution |
| - Automatic failover on connection errors |
| - Health tracking per endpoint |
| - Round-robin with health-based selection |
| - Exponential backoff for failed endpoints |
| """ |
| |
| |
| BINANCE_GLOBAL_ENDPOINTS = [ |
| "https://api.binance.com", |
| "https://api1.binance.com", |
| "https://api2.binance.com", |
| "https://api3.binance.com", |
| "https://api4.binance.com", |
| ] |
| |
| BINANCE_US_ENDPOINTS = [ |
| "https://api.binance.us", |
| ] |
| |
| def __init__(self, use_us: bool = False, timeout: float = 10.0): |
| """ |
| Initialize Binance DNS connector |
| |
| Args: |
| use_us: If True, use Binance US endpoints |
| timeout: Request timeout in seconds |
| """ |
| self.endpoints = self.BINANCE_US_ENDPOINTS if use_us else self.BINANCE_GLOBAL_ENDPOINTS |
| self.timeout = timeout |
| self.use_us = use_us |
| |
| |
| self.endpoint_health: Dict[str, Dict[str, Any]] = { |
| endpoint: { |
| "available": True, |
| "consecutive_failures": 0, |
| "last_success": None, |
| "last_failure": None, |
| "total_requests": 0, |
| "successful_requests": 0, |
| "failed_requests": 0, |
| "avg_response_time": 0.0, |
| "backoff_until": 0.0 |
| } |
| for endpoint in self.endpoints |
| } |
| |
| self.current_endpoint_index = 0 |
| |
| logger.info(f"π Binance DNS Connector initialized: {len(self.endpoints)} endpoints available") |
| |
| def _get_next_healthy_endpoint(self) -> Optional[str]: |
| """ |
| Get next healthy endpoint using intelligent selection |
| |
| Strategy: |
| 1. Filter endpoints not in backoff |
| 2. Prefer endpoints with recent success |
| 3. Round-robin among healthy endpoints |
| |
| Returns: |
| Next healthy endpoint URL or None if all down |
| """ |
| now = time.time() |
| |
| |
| available = [ |
| endpoint for endpoint in self.endpoints |
| if self.endpoint_health[endpoint]["backoff_until"] <= now |
| ] |
| |
| if not available: |
| |
| logger.warning("π¨ All Binance endpoints in backoff! Using least recently failed.") |
| return min( |
| self.endpoints, |
| key=lambda e: self.endpoint_health[e]["backoff_until"] |
| ) |
| |
| |
| def score_endpoint(endpoint: str) -> float: |
| health = self.endpoint_health[endpoint] |
| total = health["total_requests"] |
| |
| if total == 0: |
| return 0 |
| |
| success_rate = health["successful_requests"] / total |
| score = (1 - success_rate) * 100 |
| |
| |
| score += health["consecutive_failures"] * 10 |
| |
| return score |
| |
| |
| best_endpoint = min(available, key=score_endpoint) |
| |
| return best_endpoint |
| |
| def _record_success(self, endpoint: str, response_time: float): |
| """Record successful request""" |
| health = self.endpoint_health[endpoint] |
| health["consecutive_failures"] = 0 |
| health["last_success"] = datetime.now().isoformat() |
| health["total_requests"] += 1 |
| health["successful_requests"] += 1 |
| health["backoff_until"] = 0.0 |
| |
| |
| if health["avg_response_time"] == 0: |
| health["avg_response_time"] = response_time |
| else: |
| |
| health["avg_response_time"] = 0.7 * health["avg_response_time"] + 0.3 * response_time |
| |
| def _record_failure(self, endpoint: str, error: str): |
| """Record failed request with exponential backoff""" |
| health = self.endpoint_health[endpoint] |
| health["consecutive_failures"] += 1 |
| health["last_failure"] = datetime.now().isoformat() |
| health["total_requests"] += 1 |
| health["failed_requests"] += 1 |
| |
| |
| backoff_duration = min(2 ** health["consecutive_failures"], 300) |
| health["backoff_until"] = time.time() + backoff_duration |
| |
| logger.warning( |
| f"β Binance endpoint failed: {endpoint} - {error} " |
| f"(failures: {health['consecutive_failures']}, backoff: {backoff_duration}s)" |
| ) |
| |
| async def get( |
| self, |
| path: str, |
| params: Optional[Dict] = None, |
| max_retries: int = None |
| ) -> Optional[Dict[str, Any]]: |
| """ |
| Make GET request with automatic DNS failover |
| |
| Args: |
| path: API endpoint path (e.g., "/api/v3/ticker/price") |
| params: Query parameters |
| max_retries: Maximum retry attempts (default: number of endpoints) |
| |
| Returns: |
| JSON response or None if all endpoints failed |
| """ |
| if max_retries is None: |
| max_retries = len(self.endpoints) |
| |
| last_error = None |
| |
| for attempt in range(max_retries): |
| endpoint = self._get_next_healthy_endpoint() |
| |
| if not endpoint: |
| logger.error("π¨ No Binance endpoints available!") |
| break |
| |
| url = f"{endpoint}{path}" |
| start_time = time.time() |
| |
| try: |
| async with httpx.AsyncClient(timeout=self.timeout) as client: |
| response = await client.get(url, params=params) |
| response.raise_for_status() |
| |
| response_time = time.time() - start_time |
| self._record_success(endpoint, response_time) |
| |
| logger.info( |
| f"β
Binance {path} - {endpoint} - {response_time*1000:.0f}ms" |
| ) |
| |
| return response.json() |
| |
| except httpx.HTTPStatusError as e: |
| last_error = f"HTTP {e.response.status_code}" |
| self._record_failure(endpoint, last_error) |
| |
| |
| if e.response.status_code == 429: |
| logger.warning(f"β οΈ Binance rate limit hit on {endpoint}, trying next...") |
| continue |
| |
| |
| if attempt < max_retries - 1: |
| await asyncio.sleep(0.3) |
| |
| except httpx.TimeoutException: |
| last_error = "Timeout" |
| self._record_failure(endpoint, last_error) |
| |
| if attempt < max_retries - 1: |
| await asyncio.sleep(0.3) |
| |
| except Exception as e: |
| last_error = str(e) |
| self._record_failure(endpoint, last_error) |
| |
| if attempt < max_retries - 1: |
| await asyncio.sleep(0.3) |
| |
| logger.error(f"β All Binance endpoints failed for {path}: {last_error}") |
| return None |
| |
| async def post( |
| self, |
| path: str, |
| data: Optional[Dict] = None, |
| params: Optional[Dict] = None, |
| max_retries: int = None |
| ) -> Optional[Dict[str, Any]]: |
| """ |
| Make POST request with automatic DNS failover |
| |
| Args: |
| path: API endpoint path |
| data: Request body data |
| params: Query parameters |
| max_retries: Maximum retry attempts |
| |
| Returns: |
| JSON response or None if all endpoints failed |
| """ |
| if max_retries is None: |
| max_retries = len(self.endpoints) |
| |
| last_error = None |
| |
| for attempt in range(max_retries): |
| endpoint = self._get_next_healthy_endpoint() |
| |
| if not endpoint: |
| logger.error("π¨ No Binance endpoints available!") |
| break |
| |
| url = f"{endpoint}{path}" |
| start_time = time.time() |
| |
| try: |
| async with httpx.AsyncClient(timeout=self.timeout) as client: |
| response = await client.post(url, json=data, params=params) |
| response.raise_for_status() |
| |
| response_time = time.time() - start_time |
| self._record_success(endpoint, response_time) |
| |
| logger.info( |
| f"β
Binance POST {path} - {endpoint} - {response_time*1000:.0f}ms" |
| ) |
| |
| return response.json() |
| |
| except Exception as e: |
| last_error = str(e) |
| self._record_failure(endpoint, last_error) |
| |
| if attempt < max_retries - 1: |
| await asyncio.sleep(0.3) |
| |
| logger.error(f"β All Binance endpoints failed for POST {path}: {last_error}") |
| return None |
| |
| def get_health_status(self) -> Dict[str, Any]: |
| """ |
| Get health status of all Binance endpoints |
| |
| Returns: |
| Dict with health information for each endpoint |
| """ |
| now = time.time() |
| |
| return { |
| "connector_type": "Binance US" if self.use_us else "Binance Global", |
| "total_endpoints": len(self.endpoints), |
| "endpoints": [ |
| { |
| "url": endpoint, |
| "available": health["backoff_until"] <= now, |
| "consecutive_failures": health["consecutive_failures"], |
| "success_rate": ( |
| 100 * health["successful_requests"] / health["total_requests"] |
| if health["total_requests"] > 0 else 0 |
| ), |
| "total_requests": health["total_requests"], |
| "avg_response_time_ms": health["avg_response_time"] * 1000, |
| "last_success": health["last_success"], |
| "last_failure": health["last_failure"], |
| "backoff_until": ( |
| datetime.fromtimestamp(health["backoff_until"]).isoformat() |
| if health["backoff_until"] > now else None |
| ) |
| } |
| for endpoint, health in self.endpoint_health.items() |
| ] |
| } |
| |
| def reset_health(self, endpoint: Optional[str] = None): |
| """ |
| Reset health tracking for endpoint(s) |
| |
| Args: |
| endpoint: Specific endpoint to reset, or None to reset all |
| """ |
| if endpoint: |
| if endpoint in self.endpoint_health: |
| self.endpoint_health[endpoint]["consecutive_failures"] = 0 |
| self.endpoint_health[endpoint]["backoff_until"] = 0.0 |
| logger.info(f"π Reset health for {endpoint}") |
| else: |
| for ep in self.endpoint_health: |
| self.endpoint_health[ep]["consecutive_failures"] = 0 |
| self.endpoint_health[ep]["backoff_until"] = 0.0 |
| logger.info("π Reset health for all endpoints") |
|
|
|
|
| |
|
|
| _binance_global_connector: Optional[BinanceDNSConnector] = None |
| _binance_us_connector: Optional[BinanceDNSConnector] = None |
|
|
|
|
| def get_binance_connector(use_us: bool = False) -> BinanceDNSConnector: |
| """ |
| Get singleton Binance connector instance |
| |
| Args: |
| use_us: If True, return US connector, else global connector |
| |
| Returns: |
| BinanceDNSConnector instance |
| """ |
| global _binance_global_connector, _binance_us_connector |
| |
| if use_us: |
| if _binance_us_connector is None: |
| _binance_us_connector = BinanceDNSConnector(use_us=True) |
| return _binance_us_connector |
| else: |
| if _binance_global_connector is None: |
| _binance_global_connector = BinanceDNSConnector(use_us=False) |
| return _binance_global_connector |
|
|
|
|
| |
|
|
| async def binance_get(path: str, params: Optional[Dict] = None, use_us: bool = False) -> Optional[Dict]: |
| """ |
| Convenience function for Binance GET requests with failover |
| |
| Args: |
| path: API path (e.g., "/api/v3/ticker/price") |
| params: Query parameters |
| use_us: Use Binance US endpoints |
| |
| Returns: |
| JSON response or None |
| """ |
| connector = get_binance_connector(use_us=use_us) |
| return await connector.get(path, params=params) |
|
|
|
|
| async def binance_post( |
| path: str, |
| data: Optional[Dict] = None, |
| params: Optional[Dict] = None, |
| use_us: bool = False |
| ) -> Optional[Dict]: |
| """ |
| Convenience function for Binance POST requests with failover |
| |
| Args: |
| path: API path |
| data: Request body |
| params: Query parameters |
| use_us: Use Binance US endpoints |
| |
| Returns: |
| JSON response or None |
| """ |
| connector = get_binance_connector(use_us=use_us) |
| return await connector.post(path, data=data, params=params) |
|
|
|
|
| |
|
|
| if __name__ == "__main__": |
| async def test(): |
| print("=" * 70) |
| print("Testing Binance DNS Connector") |
| print("=" * 70) |
| |
| connector = get_binance_connector(use_us=False) |
| |
| |
| print("\n1. Testing BTC price fetch:") |
| result = await connector.get("/api/v3/ticker/price", params={"symbol": "BTCUSDT"}) |
| if result: |
| print(f" β
BTC Price: ${float(result.get('price', 0)):,.2f}") |
| else: |
| print(" β Failed to fetch BTC price") |
| |
| |
| print("\n2. Testing multiple price fetch:") |
| result = await connector.get("/api/v3/ticker/price") |
| if result: |
| print(f" β
Fetched {len(result)} prices") |
| else: |
| print(" β Failed to fetch prices") |
| |
| |
| print("\n3. Health Status:") |
| health = connector.get_health_status() |
| print(f" Total endpoints: {health['total_endpoints']}") |
| for ep in health['endpoints']: |
| status = "β
" if ep['available'] else "β" |
| print(f" {status} {ep['url']}: {ep['success_rate']:.1f}% success, {ep['total_requests']} requests") |
| |
| print("\n" + "=" * 70) |
| print("Test completed!") |
| |
| asyncio.run(test()) |
|
|