Trad / ml_engine /data_manager.py
Riy777's picture
Update ml_engine/data_manager.py
f04d47e verified
# ==============================================================================
# 📂 ml_engine/data_manager.py
# (V77.0 - GEM-Architect: Smart Exchange Switcher)
# ==============================================================================
# - Feature: Auto-Failover (Binance -> Bybit -> OKX -> MEXC).
# - Logic: Automatically detects the best available exchange for the current IP.
# - Stability: Zero-config switch if one exchange is blocked (451 Error).
# ==============================================================================
import asyncio
import httpx
import traceback
import ccxt.async_support as ccxt
import logging
import pandas as pd
import numpy as np
import time
from typing import List, Dict, Any, Optional
# محاولة استيراد حدود النظام
try:
from ml_engine.processor import SystemLimits
except ImportError:
SystemLimits = None
# تقليل ضوضاء السجلات
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("ccxt").setLevel(logging.WARNING)
class DataManager:
def __init__(self, contracts_db, whale_monitor, r2_service=None):
self.contracts_db = contracts_db or {}
self.whale_monitor = whale_monitor
self.r2_service = r2_service
self.adaptive_hub_ref = None
self.exchange = None
self.exchange_id = None
self.http_client = None
self.market_cache = {}
# القائمة السوداء (العملات المستقرة والرافعة)
self.BLACKLIST_TOKENS = [
'USDT', 'USDC', 'DAI', 'TUSD', 'BUSD', 'FDUSD', 'EUR', 'PAX', 'USDP',
'UP', 'DOWN', 'BEAR', 'BULL', '3S', '3L', '5S', '5L'
]
# قائمة المنصات المرشحة بالترتيب (الأولوية للأعلى)
self.EXCHANGE_CANDIDATES = [
{'id': 'binance', 'name': 'Binance', 'options': {'adjustForTimeDifference': True}},
{'id': 'bybit', 'name': 'Bybit', 'options': {}},
{'id': 'okx', 'name': 'OKX', 'options': {}},
{'id': 'mexc', 'name': 'MEXC', 'options': {}}
]
print(f"📦 [DataManager V77.0] Initialized (Smart Switcher Ready).")
async def initialize(self):
"""تهيئة الاتصال واختيار أفضل منصة متاحة"""
print(" > [DataManager] Starting initialization & Exchange Discovery...")
self.http_client = httpx.AsyncClient(timeout=30.0)
# 🔄 تشغيل بروتوكول البحث عن منصة (Exchange Discovery Protocol)
await self._connect_to_best_exchange()
await self.load_contracts_from_r2()
async def _connect_to_best_exchange(self):
"""تجربة المنصات بالترتيب واعتماد أول منصة تعمل"""
for cand in self.EXCHANGE_CANDIDATES:
ex_id = cand['id']
ex_name = cand['name']
print(f" 🔄 Testing connection to {ex_name}...")
try:
exchange_class = getattr(ccxt, ex_id)
exchange_instance = exchange_class({
'enableRateLimit': True,
'timeout': 30000,
'options': {
'defaultType': 'spot',
**cand['options']
}
})
# اختبار الاتصال الفعلي
await exchange_instance.load_markets()
# اختبار إضافي: التحقق من وجود BTC/USDT لضمان صحة البيانات
if 'BTC/USDT' in exchange_instance.markets:
self.exchange = exchange_instance
self.exchange_id = ex_id
self.market_cache = self.exchange.markets
print(f" ✅ [SUCCESS] Connected to {ex_name} (Markets: {len(self.market_cache)}).")
return # تم العثور على منصة، الخروج من الحلقة
else:
print(f" ⚠️ [SKIP] {ex_name} connected but BTC/USDT missing (Unusual).")
await exchange_instance.close()
except Exception as e:
error_msg = str(e)
if "451" in error_msg:
print(f" ⛔ [BLOCKED] {ex_name} is restricted in this region (Error 451).")
else:
print(f" ❌ [FAIL] Could not connect to {ex_name}: {error_msg.splitlines()[0]}")
# تنظيف الموارد قبل الانتقال للتالي
if 'exchange_instance' in locals():
await exchange_instance.close()
# إذا فشلت جميع المنصات
if not self.exchange:
raise RuntimeError("❌ ALL Exchanges Failed! Please check your internet connection or VPN.")
async def close(self):
if self.http_client: await self.http_client.aclose()
if self.exchange: await self.exchange.close()
async def load_contracts_from_r2(self):
if not self.r2_service: return
try:
self.contracts_db = await self.r2_service.load_contracts_db_async()
except Exception: self.contracts_db = {}
def get_contracts_db(self) -> Dict[str, Any]:
return self.contracts_db
# ==================================================================
# 🌍 Global Market Validator
# ==================================================================
async def check_global_market_health(self) -> Dict[str, Any]:
if not self.exchange: return {'is_safe': False, 'reason': 'No Exchange'}
try:
btc_ohlcv = await self.exchange.fetch_ohlcv('BTC/USDT', '1d', limit=7)
if not btc_ohlcv: return {'is_safe': True, 'reason': 'No BTC Data'}
df = pd.DataFrame(btc_ohlcv, columns=['ts', 'o', 'h', 'l', 'c', 'v'])
daily_change = (df['c'].iloc[-1] - df['c'].iloc[-2]) / df['c'].iloc[-2]
if daily_change < -0.10:
return {'is_safe': False, 'reason': f'🚨 BTC CRASHING ({daily_change*100:.2f}%)'}
return {'is_safe': True, 'reason': '✅ Market Open'}
except Exception as e:
return {'is_safe': True, 'reason': 'Error Bypass'}
# ==================================================================
# 🧠 Layer 1: The Open Gate (Volume Only)
# ==================================================================
async def layer1_rapid_screening(self, limit=200, adaptive_hub_ref=None) -> List[Dict[str, Any]]:
self.adaptive_hub_ref = adaptive_hub_ref
if not self.exchange:
print("❌ [Layer 1] No active exchange connection.")
return []
print(f"🔍 [Layer 1] Screening {self.exchange_id.upper()} Market (Volume Only > $1M)...")
# 0. فحص السوق
market_health = await self.check_global_market_health()
if not market_health['is_safe']:
print(f"⛔ [Market Validator] Trading Halted: {market_health['reason']}")
return []
# 1. فلتر السيولة الصارم
initial_candidates = await self._stage0_universe_filter()
if not initial_candidates:
print(f"⚠️ [Layer 1] No coins met the criteria on {self.exchange_id}.")
return []
# نأخذ أعلى العملات سيولة
top_candidates = initial_candidates[:limit]
print(f" 📥 Fetching data for top {len(top_candidates)} liquid assets...")
enriched_data = await self._fetch_technical_data_batch(top_candidates)
final_list = []
for item in enriched_data:
regime_info = self._diagnose_asset_regime(item)
item['asset_regime'] = regime_info['regime']
item['asset_regime_conf'] = regime_info['conf']
item['strategy_type'] = 'NEURAL_SCAN'
item['strategy_tag'] = 'NEURAL'
item['l1_sort_score'] = 0.5
if self.adaptive_hub_ref:
dynamic_config = self.adaptive_hub_ref.get_coin_type_config('SAFE_BOTTOM')
item['dynamic_limits'] = dynamic_config
final_list.append(item)
final_list.sort(key=lambda x: x.get('quote_volume', 0), reverse=True)
print(f"✅ [Layer 1] Passed {len(final_list)} candidates directly to Neural Layer.")
return final_list
# ==================================================================
# 🧱 Order Book Depth Scanner
# ==================================================================
async def get_order_book_snapshot(self, symbol: str, limit=20):
try: return await self.exchange.fetch_order_book(symbol, limit)
except: return {}
# ==================================================================
# 🔍 Stage 0: Universe Filter (Multi-Exchange Compatible)
# ==================================================================
async def _stage0_universe_filter(self) -> List[Dict[str, Any]]:
try:
MIN_VOLUME_THRESHOLD = 1000000.0 # $1M
print(f" 🛡️ [Stage 0] Fetching Tickers from {self.exchange_id.upper()}...")
tickers = await self.exchange.fetch_tickers()
candidates = []
SOVEREIGN_COINS = ['BTC/USDT', 'ETH/USDT', 'SOL/USDT', 'BNB/USDT', 'XRP/USDT']
for symbol, ticker in tickers.items():
if not symbol.endswith('/USDT'): continue
# إقصاء العملات المحظورة
base_curr = symbol.split('/')[0]
if any(bad in base_curr for bad in self.BLACKLIST_TOKENS): continue
# توحيد حساب الفوليوم (لأن المنصات تختلف في التسميات)
calc_quote_vol = float(ticker.get('quoteVolume') or 0.0)
if calc_quote_vol == 0.0:
base_vol = float(ticker.get('baseVolume') or 0.0)
last_price = float(ticker.get('last') or 0.0)
calc_quote_vol = base_vol * last_price
is_sovereign = symbol in SOVEREIGN_COINS
if not is_sovereign:
if calc_quote_vol < MIN_VOLUME_THRESHOLD: continue
change_pct = ticker.get('percentage')
if change_pct is None: change_pct = 0.0
candidates.append({
'symbol': symbol,
'quote_volume': calc_quote_vol,
'current_price': float(ticker.get('last') or 0.0),
'change_24h': change_pct
})
candidates.sort(key=lambda x: x['quote_volume'], reverse=True)
print(f" ℹ️ [Stage 0] Found {len(candidates)} valid candidates.")
return candidates
except Exception as e:
print(f"❌ [L1 Error] Universe filter failed: {e}")
traceback.print_exc()
return []
# ------------------------------------------------------------------
# Helpers & Data Fetching
# ------------------------------------------------------------------
def _diagnose_asset_regime(self, item: Dict[str, Any]) -> Dict[str, Any]:
try:
if 'df_1h' not in item:
if 'ohlcv_1h_raw' in item: item['df_1h'] = self._calc_indicators(item['ohlcv_1h_raw'])
else: return {'regime': 'RANGE', 'conf': 0.0}
df = item['df_1h']
if df.empty: return {'regime': 'RANGE', 'conf': 0.0}
curr = df.iloc[-1]
price = curr['close']; ema20 = curr['ema20']; ema50 = curr['ema50']; rsi = curr['rsi']
regime = "RANGE"; conf = 0.5
if price > ema20 and ema20 > ema50:
regime = "BULL"; conf = 0.8 if rsi > 55 else 0.6
elif price < ema20 and ema20 < ema50:
regime = "BEAR"; conf = 0.8 if rsi < 45 else 0.6
return {'regime': regime, 'conf': conf}
except Exception: return {'regime': 'RANGE', 'conf': 0.0}
async def _fetch_technical_data_batch(self, candidates):
chunk_size = 10; results = []
for i in range(0, len(candidates), chunk_size):
chunk = candidates[i:i+chunk_size]
tasks = [self._fetch_single(c) for c in chunk]
res = await asyncio.gather(*tasks)
results.extend([r for r in res if r])
await asyncio.sleep(0.05)
return results
async def _fetch_single(self, c):
try:
h1 = await self.exchange.fetch_ohlcv(c['symbol'], '1h', limit=100)
m15 = await self.exchange.fetch_ohlcv(c['symbol'], '15m', limit=50)
if not h1 or not m15: return None
c['ohlcv'] = {'1h': h1, '15m': m15}
c['ohlcv_1h_raw'] = h1
c['ohlcv_15m_raw'] = m15
c['df_1h'] = self._calc_indicators(h1)
return c
except: return None
def _calc_indicators(self, ohlcv):
df = pd.DataFrame(ohlcv, columns=['ts', 'o', 'h', 'l', 'c', 'v'])
delta = df['c'].diff()
gain = (delta.where(delta>0, 0)).rolling(14).mean()
loss = (-delta.where(delta<0, 0)).rolling(14).mean()
rs = gain/loss
df['rsi'] = 100 - (100/(1+rs))
df['ema20'] = df['c'].ewm(span=20).mean()
df['ema50'] = df['c'].ewm(span=50).mean()
tr = np.maximum(df['h']-df['l'], np.maximum(abs(df['h']-df['c'].shift()), abs(df['l']-df['c'].shift())))
df['atr'] = tr.rolling(14).mean()
df.rename(columns={'o':'open', 'h':'high', 'l':'low', 'c':'close', 'v':'volume'}, inplace=True)
return df.fillna(0)
async def get_latest_price_async(self, symbol):
try: return float((await self.exchange.fetch_ticker(symbol))['last'])
except: return 0.0
async def get_latest_ohlcv(self, symbol, timeframe='5m', limit=100):
try: return await self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
except: return []