| | import os |
| | import asyncio |
| | import httpx |
| | import json |
| | from datetime import datetime |
| | import ccxt.pro as ccxt |
| | from ccxt.base.errors import RateLimitExceeded, DDoSProtection |
| | import pandas as pd |
| | import pandas_ta as ta |
| | import time |
| |
|
| | |
| | def get_on_chain_data(symbol): |
| | print(f"🚧 Placeholder: Fetching on-chain data for {symbol}...") |
| | return {} |
| |
|
| | def get_sentiment_data(symbol): |
| | print(f"🚧 Placeholder: Fetching sentiment data for {symbol}...") |
| | return {} |
| |
|
| | class DataManager: |
| | """Manages all data fetching and processing.""" |
| | def __init__(self, contracts_db): |
| | self.contracts_db = contracts_db |
| | self.exchange = ccxt.kucoin() |
| | self.exchange.rateLimit = 1000 |
| | self._whale_data_cache = {} |
| | self.http_client = None |
| |
|
| | async def initialize(self): |
| | """Initialize async HTTP client.""" |
| | self.http_client = httpx.AsyncClient(timeout=30.0) |
| |
|
| | async def close(self): |
| | """Close connections properly.""" |
| | if self.http_client: |
| | await self.http_client.aclose() |
| | await self.exchange.close() |
| |
|
| | async def get_market_context_async(self): |
| | """Fetches and compiles data on market health from various sources.""" |
| | try: |
| | if not self.http_client: |
| | await self.initialize() |
| | top_coins_response = await self.http_client.get( |
| | 'https://api.coingecko.com/api/v3/simple/price?ids=bitcoin,ethereum&vs_currencies=usd' |
| | ) |
| | top_coins_data = top_coins_response.json() |
| | fng_response = await self.http_client.get('https://api.alternative.me/fng/') |
| | fng_data = fng_response.json() |
| | market_context = { |
| | 'timestamp': datetime.now().isoformat(), |
| | 'bitcoin_price_usd': top_coins_data.get('bitcoin', {}).get('usd'), |
| | 'ethereum_price_usd': top_coins_data.get('ethereum', {}).get('usd'), |
| | 'fear_and_greed_index': fng_data['data'][0]['value'] if fng_data.get('data') else None |
| | } |
| | fng_value = int(market_context['fear_and_greed_index']) if market_context['fear_and_greed_index'] else 50 |
| | if fng_value >= 70: |
| | market_context['sentiment'] = 'EXTREME_GREED' |
| | elif fng_value >= 50: |
| | market_context['sentiment'] = 'GREED' |
| | elif fng_value > 30: |
| | market_context['sentiment'] = 'NEUTRAL' |
| | elif fng_value >= 15: |
| | market_context['sentiment'] = 'FEAR' |
| | else: |
| | market_context['sentiment'] = 'EXTREME_FEAR' |
| | btc_price = market_context.get('bitcoin_price_usd', 0) |
| | if btc_price > 60000: |
| | market_context['btc_sentiment'] = 'BULLISH' |
| | elif btc_price < 55000: |
| | market_context['btc_sentiment'] = 'BEARISH' |
| | else: |
| | market_context['btc_sentiment'] = 'NEUTRAL' |
| | print("📊 Market context fetched successfully.") |
| | return market_context |
| | except httpx.TimeoutException: |
| | print("⏱️ Timeout fetching market context") |
| | return None |
| | except Exception as e: |
| | print(f"❌ Failed to get market context: {e}") |
| | return None |
| |
|
| | async def get_top_symbols(self, n): |
| | """Fetches the top N symbols by volume.""" |
| | try: |
| | async with asyncio.timeout(60): |
| | markets = await self.exchange.fetch_tickers() |
| | usdt_markets = {symbol: data for symbol, data in markets.items() if '/USDT' in symbol} |
| | sorted_by_volume = sorted(usdt_markets.items(), key=lambda x: x[1]['quoteVolume'], reverse=True) |
| | top_symbols = [symbol for symbol, data in sorted_by_volume[:n]] |
| | print(f"✔️ Found {len(top_symbols)} top symbols from KuCoin.") |
| | return top_symbols |
| | except asyncio.TimeoutError: |
| | print("⏱️ Timeout fetching top symbols") |
| | return [] |
| | except (RateLimitExceeded, DDoSProtection) as e: |
| | print(f"❌ Rate limit exceeded while fetching symbols: {e}") |
| | await asyncio.sleep(60) |
| | return [] |
| | except Exception as e: |
| | print(f"❌ Failed to fetch top symbols: {e}") |
| | return [] |
| | |
| | async def fetch_ohlcv_with_retry(self, symbol, timeframe, limit, retries=3): |
| | """Fetch OHLCV data with timeout and retry logic.""" |
| | for i in range(retries): |
| | try: |
| | async with asyncio.timeout(30): |
| | return await self.exchange.fetch_ohlcv(symbol, timeframe, limit=500) |
| | except asyncio.TimeoutError: |
| | print(f"⏱️ Timeout for {symbol} on {timeframe}. Retry {i+1}/{retries}") |
| | if i < retries - 1: |
| | await asyncio.sleep(5) |
| | except (RateLimitExceeded, DDoSProtection) as e: |
| | print(f"⚠️ Rate limit for {symbol} on {timeframe}. Waiting 90s...") |
| | await asyncio.sleep(90) |
| | if i < retries - 1: |
| | continue |
| | except Exception as e: |
| | print(f"❌ Failed to fetch {symbol} on {timeframe}: {e}") |
| | return None |
| | return None |
| |
|
| | async def get_fast_pass_data_async(self, symbols): |
| | """Fetches OHLCV data with concurrent limiting and progress tracking.""" |
| | timeframes = ['1h', '4h', '1d', '1w'] |
| | data = [] |
| | total = len(symbols) |
| | completed = 0 |
| | success = 0 |
| | failed = 0 |
| | semaphore = asyncio.Semaphore(4) |
| | async def fetch_symbol_data(symbol, index): |
| | nonlocal completed, success, failed |
| | async with semaphore: |
| | try: |
| | print(f"⏳ [{index+1}/{total}] Fetching {symbol}...") |
| | ohlcv_data = {} |
| | timeframes_fetched = 0 |
| | for timeframe in timeframes: |
| | candles = await self.fetch_ohlcv_with_retry(symbol, timeframe, limit=500) |
| | if candles: |
| | ohlcv_data[timeframe] = candles |
| | timeframes_fetched += 1 |
| | else: |
| | ohlcv_data[timeframe] = [] |
| | |
| | await asyncio.sleep(0.25) |
| | completed += 1 |
| | if any(ohlcv_data.values()): |
| | success += 1 |
| | print(f"✅ [{index+1}/{total}] {symbol} - {timeframes_fetched}/4 timeframes | Progress: {completed}/{total} ({int(completed/total*100)}%)") |
| | return { |
| | 'symbol': symbol, |
| | 'ohlcv': ohlcv_data, |
| | } |
| | else: |
| | failed += 1 |
| | print(f"⚠️ [{index+1}/{total}] {symbol} - No data | Progress: {completed}/{total} ({int(completed/total*100)}%)") |
| | return None |
| | except Exception as e: |
| | completed += 1 |
| | failed += 1 |
| | print(f"❌ [{index+1}/{total}] {symbol} - Error: {str(e)[:50]} | Progress: {completed}/{total} ({int(completed/total*100)}%)") |
| | return None |
| | print(f"\n{'='*70}") |
| | print(f"📊 Starting data fetch for {total} symbols") |
| | print(f"{'='*70}\n") |
| | tasks = [fetch_symbol_data(symbol, i) for i, symbol in enumerate(symbols)] |
| | results = await asyncio.gather(*tasks, return_exceptions=True) |
| | for result in results: |
| | if isinstance(result, Exception): |
| | print(f"❌ Exception in fetch task: {result}") |
| | elif result is not None: |
| | data.append(result) |
| | print(f"\n{'='*70}") |
| | print(f"✅ Data fetching complete!") |
| | print(f" Total: {total} | Success: {success} | Failed: {failed}") |
| | print(f" Success Rate: {int(success/total*100)}%") |
| | print(f"{'='*70}\n") |
| | return data |
| |
|
| | async def fetch_and_update_contracts_db_async(self): |
| | """Fetches the contracts DB from R2 or initializes an empty one.""" |
| | print("💾 Using local contracts database.") |
| | return {} |
| |
|
| | class FeatureProcessor: |
| | """Processes raw data into tradable features and scores opportunities.""" |
| | def __init__(self, market_context): |
| | self.market_context = market_context |
| |
|
| | def _calculate_indicators(self, df): |
| | """Calculates key technical indicators for a given DataFrame.""" |
| | if len(df) < 20: |
| | return None, None |
| | df['rsi'] = ta.rsi(df['close'], length=14) |
| | macd = ta.macd(df['close']) |
| | df['macd'] = macd['MACDh_12_26_9'] |
| | df['macd_signal'] = macd['MACD_12_26_9'] |
| | return df['rsi'].iloc[-1], df['macd'].iloc[-1], df['macd_signal'].iloc[-1] |
| |
|
| | def _calculate_liquidity_score(self, df_1h): |
| | """Calculates a simple liquidity score based on volume and price.""" |
| | if df_1h.empty: |
| | return 0 |
| | df_1h['dollar_volume'] = df_1h['volume'] * df_1h['close'] |
| | avg_dollar_volume = df_1h['dollar_volume'].mean() |
| | return avg_dollar_volume |
| |
|
| | def _calculate_fib_levels(self, df_1d): |
| | """Calculates Fibonacci retracement levels based on a price swing.""" |
| | if len(df_1d) < 50: |
| | return {} |
| | |
| | |
| | recent_high = df_1d['high'].iloc[-50:].max() |
| | recent_low = df_1d['low'].iloc[-50:].min() |
| | |
| | diff = recent_high - recent_low |
| | |
| | fib_levels = { |
| | "0.0%": recent_high, |
| | "23.6%": recent_high - 0.236 * diff, |
| | "38.2%": recent_high - 0.382 * diff, |
| | "50.0%": recent_high - 0.50 * diff, |
| | "61.8%": recent_high - 0.618 * diff, |
| | "78.6%": recent_high - 0.786 * diff, |
| | "100.0%": recent_low |
| | } |
| | |
| | return fib_levels |
| |
|
| | def process_and_score_symbol(self, raw_data): |
| | """ |
| | Processes raw market data, calculates features, and assigns a final score. |
| | """ |
| | symbol = raw_data['symbol'] |
| | ohlcv = raw_data['ohlcv'] |
| | |
| | if not ohlcv.get('1d') or not ohlcv.get('1h'): |
| | return None |
| | |
| | try: |
| | df_1h = pd.DataFrame(ohlcv['1h'], columns=['time', 'open', 'high', 'low', 'close', 'volume']) |
| | df_1d = pd.DataFrame(ohlcv['1d'], columns=['time', 'open', 'high', 'low', 'close', 'volume']) |
| | |
| | df_1h[['open', 'high', 'low', 'close', 'volume']] = df_1h[['open', 'high', 'low', 'close', 'volume']].astype(float) |
| | df_1d[['open', 'high', 'low', 'close', 'volume']] = df_1d[['open', 'high', 'low', 'close', 'volume']].astype(float) |
| |
|
| | rsi, macd, macd_signal = self._calculate_indicators(df_1h) |
| | if rsi is None: |
| | return None |
| | |
| | liquidity_score = self._calculate_liquidity_score(df_1h) |
| | avg_daily_volume = df_1d['volume'].mean() |
| | fib_levels = self._calculate_fib_levels(df_1d) |
| |
|
| | |
| | score = 0.5 |
| | if rsi < 50: |
| | score += 0.15 |
| | if macd > 0 and macd_signal > 0 and macd > macd_signal: |
| | score += 0.25 |
| | |
| | |
| | if df_1h['close'].iloc[-1] > df_1h['close'].iloc[-2]: |
| | score += 0.1 |
| | |
| | |
| | if rsi > 70 or rsi < 30: |
| | score += 0.2 |
| | if liquidity_score > 100000: |
| | score += 0.2 |
| | if avg_daily_volume > 500000: |
| | score += 0.2 |
| | |
| | if self.market_context: |
| | if self.market_context['btc_sentiment'] == 'BULLISH': |
| | score *= 1.1 |
| | elif self.market_context['btc_sentiment'] == 'BEARISH': |
| | score *= 0.9 |
| |
|
| | return { |
| | 'symbol': symbol, |
| | 'current_price': df_1h['close'].iloc[-1], |
| | 'features': { |
| | 'rsi': float(rsi), |
| | 'macd': float(macd), |
| | 'macd_signal': float(macd_signal), |
| | 'liquidity_score': float(liquidity_score), |
| | 'avg_daily_volume': float(avg_daily_volume) |
| | }, |
| | 'fibonacci_levels': fib_levels, |
| | 'final_score': score |
| | } |
| | except KeyError as e: |
| | print(f"⚠️ Missing key in data for {raw_data.get('symbol', 'unknown')}: {e}") |
| | return None |
| | except Exception as e: |
| | print(f"❌ Failed to process {raw_data.get('symbol', 'unknown')}: {e}") |
| | return None |
| |
|
| | def filter_top_candidates(self, candidates, n=10): |
| | """Filters the top N candidates based on their final score.""" |
| | valid_candidates = [c for c in candidates if c is not None] |
| | return sorted(valid_candidates, key=lambda x: x['final_score'], reverse=True)[:n] |