File size: 10,869 Bytes
2bf9457
ccf02a5
2bf9457
00bb5c9
 
 
 
 
 
2bf9457
d69dead
28fa18b
394e2c7
164b380
 
 
 
 
909d04f
164b380
11b4dc5
 
2bf9457
 
82e6b70
 
 
 
 
53cf6c0
28fa18b
 
2bf9457
53cf6c0
 
29d027f
4437a6f
29d027f
4437a6f
19ecbac
4437a6f
 
53cf6c0
d2775f3
4ace337
c6f72fe
29d027f
 
 
 
c6f72fe
87e3669
56e3f87
11b4dc5
 
2bf9457
82e6b70
909d04f
d50c5b6
909d04f
53cf6c0
d21f065
ccf02a5
248e033
56e3f87
6b00681
ccf02a5
29d027f
 
 
 
 
 
ce364f7
6b00681
ce364f7
ccf02a5
 
24a0949
56e3f87
d21f065
ccf02a5
56e3f87
2bf9457
ccf02a5
2bf9457
29d027f
ccf02a5
 
 
 
 
 
 
 
2bf9457
ccf02a5
 
56e3f87
53cf6c0
d21f065
ccf02a5
909d04f
 
82e6b70
 
 
 
11b4dc5
130af22
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d21f065
 
 
24a0949
d21f065
 
 
 
2bf9457
d21f065
 
 
 
 
29d027f
 
 
4437a6f
909d04f
d21f065
b44825a
2bf9457
909d04f
d21f065
 
 
 
 
 
 
909d04f
 
d21f065
 
 
909d04f
d21f065
 
 
909d04f
d21f065
 
 
 
ccf02a5
d21f065
4437a6f
d21f065
 
909d04f
d21f065
 
 
909d04f
d21f065
82e6b70
d21f065
 
 
 
 
 
ccf02a5
d21f065
 
82e6b70
 
d21f065
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ccf02a5
 
d21f065
 
ccf02a5
d21f065
ccf02a5
d21f065
ccf02a5
 
 
 
 
d21f065
ccf02a5
 
d21f065
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
# ml_engine/data_manager.py
# (V12.5 - Lazy Loading Fix + V15.6 App-Compat Fix + Detailed Logging)

import os
import asyncio
import httpx
import traceback
import time
from datetime import datetime
import ccxt.async_support as ccxt
import numpy as np
import logging
from typing import List, Dict, Any
import pandas as pd

try:
    import pandas_ta as ta
except ImportError:
    print("❌ [DataManager] مكتبة pandas_ta غير موجودة.")
    ta = None

from ml_engine.indicators import AdvancedTechnicalAnalyzer
from ml_engine.monte_carlo import MonteCarloAnalyzer
from ml_engine.ranker import Layer1Ranker
try:
    from ml_engine.patterns import ChartPatternAnalyzer
except ImportError:
    print("⚠️ [DataManager] لم يتم العثور على ml_engine/patterns.py")
    ChartPatternAnalyzer = None

logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
logging.getLogger("ccxt").setLevel(logging.WARNING)

