Riy777 commited on
Commit
642bf96
·
verified ·
1 Parent(s): 9bd090f

Update ml_engine/data_manager.py

Browse files
Files changed (1) hide show
  1. ml_engine/data_manager.py +88 -103
ml_engine/data_manager.py CHANGED
@@ -1,6 +1,6 @@
1
  # ============================================================
2
  # 📂 ml_engine/data_manager.py
3
- # (V40.0 - GEM-Architect: The Scanner Matrix)
4
  # ============================================================
5
 
6
  import asyncio
@@ -20,24 +20,12 @@ except ImportError:
20
  class SystemLimits:
21
  L1_MIN_AFFINITY_SCORE = 15.0
22
  CURRENT_REGIME = "RANGE"
23
- # أوزان الكاشفات الجديدة (تتغير بالباكتست)
24
- SCANNER_WEIGHTS = {
25
- "RSI_MOMENTUM": 0.3,
26
- "BB_BREAKOUT": 0.3,
27
- "MACD_CROSS": 0.2,
28
- "VOLUME_FLOW": 0.2
29
- }
30
 
31
  logging.getLogger("httpx").setLevel(logging.WARNING)
32
  logging.getLogger("ccxt").setLevel(logging.WARNING)
33
 
34
  class DataManager:
35
- """
36
- DataManager V40.0 (The Scanner Matrix)
37
- - L1 Screening uses a multi-strategy ensemble approach.
38
- - Optimized for speed using batch processing.
39
- """
40
-
41
  def __init__(self, contracts_db, whale_monitor, r2_service=None):
42
  self.contracts_db = contracts_db or {}
43
  self.whale_monitor = whale_monitor
@@ -49,7 +37,7 @@ class DataManager:
49
  })
50
  self.http_client = None
51
  self.BLACKLIST_TOKENS = ['USDT', 'USDC', 'DAI', 'TUSD', 'BUSD', 'UP', 'DOWN', 'BEAR', 'BULL', '3S', '3L']
52
- print(f"📦 [DataManager V40.0] Scanner Matrix Online.")
53
 
54
  async def initialize(self):
55
  self.http_client = httpx.AsyncClient(timeout=60.0)
@@ -63,42 +51,31 @@ class DataManager:
63
  if self.http_client: await self.http_client.aclose()
64
  if self.exchange: await self.exchange.close()
65
 
66
- # ==================================================================
67
- # 🛡️ Layer 1: The Scanner Matrix (New Logic)
68
- # ==================================================================
69
  async def layer1_rapid_screening(self) -> List[Dict[str, Any]]:
70
- """
71
- تنفيذ الفحص المتعدد (Matrix Scan).
72
- 1. جلب أفضل 80 عملة سيولة.
73
- 2. جلب شموع 15m لهذه العملات.
74
- 3. تطبيق 4 استراتيجيات كشف مختلفة.
75
- 4. حساب النتيجة الموزونة.
76
- """
77
  current_regime = getattr(SystemLimits, "CURRENT_REGIME", "RANGE")
78
  scanner_weights = getattr(SystemLimits, "SCANNER_WEIGHTS", {"RSI_MOMENTUM": 1.0})
79
  min_score = getattr(SystemLimits, "L1_MIN_AFFINITY_SCORE", 15.0)
80
 
81
  print(f"🔍 [L1 Matrix] Regime: {current_regime} | Weights: {scanner_weights}")
82
 
83
- # 1. تصفية الكون الأولي (High Volume Universe)
84
  tickers = await self._fetch_universe_tickers()
85
- if not tickers: return []
 
 
86
 
87
- # نأخذ أفضل 80 عملة فقط لتجنب قتل الـ API Rate Limits
88
  top_candidates = tickers[:80]
89
-
90
- # 2. جلب البيانات الفنية دفعة واحدة (Batch Fetch 15m)
91
  enriched_data = await self._batch_fetch_ta_data(top_candidates, timeframe='15m', limit=100)
92
 
