# ============================================================ # 📂 ml_engine/data_manager.py # (V45.0 - GEM-Architect: Anti-FOMO Revival) # ============================================================ import asyncio import httpx import traceback import logging import pandas as pd import numpy as np import pandas_ta as ta # سنحتاج بعض الدوال المساعدة from typing import List, Dict, Any import ccxt.async_support as ccxt # ✅ استيراد الدستور الديناميكي (للحفاظ على توافق النظام) try: from ml_engine.processor import SystemLimits except ImportError: class SystemLimits: L1_MIN_AFFINITY_SCORE = 15.0 CURRENT_REGIME = "RANGE" logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("ccxt").setLevel(logging.WARNING) class DataManager: """ DataManager V45.0 (Anti-FOMO Revival) - Restores the STRICT Logic Tree from V15.2. - Filters: 8% Max Pump, 12% Max Daily, RSI < 70 strict limit. - Targets: Clean Breakouts & Oversold Reversals ONLY. """ def __init__(self, contracts_db, whale_monitor, r2_service=None): self.contracts_db = contracts_db or {} self.whale_monitor = whale_monitor self.r2_service = r2_service self.exchange = ccxt.kucoin({ 'enableRateLimit': True, 'timeout': 60000, 'options': {'defaultType': 'spot'} }) self.http_client = None self.market_cache = {} # القائمة السوداء self.BLACKLIST_TOKENS = [ 'USDT', 'USDC', 'DAI', 'TUSD', 'BUSD', 'FDUSD', 'EUR', 'PAX', 'UP', 'DOWN', 'BEAR', 'BULL', '3S', '3L', 'USDD', 'USDP', 'HT', 'KCS' ] print(f"📦 [DataManager V45.0] Anti-FOMO Shield Active.") async def initialize(self): print(" > [DataManager] Starting initialization...") try: self.http_client = httpx.AsyncClient(timeout=30.0) await self._load_markets() print(f"✅ [DataManager] Ready (Logic: STRICT Anti-FOMO).") except Exception as e: print(f"❌ [DataManager] Init Error: {e}") traceback.print_exc() async def _load_markets(self): try: if self.exchange and not self.exchange.markets: await self.exchange.load_markets() self.market_cache = self.exchange.markets except Exception: pass async def close(self): if self.http_client: await self.http_client.aclose() if self.exchange: await self.exchange.close() async def load_contracts_from_r2(self): if not self.r2_service: return try: self.contracts_db = await self.r2_service.load_contracts_db_async() except: self.contracts_db = {} def get_contracts_db(self): return self.contracts_db # ================================================================== # 🛡️ Layer 1: The Strict Logic Tree (From V15.2) # ================================================================== async def layer1_rapid_screening(self) -> List[Dict[str, Any]]: print(f"🔍 [L1 Anti-FOMO] Filtering Universe...") # 1. المرحلة 0: فلتر الكون (السيولة العالية فقط) # V15.2 كان يطلب مليون دولار سيولة، سنبقيه كما هو للصرامة initial_candidates = await self._stage0_universe_filter() if not initial_candidates: print("⚠️ [Layer 1] Universe empty.") return [] # 2. جلب البيانات الفنية لأفضل 300 عملة (كما في V15.2) top_liquid_candidates = initial_candidates[:300] print(f" -> Analyzing top {len(top_liquid_candidates)} liquid assets...") enriched_data = await self._fetch_technical_data_batch(top_liquid_candidates) # 3. تطبيق شجرة القرار الصارمة breakout_list = [] reversal_list = [] for item in enriched_data: # هنا نستخدم منطق V15.2 الأصلي classification = self._apply_logic_tree(item) if classification['type'] == 'BREAKOUT': item['l1_score'] = classification['score'] item['type'] = 'BREAKOUT' breakout_list.append(item) elif classification['type'] == 'REVERSAL': item['l1_score'] = classification['score'] item['type'] = 'REVERSAL' reversal_list.append(item) print(f" -> [L1 Logic] Found: {len(breakout_list)} Breakouts, {len(reversal_list)} Reversals.") # 4. الترتيب والدمج النهائي # الـ Breakout نرتبهم بالأعلى سكور (فوليوم) breakout_list.sort(key=lambda x: x['l1_score'], reverse=True) # الـ Reversal نرتبهم بالأعلى سكور (سكور الارتداد في V15.2 كان 100-RSI، يعني الأعلى أفضل) reversal_list.sort(key=lambda x: x['l1_score'], reverse=True) # نختار الأفضل فقط (مزيج متوازن) final_selection = breakout_list[:25] + reversal_list[:15] return [ { 'symbol': c['symbol'], 'quote_volume': c.get('quote_volume', 0), 'current_price': c.get('current_price', 0), 'type': c.get('type', 'UNKNOWN'), 'l1_score': c.get('l1_score', 0) } for c in final_selection ] # ================================================================== # 🔗 Bridge for Backtest Engine Compatibility (IMPORTANT) # ================================================================== def _calculate_structural_score(self, df: pd.DataFrame, symbol: str, regime: str) -> (float, List[str]): """ [Compatibility Wrapper] هذه الدالة موجودة لكي لا يتعطل محرك الباكتست (backtest_engine.py). تقوم بتحويل بيانات الباكتست إلى تنسيق يفهمه منطق V15.2. """ # محاكاة تنسيق البيانات الذي يطلبه _apply_logic_tree # نحتاج تقسيم الـ DF إلى 1H و 15M تقريبياً try: # Resample لإنشاء بيانات 1H و 15M من البيانات المدخلة (التي غالباً تكون 15M) df_15m = df.copy() agg_dict = {'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum'} df_1h = df.resample('1H').agg(agg_dict).dropna() # تحويلها لقوائم كما يتوقع الكود القديم ohlcv_1h = df_1h.reset_index()[['timestamp', 'open', 'high', 'low', 'close', 'volume']].values.tolist() ohlcv_15m = df_15m.reset_index()[['timestamp', 'open', 'high', 'low', 'close', 'volume']].values.tolist() dummy_data = { 'ohlcv_1h': ohlcv_1h, 'ohlcv_15m': ohlcv_15m, 'change_24h': 0.0 # غير متوفر بدقة في الباكتست الجزئي، نتجاوزه } res = self._apply_logic_tree(dummy_data) score = res.get('score', 0.0) # تحويل السكور ليكون متوافقاً مع الباكتست (حول 20-80) if res['type'] == 'BREAKOUT': return score * 20.0, ["BREAKOUT"] # Breakout score is usually low (ratio), boost it elif res['type'] == 'REVERSAL': return score, ["REVERSAL"] # Reversal score is already 0-100 return 0.0, ["NONE"] except Exception: return 0.0, ["ERROR"] # ================================================================== # 🏗️ V15.2 Logic Core (Unchanged Logic) # ================================================================== def _apply_logic_tree(self, data: Dict[str, Any]) -> Dict[str, Any]: try: df_1h = self._calc_indicators(data['ohlcv_1h']) df_15m = self._calc_indicators(data['ohlcv_15m']) except: return {'type': 'NONE'} if df_1h.empty or df_15m.empty: return {'type': 'NONE'} curr_1h = df_1h.iloc[-1] curr_15m = df_15m.iloc[-1] # --- Stage 2: Anti-FOMO Filters (STRICT) --- try: # حساب التغير في آخر 4 ساعات if len(df_1h) >= 5: close_4h_ago = df_1h.iloc[-5]['close'] change_4h = ((curr_1h['close'] - close_4h_ago) / close_4h_ago) * 100 else: change_4h = 0.0 except: change_4h = 0.0 # 1. فلتر المضخات: ممنوع الدخول إذا صعدت أكثر من 8% في 4 ساعات if change_4h > 8.0: return {'type': 'NONE'} # 2. فلتر التذبذب اليومي: ممنوع أكثر من 12% (للبعد عن العملات المجنونة) if data.get('change_24h', 0) > 12.0: return {'type': 'NONE'} # 3. فلتر القمة: ممنوع RSI فوق 70 قطعاً if curr_1h['rsi'] > 70: return {'type': 'NONE'} # 4. فلتر الامتداد: ممنوع الابتعاد عن المتوسط كثيراً deviation = (curr_1h['close'] - curr_1h['ema20']) / curr_1h['atr'] if curr_1h['atr'] > 0 else 0 if deviation > 1.8: return {'type': 'NONE'} # --- Stage 3: Setup Classification --- # === A. Breakout Logic === is_breakout = False breakout_score = 0.0 # تريند صاعد bullish_structure = (curr_1h['ema20'] > curr_1h['ema50']) or (curr_1h['close'] > curr_1h['ema20']) if bullish_structure: # RSI يجب أن يكون فيه مساحة للصعود (ليس منخفضاً جداً ولا مرتفعاً جداً) if 45 <= curr_1h['rsi'] <= 68: if curr_15m['close'] >= curr_15m['ema20']: # Volatility Squeeze (هدوء ما قبل العاصفة) avg_range = (df_15m['high'] - df_15m['low']).rolling(10).mean().iloc[-1] current_range = curr_15m['high'] - curr_15m['low'] if current_range <= avg_range * 1.8: vol_ma20 = df_15m['volume'].rolling(20).mean().iloc[-1] # شرط الفوليوم: شمعة الحالية فيها سيولة 1.5 ضعف المتوسط if curr_15m['volume'] >= 1.5 * vol_ma20: is_breakout = True breakout_score = curr_15m['volume'] / vol_ma20 if vol_ma20 > 0 else 1.0 if is_breakout: return {'type': 'BREAKOUT', 'score': breakout_score} # === B. Reversal Logic === is_reversal = False reversal_score = 0.0 # تشبع بيعي واضح if 20 <= curr_1h['rsi'] <= 40: # السعر هبط مؤخراً if change_4h <= -2.0: # البحث عن شمعة انعكاسية (Hammer / Green Body) last_3 = df_15m.iloc[-3:] found_rejection = False for _, row in last_3.iterrows(): rng = row['high'] - row['low'] if rng > 0: is_green = row['close'] > row['open'] # Hammer pattern logic lower_wick = min(row['open'], row['close']) - row['low'] body = abs(row['close'] - row['open']) hammer_shape = lower_wick > (body * 1.5) if is_green or hammer_shape: found_rejection = True break if found_rejection: is_reversal = True # السكور كلما قل الـ RSI كان أفضل للارتداد reversal_score = (100 - curr_1h['rsi']) if is_reversal: return {'type': 'REVERSAL', 'score': reversal_score} return {'type': 'NONE'} # ------------------------------------------------------------------ # Manual Indicator Calculation (Pandas pure - Exactly like V15.2) # ------------------------------------------------------------------ def _calc_indicators(self, ohlcv_list): if not ohlcv_list: return pd.DataFrame() df = pd.DataFrame(ohlcv_list, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) # RSI Calculation delta = df['close'].diff() gain = (delta.where(delta > 0, 0)).rolling(window=14).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean() rs = gain / loss df['rsi'] = 100 - (100 / (1 + rs)) # EMA df['ema20'] = df['close'].ewm(span=20, adjust=False).mean() df['ema50'] = df['close'].ewm(span=50, adjust=False).mean() # ATR high_low = df['high'] - df['low'] high_close = np.abs(df['high'] - df['close'].shift()) low_close = np.abs(df['low'] - df['close'].shift()) ranges = pd.concat([high_low, high_close, low_close], axis=1) true_range = np.max(ranges, axis=1) df['atr'] = true_range.rolling(14).mean() df.fillna(0, inplace=True) return df # ================================================================== # 🌌 Stage 0: Universe Filter (V15.2 Logic) # ================================================================== async def _stage0_universe_filter(self) -> List[Dict[str, Any]]: try: tickers = await self.exchange.fetch_tickers() candidates = [] for symbol, ticker in tickers.items(): if not symbol.endswith('/USDT'): continue base_curr = symbol.split('/')[0] if any(bad in base_curr for bad in self.BLACKLIST_TOKENS): continue # شرط السيولة الصارم: 1 مليون دولار quote_vol = ticker.get('quoteVolume') if not quote_vol or quote_vol < 1_000_000: continue last_price = ticker.get('last') if not last_price or last_price < 0.0005: continue candidates.append({ 'symbol': symbol, 'quote_volume': quote_vol, 'current_price': last_price, 'change_24h': float(ticker.get('percentage', 0.0)) }) # ترتيب مبدئي بالحجم candidates.sort(key=lambda x: x['quote_volume'], reverse=True) return candidates except Exception as e: print(f"❌ [L1 Error] Universe filter failed: {e}") return [] # ================================================================== # 🔄 Batch Fetching # ================================================================== async def _fetch_technical_data_batch(self, candidates: List[Dict[str, Any]]) -> List[Dict[str, Any]]: chunk_size = 15 results = [] for i in range(0, len(candidates), chunk_size): chunk = candidates[i:i + chunk_size] chunk_tasks = [self._fetch_single_tech_data(c) for c in chunk] chunk_results = await asyncio.gather(*chunk_tasks) results.extend([r for r in chunk_results if r is not None]) await asyncio.sleep(0.05) return results async def _fetch_single_tech_data(self, candidate: Dict[str, Any]) -> Any: symbol = candidate['symbol'] try: # V15.2 Requires 1H and 15M ohlcv_1h = await self.exchange.fetch_ohlcv(symbol, '1h', limit=60) ohlcv_15m = await self.exchange.fetch_ohlcv(symbol, '15m', limit=60) if not ohlcv_1h or len(ohlcv_1h) < 55 or not ohlcv_15m or len(ohlcv_15m) < 55: return None candidate['ohlcv_1h'] = ohlcv_1h candidate['ohlcv_15m'] = ohlcv_15m return candidate except Exception: return None # ================================================================== # 🎯 Public Helpers # ================================================================== async def get_latest_price_async(self, symbol: str) -> float: try: ticker = await self.exchange.fetch_ticker(symbol) return float(ticker['last']) except Exception: return 0.0 async def get_latest_ohlcv(self, symbol: str, timeframe: str = '5m', limit: int = 100) -> List[List[float]]: try: candles = await self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit) return candles or [] except Exception: return [] async def get_order_book_snapshot(self, symbol: str, limit: int = 20) -> Dict[str, Any]: try: ob = await self.exchange.fetch_order_book(symbol, limit) return ob except Exception: return {}