File size: 21,693 Bytes
c994b92
 
369a5e8
c994b92
c1bc90c
 
 
 
e0ee6fd
c1bc90c
691c3dd
 
369a5e8
8c2f875
c994b92
eb23359
c27f49c
 
 
 
 
eb23359
c1bc90c
 
 
 
fa9f6bd
9180094
fa9f6bd
 
eb23359
e0ee6fd
369a5e8
9180094
ac0cc30
e0ee6fd
9180094
 
f60c8b6
fa9f6bd
f60c8b6
 
369a5e8
f60c8b6
 
369a5e8
f60c8b6
 
369a5e8
c1bc90c
 
369a5e8
e0ee6fd
0884f94
 
e0ee6fd
c1bc90c
 
f60c8b6
e0ee6fd
b938152
f60c8b6
369a5e8
 
 
c1bc90c
 
b938152
 
916aa63
8b1c1fd
 
 
 
c27f49c
8b1c1fd
e0ee6fd
 
c27f49c
e2c3c8c
a9f0141
e2c3c8c
 
81ee03c
a9f0141
 
369a5e8
81ee03c
e2c3c8c
a9f0141
e2c3c8c
 
 
 
 
 
 
a9f0141
 
e2c3c8c
a9f0141
 
e2c3c8c
a9f0141
e2c3c8c
a9f0141
 
e2c3c8c
a9f0141
 
e2c3c8c
 
81ee03c
a9f0141
 
 
 
 
 
 
 
 
 
 
 
 
 
 
369a5e8
a9f0141
 
 
 
 
 
 
 
 
 
 
 
e2c3c8c
a9f0141
e2c3c8c
 
 
 
 
f60c8b6
a9f0141
f60c8b6
369a5e8
e0ee6fd
a9f0141
e0ee6fd
a9f0141
e2c3c8c
8ca3d29
e2c3c8c
 
 
8ca3d29
a9f0141
e2c3c8c
a9f0141
e0ee6fd
4ddd041
eb23359
4ddd041
e0ee6fd
369a5e8
 
1766ccb
c27f49c
e0ee6fd
8c2f875
e0ee6fd
a9f0141
e0ee6fd
c81aa0a
e0ee6fd
c27f49c
369a5e8
c27f49c
8c2f875
81ee03c
c27f49c
c81aa0a
 
c27f49c
c81aa0a
a9f0141
8c2f875
 
 
e0ee6fd
8c2f875
 
a9f0141
8c2f875
 
369a5e8
49fec9c
 
a9f0141
8c2f875
 
369a5e8
8c2f875
 
 
 
81ee03c
8c2f875
 
 
81ee03c
 
 
 
 
 
8c2f875
 
369a5e8
8c2f875
81ee03c
 
8c2f875
 
 
 
369a5e8
8c2f875
369a5e8
8c2f875
49fec9c
c27f49c
e0ee6fd
81ee03c
 
 
8c2f875
369a5e8
8c2f875
 
81ee03c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8c2f875
81ee03c
8e8afba
81ee03c
c81aa0a
369a5e8
c81aa0a
 
 
8c2f875
c81aa0a
 
81ee03c
 
 
 
 
8c2f875
c81aa0a
81ee03c
 
 
8c2f875
8e8afba
81ee03c
8e8afba
8c2f875
369a5e8
 
8e8afba
 
 
81ee03c
c81aa0a
369a5e8
 
8e8afba
 
 
 
 
369a5e8
 
8e8afba
 
81ee03c
369a5e8
81ee03c
 
8e8afba
369a5e8
8e8afba
 
c81aa0a
 
 
81ee03c
a9f0141
81ee03c
c7a21ab
369a5e8
c7a21ab
81ee03c
 
 
c7a21ab
 
 
81ee03c
 
 
 
 
 
 
 
 
 
 
 
 
 
c7a21ab
81ee03c
eb23359
81ee03c
 
 
 
 
 
 
 
369a5e8
81ee03c
 
 
 
 
 
 
 
 
 
 
 
 
 
c7a21ab
81ee03c
 
 
 
c7a21ab
81ee03c
369a5e8
81ee03c
 
369a5e8
81ee03c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
756f999
81ee03c
 
 
c27f49c
369a5e8
c27f49c
 
 
 
 
 
