Spaces:
Paused
Paused
File size: 5,501 Bytes
c1bc90c c1f9548 c1bc90c c1f9548 c1bc90c c1f9548 c1bc90c c1f9548 c1bc90c c1f9548 c1bc90c c1f9548 c1bc90c 3d4ebf5 c1bc90c c1f9548 c1bc90c c1f9548 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 | # ml_engine/data_manager.py
# (V13.2 - GEM-Architect: Added Order Book Capability)
import os
import asyncio
import httpx
import traceback
import ccxt.async_support as ccxt
import logging
from typing import List, Dict, Any
# إعدادات التسجيل
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
logging.getLogger("ccxt").setLevel(logging.WARNING)
class DataManager:
def __init__(self, contracts_db, whale_monitor, r2_service=None):
# ==================================================================
# ⚙️ إعدادات التحكم
# ==================================================================
self.contracts_db = contracts_db or {}
self.whale_monitor = whale_monitor
self.r2_service = r2_service
# إعداد المنصة (KuCoin)
self.exchange = ccxt.kucoin({
'enableRateLimit': True,
'timeout': 30000,
})
self.http_client = None
self.market_cache = {}
async def initialize(self):
"""تهيئة مدير البيانات والاتصالات"""
print(" > [DataManager] Starting initialization...")
self.http_client = httpx.AsyncClient(timeout=30.0)
await self._load_markets()
print(f"✅ [DataManager V13.2] Ready (Order Book Enabled).")
async def _load_markets(self):
"""تحميل بيانات الأسواق وتخزينها مؤقتاً"""
try:
if self.exchange:
await self.exchange.load_markets()
self.market_cache = self.exchange.markets
except Exception as e:
print(f"❌ [DataManager] Market load failed: {e}")
traceback.print_exc()
async def close(self):
"""إغلاق جميع الاتصالات بأمان"""
print(" > [DataManager] Closing connections...")
if self.http_client: await self.http_client.aclose()
if self.exchange: await self.exchange.close()
# ==================================================================
# 🚀 R2 Compatibility
# ==================================================================
async def load_contracts_from_r2(self):
if not self.r2_service: return
try:
self.contracts_db = await self.r2_service.load_contracts_db_async()
print(f"✅ [DataManager] Contracts loaded: {len(self.contracts_db)}")
except Exception:
self.contracts_db = {}
def get_contracts_db(self) -> Dict[str, Any]:
return self.contracts_db
# ==================================================================
# 🛡️ Layer 1 Screening (Data Filter)
# ==================================================================
async def layer1_rapid_screening(self) -> List[Dict[str, Any]]:
"""الغربلة الأولية السريعة بناءً على الحجم والسيولة."""
print(f"🔍 [Layer 1] Screening top liquid assets...")
volume_data = await self._get_volume_data_live()
if not volume_data:
return []
# نأخذ أعلى 150 عملة
candidates = volume_data[:150]
return candidates
async def _get_volume_data_live(self):
"""جلب وترتيب العملات حسب حجم التداول"""
try:
tickers = await self.exchange.fetch_tickers()
data = []
for symbol, ticker in tickers.items():
if symbol.endswith('/USDT') and ticker.get('quoteVolume') and ticker['quoteVolume'] > 100000:
data.append({
'symbol': symbol,
'dollar_volume': ticker['quoteVolume'],
'current_price': ticker['last']
})
data.sort(key=lambda x: x['dollar_volume'], reverse=True)
return data
except Exception:
return []
# ==================================================================
# 🎯 Data Fetching Helpers
# ==================================================================
async def get_latest_price_async(self, symbol: str) -> float:
try:
ticker = await self.exchange.fetch_ticker(symbol)
return float(ticker['last'])
except Exception:
return 0.0
async def get_latest_ohlcv(self, symbol: str, timeframe: str = '5m', limit: int = 100) -> List[List[float]]:
try:
candles = await self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
if candles: return candles
return []
except Exception:
return []
# 🔥 [NEW] Order Book Fetcher
async def get_order_book_snapshot(self, symbol: str, limit: int = 20) -> Dict[str, Any]:
"""
جلب لقطة سريعة لدفتر الطلبات (Bids/Asks).
يستخدم لتحليل عمق السوق وتأكيد القناص.
"""
try:
# ccxt يرجع الدفتر بصيغة {'bids': [[price, size], ...], 'asks': ...}
ob = await self.exchange.fetch_order_book(symbol, limit)
return ob
except Exception as e:
# طباعة خطأ خفيف لتجنب إغراق السجلات
# print(f"⚠️ [DataManager] Failed to fetch OB for {symbol}: {e}")
return {} |