# ============================================================ # 📂 ml_engine/data_manager.py # (V60.4 - GEM-Architect: Regime Gating + Robust Data) # ============================================================ import asyncio import httpx import traceback import ccxt.async_support as ccxt import logging import pandas as pd import numpy as np from typing import List, Dict, Any # محاولة استيراد حدود النظام try: from ml_engine.processor import SystemLimits except ImportError: SystemLimits = None # تقليل ضوضاء السجلات logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("ccxt").setLevel(logging.WARNING) class DataManager: def __init__(self, contracts_db, whale_monitor, r2_service=None): self.contracts_db = contracts_db or {} self.whale_monitor = whale_monitor self.r2_service = r2_service self.adaptive_hub_ref = None self.exchange = ccxt.kucoin({ 'enableRateLimit': True, 'timeout': 60000, 'options': {'defaultType': 'spot'} }) self.http_client = None self.market_cache = {} # القائمة السوداء self.BLACKLIST_TOKENS = [ 'USDT', 'USDC', 'DAI', 'TUSD', 'BUSD', 'FDUSD', 'EUR', 'PAX', 'UP', 'DOWN', 'BEAR', 'BULL', '3S', '3L' ] print(f"📦 [DataManager V60.4] Regime Gating (Range Protection) Active.") async def initialize(self): print(" > [DataManager] Starting initialization...") self.http_client = httpx.AsyncClient(timeout=30.0) await self._load_markets() await self.load_contracts_from_r2() async def _load_markets(self): try: if self.exchange: await self.exchange.load_markets() self.market_cache = self.exchange.markets except Exception: pass async def close(self): if self.http_client: await self.http_client.aclose() if self.exchange: await self.exchange.close() async def load_contracts_from_r2(self): if not self.r2_service: return try: self.contracts_db = await self.r2_service.load_contracts_db_async() except Exception: self.contracts_db = {} def get_contracts_db(self) -> Dict[str, Any]: return self.contracts_db # ================================================================== # 🧠 Layer 1: Screening + Diagnosis + Regime Gating # ================================================================== async def layer1_rapid_screening(self, adaptive_hub_ref=None) -> List[Dict[str, Any]]: self.adaptive_hub_ref = adaptive_hub_ref print(f"🔍 [Layer 1] Screening with Regime Gating...") # 1. فلتر السيولة initial_candidates = await self._stage0_universe_filter() if not initial_candidates: print("⚠️ [Layer 1] Stage 0 returned 0 candidates.") return [] # 2. جلب البيانات الفنية top_candidates = initial_candidates[:300] enriched_data = await self._fetch_technical_data_batch(top_candidates) final_list = [] for item in enriched_data: # 3. التصنيف الفني (Breakout vs Reversal) classification = self._apply_strict_logic_tree(item) if classification['type'] != 'NONE': # 4. التشخيص (Diagnosis) regime_info = self._diagnose_asset_regime(item) current_regime = regime_info['regime'] # 🔥 5. Regime Gating (بوابة النظام - الحماية من المصيدة) # إذا السوق عرضي (RANGE) أو ميت (DEAD)، نمنع الاختراقات (BREAKOUT) # لأن الاختراقات تفشل في هذه الظروف وتصبح مصيدة ثيران. if current_regime in ['RANGE', 'DEAD'] and classification['type'] == 'BREAKOUT': # تخطي بصمت (حماية) continue # إذا مر من البوابة، نتابع item['asset_regime'] = current_regime item['asset_regime_conf'] = regime_info['conf'] # حقن العتبات if self.adaptive_hub_ref: dynamic_config = self.adaptive_hub_ref.get_regime_config(current_regime) item['dynamic_limits'] = dynamic_config item['l1_sort_score'] = classification['score'] item['strategy_tag'] = classification['type'] final_list.append(item) # 6. الترتيب النهائي final_list.sort(key=lambda x: x['l1_sort_score'], reverse=True) selection = final_list[:50] print(f"✅ [Layer 1] Passed {len(selection)} candidates (Safe Strategies Only).") return selection # ================================================================== # 🔍 Stage 0: Universe Filter (Robust USD Calc) # ================================================================== async def _stage0_universe_filter(self) -> List[Dict[str, Any]]: try: print(" 🛡️ [Stage 0] Fetching Tickers...") tickers = await self.exchange.fetch_tickers() candidates = [] # القائمة السيادية (تمر دائماً) SOVEREIGN_COINS = ['BTC/USDT', 'ETH/USDT', 'SOL/USDT', 'BNB/USDT', 'XRP/USDT'] reject_stats = {"volume": 0, "change": 0, "blacklist": 0} debug_printed = False for symbol, ticker in tickers.items(): if not symbol.endswith('/USDT'): continue base_curr = symbol.split('/')[0] if any(bad in base_curr for bad in self.BLACKLIST_TOKENS): reject_stats["blacklist"] += 1 continue # حساب الحجم يدوياً لضمان الدقة base_vol = float(ticker.get('baseVolume') or 0.0) last_price = float(ticker.get('last') or 0.0) calc_quote_vol = base_vol * last_price is_sovereign = symbol in SOVEREIGN_COINS # طباعة فحص BTC مرة واحدة if "BTC/USDT" in symbol and not debug_printed: print(f" 🐛 [DEBUG] BTC Vol: ${calc_quote_vol:,.0f}") debug_printed = True # فلتر السيولة (500k) - نتجاوزه للعملات السيادية if not is_sovereign: if calc_quote_vol < 500000: reject_stats["volume"] += 1 continue # فلتر التغير (20%) change_pct = ticker.get('percentage') if change_pct is None: change_pct = 0.0 if abs(change_pct) > 20.0: reject_stats["change"] += 1 continue candidates.append({ 'symbol': symbol, 'quote_volume': calc_quote_vol, 'current_price': last_price, 'change_24h': change_pct }) print(f" 📊 [Filter Stats] Total: {len(tickers)} | Passed: {len(candidates)}") print(f" ❌ Rejected: Vol < 500k ({reject_stats['volume']}) | Change > 20% ({reject_stats['change']})") candidates.sort(key=lambda x: x['quote_volume'], reverse=True) return candidates except Exception as e: print(f"❌ [L1 Error] Universe filter failed: {e}") traceback.print_exc() return [] # ------------------------------------------------------------------ # 🧭 The Diagnoser # ------------------------------------------------------------------ def _diagnose_asset_regime(self, item: Dict[str, Any]) -> Dict[str, Any]: try: if 'df_1h' not in item: return {'regime': 'RANGE', 'conf': 0.0} df = item['df_1h'] curr = df.iloc[-1] price = curr['close'] ema20 = curr['ema20'] ema50 = curr['ema50'] rsi = curr['rsi'] atr = curr['atr'] atr_pct = (atr / price) * 100 if price > 0 else 0 regime = "RANGE" conf = 0.5 if atr_pct < 0.5: return {'regime': 'DEAD', 'conf': 0.9} if price > ema20 and ema20 > ema50 and rsi > 50: regime = "BULL" conf = 0.8 if rsi > 55 else 0.6 elif price < ema20 and ema20 < ema50 and rsi < 50: regime = "BEAR" conf = 0.8 if rsi < 45 else 0.6 return {'regime': regime, 'conf': conf} except Exception: return {'regime': 'RANGE', 'conf': 0.0} # ------------------------------------------------------------------ # 🛡️ The Logic Tree (Anti-FOMO Tuned) # ------------------------------------------------------------------ def _apply_strict_logic_tree(self, data: Dict[str, Any]) -> Dict[str, Any]: try: df_1h = self._calc_indicators(data['ohlcv_1h_raw']) df_15m = self._calc_indicators(data['ohlcv_15m_raw']) data['df_1h'] = df_1h except: return {'type': 'NONE', 'score': 0} curr_1h = df_1h.iloc[-1] curr_15m = df_15m.iloc[-1] try: close_4h_ago = df_1h.iloc[-5]['close'] change_4h = ((curr_1h['close'] - close_4h_ago) / close_4h_ago) * 100 except: change_4h = 0.0 # Gates if change_4h > 12.0: return {'type': 'NONE', 'score': 0} if curr_1h['rsi'] > 75: return {'type': 'NONE', 'score': 0} dev = (curr_1h['close'] - curr_1h['ema20']) / curr_1h['atr'] if curr_1h['atr'] > 0 else 0 if dev > 2.2: return {'type': 'NONE', 'score': 0} # A. Breakout is_bullish = (curr_1h['ema20'] > curr_1h['ema50']) or (curr_1h['close'] > curr_1h['ema20']) if is_bullish and (45 <= curr_1h['rsi'] <= 75): vol_ma = df_15m['volume'].rolling(20).mean().iloc[-1] if curr_15m['volume'] >= 1.2 * vol_ma: score = curr_15m['volume'] / vol_ma if vol_ma > 0 else 1.0 return {'type': 'BREAKOUT', 'score': score} # B. Reversal if 20 <= curr_1h['rsi'] <= 40 and change_4h <= -2.0: score = (100 - curr_1h['rsi']) return {'type': 'REVERSAL', 'score': score} return {'type': 'NONE', 'score': 0} # ------------------------------------------------------------------ # Helpers # ------------------------------------------------------------------ async def _fetch_technical_data_batch(self, candidates): chunk_size = 10; results = [] for i in range(0, len(candidates), chunk_size): chunk = candidates[i:i+chunk_size] tasks = [self._fetch_single(c) for c in chunk] res = await asyncio.gather(*tasks) results.extend([r for r in res if r]) await asyncio.sleep(0.05) return results async def _fetch_single(self, c): try: h1 = await self.exchange.fetch_ohlcv(c['symbol'], '1h', limit=60) m15 = await self.exchange.fetch_ohlcv(c['symbol'], '15m', limit=60) if not h1 or not m15: return None c['ohlcv'] = {'1h': h1, '15m': m15} c['ohlcv_1h_raw'] = h1 c['ohlcv_15m_raw'] = m15 return c except: return None def _calc_indicators(self, ohlcv): df = pd.DataFrame(ohlcv, columns=['ts', 'o', 'h', 'l', 'c', 'v']) delta = df['c'].diff() gain = (delta.where(delta>0, 0)).rolling(14).mean() loss = (-delta.where(delta<0, 0)).rolling(14).mean() rs = gain/loss df['rsi'] = 100 - (100/(1+rs)) df['ema20'] = df['c'].ewm(span=20).mean() df['ema50'] = df['c'].ewm(span=50).mean() tr = np.maximum(df['h']-df['l'], np.maximum(abs(df['h']-df['c'].shift()), abs(df['l']-df['c'].shift()))) df['atr'] = tr.rolling(14).mean() df.rename(columns={'o':'open', 'h':'high', 'l':'low', 'c':'close', 'v':'volume'}, inplace=True) return df.fillna(0) async def get_latest_price_async(self, symbol): try: return float((await self.exchange.fetch_ticker(symbol))['last']) except: return 0.0 async def get_latest_ohlcv(self, symbol, timeframe='5m', limit=100): try: return await self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit) except: return [] async def get_order_book_snapshot(self, symbol, limit=20): try: return await self.exchange.fetch_order_book(symbol, limit) except: return {}