| | |
| | |
| | |
| | |
| |
|
| | import asyncio |
| | import httpx |
| | import traceback |
| | import ccxt.async_support as ccxt |
| | import logging |
| | import pandas as pd |
| | import numpy as np |
| | from typing import List, Dict, Any, Optional |
| |
|
| | |
| | try: |
| | from ml_engine.processor import SystemLimits |
| | except ImportError: |
| | SystemLimits = None |
| |
|
| | |
| | logging.getLogger("httpx").setLevel(logging.WARNING) |
| | logging.getLogger("ccxt").setLevel(logging.WARNING) |
| |
|
| | class DataManager: |
| | def __init__(self, contracts_db, whale_monitor, r2_service=None): |
| | self.contracts_db = contracts_db or {} |
| | self.whale_monitor = whale_monitor |
| | self.r2_service = r2_service |
| | self.adaptive_hub_ref = None |
| | |
| | self.exchange = ccxt.kucoin({ |
| | 'enableRateLimit': True, |
| | 'timeout': 60000, |
| | 'options': {'defaultType': 'spot'} |
| | }) |
| | |
| | self.http_client = None |
| | self.market_cache = {} |
| | |
| | |
| | self.BLACKLIST_TOKENS = [ |
| | 'USDT', 'USDC', 'DAI', 'TUSD', 'BUSD', 'FDUSD', 'EUR', 'PAX', |
| | 'UP', 'DOWN', 'BEAR', 'BULL', '3S', '3L' |
| | ] |
| | |
| | print(f"📦 [DataManager V67.5] Initialized (Smart Breadth Scanner).") |
| |
|
| | async def initialize(self): |
| | print(" > [DataManager] Starting initialization...") |
| | self.http_client = httpx.AsyncClient(timeout=30.0) |
| | await self._load_markets() |
| | await self.load_contracts_from_r2() |
| |
|
| | async def _load_markets(self): |
| | try: |
| | if self.exchange: |
| | await self.exchange.load_markets() |
| | self.market_cache = self.exchange.markets |
| | except Exception: pass |
| |
|
| | async def close(self): |
| | if self.http_client: await self.http_client.aclose() |
| | if self.exchange: await self.exchange.close() |
| |
|
| | async def load_contracts_from_r2(self): |
| | if not self.r2_service: return |
| | try: |
| | self.contracts_db = await self.r2_service.load_contracts_db_async() |
| | except Exception: self.contracts_db = {} |
| |
|
| | def get_contracts_db(self) -> Dict[str, Any]: |
| | return self.contracts_db |
| |
|
| | |
| | |
| | |
| | async def check_global_market_health(self) -> Dict[str, Any]: |
| | """ |
| | يفحص صحة السوق العامة باستخدام منطق مزدوج: |
| | 1. فحص سلامة BTC (تجنب الانهيارات). |
| | 2. فحص نشاط العملات البديلة (Altcoin Pulse) للسماح بالتداول حتى لو كان BTC هادئاً. |
| | """ |
| | try: |
| | |
| | btc_ohlcv = await self.exchange.fetch_ohlcv('BTC/USDT', '1d', limit=30) |
| | if not btc_ohlcv: return {'is_safe': True, 'reason': 'No BTC Data - Bypassed'} |
| |
|
| | df = pd.DataFrame(btc_ohlcv, columns=['ts', 'o', 'h', 'l', 'c', 'v']) |
| | current_close = df['c'].iloc[-1] |
| | prev_close = df['c'].iloc[-2] |
| | |
| | |
| | |
| | daily_change = (current_close - prev_close) / prev_close |
| | if daily_change < -0.045: |
| | return {'is_safe': False, 'reason': f'🚨 BTC CRASHING ({daily_change*100:.2f}%)'} |
| |
|
| | |
| | sma20 = df['c'].rolling(20).mean().iloc[-1] |
| | if current_close < sma20 * 0.92: |
| | return {'is_safe': False, 'reason': '📉 Deep Bear Market (Risk Off)'} |
| |
|
| | |
| | |
| | |
| | avg_vol = df['v'].rolling(7).mean().iloc[-1] |
| | curr_vol = df['v'].iloc[-1] |
| | |
| | btc_is_dead = curr_vol < (avg_vol * 0.4) |
| | |
| | if btc_is_dead: |
| | |
| | print(" ⚠️ [Validator] BTC Volume Low.. Scanning Altcoin Pulse...") |
| | tickers = await self.exchange.fetch_tickers() |
| | |
| | green_coins = 0 |
| | pump_coins = 0 |
| | total_checked = 0 |
| | |
| | |
| | for sym, data in tickers.items(): |
| | if not sym.endswith('/USDT'): continue |
| | vol = float(data.get('quoteVolume') or 0) |
| | if vol < 1000000: continue |
| | |
| | change = float(data.get('percentage') or 0) |
| | total_checked += 1 |
| | |
| | if change > 1.0: green_coins += 1 |
| | if change > 5.0: pump_coins += 1 |
| | |
| | |
| | if pump_coins >= 3 or (total_checked > 0 and (green_coins / total_checked) > 0.4): |
| | return {'is_safe': True, 'reason': f'✅ Decoupled Alts Active ({pump_coins} Pumping)'} |
| | else: |
| | return {'is_safe': False, 'reason': '💤 Dead Market (BTC & Alts Flat)'} |
| |
|
| | return {'is_safe': True, 'reason': '✅ Market Healthy'} |
| |
|
| | except Exception as e: |
| | print(f"⚠️ [Market Validator] Error: {e}") |
| | return {'is_safe': True, 'reason': 'Error Bypass'} |
| |
|
| | |
| | |
| | |
| | async def layer1_rapid_screening(self, adaptive_hub_ref=None) -> List[Dict[str, Any]]: |
| | self.adaptive_hub_ref = adaptive_hub_ref |
| | print(f"🔍 [Layer 1] Screening Market (Smart Breadth)...") |
| | |
| | |
| | market_health = await self.check_global_market_health() |
| | |
| | if not market_health['is_safe']: |
| | print(f"⛔ [Market Validator] Trading Halted: {market_health['reason']}") |
| | return [] |
| | else: |
| | print(f" 🌍 [Market Validator] Status: {market_health['reason']}") |
| |
|
| | |
| | initial_candidates = await self._stage0_universe_filter() |
| | if not initial_candidates: |
| | print("⚠️ [Layer 1] Stage 0 returned 0 candidates.") |
| | return [] |
| |
|
| | |
| | top_candidates = initial_candidates[:600] |
| | enriched_data = await self._fetch_technical_data_batch(top_candidates) |
| | |
| | semi_final_list = [] |
| |
|
| | |
| | for item in enriched_data: |
| | classification = self._classify_opportunity_type(item) |
| | |
| | if classification['type'] != 'NONE': |
| | regime_info = self._diagnose_asset_regime(item) |
| | item['asset_regime'] = regime_info['regime'] |
| | item['asset_regime_conf'] = regime_info['conf'] |
| | |
| | item['strategy_type'] = classification['type'] |
| | item['l1_sort_score'] = classification['score'] |
| | item['strategy_tag'] = classification['type'] |
| | |
| | |
| | if regime_info['regime'] == 'DEAD' and classification['type'] == 'MOMENTUM_LAUNCH': |
| | if not classification.get('is_squeeze', False): |
| | continue |
| |
|
| | semi_final_list.append(item) |
| |
|
| | |
| | final_list = [] |
| | semi_final_list.sort(key=lambda x: x['l1_sort_score'], reverse=True) |
| | candidates_for_depth = semi_final_list[:300] |
| | |
| | if candidates_for_depth: |
| | print(f" 🛡️ [Layer 1.5] Checking Depth for {len(candidates_for_depth)} candidates...") |
| | |
| | for item in candidates_for_depth: |
| | |
| | if item['strategy_type'] in ['ACCUMULATION_SQUEEZE', 'SAFE_BOTTOM']: |
| | try: |
| | atr_val = item.get('atr_value', 0.0) |
| | curr_price = item.get('current_price', 0.0) |
| | |
| | if atr_val > 0 and curr_price > 0: |
| | range_2h = atr_val * 2.0 |
| | ob_score = await self._check_ob_pressure(item['symbol'], curr_price, range_2h) |
| | |
| | if ob_score > 0.6: |
| | item['l1_sort_score'] += 0.15 |
| | item['note'] = f"Strong Depth Support ({ob_score:.2f})" |
| | elif ob_score < 0.4: |
| | item['l1_sort_score'] -= 0.10 |
| | except Exception: pass |
| | |
| | |
| | if self.adaptive_hub_ref: |
| | coin_type = item.get('strategy_type', 'SAFE_BOTTOM') |
| | dynamic_config = self.adaptive_hub_ref.get_coin_type_config(coin_type) |
| | item['dynamic_limits'] = dynamic_config |
| |
|
| | final_list.append(item) |
| |
|
| | final_list.sort(key=lambda x: x['l1_sort_score'], reverse=True) |
| | selection = final_list[:300] |
| | |
| | print(f"✅ [Layer 1] Passed {len(selection)} active candidates.") |
| | return selection |
| |
|
| | |
| | |
| | |
| | async def _check_ob_pressure(self, symbol: str, current_price: float, price_range: float) -> float: |
| | try: |
| | ob = await self.exchange.fetch_order_book(symbol, limit=50) |
| | bids = ob['bids'] |
| | asks = ob['asks'] |
| | |
| | min_price = current_price - price_range |
| | max_price = current_price + price_range |
| | |
| | support_vol = 0.0 |
| | resistance_vol = 0.0 |
| | |
| | for p, v in bids: |
| | if p >= min_price: support_vol += v |
| | else: break |
| | |
| | for p, v in asks: |
| | if p <= max_price: resistance_vol += v |
| | else: break |
| | |
| | if (support_vol + resistance_vol) == 0: return 0.5 |
| | return support_vol / (support_vol + resistance_vol) |
| | except Exception: |
| | return 0.5 |
| |
|
| | |
| | |
| | |
| | def _classify_opportunity_type(self, data: Dict[str, Any]) -> Dict[str, Any]: |
| | try: |
| | df_1h = self._calc_indicators(data['ohlcv_1h_raw']) |
| | curr = df_1h.iloc[-1] |
| | data['atr_value'] = curr['atr'] |
| | except: return {'type': 'NONE', 'score': 0} |
| |
|
| | rsi = curr['rsi'] |
| | close = curr['close'] |
| | ema20 = curr['ema20'] |
| | ema50 = curr['ema50'] |
| | ema200 = curr['ema200'] if 'ema200' in curr else ema50 |
| | atr = curr['atr'] |
| | |
| | lower_bb = curr['lower_bb'] if 'lower_bb' in curr else (curr['ema20'] - (2*curr['atr'])) |
| | upper_bb = curr['upper_bb'] if 'upper_bb' in curr else (curr['ema20'] + (2*curr['atr'])) |
| | bb_width = (upper_bb - lower_bb) / curr['ema20'] if curr['ema20'] > 0 else 1.0 |
| |
|
| | |
| | volatility_pct = (atr / close) * 100 if close > 0 else 0 |
| | if volatility_pct < 0.3: return {'type': 'NONE', 'score': 0} |
| |
|
| | |
| | if rsi < 55: |
| | if close <= lower_bb * 1.08: |
| | score = (60 - rsi) / 20.0 |
| | return {'type': 'SAFE_BOTTOM', 'score': min(score, 1.0)} |
| |
|
| | |
| | elif 40 <= rsi <= 65: |
| | if bb_width < 0.18: |
| | score = 1.0 - (bb_width * 3.0) |
| | return {'type': 'ACCUMULATION_SQUEEZE', 'score': max(score, 0.5), 'is_squeeze': True} |
| |
|
| | |
| | elif 50 < rsi < 85: |
| | if close > ema50: |
| | dist_to_upper = (upper_bb - close) / close |
| | if dist_to_upper < 0.12: |
| | score = rsi / 100.0 |
| | return {'type': 'MOMENTUM_LAUNCH', 'score': score} |
| | |
| | |
| | if volatility_pct > 1.5: |
| | return {'type': 'SAFE_BOTTOM', 'score': 0.4} |
| |
|
| | return {'type': 'NONE', 'score': 0} |
| |
|
| | |
| | |
| | |
| | async def _stage0_universe_filter(self) -> List[Dict[str, Any]]: |
| | try: |
| | MIN_VOLUME_THRESHOLD = 1000000.0 |
| | |
| | print(f" 🛡️ [Stage 0] Fetching Tickers (Min Vol: ${MIN_VOLUME_THRESHOLD:,.0f})...") |
| | tickers = await self.exchange.fetch_tickers() |
| | candidates = [] |
| | |
| | SOVEREIGN_COINS = ['BTC/USDT', 'ETH/USDT', 'SOL/USDT', 'BNB/USDT', 'XRP/USDT'] |
| | reject_stats = {"volume": 0, "change": 0, "blacklist": 0} |
| |
|
| | for symbol, ticker in tickers.items(): |
| | if not symbol.endswith('/USDT'): continue |
| | |
| | base_curr = symbol.split('/')[0] |
| | if any(bad in base_curr for bad in self.BLACKLIST_TOKENS): |
| | reject_stats["blacklist"] += 1 |
| | continue |
| | |
| | base_vol = float(ticker.get('baseVolume') or 0.0) |
| | last_price = float(ticker.get('last') or 0.0) |
| | calc_quote_vol = base_vol * last_price |
| | |
| | is_sovereign = symbol in SOVEREIGN_COINS |
| | |
| | if not is_sovereign: |
| | if calc_quote_vol < MIN_VOLUME_THRESHOLD: |
| | reject_stats["volume"] += 1 |
| | continue |
| | |
| | change_pct = ticker.get('percentage') |
| | if change_pct is None: change_pct = 0.0 |
| | |
| | if abs(change_pct) > 35.0: |
| | reject_stats["change"] += 1 |
| | continue |
| | |
| | candidates.append({ |
| | 'symbol': symbol, |
| | 'quote_volume': calc_quote_vol, |
| | 'current_price': last_price, |
| | 'change_24h': change_pct |
| | }) |
| | |
| | candidates.sort(key=lambda x: x['quote_volume'], reverse=True) |
| | print(f" ℹ️ [Stage 0] Ignored {reject_stats['volume']} low-vol coins.") |
| | return candidates |
| | |
| | except Exception as e: |
| | print(f"❌ [L1 Error] Universe filter failed: {e}") |
| | traceback.print_exc() |
| | return [] |
| |
|
| | |
| | |
| | |
| | def _diagnose_asset_regime(self, item: Dict[str, Any]) -> Dict[str, Any]: |
| | try: |
| | if 'df_1h' not in item: |
| | if 'ohlcv_1h_raw' in item: |
| | item['df_1h'] = self._calc_indicators(item['ohlcv_1h_raw']) |
| | else: |
| | return {'regime': 'RANGE', 'conf': 0.0} |
| |
|
| | df = item['df_1h'] |
| | if df.empty: return {'regime': 'RANGE', 'conf': 0.0} |
| |
|
| | curr = df.iloc[-1] |
| | price = curr['close'] |
| | ema20 = curr['ema20'] |
| | ema50 = curr['ema50'] |
| | rsi = curr['rsi'] |
| | atr = curr['atr'] |
| | atr_pct = (atr / price) * 100 if price > 0 else 0 |
| | |
| | regime = "RANGE" |
| | conf = 0.5 |
| | |
| | if atr_pct < 0.4: return {'regime': 'DEAD', 'conf': 0.9} |
| | |
| | if price > ema20 and ema20 > ema50 and rsi > 50: |
| | regime = "BULL" |
| | conf = 0.8 if rsi > 55 else 0.6 |
| | elif price < ema20 and ema20 < ema50 and rsi < 50: |
| | regime = "BEAR" |
| | conf = 0.8 if rsi < 45 else 0.6 |
| | |
| | return {'regime': regime, 'conf': conf} |
| | except Exception: return {'regime': 'RANGE', 'conf': 0.0} |
| |
|
| | |
| | |
| | |
| | async def _fetch_technical_data_batch(self, candidates): |
| | chunk_size = 10; results = [] |
| | for i in range(0, len(candidates), chunk_size): |
| | chunk = candidates[i:i+chunk_size] |
| | tasks = [self._fetch_single(c) for c in chunk] |
| | res = await asyncio.gather(*tasks) |
| | results.extend([r for r in res if r]) |
| | await asyncio.sleep(0.05) |
| | return results |
| |
|
| | async def _fetch_single(self, c): |
| | try: |
| | h1 = await self.exchange.fetch_ohlcv(c['symbol'], '1h', limit=210) |
| | m15 = await self.exchange.fetch_ohlcv(c['symbol'], '15m', limit=60) |
| | if not h1 or not m15: return None |
| | c['ohlcv'] = {'1h': h1, '15m': m15} |
| | c['ohlcv_1h_raw'] = h1 |
| | c['ohlcv_15m_raw'] = m15 |
| | c['df_1h'] = self._calc_indicators(h1) |
| | return c |
| | except: return None |
| |
|
| | def _calc_indicators(self, ohlcv): |
| | df = pd.DataFrame(ohlcv, columns=['ts', 'o', 'h', 'l', 'c', 'v']) |
| | delta = df['c'].diff() |
| | gain = (delta.where(delta>0, 0)).rolling(14).mean() |
| | loss = (-delta.where(delta<0, 0)).rolling(14).mean() |
| | rs = gain/loss |
| | df['rsi'] = 100 - (100/(1+rs)) |
| | |
| | df['ema20'] = df['c'].ewm(span=20).mean() |
| | df['ema50'] = df['c'].ewm(span=50).mean() |
| | df['ema200'] = df['c'].ewm(span=200).mean() |
| | |
| | tr = np.maximum(df['h']-df['l'], np.maximum(abs(df['h']-df['c'].shift()), abs(df['l']-df['c'].shift()))) |
| | df['atr'] = tr.rolling(14).mean() |
| | |
| | std = df['c'].rolling(20).std() |
| | df['upper_bb'] = df['ema20'] + (2 * std) |
| | df['lower_bb'] = df['ema20'] - (2 * std) |
| | |
| | df.rename(columns={'o':'open', 'h':'high', 'l':'low', 'c':'close', 'v':'volume'}, inplace=True) |
| | return df.fillna(0) |
| | |
| | async def get_latest_price_async(self, symbol): |
| | try: return float((await self.exchange.fetch_ticker(symbol))['last']) |
| | except: return 0.0 |
| | |
| | async def get_latest_ohlcv(self, symbol, timeframe='5m', limit=100): |
| | try: return await self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit) |
| | except: return [] |
| |
|
| | async def get_order_book_snapshot(self, symbol, limit=20): |
| | try: return await self.exchange.fetch_order_book(symbol, limit) |
| | except: return {} |