369a5e8
c27f49c
e0ee6fd
c27f49c
 
1766ccb
c27f49c
 
4ddd041
 
c27f49c
756f999
c27f49c
 
e0ee6fd
c27f49c
369a5e8
c27f49c
369a5e8
 
81ee03c
 
 
 
 
 
369a5e8
c27f49c
 
81ee03c
 
369a5e8
81ee03c
 
 
369a5e8
81ee03c
 
 
 
 
 
 
c27f49c
 
 
 
 
 
 
e0ee6fd
c27f49c
 
9f79335
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
# ============================================================
# 📂 ml_engine/data_manager.py 
# (V68.5 - GEM-Architect: Full Integrity & Breadth Scanner)
# ============================================================

import asyncio
import httpx
import traceback
import ccxt.async_support as ccxt
import logging
import pandas as pd
import numpy as np
import time
from typing import List, Dict, Any, Optional

# محاولة استيراد حدود النظام
try:
    from ml_engine.processor import SystemLimits
except ImportError:
    SystemLimits = None

# تقليل ضوضاء السجلات
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("ccxt").setLevel(logging.WARNING)

class DataManager:
    def __init__(self, contracts_db, whale_monitor, r2_service=None):
        self.contracts_db = contracts_db or {}
        self.whale_monitor = whale_monitor
        self.r2_service = r2_service
        self.adaptive_hub_ref = None 
        
        # إعداد الاتصال بـ KuCoin
        self.exchange = ccxt.kucoin({
            'enableRateLimit': True,
            'timeout': 60000, 
            'options': {'defaultType': 'spot'} 
        })
            
        self.http_client = None
        self.market_cache = {}
        
        # القائمة السوداء للعملات غير المرغوبة (Leveraged/Stable/Fiat)
        self.BLACKLIST_TOKENS = [
            'USDT', 'USDC', 'DAI', 'TUSD', 'BUSD', 'FDUSD', 'EUR', 'PAX',
            'UP', 'DOWN', 'BEAR', 'BULL', '3S', '3L', '5S', '5L'
        ]
        
        print(f"📦 [DataManager V68.5] Initialized (Full Integrity Mode).")

    async def initialize(self):
        """تهيئة الاتصال والأسواق"""
        print("   > [DataManager] Starting initialization...")
        self.http_client = httpx.AsyncClient(timeout=30.0)
        await self._load_markets()
        await self.load_contracts_from_r2()

    async def _load_markets(self):
        try:
            if self.exchange:
                await self.exchange.load_markets()
                self.market_cache = self.exchange.markets
                print(f"   > [DataManager] Markets loaded: {len(self.market_cache)}")
        except Exception as e:
            print(f"   ⚠️ [DataManager] Market load warning: {e}")

    async def close(self):
        if self.http_client: await self.http_client.aclose()
        if self.exchange: await self.exchange.close()

    async def load_contracts_from_r2(self):
        if not self.r2_service: return
        try:
            self.contracts_db = await self.r2_service.load_contracts_db_async()
        except Exception: self.contracts_db = {}

    def get_contracts_db(self) -> Dict[str, Any]:
        return self.contracts_db

    # ==================================================================
    # 🌍 Global Market Validator V2 (Smart Breadth Scanner)
    # ==================================================================
    async def check_global_market_health(self) -> Dict[str, Any]:
        """
        يفحص صحة السوق العامة باستخدام منطق مزدوج:
        1. فحص سلامة BTC (تجنب الانهيارات).
        2. فحص نشاط العملات البديلة (Altcoin Pulse).
        """
        try:
            # 1. جلب بيانات البيتكوين الأساسية
            btc_ohlcv = await self.exchange.fetch_ohlcv('BTC/USDT', '1d', limit=30)
            if not btc_ohlcv: return {'is_safe': True, 'reason': 'No BTC Data - Bypassed'}

            df = pd.DataFrame(btc_ohlcv, columns=['ts', 'o', 'h', 'l', 'c', 'v'])
            current_close = df['c'].iloc[-1]
            prev_close = df['c'].iloc[-2]
            
            # --- [ CRITICAL CHECK ] ---
            # إذا كان البيتكوين ينهار، نوقف كل شيء لأن السيولة ستجف
            daily_change = (current_close - prev_close) / prev_close
            if daily_change < -0.045: # السماح بمرونة أكبر قليلاً (-4.5%)
                return {'is_safe': False, 'reason': f'🚨 BTC CRASHING ({daily_change*100:.2f}%)'}

            # فحص المتوسطات (Trend Check)
            sma20 = df['c'].rolling(20).mean().iloc[-1]
            if current_close < sma20 * 0.92: # إذا كان السعر تحت المتوسط بـ 8% (سوق هابط عنيف)
                 return {'is_safe': False, 'reason': '📉 Deep Bear Market (Risk Off)'}

            # --- [ ALTCOIN PULSE CHECK ] ---
            # بدلاً من إيقاف السوق بسبب ضعف فوليوم البيتكوين، نفحص هل هناك عملات تتحرك؟
            avg_vol = df['v'].rolling(7).mean().iloc[-1]
            curr_vol = df['v'].iloc[-1]
            
            btc_is_dead = curr_vol < (avg_vol * 0.4) # البيتكوين ميت
            
            if btc_is_dead:
                # 🕵️ فحص النبض: هل هناك عملات تنفصل عن البيتكوين؟
                print("   ⚠️ [Validator] BTC Volume Low.. Scanning Altcoin Pulse...")
                tickers = await self.exchange.fetch_tickers()
                
                green_coins = 0
                pump_coins = 0
                total_checked = 0
                
                # نفحص العملات ذات الفوليوم العالي فقط
                for sym, data in tickers.items():
                    if not sym.endswith('/USDT'): continue
                    vol = float(data.get('quoteVolume') or 0)
                    if vol < 1000000: continue # تجاهل العملات الصغيرة جداً
                    
                    change = float(data.get('percentage') or 0)
                    total_checked += 1
                    
                    if change > 1.0: green_coins += 1
                    if change > 5.0: pump_coins += 1
                
                # المنطق: إذا وجدنا أكثر من 3 عملات تضخ بقوة، أو 40% من السوق أخضر -> السوق يعمل
                if pump_coins >= 3 or (total_checked > 0 and (green_coins / total_checked) > 0.4):
                    return {'is_safe': True, 'reason': f'✅ Decoupled Alts Active ({pump_coins} Pumping)'}
                else:
                    return {'is_safe': False, 'reason': '💤 Dead Market (BTC & Alts Flat)'}

            return {'is_safe': True, 'reason': '✅ Market Healthy'}

        except Exception as e:
            print(f"⚠️ [Market Validator] Error: {e}")
            return {'is_safe': True, 'reason': 'Error Bypass'}

    # ==================================================================
    # 🧠 Layer 1: Classification (Relaxed Funnel)
    # ==================================================================
    async def layer1_rapid_screening(self, limit=300, adaptive_hub_ref=None) -> List[Dict[str, Any]]:
        self.adaptive_hub_ref = adaptive_hub_ref
        print(f"🔍 [Layer 1] Screening Market (Smart Breadth)...")
        
        # 0. فحص صحة السوق
        market_health = await self.check_global_market_health()
        
        if not market_health['is_safe']:
            print(f"⛔ [Market Validator] Trading Halted: {market_health['reason']}")
            return []
        else:
            print(f"   🌍 [Market Validator] Status: {market_health['reason']}")

        # 1. فلتر السيولة الأساسي
        initial_candidates = await self._stage0_universe_filter()
        if not initial_candidates:
            print("⚠️ [Layer 1] Stage 0 returned 0 candidates.")
            return []

        # 2. جلب البيانات الفنية (Batch Fetching)
        # نأخذ أعلى 600 عملة من حيث الحجم لفحصها فنياً
        top_candidates = initial_candidates[:600] 
        enriched_data = await self._fetch_technical_data_batch(top_candidates)
        
        semi_final_list = []

        # 3. التصنيف الفني
        for item in enriched_data:
            classification = self._classify_opportunity_type(item)
            
            if classification['type'] != 'NONE':
                # تشخيص الحالة (BULL, BEAR, RANGE)
                regime_info = self._diagnose_asset_regime(item)
                item['asset_regime'] = regime_info['regime']
                item['asset_regime_conf'] = regime_info['conf']
                
                item['strategy_type'] = classification['type'] 
                item['l1_sort_score'] = classification['score']
                item['strategy_tag'] = classification['type']
                
                # إذا كان التشخيص العام "ميت" لكن العملة في حالة ضغط (Squeeze)، نمررها
                if regime_info['regime'] == 'DEAD' and classification['type'] == 'MOMENTUM_LAUNCH':
                    if not classification.get('is_squeeze', False):
                        continue

                semi_final_list.append(item)

        # 4. فحص العمق وحقن الإعدادات
        final_list = []
        semi_final_list.sort(key=lambda x: x['l1_sort_score'], reverse=True)
        candidates_for_depth = semi_final_list[:limit] # نأخذ العدد المطلوب للفحص العميق
        
        if candidates_for_depth:
            print(f"   🛡️ [Layer 1.5] Checking Depth for {len(candidates_for_depth)} candidates...")
        
        for item in candidates_for_depth:
            # أ. فحص العمق (Depth Check)
            if item['strategy_type'] in ['ACCUMULATION_SQUEEZE', 'SAFE_BOTTOM']:
                try:
                    atr_val = item.get('atr_value', 0.0)
                    curr_price = item.get('current_price', 0.0)
                    
                    if atr_val > 0 and curr_price > 0:
                        range_2h = atr_val * 2.0
                        ob_score = await self._check_ob_pressure(item['symbol'], curr_price, range_2h)
                        
                        if ob_score > 0.6: 
                            item['l1_sort_score'] += 0.15 
                            item['note'] = f"Strong Depth Support ({ob_score:.2f})"
                        elif ob_score < 0.4: 
                            item['l1_sort_score'] -= 0.10 
                except Exception: pass
            
            # ب. حقن الإعدادات من AdaptiveHub
            if self.adaptive_hub_ref:
                coin_type = item.get('strategy_type', 'SAFE_BOTTOM')
                dynamic_config = self.adaptive_hub_ref.get_coin_type_config(coin_type)
                item['dynamic_limits'] = dynamic_config

            final_list.append(item)

        # الترتيب النهائي
        final_list.sort(key=lambda x: x['l1_sort_score'], reverse=True)
        selection = final_list[:limit]
        
        print(f"✅ [Layer 1] Passed {len(selection)} active candidates.")
        return selection

    # ==================================================================
    # 🧱 Order Book Depth Scanner
    # ==================================================================
    async def _check_ob_pressure(self, symbol: str, current_price: float, price_range: float) -> float:
        """فحص ضغط الشراء مقابل البيع في عمق السوق"""
        try:
            ob = await self.exchange.fetch_order_book(symbol, limit=50)
            bids = ob['bids']
            asks = ob['asks']
            
            min_price = current_price - price_range
            max_price = current_price + price_range
            
            support_vol = 0.0
            resistance_vol = 0.0
            
            for p, v in bids:
                if p >= min_price: support_vol += v
                else: break
                
            for p, v in asks:
                if p <= max_price: resistance_vol += v
                else: break
                
            if (support_vol + resistance_vol) == 0: return 0.5
            return support_vol / (support_vol + resistance_vol)
        except Exception:
            return 0.5

    # ==================================================================
    # ⚖️ The Dual-Classifier Logic (RELAXED FUNNEL)
    # ==================================================================
    def _classify_opportunity_type(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """تصنيف العملة إلى نوع استراتيجية محدد"""
        try:
            df_1h = self._calc_indicators(data['ohlcv_1h_raw'])
            curr = df_1h.iloc[-1]
            data['atr_value'] = curr['atr']
        except: return {'type': 'NONE', 'score': 0}

        rsi = curr['rsi']
        close = curr['close']
        ema20 = curr['ema20']
        ema50 = curr['ema50']
        ema200 = curr['ema200'] if 'ema200' in curr else ema50 
        atr = curr['atr']
        
        lower_bb = curr['lower_bb'] if 'lower_bb' in curr else (curr['ema20'] - (2*curr['atr']))
        upper_bb = curr['upper_bb'] if 'upper_bb' in curr else (curr['ema20'] + (2*curr['atr']))
        bb_width = (upper_bb - lower_bb) / curr['ema20'] if curr['ema20'] > 0 else 1.0

        # 🔥 1. Dead Coin Filter (Relaxed to 0.3%)
        volatility_pct = (atr / close) * 100 if close > 0 else 0
        if volatility_pct < 0.3: return {'type': 'NONE', 'score': 0}

        # 🛡️ TYPE 1: SAFE_BOTTOM (القيعان الآمنة)
        # تشبع بيعي مع كسر للحد السفلي للبولنجر
        if rsi < 55:
            if close <= lower_bb * 1.08:
                score = (60 - rsi) / 20.0 
                return {'type': 'SAFE_BOTTOM', 'score': min(score, 1.0)}

        # 🔋 TYPE 2: ACCUMULATION_SQUEEZE (التجميع والضغط)
        # RSI محايد، نطاق ضيق جداً (BB Width قليل)
        elif 40 <= rsi <= 65:
            if bb_width < 0.18:
                score = 1.0 - (bb_width * 3.0) 
                return {'type': 'ACCUMULATION_SQUEEZE', 'score': max(score, 0.5), 'is_squeeze': True}

        # 🚀 TYPE 3: MOMENTUM_LAUNCH (انطلاق الزخم)
        # RSI قوي، السعر فوق المتوسطات، واقتراب من الحد العلوي
        elif 50 < rsi < 85:
            if close > ema50:
                dist_to_upper = (upper_bb - close) / close
                if dist_to_upper < 0.12: # قريب من الاختراق
                    score = rsi / 100.0
                    return {'type': 'MOMENTUM_LAUNCH', 'score': score}
        
        # 🃏 Special Case: High Volatility Catch
        if volatility_pct > 1.5:
             return {'type': 'SAFE_BOTTOM', 'score': 0.4}

        return {'type': 'NONE', 'score': 0}

    # ==================================================================
    # 🔍 Stage 0: Universe Filter
    # ==================================================================
    async def _stage0_universe_filter(self) -> List[Dict[str, Any]]:
        """جلب كل العملات وتصفيتها حسب الحجم"""
        try:
            MIN_VOLUME_THRESHOLD = 1000000.0  # 1 Million USDT
            
            print(f"   🛡️ [Stage 0] Fetching Tickers (Min Vol: ${MIN_VOLUME_THRESHOLD:,.0f})...")
            tickers = await self.exchange.fetch_tickers()
            candidates = []
            
            SOVEREIGN_COINS = ['BTC/USDT', 'ETH/USDT', 'SOL/USDT', 'BNB/USDT', 'XRP/USDT']
            reject_stats = {"volume": 0, "change": 0, "blacklist": 0}

            for symbol, ticker in tickers.items():
                if not symbol.endswith('/USDT'): continue
                
                base_curr = symbol.split('/')[0]
                if any(bad in base_curr for bad in self.BLACKLIST_TOKENS): 
                    reject_stats["blacklist"] += 1
                    continue
                
                base_vol = float(ticker.get('baseVolume') or 0.0)
                last_price = float(ticker.get('last') or 0.0)
                calc_quote_vol = base_vol * last_price
                
                is_sovereign = symbol in SOVEREIGN_COINS
                
                if not is_sovereign:
                    if calc_quote_vol < MIN_VOLUME_THRESHOLD: 
                        reject_stats["volume"] += 1
                        continue
                
                change_pct = ticker.get('percentage')
                if change_pct is None: change_pct = 0.0
                
                # استبعاد العملات التي تحركت بشكل جنوني (>35%) لتجنب القمم
                if abs(change_pct) > 35.0: 
                    reject_stats["change"] += 1
                    continue
                
                candidates.append({
                    'symbol': symbol,
                    'quote_volume': calc_quote_vol,
                    'current_price': last_price,
                    'change_24h': change_pct
                })
            
            candidates.sort(key=lambda x: x['quote_volume'], reverse=True)
            print(f"   ℹ️ [Stage 0] Ignored {reject_stats['volume']} low-vol coins.")
            return candidates
            
        except Exception as e:
            print(f"❌ [L1 Error] Universe filter failed: {e}")
            traceback.print_exc()
            return []

    # ------------------------------------------------------------------
    # 🧭 The Diagnoser (Market Regime Detection)
    # ------------------------------------------------------------------
    def _diagnose_asset_regime(self, item: Dict[str, Any]) -> Dict[str, Any]:
        """تشخيص حالة العملة الفردية (BULL/BEAR/RANGE/DEAD)"""
        try:
            if 'df_1h' not in item: 
                if 'ohlcv_1h_raw' in item:
                    item['df_1h'] = self._calc_indicators(item['ohlcv_1h_raw'])
                else:
                    return {'regime': 'RANGE', 'conf': 0.0}

            df = item['df_1h']
            if df.empty: return {'regime': 'RANGE', 'conf': 0.0}

            curr = df.iloc[-1]
            price = curr['close']
            ema20 = curr['ema20']
            ema50 = curr['ema50']
            rsi = curr['rsi']
            atr = curr['atr']
            atr_pct = (atr / price) * 100 if price > 0 else 0
            
            regime = "RANGE"
            conf = 0.5
            
            if atr_pct < 0.4: return {'regime': 'DEAD', 'conf': 0.9}
            
            if price > ema20 and ema20 > ema50 and rsi > 50:
                regime = "BULL"
                conf = 0.8 if rsi > 55 else 0.6
            elif price < ema20 and ema20 < ema50 and rsi < 50:
                regime = "BEAR"
                conf = 0.8 if rsi < 45 else 0.6
                
            return {'regime': regime, 'conf': conf}
        except Exception: return {'regime': 'RANGE', 'conf': 0.0}

    # ------------------------------------------------------------------
    # Helpers & Indicators
    # ------------------------------------------------------------------
    async def _fetch_technical_data_batch(self, candidates):
        """جلب البيانات الفنية (1h, 15m) على دفعات"""
        chunk_size = 10; results = []
        for i in range(0, len(candidates), chunk_size):
            chunk = candidates[i:i+chunk_size]
            tasks = [self._fetch_single(c) for c in chunk]
            res = await asyncio.gather(*tasks)
            results.extend([r for r in res if r])
            await asyncio.sleep(0.05) # Rate Limit Protection
        return results

    async def _fetch_single(self, c):
        try:
            h1 = await self.exchange.fetch_ohlcv(c['symbol'], '1h', limit=210) 
            m15 = await self.exchange.fetch_ohlcv(c['symbol'], '15m', limit=60)
            if not h1 or not m15: return None
            c['ohlcv'] = {'1h': h1, '15m': m15} 
            c['ohlcv_1h_raw'] = h1 
            c['ohlcv_15m_raw'] = m15
            c['df_1h'] = self._calc_indicators(h1)
            return c
        except: return None

    def _calc_indicators(self, ohlcv):
        """حساب المؤشرات يدوياً باستخدام Pandas"""
        df = pd.DataFrame(ohlcv, columns=['ts', 'o', 'h', 'l', 'c', 'v'])
        
        # RSI
        delta = df['c'].diff()
        gain = (delta.where(delta>0, 0)).rolling(14).mean()
        loss = (-delta.where(delta<0, 0)).rolling(14).mean()
        rs = gain/loss
        df['rsi'] = 100 - (100/(1+rs))
        
        # EMAs
        df['ema20'] = df['c'].ewm(span=20).mean()
        df['ema50'] = df['c'].ewm(span=50).mean()
        df['ema200'] = df['c'].ewm(span=200).mean() 
        
        # ATR
        tr = np.maximum(df['h']-df['l'], np.maximum(abs(df['h']-df['c'].shift()), abs(df['l']-df['c'].shift())))
        df['atr'] = tr.rolling(14).mean()
        
        # Bollinger Bands
        std = df['c'].rolling(20).std()
        df['upper_bb'] = df['ema20'] + (2 * std)
        df['lower_bb'] = df['ema20'] - (2 * std)
        
        df.rename(columns={'o':'open', 'h':'high', 'l':'low', 'c':'close', 'v':'volume'}, inplace=True)
        return df.fillna(0)
        
    async def get_latest_price_async(self, symbol):
        try: return float((await self.exchange.fetch_ticker(symbol))['last'])
        except: return 0.0
    
    async def get_latest_ohlcv(self, symbol, timeframe='5m', limit=100):
        try: return await self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
        except: return []

    async def get_order_book_snapshot(self, symbol, limit=20):
        try: return await self.exchange.fetch_order_book(symbol, limit)
        except: return {}