Spaces:
Paused
Paused
| # whale_monitor/core.py | |
| # (V3.3 - GEM-Architect: Enterprise Edition - Smart Web3 Limits) | |
| # ูุชุถู ู: Web3 Engine, Smart Retry for Limits, Solscan Fixes, & Full Logic. | |
| import os | |
| import asyncio | |
| import httpx | |
| import json | |
| import traceback | |
| import time | |
| from datetime import datetime, timedelta, timezone | |
| from collections import defaultdict, deque | |
| import logging | |
| import ssl | |
| from typing import List, Dict, Any | |
| # Web3 Integration | |
| from web3 import AsyncWeb3 | |
| # Local Imports | |
| from .config import ( | |
| DEFAULT_WHALE_THRESHOLD_USD, TRANSFER_EVENT_SIGNATURE, NATIVE_COINS, | |
| DEFAULT_EXCHANGE_ADDRESSES, COINGECKO_BASE_URL, COINGECKO_SYMBOL_MAPPING, | |
| ERC20_ABI | |
| ) | |
| from .rpc_manager import AdaptiveRpcManager | |
| # ุชุนุทูู ุชุณุฌูู HTTP ุงูู ุฒุนุฌ | |
| logging.getLogger("httpx").setLevel(logging.WARNING) | |
| logging.getLogger("httpcore").setLevel(logging.WARNING) | |
| logging.getLogger("web3").setLevel(logging.WARNING) | |
| class EnhancedWhaleMonitor: | |
| def __init__(self, contracts_db=None, r2_service=None): | |
| print("๐ [WhaleMonitor V3.3] Initializing Enterprise Engine...") | |
| self.r2_service = r2_service | |
| self.data_manager = None | |
| self.rpc_manager: AdaptiveRpcManager = None | |
| self.whale_threshold_usd = DEFAULT_WHALE_THRESHOLD_USD | |
| self.contracts_db = {} | |
| self._initialize_contracts_db(contracts_db or {}) | |
| self.address_labels = {} | |
| self.address_categories = {'exchange': set(), 'cex': set(), 'dex': set(), 'bridge': set(), 'whale': set(), 'unknown': set()} | |
| self._initialize_comprehensive_exchange_addresses() | |
| self.token_price_cache = {} | |
| self.token_decimals_cache = {} | |
| if self.r2_service: | |
| asyncio.create_task(self._load_contracts_from_r2()) | |
| print("โ [WhaleMonitor V3.3] System Ready.") | |
| def set_rpc_manager(self, rpc_manager: AdaptiveRpcManager): | |
| self.rpc_manager = rpc_manager | |
| print("โ [WhaleMonitor] RPC Manager Linked.") | |
| # ============================================================================== | |
| # ๐ Initialization & Database Management | |
| # ============================================================================== | |
| def _initialize_contracts_db(self, initial_contracts): | |
| for symbol, contract_data in initial_contracts.items(): | |
| symbol_lower = symbol.lower() | |
| if isinstance(contract_data, dict) and 'address' in contract_data and 'network' in contract_data: | |
| self.contracts_db[symbol_lower] = contract_data | |
| elif isinstance(contract_data, str): | |
| self.contracts_db[symbol_lower] = { | |
| 'address': contract_data, | |
| 'network': self._detect_network_from_address(contract_data) | |
| } | |
| def _detect_network_from_address(self, address): | |
| if not isinstance(address, str): return 'ethereum' | |
| address_lower = address.lower() | |
| if address_lower.startswith('0x') and len(address_lower) == 42: | |
| return 'ethereum' | |
| if len(address) > 30 and not address.startswith('0x'): | |
| return 'solana' | |
| return 'ethereum' | |
| def _initialize_comprehensive_exchange_addresses(self): | |
| for category, addresses in DEFAULT_EXCHANGE_ADDRESSES.items(): | |
| for address in addresses: | |
| if not isinstance(address, str): continue | |
| addr_lower = address.lower() | |
| self.address_labels[addr_lower] = category | |
| self.address_categories['exchange'].add(addr_lower) | |
| if category in ['uniswap', 'pancakeswap', 'sushiswap']: | |
| self.address_categories['dex'].add(addr_lower) | |
| elif 'wormhole' in category or 'bridge' in category: | |
| self.address_categories['bridge'].add(addr_lower) | |
| else: | |
| self.address_categories['cex'].add(addr_lower) | |
| async def _load_contracts_from_r2(self): | |
| if not self.r2_service: return | |
| try: | |
| key = "contracts.json" | |
| if hasattr(self.r2_service, 's3_client'): | |
| response = self.r2_service.s3_client.get_object(Bucket="trading", Key=key) | |
| contracts_data = json.loads(response['Body'].read()) | |
| for s, d in contracts_data.items(): | |
| if s.lower() not in self.contracts_db: | |
| self.contracts_db[s.lower()] = d | |
| except Exception: pass | |
| # ============================================================================== | |
| # ๐ง Core Analysis Logic | |
| # ============================================================================== | |
| async def get_symbol_whale_activity(self, symbol: str, known_price: float = 0.0) -> Dict[str, Any]: | |
| try: | |
| if not self.rpc_manager: | |
| return self._create_error_response(symbol, "RPC Manager Not Injected") | |
| self.rpc_manager.reset_session_stats() | |
| base_symbol = symbol.split("/")[0].upper() if '/' in symbol else symbol.upper() | |
| if base_symbol in NATIVE_COINS: | |
| return self._create_native_coin_response(symbol) | |
| contract_info = await self._find_contract_address_enhanced(symbol) | |
| if not contract_info or not contract_info.get('address'): | |
| return self._create_no_contract_response(symbol) | |
| contract_address = contract_info['address'] | |
| network = contract_info['network'] | |
| current_price = known_price | |
| if current_price <= 0: | |
| current_price = await self._get_token_price(symbol) | |
| if current_price <= 0: | |
| return self._create_error_response(symbol, "Price unavailable") | |
| decimals = await self._get_token_decimals(contract_address, network) | |
| if decimals is None: | |
| return self._create_error_response(symbol, f"Decimals missing on {network}") | |
| # [CONFIG] Scan window | |
| scan_hours = 4 | |
| all_transfers = await self._get_targeted_transfer_data( | |
| contract_address, network, hours=scan_hours, price=current_price, decimals=decimals | |
| ) | |
| if not all_transfers: | |
| return self._create_no_transfers_response(symbol) | |
| print(f"๐ [WhaleMonitor] Analyzed {symbol}: {len(all_transfers)} transfers found ({scan_hours}h).") | |
| analysis_windows = [ | |
| {'name': '1h', 'minutes': 60}, | |
| {'name': '4h', 'minutes': 240}, | |
| {'name': '24h', 'minutes': 1440} | |
| ] | |
| multi_window_analysis = {} | |
| current_time_utc = datetime.now(timezone.utc) | |
| timestamp_cutoff_base = current_time_utc.timestamp() | |
| for window in analysis_windows: | |
| window_name = window['name'] | |
| cutoff_ts = timestamp_cutoff_base - (window['minutes'] * 60) | |
| window_transfers = [ | |
| t for t in all_transfers | |
| if float(t.get('timeStamp', 0)) >= cutoff_ts | |
| ] | |
| analysis_result = self._analyze_transfer_list( | |
| symbol=symbol, | |
| transfers=window_transfers, | |
| daily_volume_usd=0 | |
| ) | |
| multi_window_analysis[window_name] = analysis_result | |
| if self.r2_service: | |
| asyncio.create_task(self._save_learning_record( | |
| symbol, current_price, multi_window_analysis, self.rpc_manager.get_session_stats() | |
| )) | |
| short_term = multi_window_analysis.get('1h', {}) | |
| long_term = multi_window_analysis.get('4h', {}) | |
| signal = self._generate_enhanced_trading_signal(short_term) | |
| llm_summary = self._create_enhanced_llm_summary(signal, short_term) | |
| return { | |
| 'symbol': symbol, | |
| 'data_available': True, | |
| 'analysis_timestamp': current_time_utc.isoformat(), | |
| 'summary': { | |
| 'total_transfers_scanned': len(all_transfers), | |
| 'whale_count_1h': short_term.get('whale_transfers_count', 0), | |
| 'net_flow_1h': short_term.get('net_flow_usd', 0) | |
| }, | |
| 'exchange_flows': short_term, | |
| 'accumulation_analysis_24h': long_term, | |
| 'trading_signal': signal, | |
| 'llm_friendly_summary': llm_summary | |
| } | |
| except Exception as e: | |
| # print(f"โ [WhaleMonitor] Critical Error {symbol}: {e}") | |
| traceback.print_exc() | |
| return self._create_error_response(symbol, str(e)) | |
| # ============================================================================== | |
| # ๐ต๏ธโโ๏ธ Data Fetching & Web3 Logic (Smart Limits) | |
| # ============================================================================== | |
| async def _get_targeted_transfer_data(self, contract_address: str, network: str, hours: int, price: float, decimals: int) -> List[Dict]: | |
| all_transfers = [] | |
| # A. Solana Logic | |
| if network == 'solana': | |
| try: | |
| transfers = await self._get_solscan_token_data(contract_address, hours, price) | |
| if transfers: return transfers | |
| except Exception as e: | |
| print(f" โ ๏ธ [Solana] Solscan failed: {e}") | |
| return [] | |
| # B. EVM Logic | |
| # 1. Web3 Direct (with Smart Retry) | |
| try: | |
| print(f" โก [Web3] Scanning {network} logs...") | |
| web3_transfers = await self._get_web3_transfers(contract_address, network, hours, price, decimals) | |
| if web3_transfers: | |
| print(f" โ [Web3] Found {len(web3_transfers)} transfers.") | |
| return web3_transfers | |
| except Exception as e: | |
| print(f" โ ๏ธ [Web3] Failed: {e}. Trying fallbacks...") | |
| # 2. Moralis Fallback | |
| try: | |
| chain_id = DEFAULT_NETWORK_CONFIGS.get(network, {}).get('moralis_chain_id') | |
| if chain_id: | |
| print(f" ๐ก๏ธ [Moralis] Fallback scan...") | |
| moralis_transfers = await self._get_moralis_token_data(contract_address, chain_id, hours, price, decimals) | |
| if moralis_transfers: return moralis_transfers | |
| except Exception: pass | |
| # 3. Scanners Fallback | |
| try: | |
| print(f" ๐ [Scanner] Fallback scan...") | |
| scanner_transfers = await self._get_scanner_token_data(contract_address, network, hours, price, decimals) | |
| if scanner_transfers: return scanner_transfers | |
| except Exception: pass | |
| return [] | |
| async def _get_web3_transfers(self, address: str, network: str, hours: int, price: float, decimals: int): | |
| """Web3 Fetcher with Limit Exceeded Handling""" | |
| w3 = self.rpc_manager.get_web3(network) | |
| if not w3: raise Exception(f"No Web3 provider for {network}") | |
| latest = await w3.eth.block_number | |
| block_time = 3 if network == 'bsc' else 12 if network == 'ethereum' else 2 | |
| # ู ุญุงููุฉ ุฃูููุฉ ููุงู ู ุงูู ุฏุฉ | |
| blocks_back = int((hours * 3600) / block_time) | |
| from_block = max(0, latest - blocks_back) | |
| contract = w3.eth.contract(address=w3.to_checksum_address(address), abi=ERC20_ABI) | |
| try: | |
| # ุงูู ุญุงููุฉ ุงูุฃููู | |
| logs = await w3.eth.get_logs({ | |
| 'fromBlock': from_block, 'toBlock': 'latest', | |
| 'address': w3.to_checksum_address(address), | |
| 'topics': [TRANSFER_EVENT_SIGNATURE] | |
| }) | |
| except Exception as e: | |
| # [SMART RETRY] ุฅุฐุง ูุดู ุจุณุจุจ ุงูุญุฏุ ููู ุงูู ุฏุฉ ููุฑุจุน (1 ุณุงุนุฉ) | |
| err_str = str(e).lower() | |
| if 'limit' in err_str or 'range' in err_str or 'exceeded' in err_str: | |
| print(f" โ ๏ธ [Web3] Range too big. Retrying with 1 hour window...") | |
| blocks_back = int((1 * 3600) / block_time) # 1 hour | |
| from_block = max(0, latest - blocks_back) | |
| logs = await w3.eth.get_logs({ | |
| 'fromBlock': from_block, 'toBlock': 'latest', | |
| 'address': w3.to_checksum_address(address), | |
| 'topics': [TRANSFER_EVENT_SIGNATURE] | |
| }) | |
| else: | |
| raise e # ุฎุทุฃ ุขุฎุฑ ูุง ูู ูู ุงูุชุนุงู ู ู ุนู | |
| transfers = [] | |
| for log in logs: | |
| try: | |
| if len(log['topics']) < 3: continue | |
| val_hex = log['data'].hex() | |
| val_int = int(val_hex, 16) | |
| amount = val_int / (10 ** decimals) | |
| val_usd = amount * price | |
| # ุงุณุชุฎุฏุงู ุญุฏ ุฃุฏูู 20k ููู ุถุงุฑุจุฉ ุงูุณุฑูุนุฉ | |
| if val_usd < 20000.0: continue | |
| from_addr = '0x' + log['topics'][1].hex()[-40:] | |
| to_addr = '0x' + log['topics'][2].hex()[-40:] | |
| transfers.append({ | |
| 'hash': log['transactionHash'].hex(), | |
| 'from': from_addr, | |
| 'to': to_addr, | |
| 'value_usd': val_usd, | |
| 'timeStamp': time.time(), | |
| 'network': network, | |
| 'source': 'web3' | |
| }) | |
| except: continue | |
| return transfers | |
| async def _get_solscan_token_data(self, address: str, hours: int, price: float): | |
| params = {"address": address, "limit": 50} | |
| data = await self.rpc_manager.get_solscan_api("/v2.0/token/transfer", params) | |
| if not data or not data.get('data'): return [] | |
| transfers = [] | |
| cutoff = time.time() - (hours * 3600) | |
| for tx in data['data']: | |
| try: | |
| ts = tx.get('blockTime', 0) | |
| if ts < cutoff: continue | |
| amount_raw = int(tx.get('amount', 0)) | |
| dec = tx.get('decimals', 9) | |
| val_usd = (amount_raw / (10**dec)) * price | |
| if val_usd < 20000.0: continue | |
| transfers.append({ | |
| 'hash': tx.get('signature'), | |
| 'from': tx.get('from_address'), | |
| 'to': tx.get('to_address'), | |
| 'value_usd': val_usd, | |
| 'timeStamp': ts, | |
| 'network': 'solana', | |
| 'source': 'solscan' | |
| }) | |
| except: continue | |
| return transfers | |
| async def _get_moralis_token_data(self, address, chain, hours, price, decimals): | |
| params = { | |
| "chain": chain, "contract_address": address, | |
| "order": "DESC", "limit": 100 | |
| } | |
| data = await self.rpc_manager.get_moralis_api(params) | |
| if not data or not data.get('result'): return [] | |
| transfers = [] | |
| for tx in data['result']: | |
| try: | |
| val = int(tx['value']) / (10**decimals) * price | |
| if val < 20000.0: continue | |
| transfers.append({ | |
| 'hash': tx['transaction_hash'], | |
| 'from': tx['from_address'], | |
| 'to': tx['to_address'], | |
| 'value_usd': val, | |
| 'timeStamp': datetime.fromisoformat(tx['block_timestamp'][:-1]).timestamp(), | |
| 'source': 'moralis' | |
| }) | |
| except: continue | |
| return transfers | |
| async def _get_scanner_token_data(self, address, network, hours, price, decimals): | |
| config = self.rpc_manager.get_explorer_config(network) | |
| if not config: return [] | |
| params = { | |
| "module": "account", "action": "tokentx", | |
| "contractaddress": address, "page": 1, "offset": 100, | |
| "sort": "desc", "apikey": config.get('api_key') | |
| } | |
| data = await self.rpc_manager.get_scanner_api(config['api_url'], params) | |
| if not data or not data.get('result') or isinstance(data['result'], str): return [] | |
| transfers = [] | |
| for tx in data['result']: | |
| try: | |
| val = int(tx['value']) / (10**decimals) * price | |
| if val < 20000.0: continue | |
| transfers.append({ | |
| 'hash': tx['hash'], | |
| 'from': tx['from'], | |
| 'to': tx['to'], | |
| 'value_usd': val, | |
| 'timeStamp': int(tx['timeStamp']), | |
| 'source': 'scanner' | |
| }) | |
| except: continue | |
| return transfers | |
| # ============================================================================== | |
| # ๐ต๏ธโโ๏ธ Contract Finding & Search | |
| # ============================================================================== | |
| async def _find_contract_address_enhanced(self, symbol: str): | |
| base_symbol = symbol.split('/')[0].lower() | |
| if base_symbol in self.contracts_db: return self.contracts_db[base_symbol] | |
| print(f" ๐ Searching CoinGecko for {base_symbol}...") | |
| try: | |
| data = await self.rpc_manager.get_coingecko_api('/search', params={'query': base_symbol}) | |
| if not data or not data.get('coins'): return None | |
| best_id = None | |
| for coin in data['coins']: | |
| if coin['symbol'].lower() == base_symbol: | |
| best_id = coin['id'] | |
| break | |
| if not best_id: best_id = data['coins'][0]['id'] | |
| details = await self.rpc_manager.get_coingecko_api(f'/coins/{best_id}', params={ | |
| "localization": "false", "tickers": "false", "market_data": "false", | |
| "community_data": "false", "developer_data": "false" | |
| }) | |
| if not details or 'platforms' not in details: return None | |
| platforms = details['platforms'] | |
| priority_nets = ['binance-smart-chain', 'polygon-pos', 'ethereum', 'solana', 'avalanche'] | |
| mapping = {'binance-smart-chain': 'bsc', 'polygon-pos': 'polygon', 'ethereum': 'ethereum', 'solana': 'solana', 'avalanche': 'avalanche'} | |
| for net_key in priority_nets: | |
| if net_key in platforms and platforms[net_key]: | |
| res = {'address': platforms[net_key], 'network': mapping[net_key]} | |
| self.contracts_db[base_symbol] = res | |
| return res | |
| except Exception: pass | |
| return None | |
| # ============================================================================== | |
| # ๐ Analysis Logic | |
| # ============================================================================== | |
| def _analyze_transfer_list(self, symbol: str, transfers: List[Dict], daily_volume_usd: float) -> Dict[str, Any]: | |
| stats = { | |
| 'to_exchanges_usd': 0.0, 'from_exchanges_usd': 0.0, | |
| 'deposit_count': 0, 'withdrawal_count': 0, | |
| 'whale_transfers_count': 0, 'total_volume': 0.0 | |
| } | |
| for tx in transfers: | |
| val = tx.get('value_usd', 0) | |
| stats['whale_transfers_count'] += 1 | |
| stats['total_volume'] += val | |
| to_addr = tx.get('to', '').lower() | |
| from_addr = tx.get('from', '').lower() | |
| is_deposit = to_addr in self.address_categories['exchange'] | |
| is_withdrawal = from_addr in self.address_categories['exchange'] | |
| if is_deposit: | |
| stats['to_exchanges_usd'] += val | |
| stats['deposit_count'] += 1 | |
| if is_withdrawal: | |
| stats['from_exchanges_usd'] += val | |
| stats['withdrawal_count'] += 1 | |
| stats['net_flow_usd'] = stats['to_exchanges_usd'] - stats['from_exchanges_usd'] | |
| return stats | |
| def _generate_enhanced_trading_signal(self, analysis: Dict) -> Dict: | |
| net = analysis.get('net_flow_usd', 0) | |
| whales = analysis.get('whale_transfers_count', 0) | |
| if whales < 3: return {'action': 'HOLD', 'confidence': 0.1, 'reason': 'Low Activity'} | |
| if net > 500_000: return {'action': 'SELL', 'confidence': 0.8, 'reason': 'Exchange Inflow'} | |
| elif net < -500_000: return {'action': 'BUY', 'confidence': 0.8, 'reason': 'Exchange Outflow'} | |
| return {'action': 'WATCH', 'confidence': 0.5, 'reason': 'Mixed Activity'} | |
| def _create_enhanced_llm_summary(self, signal, analysis): | |
| return {'summary': f"Whales: {analysis.get('whale_transfers_count')}", 'action': signal['action']} | |
| # ============================================================================== | |
| # โ๏ธ Helpers | |
| # ============================================================================== | |
| async def _get_token_price(self, symbol): | |
| try: | |
| res = await self.rpc_manager.get_coingecko_api('/simple/price', | |
| params={'ids': COINGECKO_SYMBOL_MAPPING.get(symbol.split('/')[0], symbol), 'vs_currencies': 'usd'}) | |
| if res: return list(res.values())[0]['usd'] | |
| except: return 0.0 | |
| async def _get_token_decimals(self, address, network): | |
| key = f"{address}_{network}" | |
| if key in self.token_decimals_cache: return self.token_decimals_cache[key] | |
| try: | |
| w3 = self.rpc_manager.get_web3(network) | |
| if w3: | |
| contract = w3.eth.contract(address=w3.to_checksum_address(address), abi=ERC20_ABI) | |
| dec = await contract.functions.decimals().call() | |
| self.token_decimals_cache[key] = dec | |
| return dec | |
| except: pass | |
| if network == 'solana': return 9 | |
| return 18 | |
| async def _save_learning_record(self, symbol, price, analysis, api_stats): | |
| if not self.r2_service: return | |
| try: | |
| record = {'id': f"{symbol}_{int(time.time())}", 'symbol': symbol, 'price': price, 'analysis': analysis, 'api_stats': api_stats, 'timestamp': datetime.now(timezone.utc).isoformat()} | |
| if hasattr(self.r2_service, 'save_record'): await self.r2_service.save_record(record) | |
| except: pass | |
| def _create_error_response(self, symbol, err): return {'symbol': symbol, 'data_available': False, 'error': err, 'trading_signal': {}} | |
| def _create_native_coin_response(self, symbol): return {'symbol': symbol, 'data_available': False, 'error': 'Native Coin', 'trading_signal': {}} | |
| def _create_no_contract_response(self, symbol): return {'symbol': symbol, 'data_available': False, 'error': 'No Contract', 'trading_signal': {}} | |
| def _create_no_transfers_response(self, symbol): return {'symbol': symbol, 'data_available': True, 'summary': {'total': 0}, 'trading_signal': {'action': 'HOLD'}} | |
| async def cleanup(self): print("๐ [WhaleMonitor] Cleanup.") |