class DataManager:
    def __init__(self, contracts_db, whale_monitor, r2_service=None):
        # ==================================================================
        # ⚙️ إعدادات التحكم المركزية (V12.3 Hybrid Thresholds)
        # ==================================================================
        self.HYBRID_ENTRY_THRESHOLD = 0.60
        # ==================================================================

        self.contracts_db = contracts_db or {}
        self.whale_monitor = whale_monitor
        self.r2_service = r2_service
        
        self.exchange = ccxt.kucoin({
            'enableRateLimit': True,
            'timeout': 30000,
        })
            
        self.http_client = None
        self.market_cache = {}

        self.technical_analyzer = AdvancedTechnicalAnalyzer()
        self.mc_analyzer = MonteCarloAnalyzer()
        
        self.layer1_ranker = None
        self.pattern_analyzer = None

    async def initialize(self):
        """تهيئة مدير البيانات والاتصالات"""
        print("   > [DM Log] 0. بدء تهيئة DataManager...")
        self.http_client = httpx.AsyncClient(timeout=30.0)
        await self._load_markets()
        
        print("   > [DataManager] إنشاء النماذج المساندة (Lazy Load)...")
        try:
            self.layer1_ranker = Layer1Ranker(model_path="ml_models/layer1_ranker.lgbm")
            if ChartPatternAnalyzer:
                self.pattern_analyzer = ChartPatternAnalyzer(r2_service=self.r2_service)
                
        except Exception as e:
            print(f"⚠️ [DataManager] تحذير أثناء إنشاء النماذج المساندة: {e}")

        print(f"✅ DataManager V12.5 initialized (Hybrid Threshold: {self.HYBRID_ENTRY_THRESHOLD})")
        print("   > [DM Log] 4. اكتملت تهيئة DataManager.")


    async def _load_markets(self):
        """تحميل بيانات الأسواق وتخزينها مؤقتاً"""
        print("   > [DM Log] 1. بدء _load_markets...")
        try:
            if self.exchange:
                print("   > [DM Log] 2. استدعاء exchange.load_markets()... (قد يستغرق وقتاً)")
                await self.exchange.load_markets()
                self.market_cache = self.exchange.markets
                
                if self.market_cache and len(self.market_cache) > 0:
                     print(f"   > [DM Log] 3. ✅ نجاح! تم تحميل {len(self.market_cache)} سوق.")
                else:
                     print("   > [DM Log] 3. ⚠️ تحذير: load_markets() نجح ولكن لم يتم إرجاع أسواق.")
            else:
                print("   > [DM Log] 2. ❌ خطأ: self.exchange هو None.")
                
        except Exception as e:
            print(f"   > [DM Log] 3. ❌❌❌ فشل فادح في _load_markets: {e}")
            traceback.print_exc()

    async def close(self):
        """إغلاق جميع الاتصالات بأمان"""
        print("   > [DM Log] 7. إغلاق اتصالات DataManager...")
        if self.http_client: await self.http_client.aclose()
        if self.exchange: await self.exchange.close()
        if self.pattern_analyzer and hasattr(self.pattern_analyzer, 'clear_memory'):
            self.pattern_analyzer.clear_memory()
        if self.layer1_ranker and hasattr(self.layer1_ranker, 'clear_memory'):
            self.layer1_ranker.clear_memory()

    # ==================================================================
    # 🚀 [إضافة جديدة V15.6] دوال التوافق مع App
    # ==================================================================
    async def load_contracts_from_r2(self):
        """
        [جديد] يقوم بتحميل قاعدة بيانات العقود من R2 عند بدء التشغيل.
        """
        print("   > [DataManager] Loading contracts database from R2...")
        if not self.r2_service:
            print("❌ [DataManager] R2Service not available. Cannot load contracts.")
            self.contracts_db = {}
            return
        
        try:
            self.contracts_db = await self.r2_service.load_contracts_db_async()
            print(f"✅ [DataManager] Contracts loaded. Total entries: {len(self.contracts_db)}")
        except Exception as e:
            print(f"❌ [DataManager] Failed to load contracts from R2: {e}")
            self.contracts_db = {}

    def get_contracts_db(self) -> Dict[str, Any]:
        """
        [جديد] إرجاع قاعدة بيانات العقود التي تم تحميلها.
        """
        return self.contracts_db
    
    # ==================================================================
    # 🛡️ دوال الطبقة الأولى (Layer 1 Screening)
    # ==================================================================
    async def layer1_rapid_screening(self) -> List[Dict[str, Any]]:
        """
        الغربلة الأولية السريعة جداً بناءً على الحجم فقط.
        """
        print(f"🔍 [Layer 1] بدء الغربلة السريعة (Top Liquid Assets)...")
        volume_data = await self._get_volume_data_live()
        
        if not volume_data:
            print("⚠️ [Layer 1 Warning] لم يتم العثور على بيانات حجم تداول.")
            return []

        candidates = volume_data[:150]
        print(f"✅ [Layer 1] تم تمرير {len(candidates)} عملة للتحليل الهجين.")
        return candidates

    async def _get_volume_data_live(self):
        """جلب بيانات الحجم الحية لجميع الأزواج"""
        try:
            tickers = await self.exchange.fetch_tickers()
            data = []
            for symbol, ticker in tickers.items():
                if symbol.endswith('/USDT') and ticker.get('quoteVolume') and ticker['quoteVolume'] > 100000:
                    data.append({
                        'symbol': symbol,
                        'dollar_volume': ticker['quoteVolume'],
                        'current_price': ticker['last']
                    })
            data.sort(key=lambda x: x['dollar_volume'], reverse=True)
            return data
        except Exception as e:
            print(f"❌ [DataManager] خطأ في جلب بيانات الحجم: {e}")
            return []

    # ==================================================================
    # 📊 دوال جلب البيانات (Data Fetching Pipeline)
    # ==================================================================
    async def stream_ohlcv_data(self, symbols: List[Dict], queue: asyncio.Queue):
        """
        مولد بيانات متدفق (Streaming Generator) يجلب شموع OHLCV
        """
        timeframes = ['5m', '15m', '1h', '4h', '1d']
        limit = 500 

        for sym_data in symbols:
            symbol = sym_data['symbol']
            tasks = [self._fetch_ohlcv_live(symbol, tf, limit) for tf in timeframes]
            results = await asyncio.gather(*tasks, return_exceptions=False)
            
            ohlcv_packet = {}
            valid_packet = True
            for i, res in enumerate(results):
                tf = timeframes[i]
                if res and isinstance(res, list) and len(res) >= 200:
                    ohlcv_packet[tf] = res
                else:
                    if tf in ['5m', '1h']: valid_packet = False

            if valid_packet and len(ohlcv_packet) >= 4:
                sym_data['ohlcv'] = ohlcv_packet
                await queue.put([sym_data]) 
            
            await asyncio.sleep(0.05)
        await queue.put(None)

    async def _fetch_ohlcv_live(self, symbol, timeframe, limit):
        """دالة مساعدة لجلب الشموع مع معالجة الأخطاء البسيطة"""
        try:
            return await self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
        except Exception:
            return None

    # ==================================================================
    # 🎯 دوال مساعدة للحارس والدماغ (Sentry & Brain Helpers)
    # ==================================================================
    async def get_latest_price_async(self, symbol: str) -> float:
        """جلب آخر سعر حقيقي (للتنفيذ والمراقبة)"""
        try:
            ticker = await self.exchange.fetch_ticker(symbol)
            return float(ticker['last'])
        except Exception as e:
            print(f"⚠️ [DataManager] Failed to fetch price for {symbol}: {e}")
            return 0.0

    async def get_latest_ohlcv(self, symbol: str, timeframe: str = '5m', limit: int = 100) -> List[List[float]]:
        """
        جلب عدد محدود من الشموع الأخيرة بسرعة.
        """
        # (إضافة طابع خفيف لتجنب إغراق السجلات)
        # print(f"   > [DM Log] 5. [get_latest_ohlcv] طلب {symbol} {timeframe}...")
        try:
            candles = await self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
            
            if candles and len(candles) > 0:
                # (لا نطبع هذا لأنه سينجح)
                return candles
            else:
                # (هذا هو الطابع المهم الذي يكشف الفشل الصامت)
                print(f"   > [DM Log] 6. ⚠️ [get_latest_ohlcv] فشل صامت لـ {symbol} {timeframe}. (أرجع قائمة فارغة).")
                return []
                
        except Exception as e:
            # (هذا هو الطابع المهم الذي يكشف الفشل الفادح)
            print(f"   > [DM Log] 6. ❌ [get_latest_ohlcv] فشل فادح لـ {symbol} {timeframe}: {e}")
            return []