Riy777 commited on
Commit
8b1c1fd
·
verified ·
1 Parent(s): b57fc28

Update ml_engine/data_manager.py

Browse files
Files changed (1) hide show
  1. ml_engine/data_manager.py +19 -9
ml_engine/data_manager.py CHANGED
@@ -1,6 +1,6 @@
1
  # ============================================================
2
  # 📂 ml_engine/data_manager.py
3
- # (V58.0 - GEM-Architect: Asset-Context Edition)
4
  # ============================================================
5
 
6
  import asyncio
@@ -28,8 +28,7 @@ class DataManager:
28
  self.contracts_db = contracts_db or {}
29
  self.whale_monitor = whale_monitor
30
  self.r2_service = r2_service
31
- # Pass the hub instance later or via global context if strictly necessary,
32
- # but better to handle regime logic internally or pass config.
33
  self.adaptive_hub_ref = None
34
 
35
  self.exchange = ccxt.kucoin({
@@ -47,7 +46,7 @@ class DataManager:
47
  'UP', 'DOWN', 'BEAR', 'BULL', '3S', '3L', 'USDD', 'USDP', 'HT', 'KCS'
48
  ]
49
 
50
- print(f"📦 [DataManager V58.0] Quality Gate & Context Engine Active.")
51
 
52
  async def initialize(self):
53
  self.http_client = httpx.AsyncClient(timeout=30.0)
@@ -64,6 +63,18 @@ class DataManager:
64
  if self.http_client: await self.http_client.aclose()
65
  if self.exchange: await self.exchange.close()
66
 
 
 
 
 
 
 
 
 
 
 
 
 
67
  # ==================================================================
68
  # 🛡️ Stage 0: The "Anti-Junk" Gate (Quality Control)
69
  # ==================================================================
@@ -204,10 +215,7 @@ class DataManager:
204
  print(f" 🧬 [Stage 1] Diagnosing 4H Regime for {len(candidates)} assets...")
205
 
206
  # 2. Parallel Diagnosis
207
- # We need to fetch 4H data for all of them.
208
- # To avoid rate limits, we might batch them or use asyncio.gather with care.
209
- # Assuming KuCoin handles ~80 requests quickly enough or ccxt throttles automatically.
210
-
211
  tasks = [self._determine_4h_regime(c['symbol']) for c in candidates]
212
  regime_results = await asyncio.gather(*tasks, return_exceptions=True)
213
 
@@ -238,4 +246,6 @@ class DataManager:
238
  async def get_latest_ohlcv(self, symbol: str, timeframe: str = '5m', limit: int = 100) -> List[List[float]]:
239
  try: return await self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
240
  except: return []
241
- def get_contracts_db(self): return self.contracts_db
 
 
 
1
  # ============================================================
2
  # 📂 ml_engine/data_manager.py
3
+ # (V58.1 - GEM-Architect: Asset-Context Edition + Fixed)
4
  # ============================================================
5
 
6
  import asyncio
 
28
  self.contracts_db = contracts_db or {}
29
  self.whale_monitor = whale_monitor
30
  self.r2_service = r2_service
31
+ # Pass the hub instance later via method arguments
 
32
  self.adaptive_hub_ref = None
33
 
34
  self.exchange = ccxt.kucoin({
 
46
  'UP', 'DOWN', 'BEAR', 'BULL', '3S', '3L', 'USDD', 'USDP', 'HT', 'KCS'
47
  ]
48
 
49
+ print(f"📦 [DataManager V58.1] Quality Gate & Context Engine Active.")
50
 
51
  async def initialize(self):
52
  self.http_client = httpx.AsyncClient(timeout=30.0)
 
63
  if self.http_client: await self.http_client.aclose()
64
  if self.exchange: await self.exchange.close()
65
 
66
+ # ✅ FIXED: Restored Missing Method for Startup Sequence
67
+ async def load_contracts_from_r2(self):
68
+ if not self.r2_service: return
69
+ try:
70
+ self.contracts_db = await self.r2_service.load_contracts_db_async()
71
+ print(f" 📂 [DataManager] Contracts DB loaded: {len(self.contracts_db)} items.")
72
+ except Exception as e:
73
+ print(f" ⚠️ [DataManager] Failed to load contracts: {e}")
74
+ self.contracts_db = {}
75
+
76
+ def get_contracts_db(self): return self.contracts_db
77
+
78
  # ==================================================================
79
  # 🛡️ Stage 0: The "Anti-Junk" Gate (Quality Control)
80
  # ==================================================================
 
215
  print(f" 🧬 [Stage 1] Diagnosing 4H Regime for {len(candidates)} assets...")
216
 
217
  # 2. Parallel Diagnosis
218
+ # Fetching 4H data for all candidates
 
 
 
219
  tasks = [self._determine_4h_regime(c['symbol']) for c in candidates]
220
  regime_results = await asyncio.gather(*tasks, return_exceptions=True)
221
 
 
246
  async def get_latest_ohlcv(self, symbol: str, timeframe: str = '5m', limit: int = 100) -> List[List[float]]:
247
  try: return await self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
248
  except: return []
249
+ async def get_order_book_snapshot(self, symbol: str, limit: int = 20) -> Dict[str, Any]:
250
+ try: return await self.exchange.fetch_order_book(symbol, limit)
251
+ except: return {}