Riy777 commited on
Commit
ac0cc30
·
verified ·
1 Parent(s): c5214aa

Update ml_engine/data_manager.py

Browse files
Files changed (1) hide show
  1. ml_engine/data_manager.py +28 -25
ml_engine/data_manager.py CHANGED
@@ -1,6 +1,6 @@
1
  # ============================================================
2
  # 📂 ml_engine/data_manager.py
3
- # (V54.0 - GEM-Architect: Stability & Debug Mode)
4
  # ============================================================
5
 
6
  import asyncio
@@ -15,6 +15,7 @@ from datetime import datetime
15
 
16
  import ccxt.async_support as ccxt
17
 
 
18
  try:
19
  from ml_engine.processor import SystemLimits
20
  except ImportError:
@@ -27,10 +28,9 @@ logging.getLogger("ccxt").setLevel(logging.WARNING)
27
 
28
  class DataManager:
29
  """
30
- DataManager V54.0 (Stability & Debug)
31
- - Reduced Concurrency: Batch size lowered to 5 to prevent Rate Limits.
32
- - Explicit Debugging: Prints exact errors causing 'other' rejections.
33
- - Logic: Strict Structural Integrity (V53 Logic Kept Intact).
34
  """
35
 
36
  def __init__(self, contracts_db, whale_monitor, r2_service=None):
@@ -39,7 +39,7 @@ class DataManager:
39
  self.r2_service = r2_service
40
 
41
  self.exchange = ccxt.kucoin({
42
- 'enableRateLimit': True, # CCXT handles pacing internally
43
  'timeout': 30000,
44
  'options': {'defaultType': 'spot'}
45
  })
@@ -55,7 +55,7 @@ class DataManager:
55
  'UP', 'DOWN', 'BEAR', 'BULL', '3S', '3L', 'USDD', 'USDP', 'HT', 'KCS'
56
  ]
57
 
58
- print(f"📦 [DataManager V54.0] Stability Engine Active.")
59
 
60
  async def initialize(self):
61
  print(" > [DataManager] Starting Matrix Initialization...")
@@ -162,7 +162,6 @@ class DataManager:
162
  for symbol, ticker in tickers.items():
163
  if not symbol.endswith('/USDT'): continue
164
 
165
- # Check Base Currency against blacklist
166
  base_curr = symbol.split('/')[0]
167
  if any(bad in base_curr for bad in self.BLACKLIST_TOKENS): continue
168
 
@@ -186,16 +185,14 @@ class DataManager:
186
 
187
  # --- Step 2: Deep Inspection (Safe Batching) ---
188
  final_candidates = []
189
- # ⬇️ Reduced batch size to 5 to prevent API Rate Limits/Timeouts
190
- chunk_size = 5
191
  reject_stats = {'spread': 0, 'depth': 0, 'age': 0, 'integrity': 0, 'wall': 0, 'other': 0}
192
 
193
  for i in range(0, len(top_150), chunk_size):
194
  chunk = top_150[i:i+chunk_size]
195
  inspected_chunk = await asyncio.gather(*[self._inspect_structure(c, reject_stats) for c in chunk])
196
  final_candidates.extend([c for c in inspected_chunk if c is not None])
197
- # ⬇️ Increased sleep to be nice to KuCoin
198
- await asyncio.sleep(0.2)
199
 
200
  if len(final_candidates) == 0:
201
  print(f" ⚠️ ALL REJECTED! Stats: {reject_stats}")
@@ -213,17 +210,15 @@ class DataManager:
213
  symbol = candidate['symbol']
214
  ticker = candidate['ticker']
215
  try:
216
- # RESTORED STRICT CHECKS
217
  tasks = [
218
  self.exchange.fetch_ohlcv(symbol, '5m', limit=50),
219
  self.exchange.fetch_ohlcv(symbol, '1d', limit=200),
220
- self.exchange.fetch_order_book(symbol, limit=50)
221
  ]
222
  ohlcv_5m, ohlcv_1d, orderbook = await asyncio.gather(*tasks)
223
 
224
  if not ohlcv_5m or not ohlcv_1d or not orderbook:
225
- # This should NOT happen often with small batches.
226
- # If it does, we need to know why.
227
  stats['other'] += 1
228
  return None
229
 
@@ -239,26 +234,35 @@ class DataManager:
239
  return None
240
 
241
  current_price = float(ticker['last'])
242
- bid = float(orderbook['bids'][0][0]) if orderbook['bids'] else 0
243
- ask = float(orderbook['asks'][0][0]) if orderbook['asks'] else 0
 
 
 
 
 
 
244
  if bid == 0 or ask == 0:
245
  stats['other'] += 1
246
  return None
247
 
248
  # 3. Spread Check (< 2.5% OR Dynamic)
249
  spread_abs = (ask - bid) / bid
 
 
250
  df_5m = pd.DataFrame(ohlcv_5m, columns=['ts', 'o', 'h', 'l', 'c', 'v'])
251
  h_l = df_5m['h'] - df_5m['l']
252
  atr_val = h_l.rolling(14).mean().iloc[-1]
253
  atr_threshold = 0.35 * (atr_val / current_price) if current_price > 0 else 0
254
 
255
- limit_spread = max(0.015, atr_threshold) # Min 1.5%, or dynamic
256
 
257
- # Hard limit 2.5% as requested
258
  if spread_abs > 0.025:
259
  stats['spread'] += 1
260
  return None
261
-
 
262
  if spread_abs > limit_spread:
263
  stats['spread'] += 1
264
  return None
@@ -291,10 +295,9 @@ class DataManager:
291
  return candidate
292
 
293
  except Exception as e:
294
- # 🛑 HERE IS THE DEBUGGER
295
- # If we hit rate limits, we will see the error now.
296
- if stats['other'] < 1:
297
- print(f" ⚠️ ERROR inspecting {symbol}: {e}")
298
  stats['other'] += 1
299
  return None
300
 
 
1
  # ============================================================
2
  # 📂 ml_engine/data_manager.py
3
+ # (V55.0 - GEM-Architect: KuCoin API Compliance Fix)
4
  # ============================================================
5
 
6
  import asyncio
 
15
 
16
  import ccxt.async_support as ccxt
17
 
18
+ # ✅ استيراد الدستور الديناميكي
19
  try:
20
  from ml_engine.processor import SystemLimits
21
  except ImportError:
 
28
 
29
  class DataManager:
30
  """
31
+ DataManager V55.0 (API Compliance Fix)
32
+ - FIXED: fetch_order_book limit set to 20 (KuCoin requirement: 20 or 100).
33
+ - Logic: Strict Structural Integrity (V53 Logic).
 
34
  """
35
 
36
  def __init__(self, contracts_db, whale_monitor, r2_service=None):
 
39
  self.r2_service = r2_service
40
 
41
  self.exchange = ccxt.kucoin({
42
+ 'enableRateLimit': True,
43
  'timeout': 30000,
44
  'options': {'defaultType': 'spot'}
45
  })
 
55
  'UP', 'DOWN', 'BEAR', 'BULL', '3S', '3L', 'USDD', 'USDP', 'HT', 'KCS'
56
  ]
57
 
58
+ print(f"📦 [DataManager V55.0] API Compliance Engine Active.")
59
 
60
  async def initialize(self):
61
  print(" > [DataManager] Starting Matrix Initialization...")
 
162
  for symbol, ticker in tickers.items():
163
  if not symbol.endswith('/USDT'): continue
164
 
 
165
  base_curr = symbol.split('/')[0]
166
  if any(bad in base_curr for bad in self.BLACKLIST_TOKENS): continue
167
 
 
185
 
186
  # --- Step 2: Deep Inspection (Safe Batching) ---
187
  final_candidates = []
188
+ chunk_size = 10 # Slightly increased for speed since limit is fixed
 
189
  reject_stats = {'spread': 0, 'depth': 0, 'age': 0, 'integrity': 0, 'wall': 0, 'other': 0}
190
 
191
  for i in range(0, len(top_150), chunk_size):
192
  chunk = top_150[i:i+chunk_size]
193
  inspected_chunk = await asyncio.gather(*[self._inspect_structure(c, reject_stats) for c in chunk])
194
  final_candidates.extend([c for c in inspected_chunk if c is not None])
195
+ await asyncio.sleep(0.1)
 
196
 
197
  if len(final_candidates) == 0:
198
  print(f" ⚠️ ALL REJECTED! Stats: {reject_stats}")
 
210
  symbol = candidate['symbol']
211
  ticker = candidate['ticker']
212
  try:
213
+ # FIXED: limit=20 (KuCoin accepts 20 or 100)
214
  tasks = [
215
  self.exchange.fetch_ohlcv(symbol, '5m', limit=50),
216
  self.exchange.fetch_ohlcv(symbol, '1d', limit=200),
217
+ self.exchange.fetch_order_book(symbol, limit=20)
218
  ]
219
  ohlcv_5m, ohlcv_1d, orderbook = await asyncio.gather(*tasks)
220
 
221
  if not ohlcv_5m or not ohlcv_1d or not orderbook:
 
 
222
  stats['other'] += 1
223
  return None
224
 
 
234
  return None
235
 
236
  current_price = float(ticker['last'])
237
+ # KuCoin orderbook might be empty or have fewer bids
238
+ if not orderbook.get('bids') or not orderbook.get('asks'):
239
+ stats['other'] += 1
240
+ return None
241
+
242
+ bid = float(orderbook['bids'][0][0])
243
+ ask = float(orderbook['asks'][0][0])
244
+
245
  if bid == 0 or ask == 0:
246
  stats['other'] += 1
247
  return None
248
 
249
  # 3. Spread Check (< 2.5% OR Dynamic)
250
  spread_abs = (ask - bid) / bid
251
+
252
+ # Dynamic calculation
253
  df_5m = pd.DataFrame(ohlcv_5m, columns=['ts', 'o', 'h', 'l', 'c', 'v'])
254
  h_l = df_5m['h'] - df_5m['l']
255
  atr_val = h_l.rolling(14).mean().iloc[-1]
256
  atr_threshold = 0.35 * (atr_val / current_price) if current_price > 0 else 0
257
 
258
+ limit_spread = max(0.015, atr_threshold)
259
 
260
+ # Hard limit check
261
  if spread_abs > 0.025:
262
  stats['spread'] += 1
263
  return None
264
+
265
+ # Dynamic check
266
  if spread_abs > limit_spread:
267
  stats['spread'] += 1
268
  return None
 
295
  return candidate
296
 
297
  except Exception as e:
298
+ # Only print real errors, ignore expected ones to keep logs clean
299
+ # stats['other'] += 1
300
+ if "limit argument" in str(e): print(f"API ERROR: {e}") # Should not happen now
 
301
  stats['other'] += 1
302
  return None
303