File size: 13,246 Bytes
c994b92
 
eb23359
c994b92
c1bc90c
 
 
 
e0ee6fd
c1bc90c
691c3dd
 
e0ee6fd
c994b92
eb23359
c27f49c
 
 
 
 
eb23359
c1bc90c
 
 
 
fa9f6bd
9180094
fa9f6bd
 
eb23359
e0ee6fd
9180094
ac0cc30
e0ee6fd
9180094
 
f60c8b6
fa9f6bd
f60c8b6
 
eb23359
f60c8b6
 
e0ee6fd
f60c8b6
 
eb23359
c1bc90c
 
e0ee6fd
0884f94
 
e0ee6fd
c1bc90c
 
f60c8b6
e0ee6fd
b938152
f60c8b6
c27f49c
c1bc90c
 
b938152
 
916aa63
8b1c1fd
 
 
 
c27f49c
8b1c1fd
e0ee6fd
 
c27f49c
f60c8b6
eb23359
f60c8b6
e0ee6fd
 
eb23359
e0ee6fd
eb23359
e0ee6fd
4ddd041
eb23359
4ddd041
e0ee6fd
eb23359
d6c5756
c27f49c
e0ee6fd
c27f49c
e0ee6fd
 
eb23359
c27f49c
e0ee6fd
c27f49c
eb23359
c27f49c
eb23359
 
 
 
 
 
 
 
 
 
 
c27f49c
 
eb23359
c27f49c
eb23359
c27f49c
 
e0ee6fd
c27f49c
 
e0ee6fd
eb23359
c27f49c
e0ee6fd
c27f49c
eb23359
c27f49c
e0ee6fd
c7a21ab
eb23359
c7a21ab
 
 
 
 
 
 
eb23359
 
 
b61c69b
c7a21ab
 
 
 
 
 
 
 
 
 
eb23359
b61c69b
 
 
c7a21ab
eb23359
 
 
c7a21ab
eb23359
c7a21ab
 
eb23359
 
b3b9df0
eb23359
 
c7a21ab
eb23359
c7a21ab
 
 
 
 
 
 
 
 
eb23359
b61c69b
c7a21ab
 
 
b61c69b
c7a21ab
 
 
 
 
 
 
 
 
 
e0ee6fd
eb23359
e0ee6fd
c27f49c
642bf96
c27f49c
 
 
 
 
 
 
 
 
0884f94
c27f49c
 
0884f94
4ddd041
b61c69b
c27f49c
 
 
 
 
 
 
 
4ddd041
0884f94
e0ee6fd
eb23359
e0ee6fd
c27f49c
e0ee6fd
 
 
c27f49c
 
e0ee6fd
 
 
 
 
 
 
 
 
eb23359
4ddd041
 
c27f49c
4ddd041
e0ee6fd
eb23359
c27f49c
c7a21ab
c27f49c
4ddd041
c27f49c
 
e0ee6fd
eb23359
c27f49c
 
 
e0ee6fd
c27f49c
 
 
 
 
 
 
 
 
 
 
 
 
 
e0ee6fd
c27f49c
 
 
 
 
4ddd041
 
c27f49c
 
 
e0ee6fd
c27f49c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e0ee6fd
c27f49c
 
eb23359
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
# ============================================================
# 📂 ml_engine/data_manager.py 
# (V60.4 - GEM-Architect: Regime Gating + Robust Data)
# ============================================================

import asyncio
import httpx
import traceback
import ccxt.async_support as ccxt
import logging
import pandas as pd
import numpy as np
from typing import List, Dict, Any

# محاولة استيراد حدود النظام
try:
    from ml_engine.processor import SystemLimits
except ImportError:
    SystemLimits = None

# تقليل ضوضاء السجلات
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("ccxt").setLevel(logging.WARNING)

