Riy777 commited on
Commit
0884f94
·
verified ·
1 Parent(s): 98050cc

Update ml_engine/data_manager.py

Browse files
Files changed (1) hide show
  1. ml_engine/data_manager.py +140 -146
ml_engine/data_manager.py CHANGED
@@ -1,6 +1,6 @@
1
  # ============================================================
2
  # 📂 ml_engine/data_manager.py
3
- # (V57.0 - GEM-Architect: Pure Volume Edition)
4
  # ============================================================
5
 
6
  import asyncio
@@ -12,33 +12,26 @@ import numpy as np
12
  import pandas_ta as ta
13
  from typing import List, Dict, Any, Tuple
14
  from datetime import datetime
15
-
16
  import ccxt.async_support as ccxt
17
 
18
- # استيراد الدستور الديناميكي
19
  try:
20
  from ml_engine.processor import SystemLimits
21
  except ImportError:
22
- class SystemLimits:
23
- L1_MIN_AFFINITY_SCORE = 15.0
24
- CURRENT_REGIME = "RANGE"
25
 
26
  logging.getLogger("httpx").setLevel(logging.WARNING)
27
  logging.getLogger("ccxt").setLevel(logging.WARNING)
28
 
29
  class DataManager:
30
- """
31
- DataManager V57.0 (Pure Volume Edition)
32
- - Logic: NO FILTERS. NO CONDITIONS.
33
- - Action: Fetches Top 150 coins by Quote Volume and passes them ALL.
34
- - Speed: Maximum (No secondary inspection requests).
35
- """
36
-
37
  def __init__(self, contracts_db, whale_monitor, r2_service=None):
38
  self.contracts_db = contracts_db or {}
39
  self.whale_monitor = whale_monitor
40
  self.r2_service = r2_service
41
-
 
 
 