93
  scored_candidates = []
 
 
94
  for item in enriched_data:
95
  df = item.get('df')
96
  if df is None or len(df) < 50: continue
97
 
98
- # 3. تطبيق الكاشفات (Scanners)
99
  scores = self._apply_scanner_strategies(df)
100
 
101
- # 4. حساب النتيجة النهائية الموزونة
102
  final_score = 0.0
103
  tags = []
104
 
@@ -107,22 +84,26 @@ class DataManager:
107
  final_score += (val['score'] * w)
108
  if val['active']: tags.append(strategy)
109
 
110
- # إضافة نقاط إضافية بناءً على النظام القديم (Volume/Price Action)
111
- # للحفاظ على التوافقية
112
- if item['change_24h'] > 5.0 and current_regime == "BULL": final_score += 10
113
 
114
  item['l1_score'] = final_score
115
  item['tags'] = tags
116
 
 
 
 
 
 
117
  if final_score >= min_score:
118
  scored_candidates.append(item)
119
 
120
  scored_candidates.sort(key=lambda x: x['l1_score'], reverse=True)
121
 
122
- print(f" -> Matrix selected {len(scored_candidates)} candidates.")
123
- if scored_candidates:
124
- print(f" -> Top Pick: {scored_candidates[0]['symbol']} (Score: {scored_candidates[0]['l1_score']:.1f})")
125
-
126
  return [
127
  {
128
  'symbol': c['symbol'],
@@ -131,70 +112,79 @@ class DataManager:
131
  'type': ','.join(c['tags']),
132
  'l1_score': c['l1_score']
133
  }
134
- for c in scored_candidates[:40] # تمرير أفضل 40
135
  ]
136
 
137
- # ------------------------------------------------------------------
138
- # 🧩 Scanner Strategies Logic
139
- # ------------------------------------------------------------------
140
  def _apply_scanner_strategies(self, df: pd.DataFrame) -> Dict[str, Any]:
141
- """تطبيق مؤشرات فنية متعددة على البيانات"""
142
  results = {}
143
- close = df['close']
144
-
145
- # Strategy A: RSI Momentum (زخم)
146
- rsi = ta.rsi(close, length=14)
147
- curr_rsi = rsi.iloc[-1]
148
- score_rsi = 0
149
- active_rsi = False
150
- if 50 < curr_rsi < 75:
151
- score_rsi = 100
152
- active_rsi = True
153
- elif curr_rsi <= 30: # Oversold Bounce
154
- score_rsi = 80
155
- active_rsi = True
156
- results["RSI_MOMENTUM"] = {'score': score_rsi, 'active': active_rsi}
157
-
158
- # Strategy B: Bollinger Band Breakout (انفجار سعري)
159
- bb = ta.bbands(close, length=20, std=2)
160
- upper = bb['BBU_20_2.0'].iloc[-1]
161
- width = bb['BBB_20_2.0'].iloc[-1]
162
- curr_price = close.iloc[-1]
163
- score_bb = 0
164
- active_bb = False
165
- if curr_price > upper and width > 0.1: # اختراق حقيقي
166
- score_bb = 100
167
- active_bb = True
168
- results["BB_BREAKOUT"] = {'score': score_bb, 'active': active_bb}
169
-
170
- # Strategy C: MACD Cross (تغير اتجاه)
171
- macd = ta.macd(close)
172
- macd_line = macd['MACD_12_26_9'].iloc[-1]
173
- signal_line = macd['MACDS_12_26_9'].iloc[-1]
174
- hist = macd['MACDh_12_26_9'].iloc[-1]
175
- score_macd = 0
176
- active_macd = False
177
- if macd_line > signal_line and hist > 0:
178
- score_macd = 100
179
- active_macd = True
180
- results["MACD_CROSS"] = {'score': score_macd, 'active': active_macd}
181
-
182
- # Strategy D: Volume Flow (تدفق سيولة)
183
- vol = df['volume']
184
- vol_ma = ta.sma(vol, length=20).iloc[-1]
185
- curr_vol = vol.iloc[-1]
186
- score_vol = 0
187
- active_vol = False
188
- if curr_vol > (vol_ma * 1.5): # حجم تداول ضخم مفاجئ
189
- score_vol = 100
190
- active_vol = True
191
- results["VOLUME_FLOW"] = {'score': score_vol, 'active': active_vol}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
192
 
