Spaces:
Runtime error
Runtime error
| # Centralized Data Management System for Binance Futures API | |
| import requests | |
| import time | |
| from typing import List, Dict, Any, Optional | |
| # --- Configuration --- | |
| API_BASE = 'https://fapi.binance.com' | |
| MAX_SYMBOLS = 500 | |
| # Strategy 1 needs: 15m, limit=25 (20+5) | |
| # Strategy 2 needs: 15m, limit=100; 1m, limit=50 | |
| # Strategy 3 needs: 15m, limit=51 (50+1) | |
| # Max required klines: 15m (100), 1m (50) | |
| class BinanceDataManager: | |
| """ | |
| Manages market list and provides kline data from Binance Futures API. | |
| Handles caching to minimize API calls and respect rate limits. | |
| """ | |
| def __init__(self): | |
| self.symbols: List[str] = [] | |
| self.kline_cache: Dict[str, Dict[str, List[Dict[str, Any]]]] = {} # {symbol: {interval: [klines]}} | |
| self.last_fetch_time: Dict[str, float] = {} # {symbol_interval: timestamp} | |
| self.market_index = 0 | |
| self.cycle_count = 0 | |
| self.required_intervals = ['1m', '3m', '15m'] | |
| def _fetch_api(self, endpoint: str, params: Dict[str, Any]) -> Optional[Dict[str, Any]]: | |
| """Generic API fetcher with basic error handling.""" | |
| url = f"{API_BASE}{endpoint}" | |
| try: | |
| response = requests.get(url, params=params, timeout=10) | |
| response.raise_for_status() | |
| return response.json() | |
| except requests.exceptions.RequestException as e: | |
| print(f"API Error fetching {endpoint} with params {params}: {e}") | |
| return None | |
| def fetch_symbols(self, limit: int = MAX_SYMBOLS) -> List[str]: | |
| """Fetches the list of active USDT perpetual futures symbols.""" | |
| print("Fetching active symbols from Binance...") | |
| data = self._fetch_api('/fapi/v1/exchangeInfo', {}) | |
| if data and 'symbols' in data: | |
| symbols = [ | |
| s['symbol'] for s in data['symbols'] | |
| if s['contractType'] == 'PERPETUAL' and s['symbol'].endswith('USDT') and s['status'] == 'TRADING' | |
| ] | |
| self.symbols = symbols[:limit] | |
| print(f"Successfully loaded {len(self.symbols)} symbols.") | |
| return self.symbols | |
| print("Failed to fetch symbols. Using fallback list.") | |
| self.symbols = ['BTCUSDT', 'ETHUSDT', 'BNBUSDT', 'XRPUSDT', 'ADAUSDT'] # Fallback | |
| return self.symbols | |
| def get_klines(self, symbol: str, interval: str, limit: int) -> List[Dict[str, Any]]: | |
| """ | |
| Retrieves klines for a symbol and interval, using cache if available. | |
| The kline data format is converted to a dictionary for easier access: | |
| {'openTime': int, 'open': float, 'high': float, 'low': float, 'close': float, 'volume': float} | |
| """ | |
| # Determine required limit based on strategies | |
| if interval == '15m': | |
| required_limit = 100 # Max needed by Strategy 2 | |
| elif interval == '1m': | |
| required_limit = 50 # Max needed by Strategy 2 | |
| elif interval == '3m': | |
| required_limit = 51 # Max needed by Strategy 3 (LR_PERIOD=50 + 1) | |
| else: | |
| required_limit = limit | |
| # Check cache and freshness (refresh every 1.5 * interval duration) | |
| cache_key = f"{symbol}_{interval}" | |
| current_time = time.time() | |
| # Simple heuristic for refresh time: 1.5 * interval in seconds | |
| interval_seconds = self._interval_to_seconds(interval) | |
| refresh_threshold = self.last_fetch_time.get(cache_key, 0) + (interval_seconds * 1.5) | |
| if cache_key in self.kline_cache and current_time < refresh_threshold: | |
| # Cache hit and fresh enough | |
| klines = self.kline_cache[symbol].get(interval, []) | |
| if len(klines) >= required_limit: | |
| return klines[-required_limit:] | |
| # Fetch from API | |
| params = {'symbol': symbol, 'interval': interval, 'limit': required_limit} | |
| data = self._fetch_api('/fapi/v1/klines', params) | |
| if data: | |
| klines = [] | |
| for c in data: | |
| klines.append({ | |
| 'openTime': c[0], | |
| 'open': float(c[1]), | |
| 'high': float(c[2]), | |
| 'low': float(c[3]), | |
| 'close': float(c[4]), | |
| 'volume': float(c[5]) | |
| }) | |
| # Update cache | |
| if symbol not in self.kline_cache: | |
| self.kline_cache[symbol] = {} | |
| self.kline_cache[symbol][interval] = klines | |
| self.last_fetch_time[cache_key] = current_time | |
| return klines[-required_limit:] | |
| return [] | |
| def _interval_to_seconds(self, interval: str) -> int: | |
| """Converts Binance interval string to seconds.""" | |
| if interval.endswith('m'): | |
| return int(interval[:-1]) * 60 | |
| elif interval.endswith('h'): | |
| return int(interval[:-1]) * 3600 | |
| elif interval.endswith('d'): | |
| return int(interval[:-1]) * 86400 | |
| return 60 # Default to 1 minute | |
| def get_next_symbol(self) -> Optional[str]: | |
| """Returns the next symbol in the round-robin cycle.""" | |
| if not self.symbols: | |
| self.fetch_symbols() | |
| if not self.symbols: | |
| return None | |
| if self.market_index >= len(self.symbols): | |
| self.market_index = 0 | |
| self.cycle_count += 1 | |
| print(f"\n--- Starting new market scan cycle: #{self.cycle_count} ---") | |
| symbol = self.symbols[self.market_index] | |
| self.market_index += 1 | |
| return symbol | |
| def get_data_for_strategy(self, symbol: str, strategy_id: int) -> Dict[str, Any]: | |
| """ | |
| Provides all necessary data for a given strategy and symbol. | |
| This is the main interface for the strategy scripts. | |
| """ | |
| data = {} | |
| if strategy_id == 1: | |
| # Strategy 1 (BB Re-entry): 15m klines, limit=25 | |
| data['klines_15m'] = self.get_klines(symbol, '15m', 25) | |
| elif strategy_id == 2: | |
| # Strategy 2 (Doji Box): 15m klines, limit=100; 1m klines, limit=50 | |
| data['klines_15m'] = self.get_klines(symbol, '15m', 100) | |
| data['klines_1m'] = self.get_klines(symbol, '1m', 50) | |
| elif strategy_id == 3: | |
| # Strategy 3 (Trend/Range/Volume): 3m klines, limit=51 (Scalping request) | |
| # Try 3m first, fallback to 1m if 3m fails to return data | |
| klines_3m = self.get_klines(symbol, '3m', 51) | |
| if klines_3m: | |
| data['klines_3m'] = klines_3m | |
| else: | |
| data['klines_1m'] = self.get_klines(symbol, '1m', 51) | |
| return data | |
| if __name__ == '__main__': | |
| # Example usage | |
| manager = BinanceDataManager() | |
| manager.fetch_symbols(limit=10) | |
| symbol = manager.get_next_symbol() | |
| if symbol: | |
| print(f"\nAnalyzing {symbol} for Strategy 2...") | |
| data = manager.get_data_for_strategy(symbol, 2) | |
| print(f"15m klines received: {len(data.get('klines_15m', []))}") | |
| print(f"1m klines received: {len(data.get('klines_1m', []))}") | |
| # Test cache | |
| print(f"\nTesting cache for {symbol} (15m)...") | |
| data_cached = manager.get_data_for_strategy(symbol, 1) | |
| print(f"15m klines received (cached): {len(data_cached.get('klines_15m', []))}") | |
| # Next symbol | |
| next_symbol = manager.get_next_symbol() | |
| print(f"\nNext symbol: {next_symbol}") | |