File size: 14,428 Bytes
a96dda4
c994b92
f04d47e
a96dda4
f04d47e
 
 
a96dda4
c1bc90c
 
 
 
e0ee6fd
c1bc90c
691c3dd
 
369a5e8
8c2f875
c994b92
eb23359
c27f49c
 
 
 
 
eb23359
c1bc90c
 
 
 
fa9f6bd
9180094
fa9f6bd
 
eb23359
e0ee6fd
f04d47e
 
fa9f6bd
f60c8b6
 
f04d47e
f60c8b6
871fa87
369a5e8
f60c8b6
 
f04d47e
 
 
 
 
 
 
 
 
c1bc90c
 
f04d47e
 
0884f94
f04d47e
 
 
 
e0ee6fd
c1bc90c
f04d47e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c1bc90c
 
b938152
 
916aa63
8b1c1fd
 
 
 
c27f49c
8b1c1fd
e0ee6fd
 
c27f49c
e2c3c8c
f04d47e
e2c3c8c
 
f04d47e
e2c3c8c
a96dda4
 
e2c3c8c
 
a96dda4
e2c3c8c
a96dda4
a9f0141
e2c3c8c
a96dda4
e2c3c8c
 
 
 
f60c8b6
a96dda4
f60c8b6
a96dda4
e0ee6fd
f04d47e
 
 
 
 
 
e0ee6fd
a96dda4
e2c3c8c
 
 
 
 
f04d47e
e0ee6fd
4ddd041
f04d47e
4ddd041
e0ee6fd
871fa87
a96dda4
 
 
c27f49c
e0ee6fd
a96dda4
e0ee6fd
a96dda4
 
 
 
 
871fa87
a96dda4
8c2f875
a96dda4
8c2f875
 
 
 
a96dda4
 
 
e0ee6fd
81ee03c
871fa87
81ee03c
a96dda4
 
 
c81aa0a
81ee03c
f04d47e
81ee03c
c7a21ab
 
f04d47e
81ee03c
f04d47e
c7a21ab
 
 
81ee03c
f04d47e
81ee03c
 
 
a96dda4
81ee03c
f04d47e
81ee03c
f04d47e
871fa87
 
 
 
 
c7a21ab
81ee03c
eb23359
81ee03c
f04d47e
81ee03c
 
 
 
 
 
 
871fa87
81ee03c
 
 
 
f04d47e
81ee03c
c7a21ab
81ee03c
 
 
 
c7a21ab
81ee03c
f04d47e
81ee03c
 
 
 
f04d47e
 
81ee03c
 
 
f04d47e
 
a96dda4
f04d47e
a96dda4
f04d47e
81ee03c
 
756f999
c27f49c
 
 
 
 
 
 
8e52f1b
c27f49c
e0ee6fd
c27f49c
 
8e52f1b
 
c27f49c
4ddd041
 
c27f49c
756f999
c27f49c
 
e0ee6fd
c27f49c
 
81ee03c
 
 
 
 
c27f49c
 
81ee03c
 
 
 
 
c27f49c
 
 
 
 
 
a96dda4
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
# ==============================================================================
# 📂 ml_engine/data_manager.py 
# (V77.0 - GEM-Architect: Smart Exchange Switcher)
# ==============================================================================
# - Feature: Auto-Failover (Binance -> Bybit -> OKX -> MEXC).
# - Logic: Automatically detects the best available exchange for the current IP.
# - Stability: Zero-config switch if one exchange is blocked (451 Error).
# ==============================================================================

import asyncio
import httpx
import traceback
import ccxt.async_support as ccxt
import logging
import pandas as pd
import numpy as np
import time
from typing import List, Dict, Any, Optional

# محاولة استيراد حدود النظام
try:
    from ml_engine.processor import SystemLimits
except ImportError:
    SystemLimits = None

# تقليل ضوضاء السجلات
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("ccxt").setLevel(logging.WARNING)