193
  return results
194
 
195
- # ------------------------------------------------------------------
196
- # ⚡ Batch & Async Helpers
197
- # ------------------------------------------------------------------
198
  async def _fetch_universe_tickers(self):
199
  try:
200
  tickers = await self.exchange.fetch_tickers()
@@ -202,7 +192,7 @@ class DataManager:
202
  for symbol, ticker in tickers.items():
203
  if not symbol.endswith('/USDT'): continue
204
  if any(bad in symbol for bad in self.BLACKLIST_TOKENS): continue
205
- if not ticker.get('quoteVolume') or ticker['quoteVolume'] < 500_000: continue # Min 500k Vol
206
  candidates.append({
207
  'symbol': symbol,
208
  'quote_volume': ticker['quoteVolume'],
@@ -215,7 +205,7 @@ class DataManager:
215
 
216
  async def _batch_fetch_ta_data(self, candidates, timeframe='15m', limit=100):
217
  results = []
218
- chunk_size = 15 # لعدم تجاوز الحدود
219
  for i in range(0, len(candidates), chunk_size):
220
  chunk = candidates[i:i+chunk_size]
221
  tasks = [self._fetch_ohlcv_safe(c, timeframe, limit) for c in chunk]
@@ -235,17 +225,12 @@ class DataManager:
235
  return candidate
236
  except: return None
237
 
238
- # Helpers needed for Processor/TradeManager
239
  async def get_latest_price_async(self, symbol):
240
  t = await self.exchange.fetch_ticker(symbol)
241
  return float(t['last'])
242
-
243
  async def get_latest_ohlcv(self, symbol, tf, limit=100):
244
  return await self.exchange.fetch_ohlcv(symbol, tf, limit=limit)
245
-
246
  async def get_order_book_snapshot(self, symbol, limit=20):
247
  return await self.exchange.fetch_order_book(symbol, limit)
248
-
249
- # R2 Placeholder
250
  async def load_contracts_from_r2(self): pass
251
  def get_contracts_db(self): return self.contracts_db
 
1
  # ============================================================
2
  # 📂 ml_engine/data_manager.py
3
+ # (V41.0 - GEM-Architect: Debugger & Logic Fix)
4
  # ============================================================
5
 
6
  import asyncio
 
20
  class SystemLimits:
21
  L1_MIN_AFFINITY_SCORE = 15.0
22
  CURRENT_REGIME = "RANGE"
23
+ SCANNER_WEIGHTS = {"RSI_MOMENTUM": 0.3, "BB_BREAKOUT": 0.3, "MACD_CROSS": 0.2, "VOLUME_FLOW": 0.2}
 
 
 
 
 
 
24
 
25
  logging.getLogger("httpx").setLevel(logging.WARNING)
26
  logging.getLogger("ccxt").setLevel(logging.WARNING)
27
 
28
  class DataManager:
 
 
 
 
 
 
29
  def __init__(self, contracts_db, whale_monitor, r2_service=None):
30
  self.contracts_db = contracts_db or {}
31
  self.whale_monitor = whale_monitor
 
37
  })
38
  self.http_client = None
39
  self.BLACKLIST_TOKENS = ['USDT', 'USDC', 'DAI', 'TUSD', 'BUSD', 'UP', 'DOWN', 'BEAR', 'BULL', '3S', '3L']
40
+ print(f"📦 [DataManager V41.0] Scanner Matrix (Debug Mode) Online.")
41
 
42
  async def initialize(self):
43
  self.http_client = httpx.AsyncClient(timeout=60.0)
 
