# -*- coding: utf-8 -*- """ K1RL QUANT - INSTITUTIONAL REWARDS SYSTEM v5.2.1-V25 HuggingFace Spaces Edition - Maximum Performance CRASH1000 NAMESPACE ISOLATION: ✅ All channels prefixed with "V25:" — zero cross-talk with other spaces ✅ Uses DB 0/1 (features/rewards) — isolated per Space container ✅ Imports from redis_config_v25 for CRASH1000-specific configuration CRITICAL FIX (v5.2.1): ✅ FIXED: asyncio.get_event_loop() from listener thread returned WRONG loop → Reward tasks silently dropped (never scheduled) → Now stores loop reference via asyncio.get_running_loop() in start() ✅ All v5.2.0 fixes retained CRITICAL FIX (v5.2.0): ✅ REMOVED duplicate RedisAblyClient - uses redis_connection_manager.RedisAblyClient ✅ Added connection health monitoring with auto-reconnection ✅ Bounded reward task pool (prevents coroutine leak) ✅ Deriv WebSocket auto-reconnection loop ✅ Pub/sub heartbeat detection (detects silent disconnects) PREVIOUS OPTIMIZATIONS (v5.1.0): ✅ Proper async price streaming with reconnection ✅ LRU cache for price data with TTL ✅ O(1) signal tracking with hash maps ✅ Batch processing with backpressure ✅ Connection health monitoring ✅ Memory-efficient deque buffers ✅ HuggingFace Spaces compatibility ✅ Container-safe logging and paths """ import asyncio import logging import sys import time import json import traceback import ssl import websockets import os from datetime import datetime, timezone from collections import deque, OrderedDict from dataclasses import dataclass, field from typing import Optional, Dict, List, Any, Deque from functools import lru_cache import numpy as np from pathlib import Path # Async compatibility try: import nest_asyncio nest_asyncio.apply() except ImportError: pass # ============================================================================ # ✅ FIX #1: Import the ROBUST RedisAblyClient from redis_connection_manager # instead of defining a broken local version with no reconnection # ============================================================================ import redis from redis_config_v25 import ( REDIS_URL, REDIS_PASSWORD, REDIS_DB_FEATURES, REDIS_DB_REWARDS, CHANNEL_PREFIX, prefixed_channel, QUASAR_VERSION ) from redis_connection_manager import ( RedisAblyClient, RedisMessage, DedicatedRedisConnectionManager, diagnose_redis_connection, IS_HF_SPACES ) # ============================================================================ # HUGGINGFACE SPACES CONFIGURATION # ============================================================================ # ✅ FIXED: Environment variable for API key DERIV_API_KEY = os.environ.get('DERIV_API_KEY', '1KJKxIJKR8LCyKB') DERIV_WS_URL = "wss://ws.binaryws.com/websockets/v3?app_id=1089" SYMBOL_MAP = { "Volatility 25 Index": "R_25", "Crash 500 Index": "CRASH500", "Volatility 100 Index": "R_100", "Volatility 50 Index": "R_50", "Volatility 25 Index": "R_25", # ✅ V25: Volatility 25 Index symbol } SYMBOL = "Volatility 25 Index" # ✅ V25 DERIV_SYMBOL = "R_25" # ✅ V25: Volatility 25 Index Deriv symbol # Ably Configuration (now Redis channels — CRASH1000 NAMESPACED) ABLY_SIGNAL_CHANNEL = prefixed_channel("final_signals") # → "CRASH1000:final_signals" ABLY_REWARD_CHANNEL = prefixed_channel("rewards") # → "CRASH1000:rewards" ABLY_BATCH_CHANNEL = prefixed_channel("reward-batches") # → "CRASH1000:reward-batches" ACTION_MAP = {0: 'BUY', 1: 'SELL', 2: 'HOLD'} ACTION_REVERSE = {'BUY': 0, 'SELL': 1, 'HOLD': 2} # Performance tuning EVALUATION_DELAY = 60 # seconds BATCH_SIZE = 10 PRICE_CACHE_TTL = 5.0 # seconds - price considered stale after this MAX_TRACKED_SIGNALS = 10000 RECONNECT_DELAY = 5 # seconds MAX_RECONNECT_ATTEMPTS = 10 # ✅ FIX #2: Bounded concurrency for reward calculation tasks MAX_CONCURRENT_REWARD_TASKS = 200 # Prevents unbounded coroutine growth # ✅ FIXED: Container-safe logging BASE_DIR = Path('/home/user/app') LOG_DIR = BASE_DIR / 'logs' LOG_DIR.mkdir(parents=True, exist_ok=True) # ── §P2-fix-7 + BUG-FIX-3 ──────────────────────────────────────────────────── # Module-level flag prevents double-handler installation even when the module # is imported twice (e.g. as __main__ AND as an import). The previous guard # stored the flag as an INSTANCE ATTRIBUTE on the logger object; that works # within one Python process, but if HF Spaces forks a second process that # imports this file, the new process gets a fresh logger (no attribute) and # installs handlers a second time — both processes then write to the same # rewards.log, producing every line twice with identical timestamps. # A module-level boolean is process-local and is never re-evaluated on import. _REWARDS_LOGGER_CONFIGURED = False logger = logging.getLogger(__name__) if not _REWARDS_LOGGER_CONFIGURED: _REWARDS_LOGGER_CONFIGURED = True logger.setLevel(logging.INFO) _fmt = logging.Formatter("%(asctime)s [REWARDS] %(levelname)s: %(message)s") _stream_h = logging.StreamHandler(sys.stdout) _stream_h.setFormatter(_fmt) _file_h = logging.FileHandler(LOG_DIR / 'rewards.log', encoding='utf-8') _file_h.setFormatter(_fmt) logger.addHandler(_stream_h) logger.addHandler(_file_h) logger.propagate = False # prevent root-logger handlers from duplicating if IS_HF_SPACES: logger.info("🤗 HuggingFace Spaces environment detected") # ============================================================================ # HIGH-PERFORMANCE DATA STRUCTURES (Unchanged - Already Optimized) # ============================================================================ class PriceData: """Immutable price snapshot with timestamp - optimized with __slots__""" __slots__ = ('bid', 'ask', 'last', 'timestamp', 'epoch') def __init__(self, bid: float, ask: float, last: float, timestamp: float, epoch: int): self.bid = bid self.ask = ask self.last = last self.timestamp = timestamp self.epoch = epoch @property def age(self) -> float: return time.time() - self.timestamp @property def is_stale(self) -> bool: return self.age > PRICE_CACHE_TTL def get_price(self, action: str) -> float: return self.ask if action == "BUY" else self.bid class TrackedSignal: """Tracked signal with minimal memory footprint - optimized with __slots__""" __slots__ = ('signal_key', 'action', 'entry_price', 'timestamp', 'agent') def __init__(self, signal_key: str, action: str, entry_price: float, timestamp: float, agent: str = "unknown"): self.signal_key = signal_key self.action = action self.entry_price = entry_price self.timestamp = timestamp self.agent = agent class TTLCache: """O(1) cache with time-to-live expiration""" __slots__ = ('_cache', '_ttl', '_max_size') def __init__(self, ttl: float = 5.0, max_size: int = 1000): self._cache: OrderedDict = OrderedDict() self._ttl = ttl self._max_size = max_size def get(self, key: str) -> Optional[Any]: if key not in self._cache: return None value, timestamp = self._cache[key] if time.time() - timestamp > self._ttl: del self._cache[key] return None return value def set(self, key: str, value: Any) -> None: # Evict oldest if at capacity while len(self._cache) >= self._max_size: self._cache.popitem(last=False) self._cache[key] = (value, time.time()) # Move to end (most recently used) self._cache.move_to_end(key) def __len__(self) -> int: return len(self._cache) # ============================================================================ # OPTIMIZED DERIV BRIDGE - HuggingFace Spaces Edition # ============================================================================ class DerivStreamingBridge: """ High-performance Deriv WebSocket bridge - HuggingFace Spaces optimized. Features: - Auto-reconnection with exponential backoff - Container-safe error handling - HF Spaces compatibility """ def __init__(self): self.ws: Optional[websockets.WebSocketClientProtocol] = None self.is_connected = False self.is_authorized = False # Price cache with TTL self._price_cache: Dict[str, PriceData] = {} self._cache_lock = asyncio.Lock() # Connection management self._reconnect_attempts = 0 self._last_tick_time: Dict[str, float] = {} self._streaming = False self._stream_task: Optional[asyncio.Task] = None # HF Spaces features self._hf_spaces_mode = IS_HF_SPACES self._max_connection_attempts = 10 # Stats self.ticks_received = 0 self.reconnections = 0 async def connect(self) -> bool: """Connect and authorize to Deriv with HF Spaces resilience""" # V9.0 FIX: Recreate lock on the RUNNING loop self._cache_lock = asyncio.Lock() try: self._reconnect_attempts += 1 logger.info(f"🔄 Connecting to Deriv WebSocket... (attempt {self._reconnect_attempts})") # Connection attempt limit if self._reconnect_attempts > self._max_connection_attempts: logger.error("❌ Max connection attempts exceeded") return False ssl_context = ssl.create_default_context() ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE self.ws = await asyncio.wait_for( websockets.connect( DERIV_WS_URL, ssl=ssl_context, ping_interval=25, ping_timeout=10, close_timeout=5, max_size=2**20 ), timeout=15.0 if IS_HF_SPACES else 30.0 ) # Authorize await self.ws.send(json.dumps({"authorize": DERIV_API_KEY})) response = await asyncio.wait_for(self.ws.recv(), timeout=10.0) data = json.loads(response) if 'authorize' in data: self.is_connected = True self.is_authorized = True self._reconnect_attempts = 0 balance = data['authorize'].get('balance', 0) logger.info(f"✅ Deriv connected | Balance: ${balance:.2f}") # Start streaming await self._start_streaming() return True else: logger.error(f"❌ Auth failed: {data.get('error', 'Unknown')}") return False except asyncio.TimeoutError: logger.warning(f"⏰ Connection timeout (attempt {self._reconnect_attempts})") return False except Exception as e: logger.warning(f"⚠️ Connection error: {e}") return False async def _start_streaming(self): """Start price streaming""" self._stream_task = asyncio.create_task(self._real_price_stream()) self._streaming = True logger.info(f"📡 Streaming started") async def _real_price_stream(self): """Real price streaming from Deriv WebSocket""" try: # Subscribe to ticks await self.ws.send(json.dumps({"ticks": DERIV_SYMBOL})) logger.info(f"📡 Subscribed to {DERIV_SYMBOL}") while self.is_connected: try: data = await self.ws.recv() json_data = json.loads(data) if 'tick' in json_data: await self._process_tick(json_data['tick']) self.ticks_received += 1 except websockets.exceptions.ConnectionClosed: logger.warning("📡 WebSocket connection closed") break except Exception as e: logger.error(f"❌ Stream error: {e}") break except Exception as e: logger.error(f"❌ Real price streaming error: {e}") finally: # ── BUG-FIX-1 ───────────────────────────────────────────────────── # _streaming and is_connected were NEVER cleared when the stream # task exited via exception or WebSocket close. The health monitor # guard (line ~895) only checks these two flags, so it could never # detect the silent death → no reconnect → price cache went stale # permanently → every single price fetch failed → Rewards=0. self._streaming = False self.is_connected = False logger.warning("⚠️ _real_price_stream exited — flags cleared for health monitor") async def _process_tick(self, tick_data): """Process incoming tick data""" try: price = float(tick_data['quote']) epoch = int(tick_data['epoch']) # Create price data price_data = PriceData( bid=price - 0.0005, ask=price + 0.0005, last=price, timestamp=time.time(), epoch=epoch ) # Update cache async with self._cache_lock: self._price_cache[DERIV_SYMBOL] = price_data self._last_tick_time[DERIV_SYMBOL] = time.time() except Exception as e: logger.error(f"❌ Tick processing error: {e}") async def _reconnect(self) -> bool: """Reconnect with exponential backoff""" self.is_connected = False self._streaming = False if self._stream_task: self._stream_task.cancel() if self.ws: await self.ws.close() delay = min(60, RECONNECT_DELAY * (2 ** min(self._reconnect_attempts, 5))) logger.info(f"🔄 Reconnecting in {delay}s...") await asyncio.sleep(delay) success = await self.connect() if success: self.reconnections += 1 logger.info(f"✅ Reconnected successfully (#{self.reconnections})") return success async def get_current_price(self, symbol: str = DERIV_SYMBOL) -> Optional[PriceData]: """Get current cached price""" async with self._cache_lock: price_data = self._price_cache.get(symbol) if price_data and not price_data.is_stale: return price_data return None def get_stats(self) -> Dict: """Get connection statistics""" return { 'connected': self.is_connected, 'authorized': self.is_authorized, 'ticks_received': self.ticks_received, 'reconnections': self.reconnections, 'streaming': self._streaming } async def shutdown(self): """Shutdown with cleanup""" logger.info("🛑 Shutting down Deriv bridge...") self.is_connected = False self._streaming = False if self._stream_task: self._stream_task.cancel() try: await self._stream_task except asyncio.CancelledError: pass if self.ws: try: await self.ws.close() except Exception: pass logger.info("✅ Deriv bridge shutdown complete") # ============================================================================ # REWARD CALCULATION COMPONENTS (Updated for HF Spaces) # ============================================================================ class RewardNormalizer: def __init__(self, base_multiplier=1000): self.base_multiplier = base_multiplier self.volatility_buffer = deque(maxlen=100) def normalize(self, entry_price, exit_price, action): """Normalize reward based on action and price movement""" if entry_price <= 0: return 0, "invalid", 0, 0 # Calculate raw basis points raw_bps = ((exit_price - entry_price) / entry_price) * 10000 # Apply action multiplier if action == "BUY": directional_bps = raw_bps elif action == "SELL": directional_bps = -raw_bps else: # HOLD directional_bps = -abs(raw_bps) * 0.1 # Small penalty for holding # Simple regime detection regime = "normal" if abs(raw_bps) > 50: regime = "high_vol" elif abs(raw_bps) < 5: regime = "low_vol" # Normalize to [-1, 1] range normalized = np.tanh(directional_bps / 100) confidence = min(abs(directional_bps) / 20, 1.0) return normalized, regime, confidence, raw_bps class AgentTracker: """Track agent performance and streaks""" def __init__(self): self.agents = {} self.reset() def reset(self): """Reset tracking data""" self.agents = { "5s": {"count": 0, "action": None, "cycles": 0}, "15s": {"count": 0, "action": None, "cycles": 0}, "30s": {"count": 0, "action": None, "cycles": 0}, "1m": {"count": 0, "action": None, "cycles": 0}, "2m": {"count": 0, "action": None, "cycles": 0}, "5m": {"count": 0, "action": None, "cycles": 0}, "10m": {"count": 0, "action": None, "cycles": 0}, "15m": {"count": 0, "action": None, "cycles": 0} } def update(self, agent, action): """Update agent tracking, return True if cycle completed""" if agent not in self.agents: return False if self.agents[agent]["action"] == action: self.agents[agent]["count"] += 1 else: if self.agents[agent]["count"] >= 3: # Cycle completion self.agents[agent]["cycles"] += 1 self.agents[agent]["count"] = 1 self.agents[agent]["action"] = action return True else: self.agents[agent]["count"] = 1 self.agents[agent]["action"] = action return False def get_info(self, agent): """Get agent tracking info""" return self.agents.get(agent, {"count": 0, "action": None, "cycles": 0}) # ============================================================================ # MAIN REWARDS ENGINE - HuggingFace Spaces Edition v5.2.0 # ============================================================================ class RewardsEngine: """ Main rewards calculation engine with HF Spaces compatibility. v5.2.0 FIXES: - Uses robust RedisAblyClient from redis_connection_manager.py - Bounded reward task pool (semaphore) - Connection health heartbeat - Deriv auto-reconnection loop """ def __init__(self): # Core components self.normalizer = RewardNormalizer() self.agent_tracker = AgentTracker() # Tracking self._tracked: Dict[str, TrackedSignal] = {} self._processed_keys = TTLCache(ttl=300) # 5 minutes # Batch processing self._batch: List[Dict] = [] self._batch_lock: Optional[asyncio.Lock] = None self._last_batch_time = time.time() # ✅ FIX #3: Bounded task pool for reward calculations self._reward_semaphore: Optional[asyncio.Semaphore] = None self._active_reward_tasks = 0 # Statistics self.signals_received = 0 self.rewards_sent = 0 self.correct = 0 self.wrong = 0 # ✅ FIX #4: Track last signal time for health monitoring self._last_signal_time = 0.0 self._last_heartbeat_time = 0.0 self._connection_healthy = True # ✅ FIX v5.2.1: Store event loop reference for thread→asyncio bridge self._loop: Optional[asyncio.AbstractEventLoop] = None # Connections self.ably_realtime: Optional[RedisAblyClient] = None self.signal_channel = None self.reward_channel_batch = None self.reward_channel_individual = None # Control self._shutdown: Optional[asyncio.Event] = None async def initialize(self) -> bool: """Initialize connections and channels""" try: logger.info("📡 Connecting to Redis (CRASH1000 namespace)...") # ✅ FIX #5: Use the ROBUST RedisAblyClient from redis_connection_manager.py # This version has: blocking listener, auto-reconnection, health monitoring # CRASH1000: Uses DB 0 (features) — isolated per Space container self.ably_realtime = RedisAblyClient( redis_url=REDIS_URL, password=REDIS_PASSWORD, use_streams=True, database=REDIS_DB_FEATURES # CRASH1000: DB 0 ) # Set up channels (already prefixed via constants above) self.signal_channel = self.ably_realtime.channels.get(ABLY_SIGNAL_CHANNEL) self.reward_channel_batch = self.ably_realtime.channels.get(ABLY_BATCH_CHANNEL) self.reward_channel_individual = self.ably_realtime.channels.get(ABLY_REWARD_CHANNEL) logger.info(f"✅ Redis channels initialized (CRASH1000 — prefix='{CHANNEL_PREFIX}', DB={REDIS_DB_FEATURES})") logger.info(f" Signal: {ABLY_SIGNAL_CHANNEL}") logger.info(f" Rewards: {ABLY_REWARD_CHANNEL}") logger.info(f" Batches: {ABLY_BATCH_CHANNEL}") # Initialize Deriv bridge logger.info("🔄 Connecting to Deriv...") success = await deriv_bridge.connect() if not success: logger.error("❌ Deriv connection failed") return False logger.info("✅ All connections established") return True except Exception as e: logger.error(f"❌ Initialization error: {e}") return False def _extract_agent(self, signal_key: str) -> str: """Extract agent timeframe from signal key. Signal keys may use either time-based suffixes (e.g. '10m_xxx') or size-based prefixes (e.g. 'xs_xxx', 'xxl_xxx'). The original code only checked for time-based tokens — 'xs_xxx' returned 'unknown' for six of the eight agents, breaking per-agent cycle tracking. ── BUG-FIX-4 ────────────────────────────────────────────────────────── Check time-based tokens first (longest match wins to avoid '1m' matching inside '10m'), then fall back to size-based prefix matching. """ # Time-based tokens — longest first to avoid substring false-positives for tf in ['15m', '10m', '5m', '2m', '1m', '30s', '15s', '5s']: if tf in signal_key: return tf # Size-based prefixes (e.g. xs_17766…, xxl_17766…) key_lower = signal_key.lower() for size in ['xxl', 'xl', 'xs', 'l_', 'm_', 's_']: if key_lower.startswith(size): return size.rstrip('_') # strip trailing underscore used as delimiter return "unknown" async def _get_price(self, action: str) -> Optional[float]: """Get current price for action""" try: price_data = await deriv_bridge.get_current_price() if price_data: return price_data.get_price(action) return None except Exception as e: logger.error(f"❌ Price fetch error: {e}") return None def _on_signal(self, message: RedisMessage): """ Handle incoming signal - FIXED for RedisAblyClient V10.1 format. ✅ FIX #6: This callback is now called by the BLOCKING listener thread in redis_connection_manager.py's RedisAblyClient, NOT the broken polling listener from the old Rewards.py RedisAblyClient. The RedisAblyClient V10.1 delivers RedisMessage objects, not raw dicts. """ try: self.signals_received += 1 self._last_signal_time = time.time() # RedisMessage from redis_connection_manager has .data attribute data = message.data if isinstance(message, RedisMessage) else message # Handle nested data (envelope format: {"event": "message", "data": {...}}) if isinstance(data, dict) and 'data' in data: data = data['data'] # Parse if string if isinstance(data, str): data = json.loads(data) # Extract fields action = data.get('final_action', data.get('action', '')).upper() signal_keys = data.get('signal_keys', []) entry_price = data.get('price', 0.0) if action not in ['BUY', 'SELL']: logger.warning(f"⚠️ Invalid action: {action}") return if not entry_price or entry_price == 0.0: logger.warning(f"⚠️ No entry price in signal: {entry_price}") return # Log signal received logger.info(f"🔔 [SIGNAL] {action} @ {entry_price:.5f} | Keys: {len(signal_keys)} | Loop: {'✅' if self._loop and self._loop.is_running() else '❌'}") # Ensure signal_keys is list if not isinstance(signal_keys, list): signal_keys = [str(signal_keys)] # Track each signal for key in signal_keys[:8]: # Limit to 8 signals key = str(key) # Skip duplicates - O(1) if key in self._tracked or self._processed_keys.get(key): continue # Memory bound check if len(self._tracked) >= MAX_TRACKED_SIGNALS: oldest = min(self._tracked.items(), key=lambda x: x[1].timestamp) del self._tracked[oldest[0]] agent = self._extract_agent(key) # Track signal self._tracked[key] = TrackedSignal( signal_key=key, action=action, entry_price=entry_price, timestamp=time.time(), agent=agent ) logger.debug(f"✅ [TRACKING] {key} | {action} @ {entry_price:.5f}") # Agent streak tracking if agent == "10m": if self.agent_tracker.update(agent, action): info = self.agent_tracker.get_info("10m") logger.info(f"🔥 [10m CYCLE #{info['cycles']}] {action} x{info['count']}") # ✅ FIX #7: Schedule reward via bounded task pool # Uses the event loop from the main thread self._schedule_reward(key) except Exception as e: logger.error(f"❌ Signal processing error: {e}") traceback.print_exc() def _schedule_reward(self, signal_key: str): """ Schedule reward calculation on the event loop. ✅ FIX v5.2.1: Since _on_signal is called from a THREAD (the RedisAblyClient listener thread), we MUST use the stored loop reference from start(). asyncio.get_event_loop() from a non-main thread returns a NEW loop (not the running one), silently dropping all reward tasks. """ try: if self._loop is not None and self._loop.is_running(): asyncio.run_coroutine_threadsafe( self._bounded_calculate_reward(signal_key), self._loop ) else: logger.warning(f"⚠️ Event loop not available (loop={self._loop}), reward for {signal_key} dropped") except RuntimeError as e: logger.warning(f"⚠️ Cannot schedule reward: {e}") async def _bounded_calculate_reward(self, signal_key: str): """ ✅ FIX #8: Bounded reward calculation with semaphore. Prevents unbounded coroutine growth from 60s sleep per signal. """ if self._reward_semaphore is None: return async with self._reward_semaphore: self._active_reward_tasks += 1 try: await self._calculate_reward(signal_key) finally: self._active_reward_tasks -= 1 async def _calculate_reward(self, signal_key: str) -> None: """Calculate reward after delay""" # Wait for evaluation period await asyncio.sleep(EVALUATION_DELAY) # Get signal signal = self._tracked.pop(signal_key, None) if not signal: return # Mark as processed self._processed_keys.set(signal_key, True) # ✅ BUG FIX 3: Retry price fetch up to 3 times with 5s backoff # Previously, a single failed price fetch would silently drop the reward forever exit_price = None for attempt in range(3): exit_price = await self._get_price(signal.action) if exit_price: break logger.warning(f"⚠️ Price fetch attempt {attempt+1}/3 failed for {signal_key}") if not deriv_bridge.is_connected: logger.warning(f"⚠️ Deriv disconnected, triggering reconnect...") await deriv_bridge._reconnect() await asyncio.sleep(5) if not exit_price: logger.error(f"❌ All price fetch attempts failed for {signal_key} — reward dropped permanently") return # Calculate reward normalized, regime, confidence, raw_bps = self.normalizer.normalize( signal.entry_price, exit_price, signal.action ) # Track accuracy if normalized > 0: self.correct += 1 correct_action = ACTION_REVERSE[signal.action] else: self.wrong += 1 correct_action = 1 - ACTION_REVERSE.get(signal.action, 0) # Log with price difference price_diff = exit_price - signal.entry_price logger.info( f"[REWARD] {signal.action} | " f"entry={signal.entry_price:.2f} → exit={exit_price:.2f} (Δ{price_diff:+.2f}) | " f"reward={normalized:+.4f} | {signal_key[:25]}" ) # Add to batch await self._add_to_batch({ "signal_key": signal_key, "reward": normalized, "entry_price": signal.entry_price, "exit_price": exit_price, "executed_action": signal.action, "correct_action": correct_action, "timestamp": datetime.now(timezone.utc).isoformat(), "price_source": "deriv_streaming_v5_live", "platform": "huggingface-spaces" if IS_HF_SPACES else "local" }) async def _add_to_batch(self, reward_data: Dict) -> None: """Add reward to batch with backpressure""" async with self._batch_lock: self._batch.append(reward_data) self.rewards_sent += 1 # Send batch if full or timeout if len(self._batch) >= BATCH_SIZE: await self._send_batch() async def _send_batch(self) -> None: """Send reward batch via Redis pub/sub""" if not self._batch: return try: batch_data = { "rewardz": self._batch.copy(), "batch_id": f"batch_{int(time.time() * 1000)}", "batch_size": len(self._batch), "timestamp": datetime.now(timezone.utc).isoformat(), "price_source": "deriv_streaming_v5_live", "platform": "huggingface-spaces" if IS_HF_SPACES else "local" } # ✅ FIX #10: Use synchronous publish for RedisAblyClient V10.1 # The RedisAblyChannel.publish() in redis_connection_manager.py is async, # but we can also use the underlying redis client directly for reliability. await self.reward_channel_batch.publish("reward-batch", batch_data) # ── §P2-fix-6 (2026-04-19): DUPLICATE-PUBLISH REMOVED ─────────── # Previous code also published each reward individually to # "new-reward" on the individual channel AFTER the batch publish. # The engine subscribes to BOTH channels (reward-batches AND # rewards) at quasar_main4.py:L25544/L25547, so every reward # triggered on_reward twice — first match succeeded, second hit # _processed_batch_ids → "duplicate" counter. # Observed impact: duplicate=46660, matched=3 (15,000:1 ratio). # The batch channel is authoritative; individual publish was a # legacy compatibility layer whose consumer no longer exists. # # for r in self._batch: # try: # await self.reward_channel_individual.publish("new-reward", r) # except Exception: # pass logger.info(f"📤 Sent batch of {len(self._batch)} rewards | Active tasks: {self._active_reward_tasks}") self._batch.clear() self._last_batch_time = time.time() except Exception as e: logger.error(f"❌ Batch send error: {e}") self._batch.clear() async def _health_monitor_loop(self): """ ✅ FIX #11: Health monitor that detects silent disconnections. If no signals received for 5 minutes AND we expect signals to be flowing, trigger diagnostics and alert. """ SIGNAL_TIMEOUT = 300 # 5 minutes without signals = problem DERIV_CHECK_INTERVAL = 60 # Check Deriv every 60s while not self._shutdown.is_set(): await asyncio.sleep(30) now = time.time() # Check Redis connection health if self.ably_realtime: try: # The robust RedisAblyClient has connection state redis_state = self.ably_realtime.connection.state if redis_state != 'connected': logger.warning(f"⚠️ [HEALTH] Redis state: {redis_state}") self._connection_healthy = False except Exception as e: logger.warning(f"⚠️ [HEALTH] Redis check failed: {e}") # Check signal flow if self._last_signal_time > 0: signal_age = now - self._last_signal_time if signal_age > SIGNAL_TIMEOUT: logger.warning( f"⚠️ [HEALTH] No signals for {signal_age:.0f}s! " f"Last signal at {datetime.fromtimestamp(self._last_signal_time).strftime('%H:%M:%S')}. " f"Possible pub/sub disconnection." ) self._connection_healthy = False # Check Deriv WebSocket if not deriv_bridge.is_connected or not deriv_bridge._streaming: logger.warning("⚠️ [HEALTH] Deriv disconnected, attempting reconnect...") success = await deriv_bridge._reconnect() if success: logger.info("✅ [HEALTH] Deriv reconnected") else: logger.error("❌ [HEALTH] Deriv reconnection failed") elif deriv_bridge._stream_task is not None and deriv_bridge._stream_task.done(): # ── BUG-FIX-2 ───────────────────────────────────────────────── # Even after BUG-FIX-1, add a second detection path: if the # stream task object itself has finished (done()==True) but the # flags haven't been cleared yet (race window), still reconnect. logger.warning("⚠️ [HEALTH] Stream task finished unexpectedly — reconnecting") await deriv_bridge._reconnect() async def _status_loop(self) -> None: """Periodic status reporting""" while not self._shutdown.is_set(): await asyncio.sleep(60) total = self.correct + self.wrong win_rate = (100 * self.correct / max(1, total)) bridge_stats = deriv_bridge.get_stats() # ✅ FIX: Include health info in status signal_age = time.time() - self._last_signal_time if self._last_signal_time > 0 else -1 logger.info( f"[STATUS] Signals={self.signals_received} | " f"Rewards={self.rewards_sent} | " f"WinRate={win_rate:.1f}% | " f"Tracked={len(self._tracked)} | " f"ActiveTasks={self._active_reward_tasks} | " f"SignalAge={signal_age:.0f}s | " f"Ticks={bridge_stats['ticks_received']} | " f"Healthy={'✅' if self._connection_healthy else '❌'} | " f"Platform={'HF Spaces' if IS_HF_SPACES else 'Local'}" ) # Flush any pending batch async with self._batch_lock: if self._batch and time.time() - self._last_batch_time > 30: await self._send_batch() async def start(self) -> None: """Start the rewards engine""" # V9.0 FIX: Recreate asyncio primitives on the running loop self._batch_lock = asyncio.Lock() self._shutdown = asyncio.Event() self._reward_semaphore = asyncio.Semaphore(MAX_CONCURRENT_REWARD_TASKS) # ✅ FIX v5.2.1: Store event loop reference BEFORE anything else # This is CRITICAL because _on_signal runs in the Redis listener THREAD # and needs to schedule coroutines on THIS loop self._loop = asyncio.get_running_loop() logger.info("=" * 70) logger.info("K1RL QUANT - INSTITUTIONAL REWARDS v5.2.1-V25") logger.info("HuggingFace Spaces Edition 🤗 (V25 NAMESPACE ISOLATION))") logger.info("=" * 70) if IS_HF_SPACES: logger.info("🤗 Running in HuggingFace Spaces environment") logger.info(" - V25 channel prefix: '%s'" % CHANNEL_PREFIX) logger.info(" - V25 databases: features=DB%d, rewards=DB%d" % (REDIS_DB_FEATURES, REDIS_DB_REWARDS)) logger.info(" - Robust RedisAblyClient V10.1 (blocking listener)") logger.info(" - Bounded reward task pool (max=%d)" % MAX_CONCURRENT_REWARD_TASKS) logger.info(" - Connection health monitoring") logger.info(" - ✅ Event loop captured for thread→asyncio bridge") if not await self.initialize(): raise Exception("Initialization failed") # ✅ FIX #12: Subscribe using RedisAblyClient V10.1 format # The V10.1 RedisAblyChannel.subscribe() expects (event_name, callback) # where callback receives a RedisMessage object (not raw dict) await self.signal_channel.subscribe("message", self._on_signal) logger.info(f"✅ Subscribed to {ABLY_SIGNAL_CHANNEL}:message (V25 namespace, robust V10.1 listener)") logger.info(f" Event loop captured: {self._loop is not None} | Running: {self._loop.is_running() if self._loop else False}") # Start health monitor health_task = asyncio.create_task(self._health_monitor_loop()) # Start status loop status_task = asyncio.create_task(self._status_loop()) try: # Main loop while not self._shutdown.is_set(): await asyncio.sleep(1) finally: health_task.cancel() status_task.cancel() async def shutdown(self) -> None: """Clean shutdown""" logger.info("🛑 Shutting down rewards engine...") self._shutdown.set() # Flush batch async with self._batch_lock: if self._batch: await self._send_batch() # Close connections if self.ably_realtime: self.ably_realtime.close() await deriv_bridge.shutdown() logger.info("✅ Shutdown complete") # ============================================================================ # GLOBAL INSTANCES # ============================================================================ deriv_bridge = DerivStreamingBridge() # ============================================================================ # MAIN # ============================================================================ async def main(): print("\n" + "=" * 70) print("K1RL QUANT - INSTITUTIONAL REWARDS v5.2.1-V25") print("HuggingFace Spaces Edition 🤗 (V25 NAMESPACE ISOLATION)") print("=" * 70) print(f"✅ V25 channel prefix: '{CHANNEL_PREFIX}'") print(f"✅ V25 databases: features=DB{REDIS_DB_FEATURES}, rewards=DB{REDIS_DB_REWARDS}") print("✅ FIXED: Uses robust RedisAblyClient V10.1 (blocking listener)") print("✅ FIXED: Bounded reward task pool (no coroutine leak)") print("✅ FIXED: Connection health monitoring") print("✅ FIXED: Deriv auto-reconnection") print("✅ FIXED: Event loop reference for thread→asyncio bridge") print("✅ Auto-reconnecting WebSocket") print("✅ O(1) signal tracking") print("✅ TTL price cache") print("✅ Batch processing with backpressure") print("✅ Memory-bounded buffers") print("✅ HuggingFace Spaces compatibility") print("✅ ZERO cross-talk with other Spaces (V25 namespace)") print("=" * 70 + "\n") if IS_HF_SPACES: print("🤗 HuggingFace Spaces environment detected") print("") engine = RewardsEngine() try: await engine.start() except KeyboardInterrupt: print("\n>>> Shutdown requested") except Exception as e: logger.error(f"Fatal error: {e}") traceback.print_exc() finally: await engine.shutdown() if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: print("\n>>> Stopped")