Tradtesting / ml_engine /data_manager.py
Riy777's picture
Update ml_engine/data_manager.py
c1f9548 verified
raw
history blame
5.5 kB
# ml_engine/data_manager.py
# (V13.2 - GEM-Architect: Added Order Book Capability)
import os
import asyncio
import httpx
import traceback
import ccxt.async_support as ccxt
import logging
from typing import List, Dict, Any
# إعدادات التسجيل
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
logging.getLogger("ccxt").setLevel(logging.WARNING)
class DataManager:
def __init__(self, contracts_db, whale_monitor, r2_service=None):
# ==================================================================
# ⚙️ إعدادات التحكم
# ==================================================================
self.contracts_db = contracts_db or {}
self.whale_monitor = whale_monitor
self.r2_service = r2_service
# إعداد المنصة (KuCoin)
self.exchange = ccxt.kucoin({
'enableRateLimit': True,
'timeout': 30000,
})
self.http_client = None
self.market_cache = {}
async def initialize(self):
"""تهيئة مدير البيانات والاتصالات"""
print(" > [DataManager] Starting initialization...")
self.http_client = httpx.AsyncClient(timeout=30.0)
await self._load_markets()
print(f"✅ [DataManager V13.2] Ready (Order Book Enabled).")
async def _load_markets(self):
"""تحميل بيانات الأسواق وتخزينها مؤقتاً"""
try:
if self.exchange:
await self.exchange.load_markets()
self.market_cache = self.exchange.markets
except Exception as e:
print(f"❌ [DataManager] Market load failed: {e}")
traceback.print_exc()
async def close(self):
"""إغلاق جميع الاتصالات بأمان"""
print(" > [DataManager] Closing connections...")
if self.http_client: await self.http_client.aclose()
if self.exchange: await self.exchange.close()
# ==================================================================
# 🚀 R2 Compatibility
# ==================================================================
async def load_contracts_from_r2(self):
if not self.r2_service: return
try:
self.contracts_db = await self.r2_service.load_contracts_db_async()
print(f"✅ [DataManager] Contracts loaded: {len(self.contracts_db)}")
except Exception:
self.contracts_db = {}
def get_contracts_db(self) -> Dict[str, Any]:
return self.contracts_db
# ==================================================================
# 🛡️ Layer 1 Screening (Data Filter)
# ==================================================================
async def layer1_rapid_screening(self) -> List[Dict[str, Any]]:
"""الغربلة الأولية السريعة بناءً على الحجم والسيولة."""
print(f"🔍 [Layer 1] Screening top liquid assets...")
volume_data = await self._get_volume_data_live()
if not volume_data:
return []
# نأخذ أعلى 150 عملة
candidates = volume_data[:150]
return candidates
async def _get_volume_data_live(self):
"""جلب وترتيب العملات حسب حجم التداول"""
try:
tickers = await self.exchange.fetch_tickers()
data = []
for symbol, ticker in tickers.items():
if symbol.endswith('/USDT') and ticker.get('quoteVolume') and ticker['quoteVolume'] > 100000:
data.append({
'symbol': symbol,
'dollar_volume': ticker['quoteVolume'],
'current_price': ticker['last']
})
data.sort(key=lambda x: x['dollar_volume'], reverse=True)
return data
except Exception:
return []
# ==================================================================
# 🎯 Data Fetching Helpers
# ==================================================================
async def get_latest_price_async(self, symbol: str) -> float:
try:
ticker = await self.exchange.fetch_ticker(symbol)
return float(ticker['last'])
except Exception:
return 0.0
async def get_latest_ohlcv(self, symbol: str, timeframe: str = '5m', limit: int = 100) -> List[List[float]]:
try:
candles = await self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
if candles: return candles
return []
except Exception:
return []
# 🔥 [NEW] Order Book Fetcher
async def get_order_book_snapshot(self, symbol: str, limit: int = 20) -> Dict[str, Any]:
"""
جلب لقطة سريعة لدفتر الطلبات (Bids/Asks).
يستخدم لتحليل عمق السوق وتأكيد القناص.
"""
try:
# ccxt يرجع الدفتر بصيغة {'bids': [[price, size], ...], 'asks': ...}
ob = await self.exchange.fetch_order_book(symbol, limit)
return ob
except Exception as e:
# طباعة خطأ خفيف لتجنب إغراق السجلات
# print(f"⚠️ [DataManager] Failed to fetch OB for {symbol}: {e}")
return {}