51
  if self.http_client: await self.http_client.aclose()
52
  if self.exchange: await self.exchange.close()
53
 
 
 
 
54
  async def layer1_rapid_screening(self) -> List[Dict[str, Any]]:
 
 
 
 
 
 
 
55
  current_regime = getattr(SystemLimits, "CURRENT_REGIME", "RANGE")
56
  scanner_weights = getattr(SystemLimits, "SCANNER_WEIGHTS", {"RSI_MOMENTUM": 1.0})
57
  min_score = getattr(SystemLimits, "L1_MIN_AFFINITY_SCORE", 15.0)
58
 
59
  print(f"🔍 [L1 Matrix] Regime: {current_regime} | Weights: {scanner_weights}")
60
 
 
61
  tickers = await self._fetch_universe_tickers()
62
+ if not tickers:
63
+ print("⚠️ [L1] Universe fetch returned empty.")
64
+ return []
65
 
 
66
  top_candidates = tickers[:80]
 
 
67
  enriched_data = await self._batch_fetch_ta_data(top_candidates, timeframe='15m', limit=100)
68
 
69
  scored_candidates = []
70
+ debug_log_sample = [] # لعرض عينة من الدرجات
71
+
72
  for item in enriched_data:
73
  df = item.get('df')
74
  if df is None or len(df) < 50: continue
75
 
76
+ # تطبيق الكاشفات
77
  scores = self._apply_scanner_strategies(df)
78
 
 
79
  final_score = 0.0
80
  tags = []
81
 
 
84
  final_score += (val['score'] * w)
85
  if val['active']: tags.append(strategy)
86
 
87
+ # توافقية النظام القديم (Boost)
88
+ if item['change_24h'] > 3.0 and current_regime == "BULL": final_score += 10
 
89
 
90
  item['l1_score'] = final_score
91
  item['tags'] = tags
92
 
93
+ # تسجيل عينة للمراقبة (أول 5 عملات فقط لتجنب إغراق السجل)
94
+ if len(debug_log_sample) < 5:
95
+ debug_details = f"{item['symbol']}: {final_score:.1f} (RSI:{scores['RSI_MOMENTUM']['val']:.1f}|Score:{scores['RSI_MOMENTUM']['score']})"
96
+ debug_log_sample.append(debug_details)
97
+
98
  if final_score >= min_score:
99
  scored_candidates.append(item)
100
 
101
  scored_candidates.sort(key=lambda x: x['l1_score'], reverse=True)
102
 
103
+ # طباعة تقرير التصحيح
104
+ print(f" -> [DEBUG L1] Sample Scores: { ' | '.join(debug_log_sample) }")
105
+ print(f" -> Matrix selected {len(scored_candidates)} candidates (Threshold: {min_score}).")
106
+
107
  return [
108
  {
109
  'symbol': c['symbol'],
 
112
  'type': ','.join(c['tags']),
113
  'l1_score': c['l1_score']
114
  }
115
+ for c in scored_candidates[:40]
116
  ]
117
 
 
 
 
118
  def _apply_scanner_strategies(self, df: pd.DataFrame) -> Dict[str, Any]:
 
119
  results = {}
