Spaces:
Paused
Paused
| # ml_engine/data_manager.py | |
| # (V13.2 - GEM-Architect: Added Order Book Capability) | |
| import os | |
| import asyncio | |
| import httpx | |
| import traceback | |
| import ccxt.async_support as ccxt | |
| import logging | |
| from typing import List, Dict, Any | |
| # إعدادات التسجيل | |
| logging.getLogger("httpx").setLevel(logging.WARNING) | |
| logging.getLogger("httpcore").setLevel(logging.WARNING) | |
| logging.getLogger("ccxt").setLevel(logging.WARNING) | |
| class DataManager: | |
| def __init__(self, contracts_db, whale_monitor, r2_service=None): | |
| # ================================================================== | |
| # ⚙️ إعدادات التحكم | |
| # ================================================================== | |
| self.contracts_db = contracts_db or {} | |
| self.whale_monitor = whale_monitor | |
| self.r2_service = r2_service | |
| # إعداد المنصة (KuCoin) | |
| self.exchange = ccxt.kucoin({ | |
| 'enableRateLimit': True, | |
| 'timeout': 30000, | |
| }) | |
| self.http_client = None | |
| self.market_cache = {} | |
| async def initialize(self): | |
| """تهيئة مدير البيانات والاتصالات""" | |
| print(" > [DataManager] Starting initialization...") | |
| self.http_client = httpx.AsyncClient(timeout=30.0) | |
| await self._load_markets() | |
| print(f"✅ [DataManager V13.2] Ready (Order Book Enabled).") | |
| async def _load_markets(self): | |
| """تحميل بيانات الأسواق وتخزينها مؤقتاً""" | |
| try: | |
| if self.exchange: | |
| await self.exchange.load_markets() | |
| self.market_cache = self.exchange.markets | |
| except Exception as e: | |
| print(f"❌ [DataManager] Market load failed: {e}") | |
| traceback.print_exc() | |
| async def close(self): | |
| """إغلاق جميع الاتصالات بأمان""" | |
| print(" > [DataManager] Closing connections...") | |
| if self.http_client: await self.http_client.aclose() | |
| if self.exchange: await self.exchange.close() | |
| # ================================================================== | |
| # 🚀 R2 Compatibility | |
| # ================================================================== | |
| async def load_contracts_from_r2(self): | |
| if not self.r2_service: return | |
| try: | |
| self.contracts_db = await self.r2_service.load_contracts_db_async() | |
| print(f"✅ [DataManager] Contracts loaded: {len(self.contracts_db)}") | |
| except Exception: | |
| self.contracts_db = {} | |
| def get_contracts_db(self) -> Dict[str, Any]: | |
| return self.contracts_db | |
| # ================================================================== | |
| # 🛡️ Layer 1 Screening (Data Filter) | |
| # ================================================================== | |
| async def layer1_rapid_screening(self) -> List[Dict[str, Any]]: | |
| """الغربلة الأولية السريعة بناءً على الحجم والسيولة.""" | |
| print(f"🔍 [Layer 1] Screening top liquid assets...") | |
| volume_data = await self._get_volume_data_live() | |
| if not volume_data: | |
| return [] | |
| # نأخذ أعلى 150 عملة | |
| candidates = volume_data[:150] | |
| return candidates | |
| async def _get_volume_data_live(self): | |
| """جلب وترتيب العملات حسب حجم التداول""" | |
| try: | |
| tickers = await self.exchange.fetch_tickers() | |
| data = [] | |
| for symbol, ticker in tickers.items(): | |
| if symbol.endswith('/USDT') and ticker.get('quoteVolume') and ticker['quoteVolume'] > 100000: | |
| data.append({ | |
| 'symbol': symbol, | |
| 'dollar_volume': ticker['quoteVolume'], | |
| 'current_price': ticker['last'] | |
| }) | |
| data.sort(key=lambda x: x['dollar_volume'], reverse=True) | |
| return data | |
| except Exception: | |
| return [] | |
| # ================================================================== | |
| # 🎯 Data Fetching Helpers | |
| # ================================================================== | |
| async def get_latest_price_async(self, symbol: str) -> float: | |
| try: | |
| ticker = await self.exchange.fetch_ticker(symbol) | |
| return float(ticker['last']) | |
| except Exception: | |
| return 0.0 | |
| async def get_latest_ohlcv(self, symbol: str, timeframe: str = '5m', limit: int = 100) -> List[List[float]]: | |
| try: | |
| candles = await self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit) | |
| if candles: return candles | |
| return [] | |
| except Exception: | |
| return [] | |
| # 🔥 [NEW] Order Book Fetcher | |
| async def get_order_book_snapshot(self, symbol: str, limit: int = 20) -> Dict[str, Any]: | |
| """ | |
| جلب لقطة سريعة لدفتر الطلبات (Bids/Asks). | |
| يستخدم لتحليل عمق السوق وتأكيد القناص. | |
| """ | |
| try: | |
| # ccxt يرجع الدفتر بصيغة {'bids': [[price, size], ...], 'asks': ...} | |
| ob = await self.exchange.fetch_order_book(symbol, limit) | |
| return ob | |
| except Exception as e: | |
| # طباعة خطأ خفيف لتجنب إغراق السجلات | |
| # print(f"⚠️ [DataManager] Failed to fetch OB for {symbol}: {e}") | |
| return {} |