Spaces:
Paused
Paused
| # ml_engine/data_manager.py | |
| # (V12.5 - Lazy Loading Fix + V15.6 App-Compat Fix + Detailed Logging) | |
| import os | |
| import asyncio | |
| import httpx | |
| import traceback | |
| import time | |
| from datetime import datetime | |
| import ccxt.async_support as ccxt | |
| import numpy as np | |
| import logging | |
| from typing import List, Dict, Any | |
| import pandas as pd | |
| try: | |
| import pandas_ta as ta | |
| except ImportError: | |
| print("❌ [DataManager] مكتبة pandas_ta غير موجودة.") | |
| ta = None | |
| from ml_engine.indicators import AdvancedTechnicalAnalyzer | |
| from ml_engine.monte_carlo import MonteCarloAnalyzer | |
| from ml_engine.ranker import Layer1Ranker | |
| try: | |
| from ml_engine.patterns import ChartPatternAnalyzer | |
| except ImportError: | |
| print("⚠️ [DataManager] لم يتم العثور على ml_engine/patterns.py") | |
| ChartPatternAnalyzer = None | |
| logging.getLogger("httpx").setLevel(logging.WARNING) | |
| logging.getLogger("httpcore").setLevel(logging.WARNING) | |
| logging.getLogger("ccxt").setLevel(logging.WARNING) | |
| class DataManager: | |
| def __init__(self, contracts_db, whale_monitor, r2_service=None): | |
| # ================================================================== | |
| # ⚙️ إعدادات التحكم المركزية (V12.3 Hybrid Thresholds) | |
| # ================================================================== | |
| self.HYBRID_ENTRY_THRESHOLD = 0.60 | |
| # ================================================================== | |
| self.contracts_db = contracts_db or {} | |
| self.whale_monitor = whale_monitor | |
| self.r2_service = r2_service | |
| self.exchange = ccxt.kucoin({ | |
| 'enableRateLimit': True, | |
| 'timeout': 30000, | |
| }) | |
| self.http_client = None | |
| self.market_cache = {} | |
| self.technical_analyzer = AdvancedTechnicalAnalyzer() | |
| self.mc_analyzer = MonteCarloAnalyzer() | |
| self.layer1_ranker = None | |
| self.pattern_analyzer = None | |
| async def initialize(self): | |
| """تهيئة مدير البيانات والاتصالات""" | |
| print(" > [DM Log] 0. بدء تهيئة DataManager...") | |
| self.http_client = httpx.AsyncClient(timeout=30.0) | |
| await self._load_markets() | |
| print(" > [DataManager] إنشاء النماذج المساندة (Lazy Load)...") | |
| try: | |
| self.layer1_ranker = Layer1Ranker(model_path="ml_models/layer1_ranker.lgbm") | |
| if ChartPatternAnalyzer: | |
| self.pattern_analyzer = ChartPatternAnalyzer(r2_service=self.r2_service) | |
| except Exception as e: | |
| print(f"⚠️ [DataManager] تحذير أثناء إنشاء النماذج المساندة: {e}") | |
| print(f"✅ DataManager V12.5 initialized (Hybrid Threshold: {self.HYBRID_ENTRY_THRESHOLD})") | |
| print(" > [DM Log] 4. اكتملت تهيئة DataManager.") | |
| async def _load_markets(self): | |
| """تحميل بيانات الأسواق وتخزينها مؤقتاً""" | |
| print(" > [DM Log] 1. بدء _load_markets...") | |
| try: | |
| if self.exchange: | |
| print(" > [DM Log] 2. استدعاء exchange.load_markets()... (قد يستغرق وقتاً)") | |
| await self.exchange.load_markets() | |
| self.market_cache = self.exchange.markets | |
| if self.market_cache and len(self.market_cache) > 0: | |
| print(f" > [DM Log] 3. ✅ نجاح! تم تحميل {len(self.market_cache)} سوق.") | |
| else: | |
| print(" > [DM Log] 3. ⚠️ تحذير: load_markets() نجح ولكن لم يتم إرجاع أسواق.") | |
| else: | |
| print(" > [DM Log] 2. ❌ خطأ: self.exchange هو None.") | |
| except Exception as e: | |
| print(f" > [DM Log] 3. ❌❌❌ فشل فادح في _load_markets: {e}") | |
| traceback.print_exc() | |
| async def close(self): | |
| """إغلاق جميع الاتصالات بأمان""" | |
| print(" > [DM Log] 7. إغلاق اتصالات DataManager...") | |
| if self.http_client: await self.http_client.aclose() | |
| if self.exchange: await self.exchange.close() | |
| if self.pattern_analyzer and hasattr(self.pattern_analyzer, 'clear_memory'): | |
| self.pattern_analyzer.clear_memory() | |
| if self.layer1_ranker and hasattr(self.layer1_ranker, 'clear_memory'): | |
| self.layer1_ranker.clear_memory() | |
| # ================================================================== | |
| # 🚀 [إضافة جديدة V15.6] دوال التوافق مع App | |
| # ================================================================== | |
| async def load_contracts_from_r2(self): | |
| """ | |
| [جديد] يقوم بتحميل قاعدة بيانات العقود من R2 عند بدء التشغيل. | |
| """ | |
| print(" > [DataManager] Loading contracts database from R2...") | |
| if not self.r2_service: | |
| print("❌ [DataManager] R2Service not available. Cannot load contracts.") | |
| self.contracts_db = {} | |
| return | |
| try: | |
| self.contracts_db = await self.r2_service.load_contracts_db_async() | |
| print(f"✅ [DataManager] Contracts loaded. Total entries: {len(self.contracts_db)}") | |
| except Exception as e: | |
| print(f"❌ [DataManager] Failed to load contracts from R2: {e}") | |
| self.contracts_db = {} | |
| def get_contracts_db(self) -> Dict[str, Any]: | |
| """ | |
| [جديد] إرجاع قاعدة بيانات العقود التي تم تحميلها. | |
| """ | |
| return self.contracts_db | |
| # ================================================================== | |
| # 🛡️ دوال الطبقة الأولى (Layer 1 Screening) | |
| # ================================================================== | |
| async def layer1_rapid_screening(self) -> List[Dict[str, Any]]: | |
| """ | |
| الغربلة الأولية السريعة جداً بناءً على الحجم فقط. | |
| """ | |
| print(f"🔍 [Layer 1] بدء الغربلة السريعة (Top Liquid Assets)...") | |
| volume_data = await self._get_volume_data_live() | |
| if not volume_data: | |
| print("⚠️ [Layer 1 Warning] لم يتم العثور على بيانات حجم تداول.") | |
| return [] | |
| candidates = volume_data[:150] | |
| print(f"✅ [Layer 1] تم تمرير {len(candidates)} عملة للتحليل الهجين.") | |
| return candidates | |
| async def _get_volume_data_live(self): | |
| """جلب بيانات الحجم الحية لجميع الأزواج""" | |
| try: | |
| tickers = await self.exchange.fetch_tickers() | |
| data = [] | |
| for symbol, ticker in tickers.items(): | |
| if symbol.endswith('/USDT') and ticker.get('quoteVolume') and ticker['quoteVolume'] > 100000: | |
| data.append({ | |
| 'symbol': symbol, | |
| 'dollar_volume': ticker['quoteVolume'], | |
| 'current_price': ticker['last'] | |
| }) | |
| data.sort(key=lambda x: x['dollar_volume'], reverse=True) | |
| return data | |
| except Exception as e: | |
| print(f"❌ [DataManager] خطأ في جلب بيانات الحجم: {e}") | |
| return [] | |
| # ================================================================== | |
| # 📊 دوال جلب البيانات (Data Fetching Pipeline) | |
| # ================================================================== | |
| async def stream_ohlcv_data(self, symbols: List[Dict], queue: asyncio.Queue): | |
| """ | |
| مولد بيانات متدفق (Streaming Generator) يجلب شموع OHLCV | |
| """ | |
| timeframes = ['5m', '15m', '1h', '4h', '1d'] | |
| limit = 500 | |
| for sym_data in symbols: | |
| symbol = sym_data['symbol'] | |
| tasks = [self._fetch_ohlcv_live(symbol, tf, limit) for tf in timeframes] | |
| results = await asyncio.gather(*tasks, return_exceptions=False) | |
| ohlcv_packet = {} | |
| valid_packet = True | |
| for i, res in enumerate(results): | |
| tf = timeframes[i] | |
| if res and isinstance(res, list) and len(res) >= 200: | |
| ohlcv_packet[tf] = res | |
| else: | |
| if tf in ['5m', '1h']: valid_packet = False | |
| if valid_packet and len(ohlcv_packet) >= 4: | |
| sym_data['ohlcv'] = ohlcv_packet | |
| await queue.put([sym_data]) | |
| await asyncio.sleep(0.05) | |
| await queue.put(None) | |
| async def _fetch_ohlcv_live(self, symbol, timeframe, limit): | |
| """دالة مساعدة لجلب الشموع مع معالجة الأخطاء البسيطة""" | |
| try: | |
| return await self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit) | |
| except Exception: | |
| return None | |
| # ================================================================== | |
| # 🎯 دوال مساعدة للحارس والدماغ (Sentry & Brain Helpers) | |
| # ================================================================== | |
| async def get_latest_price_async(self, symbol: str) -> float: | |
| """جلب آخر سعر حقيقي (للتنفيذ والمراقبة)""" | |
| try: | |
| ticker = await self.exchange.fetch_ticker(symbol) | |
| return float(ticker['last']) | |
| except Exception as e: | |
| print(f"⚠️ [DataManager] Failed to fetch price for {symbol}: {e}") | |
| return 0.0 | |
| async def get_latest_ohlcv(self, symbol: str, timeframe: str = '5m', limit: int = 100) -> List[List[float]]: | |
| """ | |
| جلب عدد محدود من الشموع الأخيرة بسرعة. | |
| """ | |
| # (إضافة طابع خفيف لتجنب إغراق السجلات) | |
| # print(f" > [DM Log] 5. [get_latest_ohlcv] طلب {symbol} {timeframe}...") | |
| try: | |
| candles = await self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit) | |
| if candles and len(candles) > 0: | |
| # (لا نطبع هذا لأنه سينجح) | |
| return candles | |
| else: | |
| # (هذا هو الطابع المهم الذي يكشف الفشل الصامت) | |
| print(f" > [DM Log] 6. ⚠️ [get_latest_ohlcv] فشل صامت لـ {symbol} {timeframe}. (أرجع قائمة فارغة).") | |
| return [] | |
| except Exception as e: | |
| # (هذا هو الطابع المهم الذي يكشف الفشل الفادح) | |
| print(f" > [DM Log] 6. ❌ [get_latest_ohlcv] فشل فادح لـ {symbol} {timeframe}: {e}") | |
| return [] |