42
  self.exchange = ccxt.kucoin({
43
  'enableRateLimit': True,
44
  'timeout': 30000,
@@ -48,27 +41,17 @@ class DataManager:
48
  self.http_client = None
49
  self.market_cache = {}
50
 
51
- self.NETWORK_HEADS = ['ETH/USDT', 'SOL/USDT', 'BNB/USDT', 'AVAX/USDT']
52
- self.BENCHMARK_SYMBOL = 'BTC/USDT'
53
-
54
- # القائمة السوداء (للعملات الأساسية فقط)
55
  self.BLACKLIST_TOKENS = [
56
  'USDT', 'USDC', 'DAI', 'TUSD', 'BUSD', 'FDUSD', 'EUR', 'PAX',
57
  'UP', 'DOWN', 'BEAR', 'BULL', '3S', '3L', 'USDD', 'USDP', 'HT', 'KCS'
58
  ]
59
 
60
- print(f"📦 [DataManager V57.0] Pure Volume Engine Active.")
61
 
62
  async def initialize(self):
63
- print(" > [DataManager] Starting Matrix Initialization...")
64
- try:
65
- self.http_client = httpx.AsyncClient(timeout=30.0)
66
- await self._load_markets()
67
- await self.update_hyper_regime()
68
- print(f"✅ [DataManager] Ready | Regime: {SystemLimits.CURRENT_REGIME}")
69
- except Exception as e:
70
- print(f"❌ [DataManager] Init Error: {e}")
71
- traceback.print_exc()
72
 
73
  async def _load_markets(self):
74
  try:
@@ -81,167 +64,178 @@ class DataManager:
81
  if self.http_client: await self.http_client.aclose()
82
  if self.exchange: await self.exchange.close()
83
 
84
- async def load_contracts_from_r2(self):
85
- if not self.r2_service: return
86
- try:
87
- self.contracts_db = await self.r2_service.load_contracts_db_async()
88
- except: self.contracts_db = {}
89
-
90
- def get_contracts_db(self): return self.contracts_db
91
-
92
  # ==================================================================
93
- # 🧠 The Matrix (Regime Sensor) - JUST FOR THRESHOLDS
94
- # ==================================================================
95
- async def update_hyper_regime(self):
96
- print(" 🧠 [Matrix] Scanning Market Dimensions...")
97
- try:
98
- fg_index, fg_label = await self._fetch_fear_greed()
99
- btc_data = await self._analyze_single_asset(self.BENCHMARK_SYMBOL)
100
- breadth_score, heads_details = await self._analyze_market_breadth()
101
-
102
- regime = "RANGE"
103
-
104
- if btc_data['trend'] == 'BULL':
105
- if breadth_score >= 0.50: regime = "BULL"
106
- else: regime = "RANGE"
107
- elif btc_data['trend'] == 'BEAR':
108
- if fg_index < 20: regime = "BEAR"
109
- else: regime = "BEAR"
110
- elif btc_data['trend'] == 'NEUTRAL':
111
- if btc_data['volatility_state'] == 'LOW' and breadth_score < 0.3: regime = "DEAD"
112
- else: regime = "RANGE"
113
-
114
- SystemLimits.CURRENT_REGIME = regime
115
- print(f" 🌍 Regime Updated: {regime} (FG:{fg_index} | Breadth:{breadth_score*100:.0f}%)")
116
-
117
- except Exception as e:
118
- print(f"❌ [Matrix Error] {e}")
119
- SystemLimits.CURRENT_REGIME = "RANGE"
120
-
121
- # --- Sensor Helpers ---
122
- async def _fetch_fear_greed(self) -> Tuple[int, str]:
123
- try:
124
- resp = await self.http_client.get("https://api.alternative.me/fng/?limit=1")
125
- data = resp.json()
126
- return int(data['data'][0]['value']), data['data'][0]['value_classification']
127
- except: return 50, "Neutral"
128
-
129
- async def _analyze_single_asset(self, symbol) -> Dict[str, Any]:
130
- try:
131
- ohlcv = await self.exchange.fetch_ohlcv(symbol, '1d', limit=100)
132
- df = pd.DataFrame(ohlcv, columns=['ts', 'o', 'h', 'l', 'c', 'v'])
133
- c = df['c']
134
- ema50 = ta.ema(c, length=50).iloc[-1]
135
- ema200 = ta.ema(c, length=200).iloc[-1]
136
- atr = ta.atr(df['h'], df['l'], c, length=14).iloc[-1]
137
- price = c.iloc[-1]
138
-
139
- trend = "NEUTRAL"
140
- if price > ema50 and ema50 > ema200: trend = "BULL"
141
- elif price < ema50 and price < ema200: trend = "BEAR"
142
- vol_state = "NORMAL"
143
- if (atr / price) < 0.025: vol_state = "LOW"
144
- return {'trend': trend, 'volatility_state': vol_state}
145
- except: return {'trend': 'NEUTRAL', 'volatility_state': 'NORMAL'}
146
-
147
- async def _analyze_market_breadth(self) -> Tuple[float, str]:
148
- tasks = [self._analyze_single_asset(sym) for sym in self.NETWORK_HEADS]
149
- results = await asyncio.gather(*tasks, return_exceptions=True)
150
- bull_count = 0; valid = 0; details = []
151
- for i, res in enumerate(results):
152
- if isinstance(res, dict):
153
- valid += 1
154
- if res['trend'] == 'BULL': bull_count += 1
155
- details.append(f"{self.NETWORK_HEADS[i].split('/')[0]}:{res['trend'][0]}")
156
- return (bull_count / valid) if valid > 0 else 0.0, "|".join(details)
157
-
158
- # ==================================================================
159
- # 🌊 Stage 0: Pure Volume Flood (The Open Gate)
160
  # ==================================================================
161
  async def _stage0_universe_filter(self) -> List[Dict[str, Any]]:
162
  """
163
- FETCH -> SORT BY VOLUME -> RETURN TOP 150.
164
- NO structural checks. NO spread checks. NO wall checks.
 
 
 
165
  """
166
  try:
167
- print(f" 🌊 [Volume Gate] Opening floodgates...")
168
  tickers = await self.exchange.fetch_tickers()
169
 
170
- pre_candidates = []
 
171
  for symbol, ticker in tickers.items():
172
  if not symbol.endswith('/USDT'): continue
173
 
174
- # Check Blacklist (Base only)
175
  base_curr = symbol.split('/')[0]
176
  if any(bad in base_curr for bad in self.BLACKLIST_TOKENS): continue
177
 
178
- # Handle Volume
179
  quote_vol = ticker.get('quoteVolume')
180
  if quote_vol is None or quote_vol == 0:
181
  base_vol = ticker.get('baseVolume')
182
  last_p = ticker.get('last')
183
  if base_vol and last_p:
184
  quote_vol = float(base_vol) * float(last_p)
185
- else: quote_vol = 0.0
186
- else: quote_vol = float(quote_vol)
187
-
188
- # Minimal floor to avoid bugs ($10k)
189
- if quote_vol < 10_000: continue
190
-
191
- pre_candidates.append({
 
 
 
 
 
 
 
 
 
 
 
 
 
192
  'symbol': symbol,
193
  'quote_volume': quote_vol,
194
  'current_price': float(ticker.get('last', 0)),
195
- 'sort_score': quote_vol # Score is just volume
 
196
  })
197
 
198
- # Sort Descending
199
- pre_candidates.sort(key=lambda x: x['quote_volume'], reverse=True)
200
 
201
- # Take Top 150
202
- final_candidates = pre_candidates[:150]
203
 
204
- print(f" -> [Volume Gate] Passed Top {len(final_candidates)} coins by Volume directly to Processor.")
205
- return final_candidates
206
 
207
  except Exception as e:
208
- print(f"❌ [Filter Error] {e}")
209
  traceback.print_exc()
210
  return []
211
 
212
  # ==================================================================
213
- # 🛡️ Layer 1: Pass-Through
214
  # ==================================================================
215
- async def layer1_rapid_screening(self) -> List[Dict[str, Any]]:
216
- # Update thresholds only
217
- await self.update_hyper_regime()
218
-
219
- # Get raw candidates
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
220
  candidates = await self._stage0_universe_filter()
221
  if not candidates: return []
222
 
223
- # Format for Layer 2
 
 
 
 
 
 
 
 
 
224
  final_list = []
225
- for c in candidates:
226
- final_list.append({
227
- 'symbol': c['symbol'],
228
- 'quote_volume': c.get('quote_volume', 0),
229
- 'current_price': c.get('current_price', 0),
230
- 'type': 'CANDIDATE',
231
- 'l1_score': c.get('sort_score', 0)
232
- })
233
-
 
 
 
 
 
 
 
 
234
  return final_list
235
 
236
- # ==================================================================
237
- # 🏗️ Helpers (Minimal)
238
- # ==================================================================
239
  async def get_latest_price_async(self, symbol: str) -> float:
240
  try: return float((await self.exchange.fetch_ticker(symbol))['last'])
241
  except: return 0.0
242
  async def get_latest_ohlcv(self, symbol: str, timeframe: str = '5m', limit: int = 100) -> List[List[float]]:
243
  try: return await self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
244
  except: return []
245
- async def get_order_book_snapshot(self, symbol: str, limit: int = 20) -> Dict[str, Any]:
246
- try: return await self.exchange.fetch_order_book(symbol, limit)
247
- except: return {}
 
1
  # ============================================================
2
  # 📂 ml_engine/data_manager.py
3
+ # (V58.0 - GEM-Architect: Asset-Context Edition)
4
  # ============================================================
5
 
6
  import asyncio
 
12
  import pandas_ta as ta
13
  from typing import List, Dict, Any, Tuple
14
  from datetime import datetime
 
15
  import ccxt.async_support as ccxt
16
 
17
+ # Keep SystemLimits import for fallbacks
18
  try:
19
  from ml_engine.processor import SystemLimits
20
  except ImportError:
21
+ SystemLimits = None
 
 
22
 
23
  logging.getLogger("httpx").setLevel(logging.WARNING)
24
  logging.getLogger("ccxt").setLevel(logging.WARNING)
25
 
26
  class DataManager:
 
 
 
 
 
 
 
27
  def __init__(self, contracts_db, whale_monitor, r2_service=None):
28
  self.contracts_db = contracts_db or {}
29
  self.whale_monitor = whale_monitor
30
  self.r2_service = r2_service
31
+ # Pass the hub instance later or via global context if strictly necessary,
32
+ # but better to handle regime logic internally or pass config.
33
+ self.adaptive_hub_ref = None
34
+
35
  self.exchange = ccxt.kucoin({
36
  'enableRateLimit': True,
37
  'timeout': 30000,
 
41
  self.http_client = None
42
  self.market_cache = {}
43
 
44
+ # Core Blacklist (Stablecoins & Leveraged Tokens)
 
 
 
45
  self.BLACKLIST_TOKENS = [
46
  'USDT', 'USDC', 'DAI', 'TUSD', 'BUSD', 'FDUSD', 'EUR', 'PAX',
47
  'UP', 'DOWN', 'BEAR', 'BULL', '3S', '3L', 'USDD', 'USDP', 'HT', 'KCS'
48
  ]
49
 
50
+ print(f"📦 [DataManager V58.0] Quality Gate & Context Engine Active.")
51
 
52
  async def initialize(self):
53
+ self.http_client = httpx.AsyncClient(timeout=30.0)
54
+ await self._load_markets()
 
 
 
 
 
 
 
55
 
56
  async def _load_markets(self):
57
  try:
 
64
  if self.http_client: await self.http_client.aclose()
65
  if self.exchange: await self.exchange.close()
66
 
 
 
 
 
 
 
 
 
67
  # ==================================================================
68
+ # 🛡️ Stage 0: The "Anti-Junk" Gate (Quality Control)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
69
  # ==================================================================
70
  async def _stage0_universe_filter(self) -> List[Dict[str, Any]]:
71
  """
72
+ Strict Quality Control:
73
+ 1. Liquidity: > $2M Quote Volume.
74
+ 2. Integrity: Spread < 2.0%.
75
+ 3. Sanity: 24h Change < 30% (Avoid chasing massive pumps).
76
+ 4. Safety: No Blacklisted tokens.
77
  """
78
  try:
79
+ print(f" 🛡️ [Stage 0] Filtering Junk (Vol>2M, Spread<2%, No Pumps)...")
80
  tickers = await self.exchange.fetch_tickers()
81
 
82
+ valid_candidates = []
83
+
84
  for symbol, ticker in tickers.items():
85
  if not symbol.endswith('/USDT'): continue
86
 
87
+ # 1. Blacklist Check
88
  base_curr = symbol.split('/')[0]
89
  if any(bad in base_curr for bad in self.BLACKLIST_TOKENS): continue
90
 
91
+ # 2. Volume Check (Strict > 2,000,000 USD)
92
  quote_vol = ticker.get('quoteVolume')
93
  if quote_vol is None or quote_vol == 0:
94
  base_vol = ticker.get('baseVolume')
95
  last_p = ticker.get('last')
96
  if base_vol and last_p:
97
  quote_vol = float(base_vol) * float(last_p)
98
+ else:
99
+ quote_vol = 0.0
100
+
101
+ if quote_vol < 2_000_000: continue
102
+
103
+ # 3. Spread Check (Avoid Orderbook manipulation/illiquidity)
104
+ bid = ticker.get('bid')
105
+ ask = ticker.get('ask')
106
+ if bid and ask and ask > 0:
107
+ spread_pct = ((ask - bid) / ask) * 100
108
+ if spread_pct > 2.0: continue # Skip if spread > 2%
109
+ else:
110
+ continue # Broken book
111
+
112
+ # 4. Sanity Check (Avoid extreme FOMO/Dumps)
113
+ change_24h = ticker.get('percentage')
114
+ if change_24h is not None:
115
+ if abs(change_24h) > 30.0: continue # Skip huge pumps/dumps
116
+
117
+ valid_candidates.append({
118
  'symbol': symbol,
119
  'quote_volume': quote_vol,
120
  'current_price': float(ticker.get('last', 0)),
121
+ 'change_24h': change_24h,
122
+ 'spread': spread_pct if 'spread_pct' in locals() else 0.0
123
  })
124
 
125
+ # Sort by Volume (Liquidity King)
126
+ valid_candidates.sort(key=lambda x: x['quote_volume'], reverse=True)
127
 
128
+ # Cap at Top 80 to ensure we have rate-limit room for 4H analysis
129
+ final_list = valid_candidates[:80]
130
 
131
+ print(f" -> [Stage 0] Passed {len(final_list)} High-Quality Assets.")
132
+ return final_list
133
 
134
  except Exception as e:
135
+ print(f"❌ [Stage 0 Error] {e}")
136
  traceback.print_exc()
137
  return []
138
 
139
  # ==================================================================
140
+ # 🧭 Stage 1: Context Diagnosis (4H Regime)
141
  # ==================================================================
142
+ async def _determine_4h_regime(self, symbol: str) -> Dict[str, Any]:
143
+ """
144
+ Diagnose the Asset's specific regime using 4H data.
145
+ Returns: BULL, BEAR, DEAD, or RANGE + Tech Data.
146
+ """
147
+ try:
148
+ ohlcv = await self.exchange.fetch_ohlcv(symbol, '4h', limit=50)
149
+ if not ohlcv or len(ohlcv) < 50: return {'regime': 'RANGE', 'conf': 0.0}
150
+
151
+ df = pd.DataFrame(ohlcv, columns=['ts', 'o', 'h', 'l', 'c', 'v'])
152
+ c = df['c']
153
+
154
+ # Indicators
155
+ ema50 = ta.ema(c, length=50).iloc[-1]
156
+ ema200 = ta.ema(c, length=200).iloc[-1]
157
+ rsi = ta.rsi(c, length=14).iloc[-1]
158
+ atr = ta.atr(df['h'], df['l'], c, length=14).iloc[-1]
159
+ price = c.iloc[-1]
160
+
161
+ # Logic
162
+ regime = "RANGE"
163
+ conf = 0.5
164
+
165
+ # 1. Check DEAD (Low Volatility)
166
+ atr_pct = (atr / price) * 100
167
+ # Calculate Range of last 20 candles
168
+ high_20 = df['h'].iloc[-20:].max()
169
+ low_20 = df['l'].iloc[-20:].min()
170
+ range_pct = ((high_20 - low_20) / low_20) * 100
171
+
172
+ if atr_pct < 0.8 and range_pct < 4.0:
173
+ regime = "DEAD"
174
+ conf = 0.9
175
+
176
+ # 2. Check BULL
177
+ elif price > ema50 and ema50 > ema200 and rsi > 50:
178
+ regime = "BULL"
179
+ conf = 0.8 if rsi > 55 else 0.6
180
+
181
+ # 3. Check BEAR
182
+ elif price < ema50 and ema50 < ema200 and rsi < 50:
183
+ regime = "BEAR"
184
+ conf = 0.8 if rsi < 45 else 0.6
185
+
186
+ return {
187
+ 'regime': regime,
188
+ 'conf': conf,
189
+ 'tech': {
190
+ 'ema50': ema50, 'ema200': ema200, 'rsi': rsi, 'atr_pct': atr_pct
191
+ }
192
+ }
193
+ except Exception:
194
+ return {'regime': 'RANGE', 'conf': 0.0}
195
+
196
+ async def layer1_rapid_screening(self, adaptive_hub_ref=None) -> List[Dict[str, Any]]:
197
+ """
198
+ Orchestrates Stage 0 (Filter) -> Stage 1 (Diagnose & Inject Limits).
199
+ """
200
+ # 1. Get High Quality Candidates
201
  candidates = await self._stage0_universe_filter()
202
  if not candidates: return []
203
 
204
+ print(f" 🧬 [Stage 1] Diagnosing 4H Regime for {len(candidates)} assets...")
205
+
206
+ # 2. Parallel Diagnosis
207
+ # We need to fetch 4H data for all of them.
208
+ # To avoid rate limits, we might batch them or use asyncio.gather with care.
209
+ # Assuming KuCoin handles ~80 requests quickly enough or ccxt throttles automatically.
210
+
211
+ tasks = [self._determine_4h_regime(c['symbol']) for c in candidates]
212
+ regime_results = await asyncio.gather(*tasks, return_exceptions=True)
213
+
214
  final_list = []
215
+ for i, res in enumerate(regime_results):
216
+ if isinstance(res, dict):
217
+ cand = candidates[i]
218
+ cand['asset_regime'] = res['regime']
219
+ cand['asset_regime_conf'] = res['conf']
220
+ cand['type'] = 'CANDIDATE'
221
+
222
+ # 3. INJECT DYNAMIC LIMITS
223
+ # This is where we break the "Global SystemLimits" dependency.
224
+ if adaptive_hub_ref:
225
+ # Get DNA for this specific asset's regime
226
+ dynamic_config = adaptive_hub_ref.get_regime_config(res['regime'])
227
+ cand['dynamic_limits'] = dynamic_config
228
+
229
+ final_list.append(cand)
230
+
231
+ print(f" -> [Stage 1] Prepared {len(final_list)} Context-Aware Candidates.")
232
  return final_list
233
 
234
+ # Keep helper methods...
 
 
235
  async def get_latest_price_async(self, symbol: str) -> float:
236
  try: return float((await self.exchange.fetch_ticker(symbol))['last'])
237
  except: return 0.0
238
  async def get_latest_ohlcv(self, symbol: str, timeframe: str = '5m', limit: int = 100) -> List[List[float]]:
239
  try: return await self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
240
  except: return []
241
+ def get_contracts_db(self): return self.contracts_db