Riy777 commited on
Commit
c1bc90c
·
verified ·
1 Parent(s): 75763fd

Create data_manager.py

Browse files
Files changed (1) hide show
  1. ml_engine/data_manager.py +136 -0
ml_engine/data_manager.py ADDED
@@ -0,0 +1,136 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # ml_engine/data_manager.py
2
+ # (V13.0 - Cleaned Data Provider - Decoupled from ML Logic)
3
+
4
+ import os
5
+ import asyncio
6
+ import httpx
7
+ import traceback
8
+ import ccxt.async_support as ccxt
9
+ import logging
10
+ from typing import List, Dict, Any
11
+
12
+ # إزالة استيراد النماذج الثقيلة من هنا (Patterns/Titan)
13
+ # DataManager يجب أن يركز فقط على جلب البيانات
14
+ from ml_engine.ranker import Layer1Ranker
15
+
16
+ logging.getLogger("httpx").setLevel(logging.WARNING)
17
+ logging.getLogger("httpcore").setLevel(logging.WARNING)
18
+ logging.getLogger("ccxt").setLevel(logging.WARNING)
19
+
20
+ class DataManager:
21
+ def __init__(self, contracts_db, whale_monitor, r2_service=None):
22
+ # ==================================================================
23
+ # ⚙️ إعدادات التحكم
24
+ # ==================================================================
25
+ self.contracts_db = contracts_db or {}
26
+ self.whale_monitor = whale_monitor
27
+ self.r2_service = r2_service
28
+
29
+ self.exchange = ccxt.kucoin({
30
+ 'enableRateLimit': True,
31
+ 'timeout': 30000,
32
+ })
33
+
34
+ self.http_client = None
35
+ self.market_cache = {}
36
+
37
+ # Ranker خفيف جداً لترتيب العملات بناء على الحجم فقط (يمكن بقاؤه هنا كأداة مساعدة للبيانات)
38
+ self.layer1_ranker = None
39
+
40
+ async def initialize(self):
41
+ """تهيئة مدير البيانات والاتصالات"""
42
+ print(" > [DataManager] Starting initialization...")
43
+ self.http_client = httpx.AsyncClient(timeout=30.0)
44
+ await self._load_markets()
45
+
46
+ # تهيئة Ranker البسيط فقط
47
+ try:
48
+ self.layer1_ranker = Layer1Ranker(model_path="ml_models/layer1_ranker.lgbm")
49
+ except Exception as e:
50
+ print(f"⚠️ [DataManager] Ranker init warning: {e}")
51
+
52
+ print(f"✅ [DataManager V13] Ready.")
53
+
54
+ async def _load_markets(self):
55
+ """تحميل بيانات الأسواق وتخزينها مؤقتاً"""
56
+ try:
57
+ if self.exchange:
58
+ await self.exchange.load_markets()
59
+ self.market_cache = self.exchange.markets
60
+ except Exception as e:
61
+ print(f"❌ [DataManager] Market load failed: {e}")
62
+ traceback.print_exc()
63
+
64
+ async def close(self):
65
+ """إغلاق جميع الاتصالات بأمان"""
66
+ print(" > [DataManager] Closing connections...")
67
+ if self.http_client: await self.http_client.aclose()
68
+ if self.exchange: await self.exchange.close()
69
+ if self.layer1_ranker and hasattr(self.layer1_ranker, 'clear_memory'):
70
+ self.layer1_ranker.clear_memory()
71
+
72
+ # ==================================================================
73
+ # 🚀 R2 Compatibility
74
+ # ==================================================================
75
+ async def load_contracts_from_r2(self):
76
+ if not self.r2_service: return
77
+ try:
78
+ self.contracts_db = await self.r2_service.load_contracts_db_async()
79
+ print(f"✅ [DataManager] Contracts loaded: {len(self.contracts_db)}")
80
+ except Exception:
81
+ self.contracts_db = {}
82
+
83
+ def get_contracts_db(self) -> Dict[str, Any]:
84
+ return self.contracts_db
85
+
86
+ # ==================================================================
87
+ # 🛡️ Layer 1 Screening (Data Filter)
88
+ # ==================================================================
89
+ async def layer1_rapid_screening(self) -> List[Dict[str, Any]]:
90
+ """
91
+ الغربلة الأولية السريعة بناءً على الحجم والسيولة.
92
+ """
93
+ print(f"🔍 [Layer 1] Screening top liquid assets...")
94
+ volume_data = await self._get_volume_data_live()
95
+
96
+ if not volume_data:
97
+ return []
98
+
99
+ # نأخذ أعلى 150 عملة من حيث السيولة لتمريرها للمعالج
100
+ candidates = volume_data[:150]
101
+ return candidates
102
+
103
+ async def _get_volume_data_live(self):
104
+ try:
105
+ tickers = await self.exchange.fetch_tickers()
106
+ data = []
107
+ for symbol, ticker in tickers.items():
108
+ # تصفية أزواج USDT ذات الحجم المحترم
109
+ if symbol.endswith('/USDT') and ticker.get('quoteVolume') and ticker['quoteVolume'] > 100000:
110
+ data.append({
111
+ 'symbol': symbol,
112
+ 'dollar_volume': ticker['quoteVolume'],
113
+ 'current_price': ticker['last']
114
+ })
115
+ data.sort(key=lambda x: x['dollar_volume'], reverse=True)
116
+ return data
117
+ except Exception:
118
+ return []
119
+
120
+ # ==================================================================
121
+ # 🎯 Helper Functions
122
+ # ==================================================================
123
+ async def get_latest_price_async(self, symbol: str) -> float:
124
+ try:
125
+ ticker = await self.exchange.fetch_ticker(symbol)
126
+ return float(ticker['last'])
127
+ except Exception:
128
+ return 0.0
129
+
130
+ async def get_latest_ohlcv(self, symbol: str, timeframe: str = '5m', limit: int = 100) -> List[List[float]]:
131
+ try:
132
+ candles = await self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
133
+ if candles: return candles
134
+ return []
135
+ except Exception:
136
+ return []