Spaces:
Runtime error
Runtime error
File size: 7,460 Bytes
fa61b9f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 | # Centralized Data Management System for Binance Futures API
import requests
import time
from typing import List, Dict, Any, Optional
# --- Configuration ---
API_BASE = 'https://fapi.binance.com'
MAX_SYMBOLS = 500
# Strategy 1 needs: 15m, limit=25 (20+5)
# Strategy 2 needs: 15m, limit=100; 1m, limit=50
# Strategy 3 needs: 15m, limit=51 (50+1)
# Max required klines: 15m (100), 1m (50)
class BinanceDataManager:
"""
Manages market list and provides kline data from Binance Futures API.
Handles caching to minimize API calls and respect rate limits.
"""
def __init__(self):
self.symbols: List[str] = []
self.kline_cache: Dict[str, Dict[str, List[Dict[str, Any]]]] = {} # {symbol: {interval: [klines]}}
self.last_fetch_time: Dict[str, float] = {} # {symbol_interval: timestamp}
self.market_index = 0
self.cycle_count = 0
self.required_intervals = ['1m', '3m', '15m']
def _fetch_api(self, endpoint: str, params: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Generic API fetcher with basic error handling."""
url = f"{API_BASE}{endpoint}"
try:
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"API Error fetching {endpoint} with params {params}: {e}")
return None
def fetch_symbols(self, limit: int = MAX_SYMBOLS) -> List[str]:
"""Fetches the list of active USDT perpetual futures symbols."""
print("Fetching active symbols from Binance...")
data = self._fetch_api('/fapi/v1/exchangeInfo', {})
if data and 'symbols' in data:
symbols = [
s['symbol'] for s in data['symbols']
if s['contractType'] == 'PERPETUAL' and s['symbol'].endswith('USDT') and s['status'] == 'TRADING'
]
self.symbols = symbols[:limit]
print(f"Successfully loaded {len(self.symbols)} symbols.")
return self.symbols
print("Failed to fetch symbols. Using fallback list.")
self.symbols = ['BTCUSDT', 'ETHUSDT', 'BNBUSDT', 'XRPUSDT', 'ADAUSDT'] # Fallback
return self.symbols
def get_klines(self, symbol: str, interval: str, limit: int) -> List[Dict[str, Any]]:
"""
Retrieves klines for a symbol and interval, using cache if available.
The kline data format is converted to a dictionary for easier access:
{'openTime': int, 'open': float, 'high': float, 'low': float, 'close': float, 'volume': float}
"""
# Determine required limit based on strategies
if interval == '15m':
required_limit = 100 # Max needed by Strategy 2
elif interval == '1m':
required_limit = 50 # Max needed by Strategy 2
elif interval == '3m':
required_limit = 51 # Max needed by Strategy 3 (LR_PERIOD=50 + 1)
else:
required_limit = limit
# Check cache and freshness (refresh every 1.5 * interval duration)
cache_key = f"{symbol}_{interval}"
current_time = time.time()
# Simple heuristic for refresh time: 1.5 * interval in seconds
interval_seconds = self._interval_to_seconds(interval)
refresh_threshold = self.last_fetch_time.get(cache_key, 0) + (interval_seconds * 1.5)
if cache_key in self.kline_cache and current_time < refresh_threshold:
# Cache hit and fresh enough
klines = self.kline_cache[symbol].get(interval, [])
if len(klines) >= required_limit:
return klines[-required_limit:]
# Fetch from API
params = {'symbol': symbol, 'interval': interval, 'limit': required_limit}
data = self._fetch_api('/fapi/v1/klines', params)
if data:
klines = []
for c in data:
klines.append({
'openTime': c[0],
'open': float(c[1]),
'high': float(c[2]),
'low': float(c[3]),
'close': float(c[4]),
'volume': float(c[5])
})
# Update cache
if symbol not in self.kline_cache:
self.kline_cache[symbol] = {}
self.kline_cache[symbol][interval] = klines
self.last_fetch_time[cache_key] = current_time
return klines[-required_limit:]
return []
def _interval_to_seconds(self, interval: str) -> int:
"""Converts Binance interval string to seconds."""
if interval.endswith('m'):
return int(interval[:-1]) * 60
elif interval.endswith('h'):
return int(interval[:-1]) * 3600
elif interval.endswith('d'):
return int(interval[:-1]) * 86400
return 60 # Default to 1 minute
def get_next_symbol(self) -> Optional[str]:
"""Returns the next symbol in the round-robin cycle."""
if not self.symbols:
self.fetch_symbols()
if not self.symbols:
return None
if self.market_index >= len(self.symbols):
self.market_index = 0
self.cycle_count += 1
print(f"\n--- Starting new market scan cycle: #{self.cycle_count} ---")
symbol = self.symbols[self.market_index]
self.market_index += 1
return symbol
def get_data_for_strategy(self, symbol: str, strategy_id: int) -> Dict[str, Any]:
"""
Provides all necessary data for a given strategy and symbol.
This is the main interface for the strategy scripts.
"""
data = {}
if strategy_id == 1:
# Strategy 1 (BB Re-entry): 15m klines, limit=25
data['klines_15m'] = self.get_klines(symbol, '15m', 25)
elif strategy_id == 2:
# Strategy 2 (Doji Box): 15m klines, limit=100; 1m klines, limit=50
data['klines_15m'] = self.get_klines(symbol, '15m', 100)
data['klines_1m'] = self.get_klines(symbol, '1m', 50)
elif strategy_id == 3:
# Strategy 3 (Trend/Range/Volume): 3m klines, limit=51 (Scalping request)
# Try 3m first, fallback to 1m if 3m fails to return data
klines_3m = self.get_klines(symbol, '3m', 51)
if klines_3m:
data['klines_3m'] = klines_3m
else:
data['klines_1m'] = self.get_klines(symbol, '1m', 51)
return data
if __name__ == '__main__':
# Example usage
manager = BinanceDataManager()
manager.fetch_symbols(limit=10)
symbol = manager.get_next_symbol()
if symbol:
print(f"\nAnalyzing {symbol} for Strategy 2...")
data = manager.get_data_for_strategy(symbol, 2)
print(f"15m klines received: {len(data.get('klines_15m', []))}")
print(f"1m klines received: {len(data.get('klines_1m', []))}")
# Test cache
print(f"\nTesting cache for {symbol} (15m)...")
data_cached = manager.get_data_for_strategy(symbol, 1)
print(f"15m klines received (cached): {len(data_cached.get('klines_15m', []))}")
# Next symbol
next_symbol = manager.get_next_symbol()
print(f"\nNext symbol: {next_symbol}")
|