class DataManager:
    def __init__(self, contracts_db, whale_monitor, r2_service=None):
        self.contracts_db = contracts_db or {}
        self.whale_monitor = whale_monitor
        self.r2_service = r2_service
        self.adaptive_hub_ref = None 
        
        self.exchange = None
        self.exchange_id = None
        self.http_client = None
        self.market_cache = {}
        
        # القائمة السوداء (العملات المستقرة والرافعة)
        self.BLACKLIST_TOKENS = [
            'USDT', 'USDC', 'DAI', 'TUSD', 'BUSD', 'FDUSD', 'EUR', 'PAX', 'USDP',
            'UP', 'DOWN', 'BEAR', 'BULL', '3S', '3L', '5S', '5L'
        ]
        
        # قائمة المنصات المرشحة بالترتيب (الأولوية للأعلى)
        self.EXCHANGE_CANDIDATES = [
            {'id': 'binance', 'name': 'Binance', 'options': {'adjustForTimeDifference': True}},
            {'id': 'bybit',   'name': 'Bybit',   'options': {}},
            {'id': 'okx',     'name': 'OKX',     'options': {}},
            {'id': 'mexc',    'name': 'MEXC',    'options': {}}
        ]
        
        print(f"📦 [DataManager V77.0] Initialized (Smart Switcher Ready).")

    async def initialize(self):
        """تهيئة الاتصال واختيار أفضل منصة متاحة"""
        print("   > [DataManager] Starting initialization & Exchange Discovery...")
        self.http_client = httpx.AsyncClient(timeout=30.0)
        
        # 🔄 تشغيل بروتوكول البحث عن منصة (Exchange Discovery Protocol)
        await self._connect_to_best_exchange()
        
        await self.load_contracts_from_r2()

    async def _connect_to_best_exchange(self):
        """تجربة المنصات بالترتيب واعتماد أول منصة تعمل"""
        for cand in self.EXCHANGE_CANDIDATES:
            ex_id = cand['id']
            ex_name = cand['name']
            print(f"   🔄 Testing connection to {ex_name}...")
            
            try:
                exchange_class = getattr(ccxt, ex_id)
                exchange_instance = exchange_class({
                    'enableRateLimit': True,
                    'timeout': 30000,
                    'options': {
                        'defaultType': 'spot',
                        **cand['options']
                    }
                })
                
                # اختبار الاتصال الفعلي
                await exchange_instance.load_markets()
                
                # اختبار إضافي: التحقق من وجود BTC/USDT لضمان صحة البيانات
                if 'BTC/USDT' in exchange_instance.markets:
                    self.exchange = exchange_instance
                    self.exchange_id = ex_id
                    self.market_cache = self.exchange.markets
                    print(f"   ✅ [SUCCESS] Connected to {ex_name} (Markets: {len(self.market_cache)}).")
                    return # تم العثور على منصة، الخروج من الحلقة
                else:
                    print(f"   ⚠️ [SKIP] {ex_name} connected but BTC/USDT missing (Unusual).")
                    await exchange_instance.close()
                    
            except Exception as e:
                error_msg = str(e)
                if "451" in error_msg:
                    print(f"   ⛔ [BLOCKED] {ex_name} is restricted in this region (Error 451).")
                else:
                    print(f"   ❌ [FAIL] Could not connect to {ex_name}: {error_msg.splitlines()[0]}")
                
                # تنظيف الموارد قبل الانتقال للتالي
                if 'exchange_instance' in locals():
                    await exchange_instance.close()
        
        # إذا فشلت جميع المنصات
        if not self.exchange:
            raise RuntimeError("❌ ALL Exchanges Failed! Please check your internet connection or VPN.")

    async def close(self):
        if self.http_client: await self.http_client.aclose()
        if self.exchange: await self.exchange.close()

    async def load_contracts_from_r2(self):
        if not self.r2_service: return
        try:
            self.contracts_db = await self.r2_service.load_contracts_db_async()
        except Exception: self.contracts_db = {}

    def get_contracts_db(self) -> Dict[str, Any]:
        return self.contracts_db

    # ==================================================================
    # 🌍 Global Market Validator
    # ==================================================================
    async def check_global_market_health(self) -> Dict[str, Any]:
        if not self.exchange: return {'is_safe': False, 'reason': 'No Exchange'}
        try:
            btc_ohlcv = await self.exchange.fetch_ohlcv('BTC/USDT', '1d', limit=7)
            if not btc_ohlcv: return {'is_safe': True, 'reason': 'No BTC Data'}

            df = pd.DataFrame(btc_ohlcv, columns=['ts', 'o', 'h', 'l', 'c', 'v'])
            daily_change = (df['c'].iloc[-1] - df['c'].iloc[-2]) / df['c'].iloc[-2]
            
            if daily_change < -0.10: 
                return {'is_safe': False, 'reason': f'🚨 BTC CRASHING ({daily_change*100:.2f}%)'}

            return {'is_safe': True, 'reason': '✅ Market Open'}

        except Exception as e:
            return {'is_safe': True, 'reason': 'Error Bypass'}

    # ==================================================================
    # 🧠 Layer 1: The Open Gate (Volume Only)
    # ==================================================================
    async def layer1_rapid_screening(self, limit=200, adaptive_hub_ref=None) -> List[Dict[str, Any]]:
        self.adaptive_hub_ref = adaptive_hub_ref
        
        if not self.exchange:
            print("❌ [Layer 1] No active exchange connection.")
            return []

        print(f"🔍 [Layer 1] Screening {self.exchange_id.upper()} Market (Volume Only > $1M)...")
        
        # 0. فحص السوق
        market_health = await self.check_global_market_health()
        if not market_health['is_safe']:
            print(f"⛔ [Market Validator] Trading Halted: {market_health['reason']}")
            return []

        # 1. فلتر السيولة الصارم
        initial_candidates = await self._stage0_universe_filter()
        if not initial_candidates:
            print(f"⚠️ [Layer 1] No coins met the criteria on {self.exchange_id}.")
            return []

        # نأخذ أعلى العملات سيولة
        top_candidates = initial_candidates[:limit] 
        print(f"   📥 Fetching data for top {len(top_candidates)} liquid assets...")
        
        enriched_data = await self._fetch_technical_data_batch(top_candidates)
        
        final_list = []
        for item in enriched_data:
            regime_info = self._diagnose_asset_regime(item)
            item['asset_regime'] = regime_info['regime']
            item['asset_regime_conf'] = regime_info['conf']
            item['strategy_type'] = 'NEURAL_SCAN' 
            item['strategy_tag'] = 'NEURAL'
            item['l1_sort_score'] = 0.5 

            if self.adaptive_hub_ref:
                dynamic_config = self.adaptive_hub_ref.get_coin_type_config('SAFE_BOTTOM')
                item['dynamic_limits'] = dynamic_config

            final_list.append(item)

        final_list.sort(key=lambda x: x.get('quote_volume', 0), reverse=True)
        print(f"✅ [Layer 1] Passed {len(final_list)} candidates directly to Neural Layer.")
        return final_list

    # ==================================================================
    # 🧱 Order Book Depth Scanner
    # ==================================================================
    async def get_order_book_snapshot(self, symbol: str, limit=20):
        try: return await self.exchange.fetch_order_book(symbol, limit)
        except: return {}

    # ==================================================================
    # 🔍 Stage 0: Universe Filter (Multi-Exchange Compatible)
    # ==================================================================
    async def _stage0_universe_filter(self) -> List[Dict[str, Any]]:
        try:
            MIN_VOLUME_THRESHOLD = 1000000.0 # $1M
            
            print(f"   🛡️ [Stage 0] Fetching Tickers from {self.exchange_id.upper()}...")
            tickers = await self.exchange.fetch_tickers()
            candidates = []
            
            SOVEREIGN_COINS = ['BTC/USDT', 'ETH/USDT', 'SOL/USDT', 'BNB/USDT', 'XRP/USDT']
            
            for symbol, ticker in tickers.items():
                if not symbol.endswith('/USDT'): continue
                
                # إقصاء العملات المحظورة
                base_curr = symbol.split('/')[0]
                if any(bad in base_curr for bad in self.BLACKLIST_TOKENS): continue
                
                # توحيد حساب الفوليوم (لأن المنصات تختلف في التسميات)
                calc_quote_vol = float(ticker.get('quoteVolume') or 0.0)
                if calc_quote_vol == 0.0:
                    base_vol = float(ticker.get('baseVolume') or 0.0)
                    last_price = float(ticker.get('last') or 0.0)
                    calc_quote_vol = base_vol * last_price
                
                is_sovereign = symbol in SOVEREIGN_COINS
                
                if not is_sovereign:
                    if calc_quote_vol < MIN_VOLUME_THRESHOLD: continue
                
                change_pct = ticker.get('percentage')
                if change_pct is None: change_pct = 0.0
                
                candidates.append({
                    'symbol': symbol,
                    'quote_volume': calc_quote_vol,
                    'current_price': float(ticker.get('last') or 0.0),
                    'change_24h': change_pct
                })
            
            candidates.sort(key=lambda x: x['quote_volume'], reverse=True)
            print(f"   ℹ️ [Stage 0] Found {len(candidates)} valid candidates.")
            return candidates
            
        except Exception as e:
            print(f"❌ [L1 Error] Universe filter failed: {e}")
            traceback.print_exc()
            return []

    # ------------------------------------------------------------------
    # Helpers & Data Fetching
    # ------------------------------------------------------------------
    def _diagnose_asset_regime(self, item: Dict[str, Any]) -> Dict[str, Any]:
        try:
            if 'df_1h' not in item: 
                if 'ohlcv_1h_raw' in item: item['df_1h'] = self._calc_indicators(item['ohlcv_1h_raw'])
                else: return {'regime': 'RANGE', 'conf': 0.0}
            df = item['df_1h']
            if df.empty: return {'regime': 'RANGE', 'conf': 0.0}
            curr = df.iloc[-1]
            price = curr['close']; ema20 = curr['ema20']; ema50 = curr['ema50']; rsi = curr['rsi']
            regime = "RANGE"; conf = 0.5
            if price > ema20 and ema20 > ema50:
                regime = "BULL"; conf = 0.8 if rsi > 55 else 0.6
            elif price < ema20 and ema20 < ema50:
                regime = "BEAR"; conf = 0.8 if rsi < 45 else 0.6
            return {'regime': regime, 'conf': conf}
        except Exception: return {'regime': 'RANGE', 'conf': 0.0}

    async def _fetch_technical_data_batch(self, candidates):
        chunk_size = 10; results = []
        for i in range(0, len(candidates), chunk_size):
            chunk = candidates[i:i+chunk_size]
            tasks = [self._fetch_single(c) for c in chunk]
            res = await asyncio.gather(*tasks)
            results.extend([r for r in res if r])
            await asyncio.sleep(0.05) 
        return results

    async def _fetch_single(self, c):
        try:
            h1 = await self.exchange.fetch_ohlcv(c['symbol'], '1h', limit=100) 
            m15 = await self.exchange.fetch_ohlcv(c['symbol'], '15m', limit=50)
            if not h1 or not m15: return None
            c['ohlcv'] = {'1h': h1, '15m': m15} 
            c['ohlcv_1h_raw'] = h1 
            c['ohlcv_15m_raw'] = m15
            c['df_1h'] = self._calc_indicators(h1)
            return c
        except: return None

    def _calc_indicators(self, ohlcv):
        df = pd.DataFrame(ohlcv, columns=['ts', 'o', 'h', 'l', 'c', 'v'])
        delta = df['c'].diff()
        gain = (delta.where(delta>0, 0)).rolling(14).mean()
        loss = (-delta.where(delta<0, 0)).rolling(14).mean()
        rs = gain/loss
        df['rsi'] = 100 - (100/(1+rs))
        df['ema20'] = df['c'].ewm(span=20).mean()
        df['ema50'] = df['c'].ewm(span=50).mean()
        tr = np.maximum(df['h']-df['l'], np.maximum(abs(df['h']-df['c'].shift()), abs(df['l']-df['c'].shift())))
        df['atr'] = tr.rolling(14).mean()
        df.rename(columns={'o':'open', 'h':'high', 'l':'low', 'c':'close', 'v':'volume'}, inplace=True)
        return df.fillna(0)
        
    async def get_latest_price_async(self, symbol):
        try: return float((await self.exchange.fetch_ticker(symbol))['last'])
        except: return 0.0
    
    async def get_latest_ohlcv(self, symbol, timeframe='5m', limit=100):
        try: return await self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
        except: return []