120
+ try:
121
+ close = df['close']
122
+
123
+ # 1. RSI (تم إصلاح المنطقة العمياء)
124
+ rsi = ta.rsi(close, length=14)
125
+ curr_rsi = rsi.iloc[-1] if rsi is not None else 50
126
+
127
+ score_rsi = 0
128
+ active_rsi = False
129
+
130
+ if 50 < curr_rsi < 75:
131
+ score_rsi = 100
132
+ active_rsi = True
133
+ elif curr_rsi <= 30: # Oversold
134
+ score_rsi = 80
135
+ active_rsi = True
136
+ elif 30 < curr_rsi <= 50: # ✅ المنطقة المحايدة (تمت إضافتها)
137
+ score_rsi = 40 # نعطيها بعض النقاط بدلاً من الصفر
138
+
139
+ results["RSI_MOMENTUM"] = {'score': score_rsi, 'active': active_rsi, 'val': curr_rsi}
140
+
141
+ # 2. BB Breakout
142
+ bb = ta.bbands(close, length=20, std=2)
143
+ if bb is not None:
144
+ upper = bb[f'BBU_20_2.0'].iloc[-1]
145
+ width = bb[f'BBB_20_2.0'].iloc[-1]
146
+ curr_price = close.iloc[-1]
147
+ score_bb = 0
148
+ active_bb = False
149
+ if curr_price > upper and width > 0.1:
150
+ score_bb = 100
151
+ active_bb = True
152
+ else:
153
+ score_bb = 0; active_bb = False
154
+ results["BB_BREAKOUT"] = {'score': score_bb, 'active': active_bb}
155
+
156
+ # 3. MACD
157
+ macd = ta.macd(close)
158
+ if macd is not None:
159
+ hist = macd[f'MACDh_12_26_9'].iloc[-1]
160
+ score_macd = 0
161
+ active_macd = False
162
+ if hist > 0: # الهستوجرام إيجابي
163
+ score_macd = 100
164
+ active_macd = True
165
+ else:
166
+ score_macd = 0; active_macd = False
167
+ results["MACD_CROSS"] = {'score': score_macd, 'active': active_macd}
168
+
169
+ # 4. Volume Flow
170
+ vol = df['volume']
171
+ vol_ma = ta.sma(vol, length=20).iloc[-1]
172
+ curr_vol = vol.iloc[-1]
173
+ score_vol = 0
174
+ active_vol = False
175
+ if curr_vol > (vol_ma * 1.2): # خففنا الشرط من 1.5 إلى 1.2
176
+ score_vol = 100
177
+ active_vol = True
178
+ results["VOLUME_FLOW"] = {'score': score_vol, 'active': active_vol}
179
+
180
+ except Exception as e:
181
+ # في حال حدوث خطأ في الحساب، نعيد قيم صفرية لعدم إيقاف النظام
182
+ # print(f"Indicator Error: {e}")
183
+ return {k: {'score': 0, 'active': False, 'val': 0} for k in ["RSI_MOMENTUM", "BB_BREAKOUT", "MACD_CROSS", "VOLUME_FLOW"]}
184
 
185
  return results
186
 
187
+ # --- Helpers ---
 
 
188
  async def _fetch_universe_tickers(self):
189
  try:
190
  tickers = await self.exchange.fetch_tickers()
 
192
  for symbol, ticker in tickers.items():
193
  if not symbol.endswith('/USDT'): continue
194
  if any(bad in symbol for bad in self.BLACKLIST_TOKENS): continue
195
+ if not ticker.get('quoteVolume') or ticker['quoteVolume'] < 300_000: continue # تخفيف شرط السيولة قليلاً
196
  candidates.append({
197
  'symbol': symbol,
198
  'quote_volume': ticker['quoteVolume'],
 
205
 
206
  async def _batch_fetch_ta_data(self, candidates, timeframe='15m', limit=100):
207
  results = []
208
+ chunk_size = 15
209
  for i in range(0, len(candidates), chunk_size):
210
  chunk = candidates[i:i+chunk_size]
211
  tasks = [self._fetch_ohlcv_safe(c, timeframe, limit) for c in chunk]
 
225
  return candidate
226
  except: return None
227
 
 
228
  async def get_latest_price_async(self, symbol):
229
  t = await self.exchange.fetch_ticker(symbol)
230
  return float(t['last'])
 
231
  async def get_latest_ohlcv(self, symbol, tf, limit=100):
232
  return await self.exchange.fetch_ohlcv(symbol, tf, limit=limit)
 
233
  async def get_order_book_snapshot(self, symbol, limit=20):
234
  return await self.exchange.fetch_order_book(symbol, limit)
 
 
235
  async def load_contracts_from_r2(self): pass
236
  def get_contracts_db(self): return self.contracts_db