V2 / data_manager.py
Alikhani099961's picture
Upload 6 files
fa61b9f verified
# Centralized Data Management System for Binance Futures API
import requests
import time
from typing import List, Dict, Any, Optional
# --- Configuration ---
API_BASE = 'https://fapi.binance.com'
MAX_SYMBOLS = 500
# Strategy 1 needs: 15m, limit=25 (20+5)
# Strategy 2 needs: 15m, limit=100; 1m, limit=50
# Strategy 3 needs: 15m, limit=51 (50+1)
# Max required klines: 15m (100), 1m (50)
class BinanceDataManager:
"""
Manages market list and provides kline data from Binance Futures API.
Handles caching to minimize API calls and respect rate limits.
"""
def __init__(self):
self.symbols: List[str] = []
self.kline_cache: Dict[str, Dict[str, List[Dict[str, Any]]]] = {} # {symbol: {interval: [klines]}}
self.last_fetch_time: Dict[str, float] = {} # {symbol_interval: timestamp}
self.market_index = 0
self.cycle_count = 0
self.required_intervals = ['1m', '3m', '15m']
def _fetch_api(self, endpoint: str, params: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Generic API fetcher with basic error handling."""
url = f"{API_BASE}{endpoint}"
try:
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"API Error fetching {endpoint} with params {params}: {e}")
return None
def fetch_symbols(self, limit: int = MAX_SYMBOLS) -> List[str]:
"""Fetches the list of active USDT perpetual futures symbols."""
print("Fetching active symbols from Binance...")
data = self._fetch_api('/fapi/v1/exchangeInfo', {})
if data and 'symbols' in data:
symbols = [
s['symbol'] for s in data['symbols']
if s['contractType'] == 'PERPETUAL' and s['symbol'].endswith('USDT') and s['status'] == 'TRADING'
]
self.symbols = symbols[:limit]
print(f"Successfully loaded {len(self.symbols)} symbols.")
return self.symbols
print("Failed to fetch symbols. Using fallback list.")
self.symbols = ['BTCUSDT', 'ETHUSDT', 'BNBUSDT', 'XRPUSDT', 'ADAUSDT'] # Fallback
return self.symbols
def get_klines(self, symbol: str, interval: str, limit: int) -> List[Dict[str, Any]]:
"""
Retrieves klines for a symbol and interval, using cache if available.
The kline data format is converted to a dictionary for easier access:
{'openTime': int, 'open': float, 'high': float, 'low': float, 'close': float, 'volume': float}
"""
# Determine required limit based on strategies
if interval == '15m':
required_limit = 100 # Max needed by Strategy 2
elif interval == '1m':
required_limit = 50 # Max needed by Strategy 2
elif interval == '3m':
required_limit = 51 # Max needed by Strategy 3 (LR_PERIOD=50 + 1)
else:
required_limit = limit
# Check cache and freshness (refresh every 1.5 * interval duration)
cache_key = f"{symbol}_{interval}"
current_time = time.time()
# Simple heuristic for refresh time: 1.5 * interval in seconds
interval_seconds = self._interval_to_seconds(interval)
refresh_threshold = self.last_fetch_time.get(cache_key, 0) + (interval_seconds * 1.5)
if cache_key in self.kline_cache and current_time < refresh_threshold:
# Cache hit and fresh enough
klines = self.kline_cache[symbol].get(interval, [])
if len(klines) >= required_limit:
return klines[-required_limit:]
# Fetch from API
params = {'symbol': symbol, 'interval': interval, 'limit': required_limit}
data = self._fetch_api('/fapi/v1/klines', params)
if data:
klines = []
for c in data:
klines.append({
'openTime': c[0],
'open': float(c[1]),
'high': float(c[2]),
'low': float(c[3]),
'close': float(c[4]),
'volume': float(c[5])
})
# Update cache
if symbol not in self.kline_cache:
self.kline_cache[symbol] = {}
self.kline_cache[symbol][interval] = klines
self.last_fetch_time[cache_key] = current_time
return klines[-required_limit:]
return []
def _interval_to_seconds(self, interval: str) -> int:
"""Converts Binance interval string to seconds."""
if interval.endswith('m'):
return int(interval[:-1]) * 60
elif interval.endswith('h'):
return int(interval[:-1]) * 3600
elif interval.endswith('d'):
return int(interval[:-1]) * 86400
return 60 # Default to 1 minute
def get_next_symbol(self) -> Optional[str]:
"""Returns the next symbol in the round-robin cycle."""
if not self.symbols:
self.fetch_symbols()
if not self.symbols:
return None
if self.market_index >= len(self.symbols):
self.market_index = 0
self.cycle_count += 1
print(f"\n--- Starting new market scan cycle: #{self.cycle_count} ---")
symbol = self.symbols[self.market_index]
self.market_index += 1
return symbol
def get_data_for_strategy(self, symbol: str, strategy_id: int) -> Dict[str, Any]:
"""
Provides all necessary data for a given strategy and symbol.
This is the main interface for the strategy scripts.
"""
data = {}
if strategy_id == 1:
# Strategy 1 (BB Re-entry): 15m klines, limit=25
data['klines_15m'] = self.get_klines(symbol, '15m', 25)
elif strategy_id == 2:
# Strategy 2 (Doji Box): 15m klines, limit=100; 1m klines, limit=50
data['klines_15m'] = self.get_klines(symbol, '15m', 100)
data['klines_1m'] = self.get_klines(symbol, '1m', 50)
elif strategy_id == 3:
# Strategy 3 (Trend/Range/Volume): 3m klines, limit=51 (Scalping request)
# Try 3m first, fallback to 1m if 3m fails to return data
klines_3m = self.get_klines(symbol, '3m', 51)
if klines_3m:
data['klines_3m'] = klines_3m
else:
data['klines_1m'] = self.get_klines(symbol, '1m', 51)
return data
if __name__ == '__main__':
# Example usage
manager = BinanceDataManager()
manager.fetch_symbols(limit=10)
symbol = manager.get_next_symbol()
if symbol:
print(f"\nAnalyzing {symbol} for Strategy 2...")
data = manager.get_data_for_strategy(symbol, 2)
print(f"15m klines received: {len(data.get('klines_15m', []))}")
print(f"1m klines received: {len(data.get('klines_1m', []))}")
# Test cache
print(f"\nTesting cache for {symbol} (15m)...")
data_cached = manager.get_data_for_strategy(symbol, 1)
print(f"15m klines received (cached): {len(data_cached.get('klines_15m', []))}")
# Next symbol
next_symbol = manager.get_next_symbol()
print(f"\nNext symbol: {next_symbol}")