Spaces:
Paused
Paused
| # -*- coding: utf-8 -*- | |
| """ | |
| K1RL QUANT - INSTITUTIONAL REWARDS SYSTEM v5.2.1-V75 | |
| HuggingFace Spaces Edition - Maximum Performance | |
| V75 NAMESPACE ISOLATION: | |
| ✅ All channels prefixed with "V75:" — zero cross-talk with other spaces | |
| ✅ Uses DB 0/1 (features/rewards) — isolated per Space container | |
| ✅ Imports from redis_config_v75 for V75-specific configuration | |
| CRITICAL FIX (v5.2.1): | |
| ✅ FIXED: asyncio.get_event_loop() from listener thread returned WRONG loop | |
| → Reward tasks silently dropped (never scheduled) | |
| → Now stores loop reference via asyncio.get_running_loop() in start() | |
| ✅ All v5.2.0 fixes retained | |
| CRITICAL FIX (v5.2.0): | |
| ✅ REMOVED duplicate RedisAblyClient - uses redis_connection_manager.RedisAblyClient | |
| ✅ Added connection health monitoring with auto-reconnection | |
| ✅ Bounded reward task pool (prevents coroutine leak) | |
| ✅ Deriv WebSocket auto-reconnection loop | |
| ✅ Pub/sub heartbeat detection (detects silent disconnects) | |
| PREVIOUS OPTIMIZATIONS (v5.1.0): | |
| ✅ Proper async price streaming with reconnection | |
| ✅ LRU cache for price data with TTL | |
| ✅ O(1) signal tracking with hash maps | |
| ✅ Batch processing with backpressure | |
| ✅ Connection health monitoring | |
| ✅ Memory-efficient deque buffers | |
| ✅ HuggingFace Spaces compatibility | |
| ✅ Container-safe logging and paths | |
| """ | |
| import asyncio | |
| import logging | |
| import sys | |
| import time | |
| import json | |
| import traceback | |
| import ssl | |
| import websockets | |
| import os | |
| from datetime import datetime, timezone | |
| from collections import deque, OrderedDict | |
| from dataclasses import dataclass, field | |
| from typing import Optional, Dict, List, Any, Deque | |
| from functools import lru_cache | |
| import numpy as np | |
| from pathlib import Path | |
| # Async compatibility | |
| try: | |
| import nest_asyncio | |
| nest_asyncio.apply() | |
| except ImportError: | |
| pass | |
| # ============================================================================ | |
| # ✅ FIX #1: Import the ROBUST RedisAblyClient from redis_connection_manager | |
| # instead of defining a broken local version with no reconnection | |
| # ============================================================================ | |
| import redis | |
| from redis_config_v75 import ( | |
| REDIS_URL, REDIS_PASSWORD, | |
| REDIS_DB_FEATURES, REDIS_DB_REWARDS, | |
| CHANNEL_PREFIX, prefixed_channel, QUASAR_VERSION | |
| ) | |
| from redis_connection_manager import ( | |
| RedisAblyClient, | |
| RedisMessage, | |
| DedicatedRedisConnectionManager, | |
| diagnose_redis_connection, | |
| IS_HF_SPACES | |
| ) | |
| # ============================================================================ | |
| # HUGGINGFACE SPACES CONFIGURATION | |
| # ============================================================================ | |
| # ✅ FIXED: Environment variable for API key | |
| DERIV_API_KEY = os.environ.get('DERIV_API_KEY', '') # no token needed for tick streaming | |
| DERIV_WS_URL = "wss://api.derivws.com/trading/v1/options/ws/public" | |
| SYMBOL_MAP = { | |
| "Volatility 25 Index": "R_25", | |
| "Crash 500 Index": "CRASH500", | |
| "Volatility 100 Index": "R_100", | |
| "Volatility 50 Index": "R_50", | |
| "Volatility 75 Index": "R_75", # ✅ V75: Volatility 75 Index symbol | |
| } | |
| SYMBOL = "Volatility 75 Index" # ✅ V75 | |
| DERIV_SYMBOL = "R_75" # ✅ V75: Volatility 75 Index Deriv symbol | |
| # Ably Configuration (now Redis channels — V75 NAMESPACED) | |
| ABLY_SIGNAL_CHANNEL = prefixed_channel("final_signals") # → "V75:final_signals" | |
| ABLY_REWARD_CHANNEL = prefixed_channel("rewards") # → "V75:rewards" | |
| ABLY_BATCH_CHANNEL = prefixed_channel("reward-batches") # → "V75:reward-batches" | |
| ACTION_MAP = {0: 'BUY', 1: 'SELL', 2: 'HOLD'} | |
| ACTION_REVERSE = {'BUY': 0, 'SELL': 1, 'HOLD': 2} | |
| # Performance tuning | |
| EVALUATION_DELAY = 60 # seconds | |
| BATCH_SIZE = 10 | |
| PRICE_CACHE_TTL = 5.0 # seconds - price considered stale after this | |
| MAX_TRACKED_SIGNALS = 10000 | |
| RECONNECT_DELAY = 5 # seconds | |
| MAX_RECONNECT_ATTEMPTS = 10 | |
| # ✅ FIX #2: Bounded concurrency for reward calculation tasks | |
| MAX_CONCURRENT_REWARD_TASKS = 200 # Prevents unbounded coroutine growth | |
| # ✅ FIXED: Container-safe logging | |
| BASE_DIR = Path('/home/user/app') | |
| LOG_DIR = BASE_DIR / 'logs' | |
| LOG_DIR.mkdir(parents=True, exist_ok=True) | |
| # ── §P2-fix-7 + BUG-FIX-3 ──────────────────────────────────────────────────── | |
| # Module-level flag prevents double-handler installation even when the module | |
| # is imported twice (e.g. as __main__ AND as an import). The previous guard | |
| # stored the flag as an INSTANCE ATTRIBUTE on the logger object; that works | |
| # within one Python process, but if HF Spaces forks a second process that | |
| # imports this file, the new process gets a fresh logger (no attribute) and | |
| # installs handlers a second time — both processes then write to the same | |
| # rewards.log, producing every line twice with identical timestamps. | |
| # A module-level boolean is process-local and is never re-evaluated on import. | |
| _REWARDS_LOGGER_CONFIGURED = False | |
| logger = logging.getLogger(__name__) | |
| if not _REWARDS_LOGGER_CONFIGURED: | |
| _REWARDS_LOGGER_CONFIGURED = True | |
| logger.setLevel(logging.INFO) | |
| _fmt = logging.Formatter("%(asctime)s [REWARDS] %(levelname)s: %(message)s") | |
| _stream_h = logging.StreamHandler(sys.stdout) | |
| _stream_h.setFormatter(_fmt) | |
| _file_h = logging.FileHandler(LOG_DIR / 'rewards.log', encoding='utf-8') | |
| _file_h.setFormatter(_fmt) | |
| logger.addHandler(_stream_h) | |
| logger.addHandler(_file_h) | |
| logger.propagate = False # prevent root-logger handlers from duplicating | |
| if IS_HF_SPACES: | |
| logger.info("🤗 HuggingFace Spaces environment detected") | |
| # ============================================================================ | |
| # HIGH-PERFORMANCE DATA STRUCTURES (Unchanged - Already Optimized) | |
| # ============================================================================ | |
| class PriceData: | |
| """Immutable price snapshot with timestamp - optimized with __slots__""" | |
| __slots__ = ('bid', 'ask', 'last', 'timestamp', 'epoch') | |
| def __init__(self, bid: float, ask: float, last: float, timestamp: float, epoch: int): | |
| self.bid = bid | |
| self.ask = ask | |
| self.last = last | |
| self.timestamp = timestamp | |
| self.epoch = epoch | |
| def age(self) -> float: | |
| return time.time() - self.timestamp | |
| def is_stale(self) -> bool: | |
| return self.age > PRICE_CACHE_TTL | |
| def get_price(self, action: str) -> float: | |
| return self.ask if action == "BUY" else self.bid | |
| class TrackedSignal: | |
| """Tracked signal with minimal memory footprint - optimized with __slots__""" | |
| __slots__ = ('signal_key', 'action', 'entry_price', 'timestamp', 'agent') | |
| def __init__(self, signal_key: str, action: str, entry_price: float, timestamp: float, agent: str = "unknown"): | |
| self.signal_key = signal_key | |
| self.action = action | |
| self.entry_price = entry_price | |
| self.timestamp = timestamp | |
| self.agent = agent | |
| class TTLCache: | |
| """O(1) cache with time-to-live expiration""" | |
| __slots__ = ('_cache', '_ttl', '_max_size') | |
| def __init__(self, ttl: float = 5.0, max_size: int = 1000): | |
| self._cache: OrderedDict = OrderedDict() | |
| self._ttl = ttl | |
| self._max_size = max_size | |
| def get(self, key: str) -> Optional[Any]: | |
| if key not in self._cache: | |
| return None | |
| value, timestamp = self._cache[key] | |
| if time.time() - timestamp > self._ttl: | |
| del self._cache[key] | |
| return None | |
| return value | |
| def set(self, key: str, value: Any) -> None: | |
| # Evict oldest if at capacity | |
| while len(self._cache) >= self._max_size: | |
| self._cache.popitem(last=False) | |
| self._cache[key] = (value, time.time()) | |
| # Move to end (most recently used) | |
| self._cache.move_to_end(key) | |
| def __len__(self) -> int: | |
| return len(self._cache) | |
| # ============================================================================ | |
| # OPTIMIZED DERIV BRIDGE - HuggingFace Spaces Edition | |
| # ============================================================================ | |
| class DerivStreamingBridge: | |
| """ | |
| High-performance Deriv WebSocket bridge - HuggingFace Spaces optimized. | |
| Features: | |
| - Auto-reconnection with exponential backoff | |
| - Container-safe error handling | |
| - HF Spaces compatibility | |
| """ | |
| def __init__(self): | |
| self.ws: Optional[websockets.WebSocketClientProtocol] = None | |
| self.is_connected = False | |
| self.is_authorized = False | |
| # Price cache with TTL | |
| self._price_cache: Dict[str, PriceData] = {} | |
| self._cache_lock = asyncio.Lock() | |
| # Connection management | |
| self._reconnect_attempts = 0 | |
| self._last_tick_time: Dict[str, float] = {} | |
| self._streaming = False | |
| self._stream_task: Optional[asyncio.Task] = None | |
| # HF Spaces features | |
| self._hf_spaces_mode = IS_HF_SPACES | |
| self._max_connection_attempts = 10 | |
| # Stats | |
| self.ticks_received = 0 | |
| self.reconnections = 0 | |
| async def connect(self) -> bool: | |
| """Connect and authorize to Deriv with HF Spaces resilience""" | |
| # V9.0 FIX: Recreate lock on the RUNNING loop | |
| self._cache_lock = asyncio.Lock() | |
| try: | |
| self._reconnect_attempts += 1 | |
| logger.info(f"🔄 Connecting to Deriv WebSocket... (attempt {self._reconnect_attempts})") | |
| # Connection attempt limit | |
| if self._reconnect_attempts > self._max_connection_attempts: | |
| logger.error("❌ Max connection attempts exceeded") | |
| return False | |
| ssl_context = ssl.create_default_context() | |
| ssl_context.check_hostname = False | |
| ssl_context.verify_mode = ssl.CERT_NONE | |
| self.ws = await asyncio.wait_for( | |
| websockets.connect( | |
| DERIV_WS_URL, | |
| ssl=ssl_context, | |
| ping_interval=25, | |
| ping_timeout=10, | |
| close_timeout=5, | |
| max_size=2**20 | |
| ), | |
| timeout=15.0 if IS_HF_SPACES else 30.0 | |
| ) | |
| # ✅ v5.3: ping/pong — no authorize needed (ticks = public endpoint) | |
| await self.ws.send(json.dumps({"ping": 1})) | |
| response = await asyncio.wait_for(self.ws.recv(), timeout=10.0) | |
| data = json.loads(response) | |
| if data.get('ping') == 'pong' or 'pong' in str(data): | |
| self.is_connected = True | |
| self.is_authorized = True | |
| self._reconnect_attempts = 0 | |
| logger.info("✅ Deriv public WebSocket ready (ping/pong OK — no auth required)") | |
| # Start streaming | |
| await self._start_streaming() | |
| return True | |
| else: | |
| logger.error(f"❌ Ping failed, unexpected response: {data}") | |
| return False | |
| except asyncio.TimeoutError: | |
| logger.warning(f"⏰ Connection timeout (attempt {self._reconnect_attempts})") | |
| return False | |
| except Exception as e: | |
| logger.warning(f"⚠️ Connection error: {e}") | |
| return False | |
| async def _start_streaming(self): | |
| """Start price streaming""" | |
| self._stream_task = asyncio.create_task(self._real_price_stream()) | |
| self._streaming = True | |
| logger.info(f"📡 Streaming started") | |
| async def _real_price_stream(self): | |
| """Real price streaming from Deriv WebSocket""" | |
| try: | |
| # Subscribe to ticks | |
| await self.ws.send(json.dumps({"ticks": DERIV_SYMBOL, "subscribe": 1})) | |
| logger.info(f"📡 Subscribed to {DERIV_SYMBOL}") | |
| while self.is_connected: | |
| try: | |
| data = await self.ws.recv() | |
| json_data = json.loads(data) | |
| if 'tick' in json_data: | |
| await self._process_tick(json_data['tick']) | |
| self.ticks_received += 1 | |
| except websockets.exceptions.ConnectionClosed: | |
| logger.warning("📡 WebSocket connection closed") | |
| break | |
| except Exception as e: | |
| logger.error(f"❌ Stream error: {e}") | |
| break | |
| except Exception as e: | |
| logger.error(f"❌ Real price streaming error: {e}") | |
| finally: | |
| # ── BUG-FIX-1 ───────────────────────────────────────────────────── | |
| # _streaming and is_connected were NEVER cleared when the stream | |
| # task exited via exception or WebSocket close. The health monitor | |
| # guard (line ~895) only checks these two flags, so it could never | |
| # detect the silent death → no reconnect → price cache went stale | |
| # permanently → every single price fetch failed → Rewards=0. | |
| self._streaming = False | |
| self.is_connected = False | |
| logger.warning("⚠️ _real_price_stream exited — flags cleared for health monitor") | |
| async def _process_tick(self, tick_data): | |
| """Process incoming tick data""" | |
| try: | |
| price = float(tick_data['quote']) | |
| epoch = int(tick_data['epoch']) | |
| # Create price data | |
| price_data = PriceData( | |
| bid=price - 0.0005, | |
| ask=price + 0.0005, | |
| last=price, | |
| timestamp=time.time(), | |
| epoch=epoch | |
| ) | |
| # Update cache | |
| async with self._cache_lock: | |
| self._price_cache[DERIV_SYMBOL] = price_data | |
| self._last_tick_time[DERIV_SYMBOL] = time.time() | |
| except Exception as e: | |
| logger.error(f"❌ Tick processing error: {e}") | |
| async def _reconnect(self) -> bool: | |
| """Reconnect with exponential backoff""" | |
| self.is_connected = False | |
| self._streaming = False | |
| if self._stream_task: | |
| self._stream_task.cancel() | |
| if self.ws: | |
| await self.ws.close() | |
| delay = min(60, RECONNECT_DELAY * (2 ** min(self._reconnect_attempts, 5))) | |
| logger.info(f"🔄 Reconnecting in {delay}s...") | |
| await asyncio.sleep(delay) | |
| success = await self.connect() | |
| if success: | |
| self.reconnections += 1 | |
| logger.info(f"✅ Reconnected successfully (#{self.reconnections})") | |
| return success | |
| async def get_current_price(self, symbol: str = DERIV_SYMBOL) -> Optional[PriceData]: | |
| """Get current cached price""" | |
| async with self._cache_lock: | |
| price_data = self._price_cache.get(symbol) | |
| if price_data and not price_data.is_stale: | |
| return price_data | |
| return None | |
| def get_stats(self) -> Dict: | |
| """Get connection statistics""" | |
| return { | |
| 'connected': self.is_connected, | |
| 'authorized': self.is_authorized, | |
| 'ticks_received': self.ticks_received, | |
| 'reconnections': self.reconnections, | |
| 'streaming': self._streaming | |
| } | |
| async def shutdown(self): | |
| """Shutdown with cleanup""" | |
| logger.info("🛑 Shutting down Deriv bridge...") | |
| self.is_connected = False | |
| self._streaming = False | |
| if self._stream_task: | |
| self._stream_task.cancel() | |
| try: | |
| await self._stream_task | |
| except asyncio.CancelledError: | |
| pass | |
| if self.ws: | |
| try: | |
| await self.ws.close() | |
| except Exception: | |
| pass | |
| logger.info("✅ Deriv bridge shutdown complete") | |
| # ============================================================================ | |
| # REWARD CALCULATION COMPONENTS (Updated for HF Spaces) | |
| # ============================================================================ | |
| class RewardNormalizer: | |
| def __init__(self, base_multiplier=1000): | |
| self.base_multiplier = base_multiplier | |
| self.volatility_buffer = deque(maxlen=100) | |
| def normalize(self, entry_price, exit_price, action): | |
| """Normalize reward based on action and price movement""" | |
| if entry_price <= 0: | |
| return 0, "invalid", 0, 0 | |
| # Calculate raw basis points | |
| raw_bps = ((exit_price - entry_price) / entry_price) * 10000 | |
| # Apply action multiplier | |
| if action == "BUY": | |
| directional_bps = raw_bps | |
| elif action == "SELL": | |
| directional_bps = -raw_bps | |
| else: # HOLD | |
| directional_bps = -abs(raw_bps) * 0.1 # Small penalty for holding | |
| # Simple regime detection | |
| regime = "normal" | |
| if abs(raw_bps) > 50: | |
| regime = "high_vol" | |
| elif abs(raw_bps) < 5: | |
| regime = "low_vol" | |
| # Normalize to [-1, 1] range | |
| normalized = np.tanh(directional_bps / 100) | |
| confidence = min(abs(directional_bps) / 20, 1.0) | |
| return normalized, regime, confidence, raw_bps | |
| class AgentTracker: | |
| """Track agent performance and streaks""" | |
| def __init__(self): | |
| self.agents = {} | |
| self.reset() | |
| def reset(self): | |
| """Reset tracking data""" | |
| self.agents = { | |
| "5s": {"count": 0, "action": None, "cycles": 0}, | |
| "15s": {"count": 0, "action": None, "cycles": 0}, | |
| "30s": {"count": 0, "action": None, "cycles": 0}, | |
| "1m": {"count": 0, "action": None, "cycles": 0}, | |
| "2m": {"count": 0, "action": None, "cycles": 0}, | |
| "5m": {"count": 0, "action": None, "cycles": 0}, | |
| "10m": {"count": 0, "action": None, "cycles": 0}, | |
| "15m": {"count": 0, "action": None, "cycles": 0} | |
| } | |
| def update(self, agent, action): | |
| """Update agent tracking, return True if cycle completed""" | |
| if agent not in self.agents: | |
| return False | |
| if self.agents[agent]["action"] == action: | |
| self.agents[agent]["count"] += 1 | |
| else: | |
| if self.agents[agent]["count"] >= 3: # Cycle completion | |
| self.agents[agent]["cycles"] += 1 | |
| self.agents[agent]["count"] = 1 | |
| self.agents[agent]["action"] = action | |
| return True | |
| else: | |
| self.agents[agent]["count"] = 1 | |
| self.agents[agent]["action"] = action | |
| return False | |
| def get_info(self, agent): | |
| """Get agent tracking info""" | |
| return self.agents.get(agent, {"count": 0, "action": None, "cycles": 0}) | |
| # ============================================================================ | |
| # MAIN REWARDS ENGINE - HuggingFace Spaces Edition v5.2.0 | |
| # ============================================================================ | |
| class RewardsEngine: | |
| """ | |
| Main rewards calculation engine with HF Spaces compatibility. | |
| v5.2.0 FIXES: | |
| - Uses robust RedisAblyClient from redis_connection_manager.py | |
| - Bounded reward task pool (semaphore) | |
| - Connection health heartbeat | |
| - Deriv auto-reconnection loop | |
| """ | |
| def __init__(self): | |
| # Core components | |
| self.normalizer = RewardNormalizer() | |
| self.agent_tracker = AgentTracker() | |
| # Tracking | |
| self._tracked: Dict[str, TrackedSignal] = {} | |
| self._processed_keys = TTLCache(ttl=300) # 5 minutes | |
| # Batch processing | |
| self._batch: List[Dict] = [] | |
| self._batch_lock: Optional[asyncio.Lock] = None | |
| self._last_batch_time = time.time() | |
| # ✅ FIX #3: Bounded task pool for reward calculations | |
| self._reward_semaphore: Optional[asyncio.Semaphore] = None | |
| self._active_reward_tasks = 0 | |
| # Statistics | |
| self.signals_received = 0 | |
| self.rewards_sent = 0 | |
| self.correct = 0 | |
| self.wrong = 0 | |
| # ✅ FIX #4: Track last signal time for health monitoring | |
| self._last_signal_time = 0.0 | |
| self._last_heartbeat_time = 0.0 | |
| self._connection_healthy = True | |
| # ✅ FIX v5.2.1: Store event loop reference for thread→asyncio bridge | |
| self._loop: Optional[asyncio.AbstractEventLoop] = None | |
| # Connections | |
| self.ably_realtime: Optional[RedisAblyClient] = None | |
| self.signal_channel = None | |
| self.reward_channel_batch = None | |
| self.reward_channel_individual = None | |
| # Control | |
| self._shutdown: Optional[asyncio.Event] = None | |
| async def initialize(self) -> bool: | |
| """Initialize connections and channels""" | |
| try: | |
| logger.info("📡 Connecting to Redis (V75 namespace)...") | |
| # ✅ FIX #5: Use the ROBUST RedisAblyClient from redis_connection_manager.py | |
| # This version has: blocking listener, auto-reconnection, health monitoring | |
| # V75: Uses DB 0 (features) — isolated per Space container | |
| self.ably_realtime = RedisAblyClient( | |
| redis_url=REDIS_URL, | |
| password=REDIS_PASSWORD, | |
| use_streams=True, | |
| database=REDIS_DB_FEATURES # V75: DB 0 | |
| ) | |
| # Set up channels (already prefixed via constants above) | |
| self.signal_channel = self.ably_realtime.channels.get(ABLY_SIGNAL_CHANNEL) | |
| self.reward_channel_batch = self.ably_realtime.channels.get(ABLY_BATCH_CHANNEL) | |
| self.reward_channel_individual = self.ably_realtime.channels.get(ABLY_REWARD_CHANNEL) | |
| logger.info(f"✅ Redis channels initialized (V75 — prefix='{CHANNEL_PREFIX}', DB={REDIS_DB_FEATURES})") | |
| logger.info(f" Signal: {ABLY_SIGNAL_CHANNEL}") | |
| logger.info(f" Rewards: {ABLY_REWARD_CHANNEL}") | |
| logger.info(f" Batches: {ABLY_BATCH_CHANNEL}") | |
| # Initialize Deriv bridge | |
| logger.info("🔄 Connecting to Deriv...") | |
| success = await deriv_bridge.connect() | |
| if not success: | |
| logger.error("❌ Deriv connection failed") | |
| return False | |
| logger.info("✅ All connections established") | |
| return True | |
| except Exception as e: | |
| logger.error(f"❌ Initialization error: {e}") | |
| return False | |
| def _extract_agent(self, signal_key: str) -> str: | |
| """Extract agent timeframe from signal key. | |
| Signal keys may use either time-based suffixes (e.g. '10m_xxx') or | |
| size-based prefixes (e.g. 'xs_xxx', 'xxl_xxx'). The original code | |
| only checked for time-based tokens — 'xs_xxx' returned 'unknown' for | |
| six of the eight agents, breaking per-agent cycle tracking. | |
| ── BUG-FIX-4 ────────────────────────────────────────────────────────── | |
| Check time-based tokens first (longest match wins to avoid '1m' | |
| matching inside '10m'), then fall back to size-based prefix matching. | |
| """ | |
| # Time-based tokens — longest first to avoid substring false-positives | |
| for tf in ['15m', '10m', '5m', '2m', '1m', '30s', '15s', '5s']: | |
| if tf in signal_key: | |
| return tf | |
| # Size-based prefixes (e.g. xs_17766…, xxl_17766…) | |
| key_lower = signal_key.lower() | |
| for size in ['xxl', 'xl', 'xs', 'l_', 'm_', 's_']: | |
| if key_lower.startswith(size): | |
| return size.rstrip('_') # strip trailing underscore used as delimiter | |
| return "unknown" | |
| async def _get_price(self, action: str) -> Optional[float]: | |
| """Get current price for action""" | |
| try: | |
| price_data = await deriv_bridge.get_current_price() | |
| if price_data: | |
| return price_data.get_price(action) | |
| return None | |
| except Exception as e: | |
| logger.error(f"❌ Price fetch error: {e}") | |
| return None | |
| def _on_signal(self, message: RedisMessage): | |
| """ | |
| Handle incoming signal - FIXED for RedisAblyClient V10.1 format. | |
| ✅ FIX #6: This callback is now called by the BLOCKING listener thread | |
| in redis_connection_manager.py's RedisAblyClient, NOT the broken polling | |
| listener from the old Rewards.py RedisAblyClient. | |
| The RedisAblyClient V10.1 delivers RedisMessage objects, not raw dicts. | |
| """ | |
| try: | |
| self.signals_received += 1 | |
| self._last_signal_time = time.time() | |
| # RedisMessage from redis_connection_manager has .data attribute | |
| data = message.data if isinstance(message, RedisMessage) else message | |
| # Handle nested data (envelope format: {"event": "message", "data": {...}}) | |
| if isinstance(data, dict) and 'data' in data: | |
| data = data['data'] | |
| # Parse if string | |
| if isinstance(data, str): | |
| data = json.loads(data) | |
| # Extract fields | |
| action = data.get('final_action', data.get('action', '')).upper() | |
| signal_keys = data.get('signal_keys', []) | |
| entry_price = data.get('price', 0.0) | |
| if action not in ['BUY', 'SELL']: | |
| logger.warning(f"⚠️ Invalid action: {action}") | |
| return | |
| if not entry_price or entry_price == 0.0: | |
| logger.warning(f"⚠️ No entry price in signal: {entry_price}") | |
| return | |
| # Log signal received | |
| logger.info(f"🔔 [SIGNAL] {action} @ {entry_price:.5f} | Keys: {len(signal_keys)} | Loop: {'✅' if self._loop and self._loop.is_running() else '❌'}") | |
| # Ensure signal_keys is list | |
| if not isinstance(signal_keys, list): | |
| signal_keys = [str(signal_keys)] | |
| # Track each signal | |
| for key in signal_keys[:8]: # Limit to 8 signals | |
| key = str(key) | |
| # Skip duplicates - O(1) | |
| if key in self._tracked or self._processed_keys.get(key): | |
| continue | |
| # Memory bound check | |
| if len(self._tracked) >= MAX_TRACKED_SIGNALS: | |
| oldest = min(self._tracked.items(), key=lambda x: x[1].timestamp) | |
| del self._tracked[oldest[0]] | |
| agent = self._extract_agent(key) | |
| # Track signal | |
| self._tracked[key] = TrackedSignal( | |
| signal_key=key, | |
| action=action, | |
| entry_price=entry_price, | |
| timestamp=time.time(), | |
| agent=agent | |
| ) | |
| logger.debug(f"✅ [TRACKING] {key} | {action} @ {entry_price:.5f}") | |
| # Agent streak tracking | |
| if agent == "10m": | |
| if self.agent_tracker.update(agent, action): | |
| info = self.agent_tracker.get_info("10m") | |
| logger.info(f"🔥 [10m CYCLE #{info['cycles']}] {action} x{info['count']}") | |
| # ✅ FIX #7: Schedule reward via bounded task pool | |
| # Uses the event loop from the main thread | |
| self._schedule_reward(key) | |
| except Exception as e: | |
| logger.error(f"❌ Signal processing error: {e}") | |
| traceback.print_exc() | |
| def _schedule_reward(self, signal_key: str): | |
| """ | |
| Schedule reward calculation on the event loop. | |
| ✅ FIX v5.2.1: Since _on_signal is called from a THREAD (the RedisAblyClient | |
| listener thread), we MUST use the stored loop reference from start(). | |
| asyncio.get_event_loop() from a non-main thread returns a NEW loop | |
| (not the running one), silently dropping all reward tasks. | |
| """ | |
| try: | |
| if self._loop is not None and self._loop.is_running(): | |
| asyncio.run_coroutine_threadsafe( | |
| self._bounded_calculate_reward(signal_key), self._loop | |
| ) | |
| else: | |
| logger.warning(f"⚠️ Event loop not available (loop={self._loop}), reward for {signal_key} dropped") | |
| except RuntimeError as e: | |
| logger.warning(f"⚠️ Cannot schedule reward: {e}") | |
| async def _bounded_calculate_reward(self, signal_key: str): | |
| """ | |
| ✅ FIX #8: Bounded reward calculation with semaphore. | |
| Prevents unbounded coroutine growth from 60s sleep per signal. | |
| """ | |
| if self._reward_semaphore is None: | |
| return | |
| async with self._reward_semaphore: | |
| self._active_reward_tasks += 1 | |
| try: | |
| await self._calculate_reward(signal_key) | |
| finally: | |
| self._active_reward_tasks -= 1 | |
| async def _calculate_reward(self, signal_key: str) -> None: | |
| """Calculate reward after delay""" | |
| # Wait for evaluation period | |
| await asyncio.sleep(EVALUATION_DELAY) | |
| # Get signal | |
| signal = self._tracked.pop(signal_key, None) | |
| if not signal: | |
| return | |
| # Mark as processed | |
| self._processed_keys.set(signal_key, True) | |
| # ✅ BUG FIX 3: Retry price fetch up to 3 times with 5s backoff | |
| # Previously, a single failed price fetch would silently drop the reward forever | |
| exit_price = None | |
| for attempt in range(3): | |
| exit_price = await self._get_price(signal.action) | |
| if exit_price: | |
| break | |
| logger.warning(f"⚠️ Price fetch attempt {attempt+1}/3 failed for {signal_key}") | |
| if not deriv_bridge.is_connected: | |
| logger.warning(f"⚠️ Deriv disconnected, triggering reconnect...") | |
| await deriv_bridge._reconnect() | |
| await asyncio.sleep(5) | |
| if not exit_price: | |
| logger.error(f"❌ All price fetch attempts failed for {signal_key} — reward dropped permanently") | |
| return | |
| # Calculate reward | |
| normalized, regime, confidence, raw_bps = self.normalizer.normalize( | |
| signal.entry_price, exit_price, signal.action | |
| ) | |
| # Track accuracy | |
| if normalized > 0: | |
| self.correct += 1 | |
| correct_action = ACTION_REVERSE[signal.action] | |
| else: | |
| self.wrong += 1 | |
| correct_action = 1 - ACTION_REVERSE.get(signal.action, 0) | |
| # Log with price difference | |
| price_diff = exit_price - signal.entry_price | |
| logger.info( | |
| f"[REWARD] {signal.action} | " | |
| f"entry={signal.entry_price:.2f} → exit={exit_price:.2f} (Δ{price_diff:+.2f}) | " | |
| f"reward={normalized:+.4f} | {signal_key[:25]}" | |
| ) | |
| # Add to batch | |
| await self._add_to_batch({ | |
| "signal_key": signal_key, | |
| "reward": normalized, | |
| "entry_price": signal.entry_price, | |
| "exit_price": exit_price, | |
| "executed_action": signal.action, | |
| "correct_action": correct_action, | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "price_source": "deriv_streaming_v5_live", | |
| "platform": "huggingface-spaces" if IS_HF_SPACES else "local" | |
| }) | |
| async def _add_to_batch(self, reward_data: Dict) -> None: | |
| """Add reward to batch with backpressure""" | |
| async with self._batch_lock: | |
| self._batch.append(reward_data) | |
| self.rewards_sent += 1 | |
| # Send batch if full or timeout | |
| if len(self._batch) >= BATCH_SIZE: | |
| await self._send_batch() | |
| async def _send_batch(self) -> None: | |
| """Send reward batch via Redis pub/sub""" | |
| if not self._batch: | |
| return | |
| try: | |
| batch_data = { | |
| "rewardz": self._batch.copy(), | |
| "batch_id": f"batch_{int(time.time() * 1000)}", | |
| "batch_size": len(self._batch), | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "price_source": "deriv_streaming_v5_live", | |
| "platform": "huggingface-spaces" if IS_HF_SPACES else "local" | |
| } | |
| # ✅ FIX #10: Use synchronous publish for RedisAblyClient V10.1 | |
| # The RedisAblyChannel.publish() in redis_connection_manager.py is async, | |
| # but we can also use the underlying redis client directly for reliability. | |
| await self.reward_channel_batch.publish("reward-batch", batch_data) | |
| # ── §P2-fix-6 (2026-04-19): DUPLICATE-PUBLISH REMOVED ─────────── | |
| # Previous code also published each reward individually to | |
| # "new-reward" on the individual channel AFTER the batch publish. | |
| # The engine subscribes to BOTH channels (reward-batches AND | |
| # rewards) at quasar_main4.py:L25544/L25547, so every reward | |
| # triggered on_reward twice — first match succeeded, second hit | |
| # _processed_batch_ids → "duplicate" counter. | |
| # Observed impact: duplicate=46660, matched=3 (15,000:1 ratio). | |
| # The batch channel is authoritative; individual publish was a | |
| # legacy compatibility layer whose consumer no longer exists. | |
| # | |
| # for r in self._batch: | |
| # try: | |
| # await self.reward_channel_individual.publish("new-reward", r) | |
| # except Exception: | |
| # pass | |
| logger.info(f"📤 Sent batch of {len(self._batch)} rewards | Active tasks: {self._active_reward_tasks}") | |
| self._batch.clear() | |
| self._last_batch_time = time.time() | |
| except Exception as e: | |
| logger.error(f"❌ Batch send error: {e}") | |
| self._batch.clear() | |
| async def _health_monitor_loop(self): | |
| """ | |
| ✅ FIX #11: Health monitor that detects silent disconnections. | |
| If no signals received for 5 minutes AND we expect signals to be flowing, | |
| trigger diagnostics and alert. | |
| """ | |
| SIGNAL_TIMEOUT = 300 # 5 minutes without signals = problem | |
| DERIV_CHECK_INTERVAL = 60 # Check Deriv every 60s | |
| while not self._shutdown.is_set(): | |
| await asyncio.sleep(30) | |
| now = time.time() | |
| # Check Redis connection health | |
| if self.ably_realtime: | |
| try: | |
| # The robust RedisAblyClient has connection state | |
| redis_state = self.ably_realtime.connection.state | |
| if redis_state != 'connected': | |
| logger.warning(f"⚠️ [HEALTH] Redis state: {redis_state}") | |
| self._connection_healthy = False | |
| except Exception as e: | |
| logger.warning(f"⚠️ [HEALTH] Redis check failed: {e}") | |
| # Check signal flow | |
| if self._last_signal_time > 0: | |
| signal_age = now - self._last_signal_time | |
| if signal_age > SIGNAL_TIMEOUT: | |
| logger.warning( | |
| f"⚠️ [HEALTH] No signals for {signal_age:.0f}s! " | |
| f"Last signal at {datetime.fromtimestamp(self._last_signal_time).strftime('%H:%M:%S')}. " | |
| f"Possible pub/sub disconnection." | |
| ) | |
| self._connection_healthy = False | |
| # Check Deriv WebSocket | |
| if not deriv_bridge.is_connected or not deriv_bridge._streaming: | |
| logger.warning("⚠️ [HEALTH] Deriv disconnected, attempting reconnect...") | |
| success = await deriv_bridge._reconnect() | |
| if success: | |
| logger.info("✅ [HEALTH] Deriv reconnected") | |
| else: | |
| logger.error("❌ [HEALTH] Deriv reconnection failed") | |
| elif deriv_bridge._stream_task is not None and deriv_bridge._stream_task.done(): | |
| # ── BUG-FIX-2 ───────────────────────────────────────────────── | |
| # Even after BUG-FIX-1, add a second detection path: if the | |
| # stream task object itself has finished (done()==True) but the | |
| # flags haven't been cleared yet (race window), still reconnect. | |
| logger.warning("⚠️ [HEALTH] Stream task finished unexpectedly — reconnecting") | |
| await deriv_bridge._reconnect() | |
| async def _status_loop(self) -> None: | |
| """Periodic status reporting""" | |
| while not self._shutdown.is_set(): | |
| await asyncio.sleep(60) | |
| total = self.correct + self.wrong | |
| win_rate = (100 * self.correct / max(1, total)) | |
| bridge_stats = deriv_bridge.get_stats() | |
| # ✅ FIX: Include health info in status | |
| signal_age = time.time() - self._last_signal_time if self._last_signal_time > 0 else -1 | |
| logger.info( | |
| f"[STATUS] Signals={self.signals_received} | " | |
| f"Rewards={self.rewards_sent} | " | |
| f"WinRate={win_rate:.1f}% | " | |
| f"Tracked={len(self._tracked)} | " | |
| f"ActiveTasks={self._active_reward_tasks} | " | |
| f"SignalAge={signal_age:.0f}s | " | |
| f"Ticks={bridge_stats['ticks_received']} | " | |
| f"Healthy={'✅' if self._connection_healthy else '❌'} | " | |
| f"Platform={'HF Spaces' if IS_HF_SPACES else 'Local'}" | |
| ) | |
| # Flush any pending batch | |
| async with self._batch_lock: | |
| if self._batch and time.time() - self._last_batch_time > 30: | |
| await self._send_batch() | |
| async def start(self) -> None: | |
| """Start the rewards engine""" | |
| # V9.0 FIX: Recreate asyncio primitives on the running loop | |
| self._batch_lock = asyncio.Lock() | |
| self._shutdown = asyncio.Event() | |
| self._reward_semaphore = asyncio.Semaphore(MAX_CONCURRENT_REWARD_TASKS) | |
| # ✅ FIX v5.2.1: Store event loop reference BEFORE anything else | |
| # This is CRITICAL because _on_signal runs in the Redis listener THREAD | |
| # and needs to schedule coroutines on THIS loop | |
| self._loop = asyncio.get_running_loop() | |
| logger.info("=" * 70) | |
| logger.info("K1RL QUANT - INSTITUTIONAL REWARDS v5.2.1-V75") | |
| logger.info("HuggingFace Spaces Edition 🤗 (V75 NAMESPACE ISOLATION))") | |
| logger.info("=" * 70) | |
| if IS_HF_SPACES: | |
| logger.info("🤗 Running in HuggingFace Spaces environment") | |
| logger.info(" - V75 channel prefix: '%s'" % CHANNEL_PREFIX) | |
| logger.info(" - V75 databases: features=DB%d, rewards=DB%d" % (REDIS_DB_FEATURES, REDIS_DB_REWARDS)) | |
| logger.info(" - Robust RedisAblyClient V10.1 (blocking listener)") | |
| logger.info(" - Bounded reward task pool (max=%d)" % MAX_CONCURRENT_REWARD_TASKS) | |
| logger.info(" - Connection health monitoring") | |
| logger.info(" - ✅ Event loop captured for thread→asyncio bridge") | |
| if not await self.initialize(): | |
| raise Exception("Initialization failed") | |
| # ✅ FIX #12: Subscribe using RedisAblyClient V10.1 format | |
| # The V10.1 RedisAblyChannel.subscribe() expects (event_name, callback) | |
| # where callback receives a RedisMessage object (not raw dict) | |
| await self.signal_channel.subscribe("message", self._on_signal) | |
| logger.info(f"✅ Subscribed to {ABLY_SIGNAL_CHANNEL}:message (V75 namespace, robust V10.1 listener)") | |
| logger.info(f" Event loop captured: {self._loop is not None} | Running: {self._loop.is_running() if self._loop else False}") | |
| # Start health monitor | |
| health_task = asyncio.create_task(self._health_monitor_loop()) | |
| # Start status loop | |
| status_task = asyncio.create_task(self._status_loop()) | |
| try: | |
| # Main loop | |
| while not self._shutdown.is_set(): | |
| await asyncio.sleep(1) | |
| finally: | |
| health_task.cancel() | |
| status_task.cancel() | |
| async def shutdown(self) -> None: | |
| """Clean shutdown""" | |
| logger.info("🛑 Shutting down rewards engine...") | |
| self._shutdown.set() | |
| # Flush batch | |
| async with self._batch_lock: | |
| if self._batch: | |
| await self._send_batch() | |
| # Close connections | |
| if self.ably_realtime: | |
| self.ably_realtime.close() | |
| await deriv_bridge.shutdown() | |
| logger.info("✅ Shutdown complete") | |
| # ============================================================================ | |
| # GLOBAL INSTANCES | |
| # ============================================================================ | |
| deriv_bridge = DerivStreamingBridge() | |
| # ============================================================================ | |
| # MAIN | |
| # ============================================================================ | |
| async def main(): | |
| print("\n" + "=" * 70) | |
| print("K1RL QUANT - INSTITUTIONAL REWARDS v5.2.1-V75") | |
| print("HuggingFace Spaces Edition 🤗 (V75 NAMESPACE ISOLATION)") | |
| print("=" * 70) | |
| print(f"✅ V75 channel prefix: '{CHANNEL_PREFIX}'") | |
| print(f"✅ V75 databases: features=DB{REDIS_DB_FEATURES}, rewards=DB{REDIS_DB_REWARDS}") | |
| print("✅ FIXED: Uses robust RedisAblyClient V10.1 (blocking listener)") | |
| print("✅ FIXED: Bounded reward task pool (no coroutine leak)") | |
| print("✅ FIXED: Connection health monitoring") | |
| print("✅ FIXED: Deriv auto-reconnection") | |
| print("✅ FIXED: Event loop reference for thread→asyncio bridge") | |
| print("✅ Auto-reconnecting WebSocket") | |
| print("✅ O(1) signal tracking") | |
| print("✅ TTL price cache") | |
| print("✅ Batch processing with backpressure") | |
| print("✅ Memory-bounded buffers") | |
| print("✅ HuggingFace Spaces compatibility") | |
| print("✅ ZERO cross-talk with other Spaces (V75 namespace)") | |
| print("=" * 70 + "\n") | |
| if IS_HF_SPACES: | |
| print("🤗 HuggingFace Spaces environment detected") | |
| print("") | |
| engine = RewardsEngine() | |
| try: | |
| await engine.start() | |
| except KeyboardInterrupt: | |
| print("\n>>> Shutdown requested") | |
| except Exception as e: | |
| logger.error(f"Fatal error: {e}") | |
| traceback.print_exc() | |
| finally: | |
| await engine.shutdown() | |
| if __name__ == "__main__": | |
| try: | |
| asyncio.run(main()) | |
| except KeyboardInterrupt: | |
| print("\n>>> Stopped") |