Riy777 commited on
Commit
c1f9548
·
verified ·
1 Parent(s): 2167505

Update ml_engine/data_manager.py

Browse files
Files changed (1) hide show
  1. ml_engine/data_manager.py +23 -18
ml_engine/data_manager.py CHANGED
@@ -1,5 +1,5 @@
1
  # ml_engine/data_manager.py
2
- # (V13.1 - Cleaned & Fixed - Removed Obsolete Ranker)
3
 
4
  import os
5
  import asyncio
@@ -9,7 +9,7 @@ import ccxt.async_support as ccxt
9
  import logging
10
  from typing import List, Dict, Any
11
 
12
- # إعدادات التسجيل لتقليل الضجيج
13
  logging.getLogger("httpx").setLevel(logging.WARNING)
14
  logging.getLogger("httpcore").setLevel(logging.WARNING)
15
  logging.getLogger("ccxt").setLevel(logging.WARNING)
@@ -23,7 +23,7 @@ class DataManager:
23
  self.whale_monitor = whale_monitor
24
  self.r2_service = r2_service
25
 
26
- # إعداد المنصة (KuCoin كمثال)
27
  self.exchange = ccxt.kucoin({
28
  'enableRateLimit': True,
29
  'timeout': 30000,
@@ -36,11 +36,8 @@ class DataManager:
36
  """تهيئة مدير البيانات والاتصالات"""
37
  print(" > [DataManager] Starting initialization...")
38
  self.http_client = httpx.AsyncClient(timeout=30.0)
39
-
40
- # تحميل الأسواق
41
  await self._load_markets()
42
-
43
- print(f"✅ [DataManager V13.1] Ready (Titan Architecture).")
44
 
45
  async def _load_markets(self):
46
  """تحميل بيانات الأسواق وتخزينها مؤقتاً"""
@@ -62,7 +59,6 @@ class DataManager:
62
  # 🚀 R2 Compatibility
63
  # ==================================================================
64
  async def load_contracts_from_r2(self):
65
- """تحميل قاعدة بيانات العقود من R2"""
66
  if not self.r2_service: return
67
  try:
68
  self.contracts_db = await self.r2_service.load_contracts_db_async()
@@ -77,16 +73,14 @@ class DataManager:
77
  # 🛡️ Layer 1 Screening (Data Filter)
78
  # ==================================================================
79
  async def layer1_rapid_screening(self) -> List[Dict[str, Any]]:
80
- """
81
- الغربلة الأولية السريعة بناءً على الحجم والسيولة (بدون Ranker ML).
82
- """
83
  print(f"🔍 [Layer 1] Screening top liquid assets...")
84
  volume_data = await self._get_volume_data_live()
85
 
86
  if not volume_data:
87
  return []
88
 
89
- # نأخذ أعلى 150 عملة من حيث السيولة لتمريرها للمعالج (Titan)
90
  candidates = volume_data[:150]
91
  return candidates
92
 
@@ -96,24 +90,21 @@ class DataManager:
96
  tickers = await self.exchange.fetch_tickers()
97
  data = []
98
  for symbol, ticker in tickers.items():
99
- # تصفية أزواج USDT ذات الحجم المحترم
100
  if symbol.endswith('/USDT') and ticker.get('quoteVolume') and ticker['quoteVolume'] > 100000:
101
  data.append({
102
  'symbol': symbol,
103
  'dollar_volume': ticker['quoteVolume'],
104
  'current_price': ticker['last']
105
  })
106
- # الترتيب تنازلياً حسب الحجم
107
  data.sort(key=lambda x: x['dollar_volume'], reverse=True)
108
  return data
109
  except Exception:
110
  return []
111
 
112
  # ==================================================================
113
- # 🎯 Helper Functions
114
  # ==================================================================
115
  async def get_latest_price_async(self, symbol: str) -> float:
116
- """جلب السعر الحالي"""
117
  try:
118
  ticker = await self.exchange.fetch_ticker(symbol)
119
  return float(ticker['last'])
@@ -121,10 +112,24 @@ class DataManager:
121
  return 0.0
122
 
123
  async def get_latest_ohlcv(self, symbol: str, timeframe: str = '5m', limit: int = 100) -> List[List[float]]:
124
- """جلب الشموع"""
125
  try:
126
  candles = await self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
127
  if candles: return candles
128
  return []
129
  except Exception:
130
- return []
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  # ml_engine/data_manager.py
2
+ # (V13.2 - GEM-Architect: Added Order Book Capability)
3
 
4
  import os
5
  import asyncio
 
9
  import logging
10
  from typing import List, Dict, Any
11
 
12
+ # إعدادات التسجيل
13
  logging.getLogger("httpx").setLevel(logging.WARNING)
14
  logging.getLogger("httpcore").setLevel(logging.WARNING)
15
  logging.getLogger("ccxt").setLevel(logging.WARNING)
 
23
  self.whale_monitor = whale_monitor
24
  self.r2_service = r2_service
25
 
26
+ # إعداد المنصة (KuCoin)
27
  self.exchange = ccxt.kucoin({
28
  'enableRateLimit': True,
29
  'timeout': 30000,
 
36
  """تهيئة مدير البيانات والاتصالات"""
37
  print(" > [DataManager] Starting initialization...")
38
  self.http_client = httpx.AsyncClient(timeout=30.0)
 
 
39
  await self._load_markets()
40
+ print(f"✅ [DataManager V13.2] Ready (Order Book Enabled).")
 
41
 
42
  async def _load_markets(self):
43
  """تحميل بيانات الأسواق وتخزينها مؤقتاً"""
 
59
  # 🚀 R2 Compatibility
60
  # ==================================================================
61
  async def load_contracts_from_r2(self):
 
62
  if not self.r2_service: return
63
  try:
64
  self.contracts_db = await self.r2_service.load_contracts_db_async()
 
73
  # 🛡️ Layer 1 Screening (Data Filter)
74
  # ==================================================================
75
  async def layer1_rapid_screening(self) -> List[Dict[str, Any]]:
76
+ """الغربلة الأولية السريعة بناءً على الحجم والسيولة."""
 
 
77
  print(f"🔍 [Layer 1] Screening top liquid assets...")
78
  volume_data = await self._get_volume_data_live()
79
 
80
  if not volume_data:
81
  return []
82
 
83
+ # نأخذ أعلى 150 عملة
84
  candidates = volume_data[:150]
85
  return candidates
86
 
 
90
  tickers = await self.exchange.fetch_tickers()
91
  data = []
92
  for symbol, ticker in tickers.items():
 
93
  if symbol.endswith('/USDT') and ticker.get('quoteVolume') and ticker['quoteVolume'] > 100000:
94
  data.append({
95
  'symbol': symbol,
96
  'dollar_volume': ticker['quoteVolume'],
97
  'current_price': ticker['last']
98
  })
 
99
  data.sort(key=lambda x: x['dollar_volume'], reverse=True)
100
  return data
101
  except Exception:
102
  return []
103
 
104
  # ==================================================================
105
+ # 🎯 Data Fetching Helpers
106
  # ==================================================================
107
  async def get_latest_price_async(self, symbol: str) -> float:
 
108
  try:
109
  ticker = await self.exchange.fetch_ticker(symbol)
110
  return float(ticker['last'])
 
112
  return 0.0
113
 
114
  async def get_latest_ohlcv(self, symbol: str, timeframe: str = '5m', limit: int = 100) -> List[List[float]]:
 
115
  try:
116
  candles = await self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
117
  if candles: return candles
118
  return []
119
  except Exception:
120
+ return []
121
+
122
+ # 🔥 [NEW] Order Book Fetcher
123
+ async def get_order_book_snapshot(self, symbol: str, limit: int = 20) -> Dict[str, Any]:
124
+ """
125
+ جلب لقطة سريعة لدفتر الطلبات (Bids/Asks).
126
+ يستخدم لتحليل عمق السوق وتأكيد القناص.
127
+ """
128
+ try:
129
+ # ccxt يرجع الدفتر بصيغة {'bids': [[price, size], ...], 'asks': ...}
130
+ ob = await self.exchange.fetch_order_book(symbol, limit)
131
+ return ob
132
+ except Exception as e:
133
+ # طباعة خطأ خفيف لتجنب إغراق السجلات
134
+ # print(f"⚠️ [DataManager] Failed to fetch OB for {symbol}: {e}")
135
+ return {}