class DataManager:
    def __init__(self, contracts_db, whale_monitor, r2_service=None):
        self.contracts_db = contracts_db or {}
        self.whale_monitor = whale_monitor
        self.r2_service = r2_service
        self.adaptive_hub_ref = None 
        
        self.exchange = ccxt.kucoin({
            'enableRateLimit': True,
            'timeout': 60000, 
            'options': {'defaultType': 'spot'} 
        })
            
        self.http_client = None
        self.market_cache = {}
        
        # القائمة السوداء
        self.BLACKLIST_TOKENS = [
            'USDT', 'USDC', 'DAI', 'TUSD', 'BUSD', 'FDUSD', 'EUR', 'PAX',
            'UP', 'DOWN', 'BEAR', 'BULL', '3S', '3L'
        ]
        
        print(f"📦 [DataManager V60.4] Regime Gating (Range Protection) Active.")

    async def initialize(self):
        print("   > [DataManager] Starting initialization...")
        self.http_client = httpx.AsyncClient(timeout=30.0)
        await self._load_markets()
        await self.load_contracts_from_r2()

    async def _load_markets(self):
        try:
            if self.exchange:
                await self.exchange.load_markets()
                self.market_cache = self.exchange.markets
        except Exception: pass

    async def close(self):
        if self.http_client: await self.http_client.aclose()
        if self.exchange: await self.exchange.close()

    async def load_contracts_from_r2(self):
        if not self.r2_service: return
        try:
            self.contracts_db = await self.r2_service.load_contracts_db_async()
        except Exception: self.contracts_db = {}

    def get_contracts_db(self) -> Dict[str, Any]:
        return self.contracts_db

    # ==================================================================
    # 🧠 Layer 1: Screening + Diagnosis + Regime Gating
    # ==================================================================
    async def layer1_rapid_screening(self, adaptive_hub_ref=None) -> List[Dict[str, Any]]:
        self.adaptive_hub_ref = adaptive_hub_ref
        print(f"🔍 [Layer 1] Screening with Regime Gating...")
        
        # 1. فلتر السيولة
        initial_candidates = await self._stage0_universe_filter()
        if not initial_candidates:
            print("⚠️ [Layer 1] Stage 0 returned 0 candidates.")
            return []

        # 2. جلب البيانات الفنية
        top_candidates = initial_candidates[:300] 
        enriched_data = await self._fetch_technical_data_batch(top_candidates)
        
        final_list = []

        for item in enriched_data:
            # 3. التصنيف الفني (Breakout vs Reversal)
            classification = self._apply_strict_logic_tree(item)
            
            if classification['type'] != 'NONE':
                # 4. التشخيص (Diagnosis)
                regime_info = self._diagnose_asset_regime(item)
                current_regime = regime_info['regime']
                
                # 🔥 5. Regime Gating (بوابة النظام - الحماية من المصيدة)
                # إذا السوق عرضي (RANGE) أو ميت (DEAD)، نمنع الاختراقات (BREAKOUT)
                # لأن الاختراقات تفشل في هذه الظروف وتصبح مصيدة ثيران.
                if current_regime in ['RANGE', 'DEAD'] and classification['type'] == 'BREAKOUT':
                    # تخطي بصمت (حماية)
                    continue

                # إذا مر من البوابة، نتابع
                item['asset_regime'] = current_regime
                item['asset_regime_conf'] = regime_info['conf']
                
                # حقن العتبات
                if self.adaptive_hub_ref:
                    dynamic_config = self.adaptive_hub_ref.get_regime_config(current_regime)
                    item['dynamic_limits'] = dynamic_config
                
                item['l1_sort_score'] = classification['score'] 
                item['strategy_tag'] = classification['type']
                final_list.append(item)

        # 6. الترتيب النهائي
        final_list.sort(key=lambda x: x['l1_sort_score'], reverse=True)
        
        selection = final_list[:50]
        print(f"✅ [Layer 1] Passed {len(selection)} candidates (Safe Strategies Only).")
        return selection

    # ==================================================================
    # 🔍 Stage 0: Universe Filter (Robust USD Calc)
    # ==================================================================
    async def _stage0_universe_filter(self) -> List[Dict[str, Any]]:
        try:
            print("   🛡️ [Stage 0] Fetching Tickers...")
            tickers = await self.exchange.fetch_tickers()
            candidates = []
            
            # القائمة السيادية (تمر دائماً)
            SOVEREIGN_COINS = ['BTC/USDT', 'ETH/USDT', 'SOL/USDT', 'BNB/USDT', 'XRP/USDT']
            
            reject_stats = {"volume": 0, "change": 0, "blacklist": 0}
            debug_printed = False

            for symbol, ticker in tickers.items():
                if not symbol.endswith('/USDT'): continue
                
                base_curr = symbol.split('/')[0]
                if any(bad in base_curr for bad in self.BLACKLIST_TOKENS): 
                    reject_stats["blacklist"] += 1
                    continue
                
                # حساب الحجم يدوياً لضمان الدقة
                base_vol = float(ticker.get('baseVolume') or 0.0)
                last_price = float(ticker.get('last') or 0.0)
                calc_quote_vol = base_vol * last_price
                
                is_sovereign = symbol in SOVEREIGN_COINS
                
                # طباعة فحص BTC مرة واحدة
                if "BTC/USDT" in symbol and not debug_printed:
                    print(f"   🐛 [DEBUG] BTC Vol: ${calc_quote_vol:,.0f}")
                    debug_printed = True

                # فلتر السيولة (500k) - نتجاوزه للعملات السيادية
                if not is_sovereign:
                    if calc_quote_vol < 500000: 
                        reject_stats["volume"] += 1
                        continue
                
                # فلتر التغير (20%)
                change_pct = ticker.get('percentage')
                if change_pct is None: change_pct = 0.0
                
                if abs(change_pct) > 20.0: 
                    reject_stats["change"] += 1
                    continue
                
                candidates.append({
                    'symbol': symbol,
                    'quote_volume': calc_quote_vol,
                    'current_price': last_price,
                    'change_24h': change_pct
                })
            
            print(f"   📊 [Filter Stats] Total: {len(tickers)} | Passed: {len(candidates)}")
            print(f"      ❌ Rejected: Vol < 500k ({reject_stats['volume']}) | Change > 20% ({reject_stats['change']})")

            candidates.sort(key=lambda x: x['quote_volume'], reverse=True)
            return candidates
            
        except Exception as e:
            print(f"❌ [L1 Error] Universe filter failed: {e}")
            traceback.print_exc()
            return []

    # ------------------------------------------------------------------
    # 🧭 The Diagnoser
    # ------------------------------------------------------------------
    def _diagnose_asset_regime(self, item: Dict[str, Any]) -> Dict[str, Any]:
        try:
            if 'df_1h' not in item: return {'regime': 'RANGE', 'conf': 0.0}
            df = item['df_1h']
            curr = df.iloc[-1]
            price = curr['close']
            ema20 = curr['ema20']
            ema50 = curr['ema50']
            rsi = curr['rsi']
            atr = curr['atr']
            atr_pct = (atr / price) * 100 if price > 0 else 0
            
            regime = "RANGE"
            conf = 0.5
            
            if atr_pct < 0.5: return {'regime': 'DEAD', 'conf': 0.9}
            
            if price > ema20 and ema20 > ema50 and rsi > 50:
                regime = "BULL"
                conf = 0.8 if rsi > 55 else 0.6
            elif price < ema20 and ema20 < ema50 and rsi < 50:
                regime = "BEAR"
                conf = 0.8 if rsi < 45 else 0.6
                
            return {'regime': regime, 'conf': conf}
        except Exception: return {'regime': 'RANGE', 'conf': 0.0}

    # ------------------------------------------------------------------
    # 🛡️ The Logic Tree (Anti-FOMO Tuned)
    # ------------------------------------------------------------------
    def _apply_strict_logic_tree(self, data: Dict[str, Any]) -> Dict[str, Any]:
        try:
            df_1h = self._calc_indicators(data['ohlcv_1h_raw'])
            df_15m = self._calc_indicators(data['ohlcv_15m_raw'])
            data['df_1h'] = df_1h
        except: return {'type': 'NONE', 'score': 0}

        curr_1h = df_1h.iloc[-1]
        curr_15m = df_15m.iloc[-1]
        
        try:
            close_4h_ago = df_1h.iloc[-5]['close']
            change_4h = ((curr_1h['close'] - close_4h_ago) / close_4h_ago) * 100
        except: change_4h = 0.0

        # Gates
        if change_4h > 12.0: return {'type': 'NONE', 'score': 0}
        if curr_1h['rsi'] > 75: return {'type': 'NONE', 'score': 0} 
        dev = (curr_1h['close'] - curr_1h['ema20']) / curr_1h['atr'] if curr_1h['atr'] > 0 else 0
        if dev > 2.2: return {'type': 'NONE', 'score': 0}

        # A. Breakout
        is_bullish = (curr_1h['ema20'] > curr_1h['ema50']) or (curr_1h['close'] > curr_1h['ema20'])
        if is_bullish and (45 <= curr_1h['rsi'] <= 75): 
            vol_ma = df_15m['volume'].rolling(20).mean().iloc[-1]
            if curr_15m['volume'] >= 1.2 * vol_ma: 
                score = curr_15m['volume'] / vol_ma if vol_ma > 0 else 1.0
                return {'type': 'BREAKOUT', 'score': score}

        # B. Reversal
        if 20 <= curr_1h['rsi'] <= 40 and change_4h <= -2.0:
            score = (100 - curr_1h['rsi'])
            return {'type': 'REVERSAL', 'score': score}

        return {'type': 'NONE', 'score': 0}

    # ------------------------------------------------------------------
    # Helpers
    # ------------------------------------------------------------------
    async def _fetch_technical_data_batch(self, candidates):
        chunk_size = 10; results = []
        for i in range(0, len(candidates), chunk_size):
            chunk = candidates[i:i+chunk_size]
            tasks = [self._fetch_single(c) for c in chunk]
            res = await asyncio.gather(*tasks)
            results.extend([r for r in res if r])
            await asyncio.sleep(0.05)
        return results

    async def _fetch_single(self, c):
        try:
            h1 = await self.exchange.fetch_ohlcv(c['symbol'], '1h', limit=60)
            m15 = await self.exchange.fetch_ohlcv(c['symbol'], '15m', limit=60)
            if not h1 or not m15: return None
            c['ohlcv'] = {'1h': h1, '15m': m15} 
            c['ohlcv_1h_raw'] = h1 
            c['ohlcv_15m_raw'] = m15
            return c
        except: return None

    def _calc_indicators(self, ohlcv):
        df = pd.DataFrame(ohlcv, columns=['ts', 'o', 'h', 'l', 'c', 'v'])
        delta = df['c'].diff()
        gain = (delta.where(delta>0, 0)).rolling(14).mean()
        loss = (-delta.where(delta<0, 0)).rolling(14).mean()
        rs = gain/loss
        df['rsi'] = 100 - (100/(1+rs))
        df['ema20'] = df['c'].ewm(span=20).mean()
        df['ema50'] = df['c'].ewm(span=50).mean()
        tr = np.maximum(df['h']-df['l'], np.maximum(abs(df['h']-df['c'].shift()), abs(df['l']-df['c'].shift())))
        df['atr'] = tr.rolling(14).mean()
        df.rename(columns={'o':'open', 'h':'high', 'l':'low', 'c':'close', 'v':'volume'}, inplace=True)
        return df.fillna(0)
        
    async def get_latest_price_async(self, symbol):
        try: return float((await self.exchange.fetch_ticker(symbol))['last'])
        except: return 0.0
    
    async def get_latest_ohlcv(self, symbol, timeframe='5m', limit=100):
        try: return await self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
        except: return []

    async def get_order_book_snapshot(self, symbol, limit=20):
        try: return await self.exchange.fetch_order_book(symbol, limit)
        except: return {}