File size: 5,501 Bytes
c1bc90c
c1f9548
c1bc90c
 
 
 
 
 
 
 
 
c1f9548
c1bc90c
 
 
 
 
 
 
 
 
 
 
 
 
c1f9548
c1bc90c
 
 
 
 
 
 
 
 
 
 
 
 
c1f9548
c1bc90c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c1f9548
c1bc90c
 
 
 
 
 
c1f9548
c1bc90c
 
 
 
3d4ebf5
c1bc90c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c1f9548
c1bc90c
 
 
 
 
 
 
 
 
 
 
 
 
 
c1f9548
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# ml_engine/data_manager.py
# (V13.2 - GEM-Architect: Added Order Book Capability)

import os
import asyncio
import httpx
import traceback
import ccxt.async_support as ccxt
import logging
from typing import List, Dict, Any

# إعدادات التسجيل
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
logging.getLogger("ccxt").setLevel(logging.WARNING)

class DataManager:
    def __init__(self, contracts_db, whale_monitor, r2_service=None):
        # ==================================================================
        # ⚙️ إعدادات التحكم
        # ==================================================================
        self.contracts_db = contracts_db or {}
        self.whale_monitor = whale_monitor
        self.r2_service = r2_service
        
        # إعداد المنصة (KuCoin)
        self.exchange = ccxt.kucoin({
            'enableRateLimit': True,
            'timeout': 30000,
        })
            
        self.http_client = None
        self.market_cache = {}

    async def initialize(self):
        """تهيئة مدير البيانات والاتصالات"""
        print("   > [DataManager] Starting initialization...")
        self.http_client = httpx.AsyncClient(timeout=30.0)
        await self._load_markets()
        print(f"✅ [DataManager V13.2] Ready (Order Book Enabled).")

    async def _load_markets(self):
        """تحميل بيانات الأسواق وتخزينها مؤقتاً"""
        try:
            if self.exchange:
                await self.exchange.load_markets()
                self.market_cache = self.exchange.markets
        except Exception as e:
            print(f"❌ [DataManager] Market load failed: {e}")
            traceback.print_exc()

    async def close(self):
        """إغلاق جميع الاتصالات بأمان"""
        print("   > [DataManager] Closing connections...")
        if self.http_client: await self.http_client.aclose()
        if self.exchange: await self.exchange.close()

    # ==================================================================
    # 🚀 R2 Compatibility
    # ==================================================================
    async def load_contracts_from_r2(self):
        if not self.r2_service: return
        try:
            self.contracts_db = await self.r2_service.load_contracts_db_async()
            print(f"✅ [DataManager] Contracts loaded: {len(self.contracts_db)}")
        except Exception:
            self.contracts_db = {}

    def get_contracts_db(self) -> Dict[str, Any]:
        return self.contracts_db
    
    # ==================================================================
    # 🛡️ Layer 1 Screening (Data Filter)
    # ==================================================================
    async def layer1_rapid_screening(self) -> List[Dict[str, Any]]:
        """الغربلة الأولية السريعة بناءً على الحجم والسيولة."""
        print(f"🔍 [Layer 1] Screening top liquid assets...")
        volume_data = await self._get_volume_data_live()
        
        if not volume_data:
            return []

        # نأخذ أعلى 150 عملة
        candidates = volume_data[:150]
        return candidates

    async def _get_volume_data_live(self):
        """جلب وترتيب العملات حسب حجم التداول"""
        try:
            tickers = await self.exchange.fetch_tickers()
            data = []
            for symbol, ticker in tickers.items():
                if symbol.endswith('/USDT') and ticker.get('quoteVolume') and ticker['quoteVolume'] > 100000:
                    data.append({
                        'symbol': symbol,
                        'dollar_volume': ticker['quoteVolume'],
                        'current_price': ticker['last']
                    })
            data.sort(key=lambda x: x['dollar_volume'], reverse=True)
            return data
        except Exception:
            return []

    # ==================================================================
    # 🎯 Data Fetching Helpers
    # ==================================================================
    async def get_latest_price_async(self, symbol: str) -> float:
        try:
            ticker = await self.exchange.fetch_ticker(symbol)
            return float(ticker['last'])
        except Exception:
            return 0.0

    async def get_latest_ohlcv(self, symbol: str, timeframe: str = '5m', limit: int = 100) -> List[List[float]]:
        try:
            candles = await self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
            if candles: return candles
            return []
        except Exception:
            return []

    # 🔥 [NEW] Order Book Fetcher
    async def get_order_book_snapshot(self, symbol: str, limit: int = 20) -> Dict[str, Any]:
        """
        جلب لقطة سريعة لدفتر الطلبات (Bids/Asks).
        يستخدم لتحليل عمق السوق وتأكيد القناص.
        """
        try:
            # ccxt يرجع الدفتر بصيغة {'bids': [[price, size], ...], 'asks': ...}
            ob = await self.exchange.fetch_order_book(symbol, limit)
            return ob
        except Exception as e:
            # طباعة خطأ خفيف لتجنب إغراق السجلات
            # print(f"⚠️ [DataManager] Failed to fetch OB for {symbol}: {e}")
            return {}