import os import asyncio import httpx import json from datetime import datetime import ccxt.pro as ccxt from ccxt.base.errors import RateLimitExceeded, DDoSProtection import pandas as pd import pandas_ta as ta import time # Placeholder for future on-chain/sentiment data fetching def get_on_chain_data(symbol): print(f"🚧 Placeholder: Fetching on-chain data for {symbol}...") return {} def get_sentiment_data(symbol): print(f"🚧 Placeholder: Fetching sentiment data for {symbol}...") return {} class DataManager: """Manages all data fetching and processing.""" def __init__(self, contracts_db): self.contracts_db = contracts_db self.exchange = ccxt.kucoin() self.exchange.rateLimit = 1000 # Adjusted rateLimit to 1 second self._whale_data_cache = {} self.http_client = None async def initialize(self): """Initialize async HTTP client.""" self.http_client = httpx.AsyncClient(timeout=30.0) async def close(self): """Close connections properly.""" if self.http_client: await self.http_client.aclose() await self.exchange.close() async def get_market_context_async(self): """Fetches and compiles data on market health from various sources.""" try: if not self.http_client: await self.initialize() top_coins_response = await self.http_client.get( 'https://api.coingecko.com/api/v3/simple/price?ids=bitcoin,ethereum&vs_currencies=usd' ) top_coins_data = top_coins_response.json() fng_response = await self.http_client.get('https://api.alternative.me/fng/') fng_data = fng_response.json() market_context = { 'timestamp': datetime.now().isoformat(), 'bitcoin_price_usd': top_coins_data.get('bitcoin', {}).get('usd'), 'ethereum_price_usd': top_coins_data.get('ethereum', {}).get('usd'), 'fear_and_greed_index': fng_data['data'][0]['value'] if fng_data.get('data') else None } fng_value = int(market_context['fear_and_greed_index']) if market_context['fear_and_greed_index'] else 50 if fng_value >= 70: market_context['sentiment'] = 'EXTREME_GREED' elif fng_value >= 50: market_context['sentiment'] = 'GREED' elif fng_value > 30: market_context['sentiment'] = 'NEUTRAL' elif fng_value >= 15: market_context['sentiment'] = 'FEAR' else: market_context['sentiment'] = 'EXTREME_FEAR' btc_price = market_context.get('bitcoin_price_usd', 0) if btc_price > 60000: market_context['btc_sentiment'] = 'BULLISH' elif btc_price < 55000: market_context['btc_sentiment'] = 'BEARISH' else: market_context['btc_sentiment'] = 'NEUTRAL' print("📊 Market context fetched successfully.") return market_context except httpx.TimeoutException: print("⏱️ Timeout fetching market context") return None except Exception as e: print(f"❌ Failed to get market context: {e}") return None async def get_top_symbols(self, n): """Fetches the top N symbols by volume.""" try: async with asyncio.timeout(60): markets = await self.exchange.fetch_tickers() usdt_markets = {symbol: data for symbol, data in markets.items() if '/USDT' in symbol} sorted_by_volume = sorted(usdt_markets.items(), key=lambda x: x[1]['quoteVolume'], reverse=True) top_symbols = [symbol for symbol, data in sorted_by_volume[:n]] print(f"✔️ Found {len(top_symbols)} top symbols from KuCoin.") return top_symbols except asyncio.TimeoutError: print("⏱️ Timeout fetching top symbols") return [] except (RateLimitExceeded, DDoSProtection) as e: print(f"❌ Rate limit exceeded while fetching symbols: {e}") await asyncio.sleep(60) return [] except Exception as e: print(f"❌ Failed to fetch top symbols: {e}") return [] async def fetch_ohlcv_with_retry(self, symbol, timeframe, limit, retries=3): """Fetch OHLCV data with timeout and retry logic.""" for i in range(retries): try: async with asyncio.timeout(30): return await self.exchange.fetch_ohlcv(symbol, timeframe, limit=500) except asyncio.TimeoutError: print(f"⏱️ Timeout for {symbol} on {timeframe}. Retry {i+1}/{retries}") if i < retries - 1: await asyncio.sleep(5) except (RateLimitExceeded, DDoSProtection) as e: print(f"⚠️ Rate limit for {symbol} on {timeframe}. Waiting 90s...") await asyncio.sleep(90) if i < retries - 1: continue except Exception as e: print(f"❌ Failed to fetch {symbol} on {timeframe}: {e}") return None return None async def get_fast_pass_data_async(self, symbols): """Fetches OHLCV data with concurrent limiting and progress tracking.""" timeframes = ['1h', '4h', '1d', '1w'] data = [] total = len(symbols) completed = 0 success = 0 failed = 0 semaphore = asyncio.Semaphore(4) # Increased concurrency to 4 async def fetch_symbol_data(symbol, index): nonlocal completed, success, failed async with semaphore: try: print(f"⏳ [{index+1}/{total}] Fetching {symbol}...") ohlcv_data = {} timeframes_fetched = 0 for timeframe in timeframes: candles = await self.fetch_ohlcv_with_retry(symbol, timeframe, limit=500) if candles: ohlcv_data[timeframe] = candles timeframes_fetched += 1 else: ohlcv_data[timeframe] = [] # Add a short delay to mitigate rate limits await asyncio.sleep(0.25) completed += 1 if any(ohlcv_data.values()): success += 1 print(f"✅ [{index+1}/{total}] {symbol} - {timeframes_fetched}/4 timeframes | Progress: {completed}/{total} ({int(completed/total*100)}%)") return { 'symbol': symbol, 'ohlcv': ohlcv_data, } else: failed += 1 print(f"⚠️ [{index+1}/{total}] {symbol} - No data | Progress: {completed}/{total} ({int(completed/total*100)}%)") return None except Exception as e: completed += 1 failed += 1 print(f"❌ [{index+1}/{total}] {symbol} - Error: {str(e)[:50]} | Progress: {completed}/{total} ({int(completed/total*100)}%)") return None print(f"\n{'='*70}") print(f"📊 Starting data fetch for {total} symbols") print(f"{'='*70}\n") tasks = [fetch_symbol_data(symbol, i) for i, symbol in enumerate(symbols)] results = await asyncio.gather(*tasks, return_exceptions=True) for result in results: if isinstance(result, Exception): print(f"❌ Exception in fetch task: {result}") elif result is not None: data.append(result) print(f"\n{'='*70}") print(f"✅ Data fetching complete!") print(f" Total: {total} | Success: {success} | Failed: {failed}") print(f" Success Rate: {int(success/total*100)}%") print(f"{'='*70}\n") return data async def fetch_and_update_contracts_db_async(self): """Fetches the contracts DB from R2 or initializes an empty one.""" print("💾 Using local contracts database.") return {} class FeatureProcessor: """Processes raw data into tradable features and scores opportunities.""" def __init__(self, market_context): self.market_context = market_context def _calculate_indicators(self, df): """Calculates key technical indicators for a given DataFrame.""" if len(df) < 20: return None, None df['rsi'] = ta.rsi(df['close'], length=14) macd = ta.macd(df['close']) df['macd'] = macd['MACDh_12_26_9'] df['macd_signal'] = macd['MACD_12_26_9'] return df['rsi'].iloc[-1], df['macd'].iloc[-1], df['macd_signal'].iloc[-1] def _calculate_liquidity_score(self, df_1h): """Calculates a simple liquidity score based on volume and price.""" if df_1h.empty: return 0 df_1h['dollar_volume'] = df_1h['volume'] * df_1h['close'] avg_dollar_volume = df_1h['dollar_volume'].mean() return avg_dollar_volume def _calculate_fib_levels(self, df_1d): """Calculates Fibonacci retracement levels based on a price swing.""" if len(df_1d) < 50: return {} # Find the highest high and lowest low in the last 50 days recent_high = df_1d['high'].iloc[-50:].max() recent_low = df_1d['low'].iloc[-50:].min() diff = recent_high - recent_low fib_levels = { "0.0%": recent_high, "23.6%": recent_high - 0.236 * diff, "38.2%": recent_high - 0.382 * diff, "50.0%": recent_high - 0.50 * diff, "61.8%": recent_high - 0.618 * diff, "78.6%": recent_high - 0.786 * diff, "100.0%": recent_low } return fib_levels def process_and_score_symbol(self, raw_data): """ Processes raw market data, calculates features, and assigns a final score. """ symbol = raw_data['symbol'] ohlcv = raw_data['ohlcv'] if not ohlcv.get('1d') or not ohlcv.get('1h'): return None try: df_1h = pd.DataFrame(ohlcv['1h'], columns=['time', 'open', 'high', 'low', 'close', 'volume']) df_1d = pd.DataFrame(ohlcv['1d'], columns=['time', 'open', 'high', 'low', 'close', 'volume']) df_1h[['open', 'high', 'low', 'close', 'volume']] = df_1h[['open', 'high', 'low', 'close', 'volume']].astype(float) df_1d[['open', 'high', 'low', 'close', 'volume']] = df_1d[['open', 'high', 'low', 'close', 'volume']].astype(float) rsi, macd, macd_signal = self._calculate_indicators(df_1h) if rsi is None: return None liquidity_score = self._calculate_liquidity_score(df_1h) avg_daily_volume = df_1d['volume'].mean() fib_levels = self._calculate_fib_levels(df_1d) # --- Scoring Logic --- score = 0.5 # Base score if rsi < 50: score += 0.15 if macd > 0 and macd_signal > 0 and macd > macd_signal: score += 0.25 # Simple bonus for bullish momentum if df_1h['close'].iloc[-1] > df_1h['close'].iloc[-2]: score += 0.1 # Additional scores based on conditions if rsi > 70 or rsi < 30: score += 0.2 if liquidity_score > 100000: score += 0.2 if avg_daily_volume > 500000: score += 0.2 if self.market_context: if self.market_context['btc_sentiment'] == 'BULLISH': score *= 1.1 elif self.market_context['btc_sentiment'] == 'BEARISH': score *= 0.9 return { 'symbol': symbol, 'current_price': df_1h['close'].iloc[-1], 'features': { 'rsi': float(rsi), 'macd': float(macd), 'macd_signal': float(macd_signal), 'liquidity_score': float(liquidity_score), 'avg_daily_volume': float(avg_daily_volume) }, 'fibonacci_levels': fib_levels, 'final_score': score } except KeyError as e: print(f"⚠️ Missing key in data for {raw_data.get('symbol', 'unknown')}: {e}") return None except Exception as e: print(f"❌ Failed to process {raw_data.get('symbol', 'unknown')}: {e}") return None def filter_top_candidates(self, candidates, n=10): """Filters the top N candidates based on their final score.""" valid_candidates = [c for c in candidates if c is not None] return sorted(valid_candidates, key=lambda x: x['final_score'], reverse=True)[:n]