# ml_engine/data_manager.py # (V13.2 - GEM-Architect: Added Order Book Capability) import os import asyncio import httpx import traceback import ccxt.async_support as ccxt import logging from typing import List, Dict, Any # إعدادات التسجيل logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("httpcore").setLevel(logging.WARNING) logging.getLogger("ccxt").setLevel(logging.WARNING) class DataManager: def __init__(self, contracts_db, whale_monitor, r2_service=None): # ================================================================== # ⚙️ إعدادات التحكم # ================================================================== self.contracts_db = contracts_db or {} self.whale_monitor = whale_monitor self.r2_service = r2_service # إعداد المنصة (KuCoin) self.exchange = ccxt.kucoin({ 'enableRateLimit': True, 'timeout': 30000, }) self.http_client = None self.market_cache = {} async def initialize(self): """تهيئة مدير البيانات والاتصالات""" print(" > [DataManager] Starting initialization...") self.http_client = httpx.AsyncClient(timeout=30.0) await self._load_markets() print(f"✅ [DataManager V13.2] Ready (Order Book Enabled).") async def _load_markets(self): """تحميل بيانات الأسواق وتخزينها مؤقتاً""" try: if self.exchange: await self.exchange.load_markets() self.market_cache = self.exchange.markets except Exception as e: print(f"❌ [DataManager] Market load failed: {e}") traceback.print_exc() async def close(self): """إغلاق جميع الاتصالات بأمان""" print(" > [DataManager] Closing connections...") if self.http_client: await self.http_client.aclose() if self.exchange: await self.exchange.close() # ================================================================== # 🚀 R2 Compatibility # ================================================================== async def load_contracts_from_r2(self): if not self.r2_service: return try: self.contracts_db = await self.r2_service.load_contracts_db_async() print(f"✅ [DataManager] Contracts loaded: {len(self.contracts_db)}") except Exception: self.contracts_db = {} def get_contracts_db(self) -> Dict[str, Any]: return self.contracts_db # ================================================================== # 🛡️ Layer 1 Screening (Data Filter) # ================================================================== async def layer1_rapid_screening(self) -> List[Dict[str, Any]]: """الغربلة الأولية السريعة بناءً على الحجم والسيولة.""" print(f"🔍 [Layer 1] Screening top liquid assets...") volume_data = await self._get_volume_data_live() if not volume_data: return [] # نأخذ أعلى 150 عملة candidates = volume_data[:150] return candidates async def _get_volume_data_live(self): """جلب وترتيب العملات حسب حجم التداول""" try: tickers = await self.exchange.fetch_tickers() data = [] for symbol, ticker in tickers.items(): if symbol.endswith('/USDT') and ticker.get('quoteVolume') and ticker['quoteVolume'] > 100000: data.append({ 'symbol': symbol, 'dollar_volume': ticker['quoteVolume'], 'current_price': ticker['last'] }) data.sort(key=lambda x: x['dollar_volume'], reverse=True) return data except Exception: return [] # ================================================================== # 🎯 Data Fetching Helpers # ================================================================== async def get_latest_price_async(self, symbol: str) -> float: try: ticker = await self.exchange.fetch_ticker(symbol) return float(ticker['last']) except Exception: return 0.0 async def get_latest_ohlcv(self, symbol: str, timeframe: str = '5m', limit: int = 100) -> List[List[float]]: try: candles = await self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit) if candles: return candles return [] except Exception: return [] # 🔥 [NEW] Order Book Fetcher async def get_order_book_snapshot(self, symbol: str, limit: int = 20) -> Dict[str, Any]: """ جلب لقطة سريعة لدفتر الطلبات (Bids/Asks). يستخدم لتحليل عمق السوق وتأكيد القناص. """ try: # ccxt يرجع الدفتر بصيغة {'bids': [[price, size], ...], 'asks': ...} ob = await self.exchange.fetch_order_book(symbol, limit) return ob except Exception as e: # طباعة خطأ خفيف لتجنب إغراق السجلات # print(f"⚠️ [DataManager] Failed to fetch OB for {symbol}: {e}") return {}