# Centralized Data Management System for Binance Futures API import requests import time from typing import List, Dict, Any, Optional # --- Configuration --- API_BASE = 'https://fapi.binance.com' MAX_SYMBOLS = 500 # Strategy 1 needs: 15m, limit=25 (20+5) # Strategy 2 needs: 15m, limit=100; 1m, limit=50 # Strategy 3 needs: 15m, limit=51 (50+1) # Max required klines: 15m (100), 1m (50) class BinanceDataManager: """ Manages market list and provides kline data from Binance Futures API. Handles caching to minimize API calls and respect rate limits. """ def __init__(self): self.symbols: List[str] = [] self.kline_cache: Dict[str, Dict[str, List[Dict[str, Any]]]] = {} # {symbol: {interval: [klines]}} self.last_fetch_time: Dict[str, float] = {} # {symbol_interval: timestamp} self.market_index = 0 self.cycle_count = 0 self.required_intervals = ['1m', '3m', '15m'] def _fetch_api(self, endpoint: str, params: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Generic API fetcher with basic error handling.""" url = f"{API_BASE}{endpoint}" try: response = requests.get(url, params=params, timeout=10) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: print(f"API Error fetching {endpoint} with params {params}: {e}") return None def fetch_symbols(self, limit: int = MAX_SYMBOLS) -> List[str]: """Fetches the list of active USDT perpetual futures symbols.""" print("Fetching active symbols from Binance...") data = self._fetch_api('/fapi/v1/exchangeInfo', {}) if data and 'symbols' in data: symbols = [ s['symbol'] for s in data['symbols'] if s['contractType'] == 'PERPETUAL' and s['symbol'].endswith('USDT') and s['status'] == 'TRADING' ] self.symbols = symbols[:limit] print(f"Successfully loaded {len(self.symbols)} symbols.") return self.symbols print("Failed to fetch symbols. Using fallback list.") self.symbols = ['BTCUSDT', 'ETHUSDT', 'BNBUSDT', 'XRPUSDT', 'ADAUSDT'] # Fallback return self.symbols def get_klines(self, symbol: str, interval: str, limit: int) -> List[Dict[str, Any]]: """ Retrieves klines for a symbol and interval, using cache if available. The kline data format is converted to a dictionary for easier access: {'openTime': int, 'open': float, 'high': float, 'low': float, 'close': float, 'volume': float} """ # Determine required limit based on strategies if interval == '15m': required_limit = 100 # Max needed by Strategy 2 elif interval == '1m': required_limit = 50 # Max needed by Strategy 2 elif interval == '3m': required_limit = 51 # Max needed by Strategy 3 (LR_PERIOD=50 + 1) else: required_limit = limit # Check cache and freshness (refresh every 1.5 * interval duration) cache_key = f"{symbol}_{interval}" current_time = time.time() # Simple heuristic for refresh time: 1.5 * interval in seconds interval_seconds = self._interval_to_seconds(interval) refresh_threshold = self.last_fetch_time.get(cache_key, 0) + (interval_seconds * 1.5) if cache_key in self.kline_cache and current_time < refresh_threshold: # Cache hit and fresh enough klines = self.kline_cache[symbol].get(interval, []) if len(klines) >= required_limit: return klines[-required_limit:] # Fetch from API params = {'symbol': symbol, 'interval': interval, 'limit': required_limit} data = self._fetch_api('/fapi/v1/klines', params) if data: klines = [] for c in data: klines.append({ 'openTime': c[0], 'open': float(c[1]), 'high': float(c[2]), 'low': float(c[3]), 'close': float(c[4]), 'volume': float(c[5]) }) # Update cache if symbol not in self.kline_cache: self.kline_cache[symbol] = {} self.kline_cache[symbol][interval] = klines self.last_fetch_time[cache_key] = current_time return klines[-required_limit:] return [] def _interval_to_seconds(self, interval: str) -> int: """Converts Binance interval string to seconds.""" if interval.endswith('m'): return int(interval[:-1]) * 60 elif interval.endswith('h'): return int(interval[:-1]) * 3600 elif interval.endswith('d'): return int(interval[:-1]) * 86400 return 60 # Default to 1 minute def get_next_symbol(self) -> Optional[str]: """Returns the next symbol in the round-robin cycle.""" if not self.symbols: self.fetch_symbols() if not self.symbols: return None if self.market_index >= len(self.symbols): self.market_index = 0 self.cycle_count += 1 print(f"\n--- Starting new market scan cycle: #{self.cycle_count} ---") symbol = self.symbols[self.market_index] self.market_index += 1 return symbol def get_data_for_strategy(self, symbol: str, strategy_id: int) -> Dict[str, Any]: """ Provides all necessary data for a given strategy and symbol. This is the main interface for the strategy scripts. """ data = {} if strategy_id == 1: # Strategy 1 (BB Re-entry): 15m klines, limit=25 data['klines_15m'] = self.get_klines(symbol, '15m', 25) elif strategy_id == 2: # Strategy 2 (Doji Box): 15m klines, limit=100; 1m klines, limit=50 data['klines_15m'] = self.get_klines(symbol, '15m', 100) data['klines_1m'] = self.get_klines(symbol, '1m', 50) elif strategy_id == 3: # Strategy 3 (Trend/Range/Volume): 3m klines, limit=51 (Scalping request) # Try 3m first, fallback to 1m if 3m fails to return data klines_3m = self.get_klines(symbol, '3m', 51) if klines_3m: data['klines_3m'] = klines_3m else: data['klines_1m'] = self.get_klines(symbol, '1m', 51) return data if __name__ == '__main__': # Example usage manager = BinanceDataManager() manager.fetch_symbols(limit=10) symbol = manager.get_next_symbol() if symbol: print(f"\nAnalyzing {symbol} for Strategy 2...") data = manager.get_data_for_strategy(symbol, 2) print(f"15m klines received: {len(data.get('klines_15m', []))}") print(f"1m klines received: {len(data.get('klines_1m', []))}") # Test cache print(f"\nTesting cache for {symbol} (15m)...") data_cached = manager.get_data_for_strategy(symbol, 1) print(f"15m klines received (cached): {len(data_cached.get('klines_15m', []))}") # Next symbol next_symbol = manager.get_next_symbol() print(f"\nNext symbol: {next_symbol}")