Spaces:
Sleeping
Sleeping
| # ============================================================================== | |
| # 📂 ml_engine/data_manager.py | |
| # (V77.0 - GEM-Architect: Smart Exchange Switcher) | |
| # ============================================================================== | |
| # - Feature: Auto-Failover (Binance -> Bybit -> OKX -> MEXC). | |
| # - Logic: Automatically detects the best available exchange for the current IP. | |
| # - Stability: Zero-config switch if one exchange is blocked (451 Error). | |
| # ============================================================================== | |
| import asyncio | |
| import httpx | |
| import traceback | |
| import ccxt.async_support as ccxt | |
| import logging | |
| import pandas as pd | |
| import numpy as np | |
| import time | |
| from typing import List, Dict, Any, Optional | |
| # محاولة استيراد حدود النظام | |
| try: | |
| from ml_engine.processor import SystemLimits | |
| except ImportError: | |
| SystemLimits = None | |
| # تقليل ضوضاء السجلات | |
| logging.getLogger("httpx").setLevel(logging.WARNING) | |
| logging.getLogger("ccxt").setLevel(logging.WARNING) | |
| class DataManager: | |
| def __init__(self, contracts_db, whale_monitor, r2_service=None): | |
| self.contracts_db = contracts_db or {} | |
| self.whale_monitor = whale_monitor | |
| self.r2_service = r2_service | |
| self.adaptive_hub_ref = None | |
| self.exchange = None | |
| self.exchange_id = None | |
| self.http_client = None | |
| self.market_cache = {} | |
| # القائمة السوداء (العملات المستقرة والرافعة) | |
| self.BLACKLIST_TOKENS = [ | |
| 'USDT', 'USDC', 'DAI', 'TUSD', 'BUSD', 'FDUSD', 'EUR', 'PAX', 'USDP', | |
| 'UP', 'DOWN', 'BEAR', 'BULL', '3S', '3L', '5S', '5L' | |
| ] | |
| # قائمة المنصات المرشحة بالترتيب (الأولوية للأعلى) | |
| self.EXCHANGE_CANDIDATES = [ | |
| {'id': 'binance', 'name': 'Binance', 'options': {'adjustForTimeDifference': True}}, | |
| {'id': 'bybit', 'name': 'Bybit', 'options': {}}, | |
| {'id': 'okx', 'name': 'OKX', 'options': {}}, | |
| {'id': 'mexc', 'name': 'MEXC', 'options': {}} | |
| ] | |
| print(f"📦 [DataManager V77.0] Initialized (Smart Switcher Ready).") | |
| async def initialize(self): | |
| """تهيئة الاتصال واختيار أفضل منصة متاحة""" | |
| print(" > [DataManager] Starting initialization & Exchange Discovery...") | |
| self.http_client = httpx.AsyncClient(timeout=30.0) | |
| # 🔄 تشغيل بروتوكول البحث عن منصة (Exchange Discovery Protocol) | |
| await self._connect_to_best_exchange() | |
| await self.load_contracts_from_r2() | |
| async def _connect_to_best_exchange(self): | |
| """تجربة المنصات بالترتيب واعتماد أول منصة تعمل""" | |
| for cand in self.EXCHANGE_CANDIDATES: | |
| ex_id = cand['id'] | |
| ex_name = cand['name'] | |
| print(f" 🔄 Testing connection to {ex_name}...") | |
| try: | |
| exchange_class = getattr(ccxt, ex_id) | |
| exchange_instance = exchange_class({ | |
| 'enableRateLimit': True, | |
| 'timeout': 30000, | |
| 'options': { | |
| 'defaultType': 'spot', | |
| **cand['options'] | |
| } | |
| }) | |
| # اختبار الاتصال الفعلي | |
| await exchange_instance.load_markets() | |
| # اختبار إضافي: التحقق من وجود BTC/USDT لضمان صحة البيانات | |
| if 'BTC/USDT' in exchange_instance.markets: | |
| self.exchange = exchange_instance | |
| self.exchange_id = ex_id | |
| self.market_cache = self.exchange.markets | |
| print(f" ✅ [SUCCESS] Connected to {ex_name} (Markets: {len(self.market_cache)}).") | |
| return # تم العثور على منصة، الخروج من الحلقة | |
| else: | |
| print(f" ⚠️ [SKIP] {ex_name} connected but BTC/USDT missing (Unusual).") | |
| await exchange_instance.close() | |
| except Exception as e: | |
| error_msg = str(e) | |
| if "451" in error_msg: | |
| print(f" ⛔ [BLOCKED] {ex_name} is restricted in this region (Error 451).") | |
| else: | |
| print(f" ❌ [FAIL] Could not connect to {ex_name}: {error_msg.splitlines()[0]}") | |
| # تنظيف الموارد قبل الانتقال للتالي | |
| if 'exchange_instance' in locals(): | |
| await exchange_instance.close() | |
| # إذا فشلت جميع المنصات | |
| if not self.exchange: | |
| raise RuntimeError("❌ ALL Exchanges Failed! Please check your internet connection or VPN.") | |
| async def close(self): | |
| if self.http_client: await self.http_client.aclose() | |
| if self.exchange: await self.exchange.close() | |
| async def load_contracts_from_r2(self): | |
| if not self.r2_service: return | |
| try: | |
| self.contracts_db = await self.r2_service.load_contracts_db_async() | |
| except Exception: self.contracts_db = {} | |
| def get_contracts_db(self) -> Dict[str, Any]: | |
| return self.contracts_db | |
| # ================================================================== | |
| # 🌍 Global Market Validator | |
| # ================================================================== | |
| async def check_global_market_health(self) -> Dict[str, Any]: | |
| if not self.exchange: return {'is_safe': False, 'reason': 'No Exchange'} | |
| try: | |
| btc_ohlcv = await self.exchange.fetch_ohlcv('BTC/USDT', '1d', limit=7) | |
| if not btc_ohlcv: return {'is_safe': True, 'reason': 'No BTC Data'} | |
| df = pd.DataFrame(btc_ohlcv, columns=['ts', 'o', 'h', 'l', 'c', 'v']) | |
| daily_change = (df['c'].iloc[-1] - df['c'].iloc[-2]) / df['c'].iloc[-2] | |
| if daily_change < -0.10: | |
| return {'is_safe': False, 'reason': f'🚨 BTC CRASHING ({daily_change*100:.2f}%)'} | |
| return {'is_safe': True, 'reason': '✅ Market Open'} | |
| except Exception as e: | |
| return {'is_safe': True, 'reason': 'Error Bypass'} | |
| # ================================================================== | |
| # 🧠 Layer 1: The Open Gate (Volume Only) | |
| # ================================================================== | |
| async def layer1_rapid_screening(self, limit=200, adaptive_hub_ref=None) -> List[Dict[str, Any]]: | |
| self.adaptive_hub_ref = adaptive_hub_ref | |
| if not self.exchange: | |
| print("❌ [Layer 1] No active exchange connection.") | |
| return [] | |
| print(f"🔍 [Layer 1] Screening {self.exchange_id.upper()} Market (Volume Only > $1M)...") | |
| # 0. فحص السوق | |
| market_health = await self.check_global_market_health() | |
| if not market_health['is_safe']: | |
| print(f"⛔ [Market Validator] Trading Halted: {market_health['reason']}") | |
| return [] | |
| # 1. فلتر السيولة الصارم | |
| initial_candidates = await self._stage0_universe_filter() | |
| if not initial_candidates: | |
| print(f"⚠️ [Layer 1] No coins met the criteria on {self.exchange_id}.") | |
| return [] | |
| # نأخذ أعلى العملات سيولة | |
| top_candidates = initial_candidates[:limit] | |
| print(f" 📥 Fetching data for top {len(top_candidates)} liquid assets...") | |
| enriched_data = await self._fetch_technical_data_batch(top_candidates) | |
| final_list = [] | |
| for item in enriched_data: | |
| regime_info = self._diagnose_asset_regime(item) | |
| item['asset_regime'] = regime_info['regime'] | |
| item['asset_regime_conf'] = regime_info['conf'] | |
| item['strategy_type'] = 'NEURAL_SCAN' | |
| item['strategy_tag'] = 'NEURAL' | |
| item['l1_sort_score'] = 0.5 | |
| if self.adaptive_hub_ref: | |
| dynamic_config = self.adaptive_hub_ref.get_coin_type_config('SAFE_BOTTOM') | |
| item['dynamic_limits'] = dynamic_config | |
| final_list.append(item) | |
| final_list.sort(key=lambda x: x.get('quote_volume', 0), reverse=True) | |
| print(f"✅ [Layer 1] Passed {len(final_list)} candidates directly to Neural Layer.") | |
| return final_list | |
| # ================================================================== | |
| # 🧱 Order Book Depth Scanner | |
| # ================================================================== | |
| async def get_order_book_snapshot(self, symbol: str, limit=20): | |
| try: return await self.exchange.fetch_order_book(symbol, limit) | |
| except: return {} | |
| # ================================================================== | |
| # 🔍 Stage 0: Universe Filter (Multi-Exchange Compatible) | |
| # ================================================================== | |
| async def _stage0_universe_filter(self) -> List[Dict[str, Any]]: | |
| try: | |
| MIN_VOLUME_THRESHOLD = 1000000.0 # $1M | |
| print(f" 🛡️ [Stage 0] Fetching Tickers from {self.exchange_id.upper()}...") | |
| tickers = await self.exchange.fetch_tickers() | |
| candidates = [] | |
| SOVEREIGN_COINS = ['BTC/USDT', 'ETH/USDT', 'SOL/USDT', 'BNB/USDT', 'XRP/USDT'] | |
| for symbol, ticker in tickers.items(): | |
| if not symbol.endswith('/USDT'): continue | |
| # إقصاء العملات المحظورة | |
| base_curr = symbol.split('/')[0] | |
| if any(bad in base_curr for bad in self.BLACKLIST_TOKENS): continue | |
| # توحيد حساب الفوليوم (لأن المنصات تختلف في التسميات) | |
| calc_quote_vol = float(ticker.get('quoteVolume') or 0.0) | |
| if calc_quote_vol == 0.0: | |
| base_vol = float(ticker.get('baseVolume') or 0.0) | |
| last_price = float(ticker.get('last') or 0.0) | |
| calc_quote_vol = base_vol * last_price | |
| is_sovereign = symbol in SOVEREIGN_COINS | |
| if not is_sovereign: | |
| if calc_quote_vol < MIN_VOLUME_THRESHOLD: continue | |
| change_pct = ticker.get('percentage') | |
| if change_pct is None: change_pct = 0.0 | |
| candidates.append({ | |
| 'symbol': symbol, | |
| 'quote_volume': calc_quote_vol, | |
| 'current_price': float(ticker.get('last') or 0.0), | |
| 'change_24h': change_pct | |
| }) | |
| candidates.sort(key=lambda x: x['quote_volume'], reverse=True) | |
| print(f" ℹ️ [Stage 0] Found {len(candidates)} valid candidates.") | |
| return candidates | |
| except Exception as e: | |
| print(f"❌ [L1 Error] Universe filter failed: {e}") | |
| traceback.print_exc() | |
| return [] | |
| # ------------------------------------------------------------------ | |
| # Helpers & Data Fetching | |
| # ------------------------------------------------------------------ | |
| def _diagnose_asset_regime(self, item: Dict[str, Any]) -> Dict[str, Any]: | |
| try: | |
| if 'df_1h' not in item: | |
| if 'ohlcv_1h_raw' in item: item['df_1h'] = self._calc_indicators(item['ohlcv_1h_raw']) | |
| else: return {'regime': 'RANGE', 'conf': 0.0} | |
| df = item['df_1h'] | |
| if df.empty: return {'regime': 'RANGE', 'conf': 0.0} | |
| curr = df.iloc[-1] | |
| price = curr['close']; ema20 = curr['ema20']; ema50 = curr['ema50']; rsi = curr['rsi'] | |
| regime = "RANGE"; conf = 0.5 | |
| if price > ema20 and ema20 > ema50: | |
| regime = "BULL"; conf = 0.8 if rsi > 55 else 0.6 | |
| elif price < ema20 and ema20 < ema50: | |
| regime = "BEAR"; conf = 0.8 if rsi < 45 else 0.6 | |
| return {'regime': regime, 'conf': conf} | |
| except Exception: return {'regime': 'RANGE', 'conf': 0.0} | |
| async def _fetch_technical_data_batch(self, candidates): | |
| chunk_size = 10; results = [] | |
| for i in range(0, len(candidates), chunk_size): | |
| chunk = candidates[i:i+chunk_size] | |
| tasks = [self._fetch_single(c) for c in chunk] | |
| res = await asyncio.gather(*tasks) | |
| results.extend([r for r in res if r]) | |
| await asyncio.sleep(0.05) | |
| return results | |
| async def _fetch_single(self, c): | |
| try: | |
| h1 = await self.exchange.fetch_ohlcv(c['symbol'], '1h', limit=100) | |
| m15 = await self.exchange.fetch_ohlcv(c['symbol'], '15m', limit=50) | |
| if not h1 or not m15: return None | |
| c['ohlcv'] = {'1h': h1, '15m': m15} | |
| c['ohlcv_1h_raw'] = h1 | |
| c['ohlcv_15m_raw'] = m15 | |
| c['df_1h'] = self._calc_indicators(h1) | |
| return c | |
| except: return None | |
| def _calc_indicators(self, ohlcv): | |
| df = pd.DataFrame(ohlcv, columns=['ts', 'o', 'h', 'l', 'c', 'v']) | |
| delta = df['c'].diff() | |
| gain = (delta.where(delta>0, 0)).rolling(14).mean() | |
| loss = (-delta.where(delta<0, 0)).rolling(14).mean() | |
| rs = gain/loss | |
| df['rsi'] = 100 - (100/(1+rs)) | |
| df['ema20'] = df['c'].ewm(span=20).mean() | |
| df['ema50'] = df['c'].ewm(span=50).mean() | |
| tr = np.maximum(df['h']-df['l'], np.maximum(abs(df['h']-df['c'].shift()), abs(df['l']-df['c'].shift()))) | |
| df['atr'] = tr.rolling(14).mean() | |
| df.rename(columns={'o':'open', 'h':'high', 'l':'low', 'c':'close', 'v':'volume'}, inplace=True) | |
| return df.fillna(0) | |
| async def get_latest_price_async(self, symbol): | |
| try: return float((await self.exchange.fetch_ticker(symbol))['last']) | |
| except: return 0.0 | |
| async def get_latest_ohlcv(self, symbol, timeframe='5m', limit=100): | |
| try: return await self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit) | |
| except: return [] |