File size: 7,460 Bytes
fa61b9f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# Centralized Data Management System for Binance Futures API

import requests
import time
from typing import List, Dict, Any, Optional

# --- Configuration ---
API_BASE = 'https://fapi.binance.com'
MAX_SYMBOLS = 500
# Strategy 1 needs: 15m, limit=25 (20+5)
# Strategy 2 needs: 15m, limit=100; 1m, limit=50
# Strategy 3 needs: 15m, limit=51 (50+1)
# Max required klines: 15m (100), 1m (50)

class BinanceDataManager:
    """
    Manages market list and provides kline data from Binance Futures API.
    Handles caching to minimize API calls and respect rate limits.
    """
    def __init__(self):
        self.symbols: List[str] = []
        self.kline_cache: Dict[str, Dict[str, List[Dict[str, Any]]]] = {} # {symbol: {interval: [klines]}}
        self.last_fetch_time: Dict[str, float] = {} # {symbol_interval: timestamp}
        self.market_index = 0
        self.cycle_count = 0
        self.required_intervals = ['1m', '3m', '15m']

    def _fetch_api(self, endpoint: str, params: Dict[str, Any]) -> Optional[Dict[str, Any]]:
        """Generic API fetcher with basic error handling."""
        url = f"{API_BASE}{endpoint}"
        try:
            response = requests.get(url, params=params, timeout=10)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            print(f"API Error fetching {endpoint} with params {params}: {e}")
            return None

    def fetch_symbols(self, limit: int = MAX_SYMBOLS) -> List[str]:
        """Fetches the list of active USDT perpetual futures symbols."""
        print("Fetching active symbols from Binance...")
        data = self._fetch_api('/fapi/v1/exchangeInfo', {})
        if data and 'symbols' in data:
            symbols = [
                s['symbol'] for s in data['symbols']
                if s['contractType'] == 'PERPETUAL' and s['symbol'].endswith('USDT') and s['status'] == 'TRADING'
            ]
            self.symbols = symbols[:limit]
            print(f"Successfully loaded {len(self.symbols)} symbols.")
            return self.symbols
        
        print("Failed to fetch symbols. Using fallback list.")
        self.symbols = ['BTCUSDT', 'ETHUSDT', 'BNBUSDT', 'XRPUSDT', 'ADAUSDT'] # Fallback
        return self.symbols

    def get_klines(self, symbol: str, interval: str, limit: int) -> List[Dict[str, Any]]:
        """
        Retrieves klines for a symbol and interval, using cache if available.
        The kline data format is converted to a dictionary for easier access:
        {'openTime': int, 'open': float, 'high': float, 'low': float, 'close': float, 'volume': float}
        """
        
        # Determine required limit based on strategies
        if interval == '15m':
            required_limit = 100 # Max needed by Strategy 2
        elif interval == '1m':
            required_limit = 50 # Max needed by Strategy 2
        elif interval == '3m':
            required_limit = 51 # Max needed by Strategy 3 (LR_PERIOD=50 + 1)
        else:
            required_limit = limit

        # Check cache and freshness (refresh every 1.5 * interval duration)
        cache_key = f"{symbol}_{interval}"
        current_time = time.time()
        
        # Simple heuristic for refresh time: 1.5 * interval in seconds
        interval_seconds = self._interval_to_seconds(interval)
        refresh_threshold = self.last_fetch_time.get(cache_key, 0) + (interval_seconds * 1.5)
        
        if cache_key in self.kline_cache and current_time < refresh_threshold:
            # Cache hit and fresh enough
            klines = self.kline_cache[symbol].get(interval, [])
            if len(klines) >= required_limit:
                return klines[-required_limit:]

        # Fetch from API
        params = {'symbol': symbol, 'interval': interval, 'limit': required_limit}
        data = self._fetch_api('/fapi/v1/klines', params)
        
        if data:
            klines = []
            for c in data:
                klines.append({
                    'openTime': c[0],
                    'open': float(c[1]),
                    'high': float(c[2]),
                    'low': float(c[3]),
                    'close': float(c[4]),
                    'volume': float(c[5])
                })
            
            # Update cache
            if symbol not in self.kline_cache:
                self.kline_cache[symbol] = {}
            self.kline_cache[symbol][interval] = klines
            self.last_fetch_time[cache_key] = current_time
            
            return klines[-required_limit:]
        
        return []

    def _interval_to_seconds(self, interval: str) -> int:
        """Converts Binance interval string to seconds."""
        if interval.endswith('m'):
            return int(interval[:-1]) * 60
        elif interval.endswith('h'):
            return int(interval[:-1]) * 3600
        elif interval.endswith('d'):
            return int(interval[:-1]) * 86400
        return 60 # Default to 1 minute

    def get_next_symbol(self) -> Optional[str]:
        """Returns the next symbol in the round-robin cycle."""
        if not self.symbols:
            self.fetch_symbols()
            if not self.symbols:
                return None

        if self.market_index >= len(self.symbols):
            self.market_index = 0
            self.cycle_count += 1
            print(f"\n--- Starting new market scan cycle: #{self.cycle_count} ---")

        symbol = self.symbols[self.market_index]
        self.market_index += 1
        return symbol

    def get_data_for_strategy(self, symbol: str, strategy_id: int) -> Dict[str, Any]:
        """
        Provides all necessary data for a given strategy and symbol.
        This is the main interface for the strategy scripts.
        """
        data = {}
        
        if strategy_id == 1:
            # Strategy 1 (BB Re-entry): 15m klines, limit=25
            data['klines_15m'] = self.get_klines(symbol, '15m', 25)
        elif strategy_id == 2:
            # Strategy 2 (Doji Box): 15m klines, limit=100; 1m klines, limit=50
            data['klines_15m'] = self.get_klines(symbol, '15m', 100)
            data['klines_1m'] = self.get_klines(symbol, '1m', 50)
        elif strategy_id == 3:
            # Strategy 3 (Trend/Range/Volume): 3m klines, limit=51 (Scalping request)
            # Try 3m first, fallback to 1m if 3m fails to return data
            klines_3m = self.get_klines(symbol, '3m', 51)
            if klines_3m:
                data['klines_3m'] = klines_3m
            else:
                data['klines_1m'] = self.get_klines(symbol, '1m', 51)
        
        return data

if __name__ == '__main__':
    # Example usage
    manager = BinanceDataManager()
    manager.fetch_symbols(limit=10)
    
    symbol = manager.get_next_symbol()
    if symbol:
        print(f"\nAnalyzing {symbol} for Strategy 2...")
        data = manager.get_data_for_strategy(symbol, 2)
        print(f"15m klines received: {len(data.get('klines_15m', []))}")
        print(f"1m klines received: {len(data.get('klines_1m', []))}")
        
        # Test cache
        print(f"\nTesting cache for {symbol} (15m)...")
        data_cached = manager.get_data_for_strategy(symbol, 1)
        print(f"15m klines received (cached): {len(data_cached.get('klines_15m', []))}")
        
        # Next symbol
        next_symbol = manager.get_next_symbol()
        print(f"\nNext symbol: {next_symbol}")