""" ╔══════════════════════════════════════════════════════════════════════════════╗ ║ ║ ║ ██╗ ██╗ ██╗██████╗ ██╗ ██████╗ ██╗ ██╗ █████╗ ███╗ ██╗████████╗║ ║ ██║ ██╔╝███║██╔══██╗██║ ██╔═══██╗██║ ██║██╔══██╗████╗ ██║╚══██╔══╝║ ║ █████╔╝ ╚██║██████╔╝██║ ██║ ██║██║ ██║███████║██╔██╗ ██║ ██║ ║ ║ ██╔═██╗ ██║██╔══██╗██║ ██║▄▄ ██║██║ ██║██╔══██║██║╚██╗██║ ██║ ║ ║ ██║ ██╗ ██║██║ ██║███████╗ ╚██████╔╝╚██████╔╝██║ ██║██║ ╚████║ ██║ ║ ║ ╚═╝ ╚═╝ ╚═╝╚═╝ ╚═╝╚══════╝ ╚══▀▀═╝ ╚═════╝ ╚═╝ ╚═╝╚═╝ ╚═══╝ ╚═╝ ║ ║ ║ ║ ──────────────────────────────────────────────────────────────────────────── ║ ║ ║ ║ REGIME-ADAPTIVE FEATURE ENGINEERING SYSTEM ║ ║ ║ ║ Multi-Resolution Analysis • Institutional Patterns • AI ║ ║ ║ ║ ──────────────────────────────────────────────────────────────────────────── ║ ║ ║ ║ ASSET: Volatility 25 Index TIMEFRAMES: 8 (5s - 10m) ║ ║ FEATURES: 60 per timeframe TOTAL DIMS: 480 features ║ ║ REGIME: Adaptive (Volatility/Trend) INFO GAIN: +83% vs baseline ║ ║ PATTERNS: Institutional-Grade COMPUTE: <7ms/tick ║ ║ ║ ║ "Latent Regime Detection for Non-Stationary Markets" ║ ║ ║ ║ [ FEATURE EXTRACTION ONLINE ] v3.0.0-V25 | DERIV WEBSOCKET EDITION ║ ║ ║ ╚══════════════════════════════════════════════════════════════════════════════╝ THEORETICAL FOUNDATION: P(Y_{t+Δ}|Φ(X_t)) = Σ_r P(Y_{t+Δ}|Φ,R_t=r)P(R_t=r) References: - Ang & Timmermann (2012): Regime Changes and Financial Markets - Hamilton (1989): Markov Regime-Switching Models - Nison (1991): Japanese Candlestick Charting Techniques """ import pandas as pd import numpy as np from scipy.stats import percentileofscore, skew, kurtosis from collections import deque from datetime import datetime, timezone UTC = timezone.utc from typing import Optional, Dict from dataclasses import dataclass import threading import logging import nest_asyncio import time import asyncio import json import ssl import websockets import traceback import warnings # ============================================================================ # REDIS CLIENT FOR HUGGINGFACE SPACES (V25 — NAMESPACED CHANNELS) # ============================================================================ try: from redis_config_v25 import REDIS_URL, REDIS_DB_FEATURES, CHANNEL_PREFIX, prefixed_channel import redis class RedisAblyClient: """Simple Redis client for HuggingFace Spaces compatibility (V25 namespaced)""" def __init__(self, redis_url=None, use_streams=True): self.redis_url = redis_url or REDIS_URL self.client = None self.channels = SimpleChannelManager(self) self._connect() def _connect(self): try: # V25: Use DB 0 (features) — isolated per Space container self.client = redis.from_url(self.redis_url, db=REDIS_DB_FEATURES) self.client.ping() print(f"✅ Redis connected for features (V25 — DB {REDIS_DB_FEATURES})") except Exception as e: print(f"⚠️ Redis connection failed: {e}") self.client = None async def publish(self, channel, data): if self.client: try: # V25: Auto-prefix channel name for namespace isolation self.client.publish(prefixed_channel(channel), json.dumps(data)) except Exception as e: print(f"⚠️ Redis publish failed: {e}") class SimpleChannel: def __init__(self, name, client): self.name = prefixed_channel(name) # V25: auto-prefix self.client = client async def publish(self, event, data): # Publish to channel name directly await self.client.publish(self.name, { "event": event, "data": data }) class SimpleChannelManager: def __init__(self, client): self.client = client self._channels = {} def get(self, name): if name not in self._channels: self._channels[name] = SimpleChannel(name, self.client) return self._channels[name] except ImportError: print("⚠️ Redis not available - using mock mode") CHANNEL_PREFIX = "V25:" def prefixed_channel(name): return f"V75:{name}" if not name.startswith("V25:") else name REDIS_DB_FEATURES = 0 class RedisAblyClient: def __init__(self, *args, **kwargs): self.channels = SimpleChannelManager(self) async def publish(self, channel, data): pass class SimpleChannel: def __init__(self, name, client): self.name = prefixed_channel(name) async def publish(self, event, data): pass class SimpleChannelManager: def __init__(self, client): self._channels = {} def get(self, name): if name not in self._channels: self._channels[name] = SimpleChannel(name, None) return self._channels[name] warnings.filterwarnings('ignore', category=FutureWarning) warnings.filterwarnings('ignore', category=RuntimeWarning, message='Mean of empty slice') warnings.filterwarnings('ignore', category=RuntimeWarning, message='overflow encountered') nest_asyncio.apply() # ============================================================================ # DERIV WEBSOCKET CONFIGURATION # ============================================================================ DERIV_API_KEY = "1KJKxIJKR8LCyKB" DERIV_WS_URL = "wss://ws.binaryws.com/websockets/v3?app_id=1089" SYMBOL_MAP = { "Volatility 25 Index": "R_25", "Crash 500 Index": "CRASH500", "Volatility 100 Index": "R_100", "Volatility 50 Index": "R_50", "Volatility 25 Index": "R_25", # ✅ V25: Volatility 25 Index symbol } # ============================================================================ # DERIV DATA STRUCTURES # ============================================================================ @dataclass class DerivTick: time: int = 0 bid: float = 0.0 ask: float = 0.0 last: float = 0.0 volume: int = 0 time_msc: int = 0 flags: int = 0 volume_real: float = 0.0 @dataclass class DerivAccountInfo: login: int = 0 balance: float = 0.0 equity: float = 0.0 profit: float = 0.0 margin: float = 0.0 margin_free: float = 0.0 margin_level: float = 0.0 currency: str = "USD" # ============================================================================ # DERIV WEBSOCKET BRIDGE - STREAMING VERSION # ============================================================================ class DerivBridge: """Deriv WebSocket bridge - STREAMING VERSION""" def __init__(self): self.ws = None self.is_connected = False self.is_authorized = False self.balance = 0.0 self._prices = {} # Current prices for each symbol self._price_lock = asyncio.Lock() self._stream_tasks = {} self._subscribed_symbols = set() self._last_tick = None async def _connect_and_authorize(self): """Connect and authorize to Deriv""" try: print("🔄 Connecting to Deriv WebSocket...") ssl_context = ssl.create_default_context() ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE self.ws = await websockets.connect( DERIV_WS_URL, ssl=ssl_context, ping_interval=30, ping_timeout=10, close_timeout=5, max_size=2**20 ) print("✅ WebSocket connected") # Authorize auth_msg = {"authorize": DERIV_API_KEY} await self.ws.send(json.dumps(auth_msg)) # Wait for auth response response = await self.ws.recv() data = json.loads(response) if 'authorize' in data: self.is_connected = True self.is_authorized = True self.balance = float(data['authorize'].get('balance', 0)) print(f"✅ Authorized | Balance: ${self.balance:.2f}") return True elif 'error' in data: print(f"❌ Auth error: {data['error']}") return False except Exception as e: print(f"❌ Connection error: {e}") return False async def _stream_prices(self, deriv_symbol: str): """Continuous price streaming for a symbol with auto-reconnect""" retry_delay = 5 while True: try: # Re-connect/re-authorize if needed if not self.is_connected or self.ws is None: print(f"🔄 Reconnecting WebSocket for {deriv_symbol}...") connected = await self._connect_and_authorize() if not connected: print(f"⚠️ Reconnect failed for {deriv_symbol}, retrying in {retry_delay}s...") await asyncio.sleep(retry_delay) retry_delay = min(retry_delay * 2, 60) continue retry_delay = 5 # reset on success # Subscribe to ticks subscribe_msg = {"ticks": deriv_symbol} await self.ws.send(json.dumps(subscribe_msg)) print(f"📡 Streaming {deriv_symbol}...") # Continuous receive loop while self.is_connected: try: data = await self.ws.recv() json_data = json.loads(data) if 'tick' in json_data: tick_data = json_data['tick'] if tick_data.get('symbol') == deriv_symbol: price = float(tick_data['quote']) epoch = int(tick_data['epoch']) async with self._price_lock: self._prices[deriv_symbol] = { 'bid': price - 0.0005, 'ask': price + 0.0005, 'last': price, 'time': epoch, 'time_msc': epoch * 1000 } self._last_tick = DerivTick( time=epoch, bid=price - 0.0005, ask=price + 0.0005, last=price, volume=0, time_msc=epoch * 1000 ) elif 'error' in json_data: logging.error(f"Stream error for {deriv_symbol}: {json_data['error']}") except asyncio.CancelledError: print(f"Stream cancelled for {deriv_symbol}") return except websockets.exceptions.ConnectionClosed: print(f"❌ WebSocket closed for {deriv_symbol} — will reconnect") self.is_connected = False break except json.JSONDecodeError: continue except Exception as e: logging.error(f"Stream error: {e}") self.is_connected = False break except asyncio.CancelledError: print(f"Stream cancelled for {deriv_symbol}") return except Exception as e: logging.error(f"Fatal stream error for {deriv_symbol}: {e}") self.is_connected = False # Brief pause before reconnect attempt await asyncio.sleep(retry_delay) retry_delay = min(retry_delay * 2, 60) async def _ensure_stream(self, deriv_symbol: str): """Ensure streaming is active for a symbol; restart dead tasks""" existing = self._stream_tasks.get(deriv_symbol) if existing is None or existing.done(): # Start (or restart) streaming task task = asyncio.create_task(self._stream_prices(deriv_symbol)) self._stream_tasks[deriv_symbol] = task self._subscribed_symbols.add(deriv_symbol) async def get_current_price(self, deriv_symbol: str) -> Optional[Dict]: """Get current price (from streaming cache)""" try: # Ensure we're streaming this symbol await self._ensure_stream(deriv_symbol) # Give it a moment to receive first price if new if deriv_symbol not in self._prices: await asyncio.sleep(0.5) # Return cached price async with self._price_lock: return self._prices.get(deriv_symbol) except Exception as e: logging.error(f"Price fetch error: {e}") return None def symbol_info_tick(self, symbol: str) -> Optional[DerivTick]: """MT5-compatible tick info getter (synchronous wrapper)""" try: deriv_symbol = SYMBOL_MAP.get(symbol, symbol) # Check if we have cached price if deriv_symbol in self._prices: price_data = self._prices[deriv_symbol] return DerivTick( time=price_data.get('time', 0), bid=price_data.get('bid', 0), ask=price_data.get('ask', 0), last=price_data.get('last', 0), volume=0, time_msc=price_data.get('time_msc', 0) ) return self._last_tick except Exception as e: logging.error(f"symbol_info_tick error: {e}") return None async def get_balance(self) -> float: """Get current balance""" try: if not self.is_connected: return self.balance await self.ws.send(json.dumps({"balance": 1})) # Wait for balance response (with timeout for this specific call) for _ in range(10): try: response = await asyncio.wait_for(self.ws.recv(), timeout=1) data = json.loads(response) if 'balance' in data: self.balance = float(data['balance']['balance']) return self.balance except asyncio.TimeoutError: continue except Exception as e: logging.error(f"Balance error: {e}") return self.balance def symbol_info(self, symbol: str) -> Optional[dict]: """MT5-compatible symbol_info - returns symbol information""" deriv_symbol = SYMBOL_MAP.get(symbol, symbol) return { 'name': symbol, 'deriv_symbol': deriv_symbol, 'visible': True, 'point': 0.00001, 'digits': 5 } def symbol_select(self, symbol: str, enable: bool = True) -> bool: """MT5-compatible symbol_select - always returns True for Deriv""" return True async def initialize(self, symbol: str = None): """Initialize connection and optionally start streaming a symbol""" try: print("🔄 Initializing Deriv...") result = await self._connect_and_authorize() if result: if symbol: deriv_symbol = SYMBOL_MAP.get(symbol, symbol) await self._ensure_stream(deriv_symbol) # Wait for first tick await asyncio.sleep(1) print("✅ Deriv initialized") return True else: print("❌ Deriv init failed") return False except Exception as e: logging.error(f"Initialize error: {e}") return False async def shutdown(self): """Shutdown gracefully""" try: # Cancel all streaming tasks for task in self._stream_tasks.values(): task.cancel() # Wait for cancellation if self._stream_tasks: await asyncio.gather(*self._stream_tasks.values(), return_exceptions=True) # Close WebSocket if self.ws: await self.ws.close() self.is_connected = False self.is_authorized = False except Exception as e: logging.error(f"Shutdown error: {e}") # Global bridge instance deriv_bridge = DerivBridge() # ============================================================================ # CONFIGURATION # ============================================================================ # Redis URL already imported above in inline Redis client SYMBOL = "Volatility 25 Index" # ✅ V25 DERIV_SYMBOL = "R_25" # ✅ V25: Volatility 25 Index Deriv symbol FEATURE_WINDOW = 10 # base unit TIMEFRAMES = { # === High-Frequency Zone (Volatility Capture) === 'xs': 5, # tick 's': 10, # ultra 'm': 20, # fast # === Critical Trading Zones === 'l': 30, # scalp 'xl': 60, # 1min 'xxl': 120, # 2min # === Structure & Regime Detection === '5m': 300, # 5min '10m': 600, # 10min } # ============================================================================ # FEATURE CONTRACT — SINGLE SOURCE OF TRUTH (60 FEATURES) # ---------------------------------------------------------------------------- # # ENGINEERING NOTE — why this looks the way it does: # # Previously the contract was spread across three top-level sets # (REQUIRED_FEATURES, METADATA_FIELDS, BINARY_FEATURES, ...), with no # runtime check that they were mutually consistent. A drift where a # single key ('price') landed in BOTH the "required features" list AND # the "metadata to strip before validating" set caused the validator to # report {'price'} missing on every tick, which silently shut down # publishing for the entire pipeline. # # The fix is structural: ONE FeatureContract object owns the full schema # and checks its own invariants at import time. Any future drift crashes # the module on load with a named offender, instead of corrupting the # wire format at 60Hz for hours. # # The old module-level names (REQUIRED_FEATURES, METADATA_FIELDS, etc.) # are kept as PROJECTIONS of the contract for call-site back-compat — the # rest of Features.py can import them exactly as before. # ============================================================================ from dataclasses import dataclass, field from typing import FrozenSet, Mapping, Any CONTRACT_VERSION = "feat-v1.0.0" EXPECTED_FEATURE_COUNT = 60 # ---- Feature keys (60) — values fed into model inference ------------------ _FEATURES: FrozenSet[str] = frozenset({ # Core Technical (19) 'log_return', 'rolling_mean_5', 'rolling_std_5', 'zscore_5', 'rsi_14', 'macd', 'macd_signal', 'macd_hist', 'atr', 'cdf_value', 'cdf_slope', 'cdf_diff', 'volatility_quantile_90', 'volatility_ratio', 'entropy_50', 'autocorr_3', 'momentum_10', 'volume_change_rate', 'volume_zscore', # Derivatives (15) 'price_vel', 'price_acc', 'price_jrk', 'price_vel_mean', 'price_vel_std', 'price_vel_skew', 'price_vel_kurtosis', 'price_acc_mean', 'price_acc_std', 'price_acc_skew', 'price_acc_kurtosis', 'price_jrk_mean', 'price_jrk_std', 'price_jrk_skew', 'price_jrk_kurtosis', # Additional Technical (7) 'ma10', 'ma20', 'std20', 'bollinger_upper', 'bollinger_lower', 'bollinger_width', 'bollinger_position', # Candlestick (9) 'gravestone_doji', 'four_price_doji', 'doji', 'spinning_top', 'bullish_candle', 'bearish_candle', 'dragonfly_candle', 'spinning_top_bearish_followup', 'bullish_then_dragonfly', # Support / Resistance (7) 'distance_to_nearest_support', 'distance_to_nearest_resistance', 'near_support', 'near_resistance', 'distance_to_stop_loss', 'support_strength', 'resistance_strength', # Price Variants (3) — models consume these for absolute-scale context 'price', 'close_scaled', 'close_price', }) # ---- Envelope keys — wire metadata, NEVER fed to a model ------------------- # Disjoint from _FEATURES by invariant (checked below in __post_init__). _ENVELOPE: FrozenSet[str] = frozenset({ 'agent', # routing 'timeframe', # routing 'timestamp', # wall-clock ISO-8601 at publish 'tick_index', # monotonic producer tick counter 'tick_count', # legacy alias, kept for back-compat 'feature_count', # integrity check: len(features) 'contract_version', # schema version string 'features', # nested payload key }) # ---- Typed subsets of _FEATURES (validated as subsets at import time) ------ _BINARY: FrozenSet[str] = frozenset({ 'near_support', 'near_resistance', 'gravestone_doji', 'four_price_doji', 'doji', 'spinning_top', 'bullish_candle', 'bearish_candle', 'dragonfly_candle', 'spinning_top_bearish_followup', 'bullish_then_dragonfly', }) _PRICE_SCALE: FrozenSet[str] = frozenset({ 'price', 'close_scaled', 'close_price', 'ma10', 'ma20', 'bollinger_upper', 'bollinger_lower', }) _NON_NORMALISED: FrozenSet[str] = _BINARY | _PRICE_SCALE | frozenset({ 'price_vel', 'price_acc', 'price_jrk', }) @dataclass(frozen=True) class ValidationResult: """Structured validation outcome with three distinct failure modes.""" ok: bool missing: FrozenSet[str] # required features absent from dict leaked_envelope: FrozenSet[str] # envelope keys found inside features dict unexpected: FrozenSet[str] # keys that belong to neither set def as_error_lines(self): lines = [] if self.missing: lines.append(f"missing features: {sorted(self.missing)}") if self.leaked_envelope: lines.append(f"envelope keys inside features dict: " f"{sorted(self.leaked_envelope)}") if self.unexpected: lines.append(f"unknown keys: {sorted(self.unexpected)}") return lines @dataclass(frozen=True) class FeatureContract: """ The schema for a single timeframe's feature payload. Invariants (all checked in __post_init__ — module fails to import if any are violated): (1) features ∩ envelope = ∅ No key is allowed to be "both a feature and envelope". This was the original bug — 'price' was in both sets, and the validator silently rejected every tick. (2) |features| == EXPECTED_FEATURE_COUNT The contract declares an exact 60-feature shape. Drift here would corrupt downstream tensor shapes. (3) binary, price_scale, non_normalised are all ⊆ features A typed subset cannot contain a key that isn't a feature at all. This catches stale references after a feature rename. """ version: str = CONTRACT_VERSION features: FrozenSet[str] = field(default_factory=lambda: _FEATURES) envelope: FrozenSet[str] = field(default_factory=lambda: _ENVELOPE) binary: FrozenSet[str] = field(default_factory=lambda: _BINARY) price_scale: FrozenSet[str] = field(default_factory=lambda: _PRICE_SCALE) non_normalised: FrozenSet[str] = field(default_factory=lambda: _NON_NORMALISED) def __post_init__(self): # (1) disjointness overlap = self.features & self.envelope if overlap: raise RuntimeError( f"[FeatureContract] BROKEN INVARIANT: keys in BOTH features " f"and envelope: {sorted(overlap)}. Remove from one set — the " f"validator cannot distinguish feature-vs-envelope for these " f"keys, so every tick will be rejected." ) # (2) cardinality if len(self.features) != EXPECTED_FEATURE_COUNT: raise RuntimeError( f"[FeatureContract] BROKEN INVARIANT: expected " f"{EXPECTED_FEATURE_COUNT} features, got {len(self.features)}. " f"Update EXPECTED_FEATURE_COUNT or fix the feature list." ) # (3) subsets for name, subset in ( ('binary', self.binary), ('price_scale', self.price_scale), ('non_normalised', self.non_normalised), ): stray = subset - self.features if stray: raise RuntimeError( f"[FeatureContract] BROKEN INVARIANT: '{name}' contains " f"non-feature keys: {sorted(stray)}" ) # ---- public API ------------------------------------------------------- def validate(self, features_dict: Mapping[str, Any]) -> ValidationResult: """ Validate the INNER features dict only — envelope keys should NOT be present here; if they are, they're reported as leaked_envelope, not stripped and hidden. """ actual = set(features_dict.keys()) return ValidationResult( ok = (actual == self.features), missing = frozenset(self.features - actual), leaked_envelope = frozenset(actual & self.envelope), unexpected = frozenset(actual - self.features - self.envelope), ) def build_payload( self, agent_name: str, features_dict: Mapping[str, float], tick_index, timestamp_iso: str, ) -> dict: """ Construct the wire payload with envelope / feature separation enforced structurally. Envelope fields live at the top level; features live ONLY inside payload['features']. """ return { 'agent': agent_name, 'timestamp': timestamp_iso, 'tick_index': tick_index, 'feature_count': len(features_dict), 'contract_version': self.version, 'features': dict(features_dict), } def extract_features(self, payload: Mapping[str, Any]) -> dict: """ Consumer-side: pull the inner features dict and verify envelope version. Raises ValueError on schema drift so the consumer can log-and-drop rather than silently accept malformed payloads. """ got_ver = payload.get('contract_version') if got_ver is not None and got_ver != self.version: raise ValueError( f"contract version mismatch: payload={got_ver!r} " f"expected={self.version!r}" ) feats = payload.get('features') if not isinstance(feats, dict): raise ValueError( f"payload.features missing or wrong type: {type(feats).__name__}" ) return feats # Singleton — import this, don't construct your own. # Module import will FAIL LOUDLY here if any invariant is violated. FEATURE_CONTRACT = FeatureContract() # --------------------------------------------------------------------------- # Back-compat aliases — projections of FEATURE_CONTRACT. Existing call sites # keep working unchanged; only the source of truth moved. Deleting any of # these will break older code paths that haven't been migrated to use # FEATURE_CONTRACT directly. # --------------------------------------------------------------------------- REQUIRED_FEATURES = tuple(FEATURE_CONTRACT.features) # order-agnostic METADATA_FIELDS = FEATURE_CONTRACT.envelope BINARY_FEATURES = FEATURE_CONTRACT.binary PRICE_FEATURES = FEATURE_CONTRACT.price_scale NORMALIZATION_EXCLUSIONS = FEATURE_CONTRACT.non_normalised # ============================================================================ # REGIME DETECTION PARAMETERS # ============================================================================ REGIME_CONFIG = { 'volatility_lookback': 100, 'vol_low_threshold': 0.33, 'vol_high_threshold': 0.67, 'trend_threshold': 0.6, 'entropy_threshold': 1.5, 'regime_memory': 20, } # ============================================================================ # LOGGING SETUP # ============================================================================ logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%H:%M:%S' ) logger = logging.getLogger(__name__) # ============================================================================ # HELPER FUNCTIONS # ============================================================================ def safe_skew(x): clean_x = x[~np.isnan(x)] return skew(clean_x) if len(clean_x) >= 3 else 0.0 def safe_kurtosis(x): clean_x = x[~np.isnan(x)] return kurtosis(clean_x) if len(clean_x) >= 3 else 0.0 def min_max_scale(series): if len(series) == 0: return pd.Series([]) min_val, max_val = series.min(), series.max() if max_val - min_val == 0: return pd.Series(np.zeros(len(series)), index=series.index) return (series - min_val) / (max_val - min_val) def safe_entropy(series): try: clean_series = series.dropna() if len(clean_series) < 5: return 0.0 if clean_series.nunique() == 1: return 0.0 hist, _ = np.histogram(clean_series, bins=10, density=True) hist = hist[hist > 0] if len(hist) == 0: return 0.0 return -np.sum(hist * np.log(hist)) except: return 0.0 # ============================================================================ # INSTITUTIONAL-GRADE CANDLESTICK PATTERN DETECTION # ============================================================================ def gravestone_doji(o, h, l, c): """ Gravestone Doji: Death at the top Institutional criteria: - Body <= 2% of range - Upper shadow >= 66% of range - Lower shadow <= 10% of range """ try: body = abs(c - o) upper_shadow = h - max(o, c) lower_shadow = min(o, c) - l total_range = h - l if total_range < 1e-6: return 0 body_ratio = body / total_range upper_ratio = upper_shadow / total_range lower_ratio = lower_shadow / total_range return int( body_ratio <= 0.02 and upper_ratio >= 0.66 and lower_ratio <= 0.10 ) except: return 0 def four_price_doji(o, h, l, c): """ Four Price Doji: Extreme indecision All prices equal within 0.1% tolerance """ try: prices = [o, h, l, c] avg_price = np.mean(prices) if avg_price < 1e-6: return 0 max_deviation = max(abs(p - avg_price) / avg_price for p in prices) return int(max_deviation <= 0.001) except: return 0 def doji(o, h, l, c): """ Standard Doji: Indecision Institutional criteria: - Body <= 5% of range - Both shadows >= 20% of range """ try: body = abs(c - o) upper_shadow = h - max(o, c) lower_shadow = min(o, c) - l total_range = h - l if total_range < 1e-6: return 0 body_ratio = body / total_range upper_ratio = upper_shadow / total_range lower_ratio = lower_shadow / total_range return int( body_ratio <= 0.05 and upper_ratio >= 0.20 and lower_ratio >= 0.20 ) except: return 0 def spinning_top(o, h, l, c): """ Spinning Top: Market confusion Institutional criteria: - Body <= 33% of range - Both shadows >= 25% of range each """ try: body = abs(c - o) upper_shadow = h - max(o, c) lower_shadow = min(o, c) - l total_range = h - l if total_range < 1e-6: return 0 body_ratio = body / total_range upper_ratio = upper_shadow / total_range lower_ratio = lower_shadow / total_range return int( body_ratio <= 0.33 and upper_ratio >= 0.25 and lower_ratio >= 0.25 ) except: return 0 def bullish_candle(o, h, l, c): """ Bullish Candle: Strong buying Institutional criteria: - Body >= 60% of range - Close > Open - Upper shadow <= 15% of range """ try: if c <= o: return 0 body = c - o total_range = h - l upper_shadow = h - c if total_range < 1e-6: return 0 body_ratio = body / total_range upper_ratio = upper_shadow / total_range return int(body_ratio >= 0.60 and upper_ratio <= 0.15) except: return 0 def bearish_candle(o, h, l, c): """ Bearish Candle: Strong selling Institutional criteria: - Body >= 60% of range - Close < Open - Lower shadow <= 15% of range """ try: if c >= o: return 0 body = o - c total_range = h - l lower_shadow = c - l if total_range < 1e-6: return 0 body_ratio = body / total_range lower_ratio = lower_shadow / total_range return int(body_ratio >= 0.60 and lower_ratio <= 0.15) except: return 0 def dragonfly_candle(o, h, l, c): """ Dragonfly Doji: Bullish reversal Institutional criteria: - Body <= 5% of range - Lower shadow >= 66% of range - Upper shadow <= 10% of range """ try: body = abs(c - o) upper_shadow = h - max(o, c) lower_shadow = min(o, c) - l total_range = h - l if total_range < 1e-6: return 0 body_ratio = body / total_range upper_ratio = upper_shadow / total_range lower_ratio = lower_shadow / total_range return int( body_ratio <= 0.05 and lower_ratio >= 0.66 and upper_ratio <= 0.10 ) except: return 0 def spinning_top_bearish_followup(c1, c2): """ Spinning top followed by bearish candle Indicates weakness after indecision """ try: return int(spinning_top(*c1) == 1 and bearish_candle(*c2) == 1) except: return 0 def bullish_candle_followed_by_dragonfly(c1, c2): """ Bullish candle + dragonfly = strong support Institutional continuation pattern """ try: return int( bullish_candle(*c1) == 1 and dragonfly_candle(*c2) == 1 and c2[3] >= c1[3] # Second close >= first close ) except: return 0 # Support/Resistance functions (unchanged) def find_supports(p, df): try: return list(df['Low'][(df['Low'].shift(1) > df['Low']) & (df['Low'].shift(-1) > df['Low']) & (df['Low'] < p)]) except: return [] def find_resistances(p, df): try: return list(df['High'][(df['High'].shift(1) < df['High']) & (df['High'].shift(-1) < df['High']) & (df['High'] > p)]) except: return [] def find_stop_level(p, df): try: lows = df['Low'][-10:] mins = lows[(lows.shift(1) > lows) & (lows.shift(-1) > lows)] below = mins[mins < p] return float(below.max()) if not below.empty else None except: return None def dist_to_nearest(p, levels): try: return float(min(abs(p - x) for x in levels)) if levels else -1.0 except: return -1.0 def cluster_strength(levels): try: if not levels: return 0.0 levels = sorted(levels) clusters = 0 i = 0 while i < len(levels): j, count = i+1, 1 while j < len(levels) and abs(levels[j]-levels[i]) <= 0.1: count += 1 j += 1 if count > 1: clusters += count i = j return float(clusters) except: return 0.0 # ============================================================================ # REGIME DETECTOR (INTERNAL ONLY) # ============================================================================ class RegimeDetector: """Latent regime detection for adaptive normalization""" def __init__(self, config=REGIME_CONFIG): self.config = config self.regime_history = deque(maxlen=config['regime_memory']) def detect_regime(self, df): if len(df) < 30: return self._default_regime() try: returns = df['Close'].pct_change().dropna() current_vol = returns.rolling(20).std().iloc[-1] vol_history = returns.rolling(20).std().dropna() vol_percentile = percentileofscore(vol_history, current_vol) / 100 low_vol_weight = self._sigmoid(self.config['vol_low_threshold'] - vol_percentile, 10) high_vol_weight = self._sigmoid(vol_percentile - self.config['vol_high_threshold'], 10) medium_vol_weight = max(0, 1 - low_vol_weight - high_vol_weight) momentum = (df['Close'].iloc[-1] / df['Close'].iloc[-20] - 1) if len(df) >= 20 else 0 trend_strength = abs(momentum) trending_weight = self._sigmoid(trend_strength - self.config['trend_threshold'], 5) price_entropy = safe_entropy(df['Close'].pct_change().dropna().tail(50)) mean_rev_weight = self._sigmoid(price_entropy - self.config['entropy_threshold'], 2) regime_weights = { 'low_vol': float(low_vol_weight), 'medium_vol': float(medium_vol_weight), 'high_vol': float(high_vol_weight), 'trending': float(trending_weight), 'mean_reverting': float(mean_rev_weight), } self.regime_history.append(regime_weights) return self._smooth_regime(regime_weights) except Exception as e: logger.debug(f"Regime detection failed: {e}") return self._default_regime() def _sigmoid(self, x, steepness=1): """Numerically stable sigmoid""" z = np.clip(-steepness * x, -500, 500) # Prevent overflow return 1 / (1 + np.exp(z)) def _smooth_regime(self, current_regime): """Safe EWMA smoothing with NaN handling""" if len(self.regime_history) < 2: return current_regime alpha = 0.3 smoothed = current_regime.copy() for key in ['low_vol', 'medium_vol', 'high_vol', 'trending', 'mean_reverting']: historical = [r[key] for r in self.regime_history if key in r] historical = [v for v in historical if not (np.isnan(v) or np.isinf(v))] if len(historical) > 0: hist_mean = float(np.mean(historical)) smoothed[key] = alpha * current_regime[key] + (1-alpha) * hist_mean else: smoothed[key] = current_regime[key] return smoothed def _default_regime(self): return { 'low_vol': 0.33, 'medium_vol': 0.34, 'high_vol': 0.33, 'trending': 0.5, 'mean_reverting': 0.5, } # ============================================================================ # ADAPTIVE NORMALIZER # ============================================================================ class AdaptiveNormalizer: """Regime-aware normalization""" def normalize(self, feature_series, regime_weights): if len(feature_series) < 20: return self._zscore_normalize(feature_series) try: z_standard = self._zscore_normalize(feature_series) z_robust = self._robust_normalize(feature_series) vol_weight = regime_weights['high_vol'] z_adaptive = (1 - vol_weight) * z_standard + vol_weight * z_robust return np.clip(z_adaptive, -5, 5) except: return self._zscore_normalize(feature_series) def _zscore_normalize(self, series): mu = series.mean() sigma = series.std() return (series - mu) / (sigma + 1e-10) if sigma > 1e-8 else series * 0 def _robust_normalize(self, series): q25 = series.quantile(0.25) q75 = series.quantile(0.75) iqr = q75 - q25 median = series.median() return (series - median) / (iqr + 1e-10) if iqr > 1e-8 else series * 0 # ============================================================================ # INTEGRATED FEATURE ENHANCER (60 FEATURES STRICT) # ============================================================================ class IntegratedFeatureEnhancer: def __init__(self, ably_client, agent_names, window_size=100): self.ably = ably_client self.agent_names = agent_names self.window_size = window_size self.price_buffers = {name: deque(maxlen=window_size) for name in agent_names} # Internal regime components self.regime_detector = RegimeDetector() self.adaptive_normalizer = AdaptiveNormalizer() # Channels self.features_channel = ably_client.channels.get("integrated_features_all") self.meta_channels = { name: ably_client.channels.get(f"meta_features-{name}") for name in agent_names } self.latest_computed_features = {} self.features_lock = threading.Lock() logger.info(f"Regime-Adaptive Feature Enhancer initialized") logger.info( f"Contract: version={FEATURE_CONTRACT.version} " f"features={len(FEATURE_CONTRACT.features)} " f"envelope={len(FEATURE_CONTRACT.envelope)} " f"(invariants enforced at import time)" ) # Defensive re-check at instantiation. The contract's __post_init__ # already verified this at import, but a runtime assert catches # anyone monkey-patching FEATURE_CONTRACT.features before first use. assert len(FEATURE_CONTRACT.features) == EXPECTED_FEATURE_COUNT, ( f"Feature count mismatch at runtime: " f"{len(FEATURE_CONTRACT.features)} != {EXPECTED_FEATURE_COUNT}" ) def compute_core_technical_features(self, df): """Compute 19 core technical indicators with robust edge case handling""" df = df.copy() eps = 1e-10 # Suppress warnings during computation with warnings.catch_warnings(): warnings.simplefilter("ignore", RuntimeWarning) df['log_return'] = np.log(df['Close'] / df['Close'].shift(1)).replace([np.inf, -np.inf], 0).fillna(0) df['rolling_mean_5'] = df['Close'].rolling(5, min_periods=1).mean().fillna(df['Close']) df['rolling_std_5'] = df['Close'].rolling(5, min_periods=1).std().fillna(eps) df['rolling_std_5'] = df['rolling_std_5'].replace(0, eps) df['zscore_5'] = (df['Close'] - df['rolling_mean_5']) / df['rolling_std_5'] # RSI delta = df['Close'].diff().fillna(0) gain = np.where(delta > 0, delta, 0) loss = np.where(delta < 0, -delta, 0) avg_gain = pd.Series(gain).rolling(14, min_periods=1).mean().fillna(0) avg_loss = pd.Series(loss).rolling(14, min_periods=1).mean().fillna(0) rs = avg_gain / (avg_loss + eps) df['rsi_14'] = 100 - (100 / (1 + rs)) df['rsi_14'] = df['rsi_14'].ewm(span=5, adjust=False).mean().fillna(50) # MACD ema12 = df['Close'].ewm(span=12, adjust=False).mean() ema26 = df['Close'].ewm(span=26, adjust=False).mean() df['macd'] = ema12 - ema26 df['macd_signal'] = df['macd'].ewm(span=9, adjust=False).mean() df['macd_hist'] = df['macd'] - df['macd_signal'] # ATR high_low = df['High'] - df['Low'] high_close = np.abs(df['High'] - df['Close'].shift(1)) low_close = np.abs(df['Low'] - df['Close'].shift(1)) tr = np.maximum.reduce([high_low, high_close, low_close]) df['atr'] = pd.Series(tr).rolling(14, min_periods=1).mean().fillna(0) # CDF features window = min(100, len(df)) if window >= 20: df['cdf_value'] = df['log_return'].rolling(window, min_periods=10).apply( lambda x: percentileofscore(x.dropna(), x.iloc[-1]) / 100 if len(x.dropna()) > 10 else 0.5 ).fillna(0.5) else: df['cdf_value'] = 0.5 df['cdf_value'] = df['cdf_value'].ffill().bfill().fillna(0.5) df['cdf_slope'] = df['cdf_value'].diff().ewm(span=5, adjust=False).mean().fillna(0) df['cdf_diff'] = (df['cdf_value'] - df['cdf_value'].shift(10)).fillna(0) df['cdf_diff'] = df['cdf_diff'].ewm(span=5, adjust=False).mean().fillna(0) # Volatility df['volatility_quantile_90'] = df['rolling_std_5'].rolling( min(100, len(df)), min_periods=20 ).quantile(0.9).fillna(df['rolling_std_5']) df['volatility_ratio'] = df['rolling_std_5'] / (df['volatility_quantile_90'] + eps) df['volatility_ratio'] = df['volatility_ratio'].clip(0, 3).fillna(1.0) # Entropy df['entropy_50'] = df['log_return'].rolling( min(50, len(df)), min_periods=20 ).apply(safe_entropy).fillna(0) # Autocorrelation df['autocorr_3'] = df['log_return'].rolling(20, min_periods=5).apply( lambda x: x.autocorr(lag=3) if len(x) > 3 else 0 ).fillna(0) # Momentum df['momentum_10'] = (df['Close'] / df['Close'].shift(10) - 1).fillna(0) # Volume df['volume_change_rate'] = df['Volume'].pct_change().replace([np.inf, -np.inf], 0).fillna(0) vol_mean = df['Volume'].rolling(20, min_periods=1).mean() vol_std = df['Volume'].rolling(20, min_periods=1).std().fillna(eps) df['volume_zscore'] = ((df['Volume'] - vol_mean) / (vol_std + eps)).clip(-3, 3).fillna(0) return df def compute_derivative_features(self, df, window=10): """Compute 15 derivative features with robust handling""" df = df.copy() df['price_vel'] = df['Close'].diff() df['price_acc'] = df['price_vel'].diff() df['price_jrk'] = df['price_acc'].diff() for col in ['price_vel', 'price_acc', 'price_jrk']: try: # Use fillna(0) to handle edge cases df[f'{col}_mean'] = df[col].rolling(window, min_periods=1).mean().fillna(0) df[f'{col}_std'] = df[col].rolling(window, min_periods=1).std().fillna(0) df[f'{col}_skew'] = df[col].rolling(window, min_periods=3).apply( safe_skew, raw=True ).fillna(0) df[f'{col}_kurtosis'] = df[col].rolling(window, min_periods=3).apply( safe_kurtosis, raw=True ).fillna(0) except Exception as e: logger.debug(f"Derivative feature {col} computation failed: {e}") df[f'{col}_mean'] = 0 df[f'{col}_std'] = 0 df[f'{col}_skew'] = 0 df[f'{col}_kurtosis'] = 0 return df def compute_additional_technical(self, df): """Compute 7 additional technical features""" df = df.copy() eps = 1e-10 df['ma10'] = df['Close'].rolling(10, min_periods=1).mean() df['ma20'] = df['Close'].rolling(20, min_periods=1).mean() df['std20'] = df['Close'].rolling(20, min_periods=1).std() df['bollinger_upper'] = df['ma20'] + 2 * df['std20'] df['bollinger_lower'] = df['ma20'] - 2 * df['std20'] df['bollinger_width'] = (df['bollinger_upper'] - df['bollinger_lower']) / (df['ma20'] + eps) df['bollinger_position'] = (df['Close'] - df['bollinger_lower']) / (df['bollinger_upper'] - df['bollinger_lower'] + eps) df['bollinger_position'] = df['bollinger_position'].clip(0, 1) return df def compute_candlestick_patterns(self, df): """Compute 9 institutional-grade candlestick patterns""" df = df.copy() if 'Open' not in df.columns: df['Open'] = df['Close'] patterns = [ ('gravestone_doji', gravestone_doji), ('four_price_doji', four_price_doji), ('doji', doji), ('spinning_top', spinning_top), ('bullish_candle', bullish_candle), ('bearish_candle', bearish_candle), ('dragonfly_candle', dragonfly_candle) ] for name, func in patterns: df[name] = df.apply( lambda r: func(r['Open'], r['High'], r['Low'], r['Close']), axis=1 ) df['spinning_top_bearish_followup'] = 0 df['bullish_then_dragonfly'] = 0 for i in range(1, len(df)): c1 = tuple(df.iloc[i-1][['Open', 'High', 'Low', 'Close']]) c2 = tuple(df.iloc[i][['Open', 'High', 'Low', 'Close']]) df.at[df.index[i], 'spinning_top_bearish_followup'] = spinning_top_bearish_followup(c1, c2) df.at[df.index[i], 'bullish_then_dragonfly'] = bullish_candle_followed_by_dragonfly(c1, c2) return df def compute_support_resistance_features(self, df): """Compute 7 support/resistance features""" df = df.copy() if len(df) < 10: df['distance_to_nearest_support'] = 0.0 df['distance_to_nearest_resistance'] = 0.0 df['near_support'] = 0 df['near_resistance'] = 0 df['distance_to_stop_loss'] = 0.5 df['support_strength'] = 0.0 df['resistance_strength'] = 0.0 return df current_price = df['Close'].iloc[-1] supports = find_supports(current_price, df) resistances = find_resistances(current_price, df) stop_level = find_stop_level(current_price, df) min_p, max_p = df['Low'].min(), df['High'].max() rng = max_p - min_p if max_p > min_p else 1 df['distance_to_nearest_support'] = dist_to_nearest(current_price, supports) df['distance_to_nearest_resistance'] = dist_to_nearest(current_price, resistances) df['near_support'] = int(any(abs(current_price - s) < 0.3 for s in supports)) if supports else 0 df['near_resistance'] = int(any(abs(current_price - r) < 0.3 for r in resistances)) if resistances else 0 df['distance_to_stop_loss'] = (current_price - stop_level) / rng if stop_level else 0.5 df['support_strength'] = cluster_strength([s/rng for s in supports]) df['resistance_strength'] = cluster_strength([r/rng for r in resistances]) return df def _validate_feature_contract(self, features_dict): """ Delegate to FEATURE_CONTRACT.validate() and return a legacy 3-tuple (is_valid, missing, extra) for call-site back-compat. `extra` in the legacy contract conflated two distinct failure modes — envelope leakage and unknown keys. We preserve the 3-tuple shape but keep them merged; richer diagnostics are available by calling FEATURE_CONTRACT.validate() directly. """ result = FEATURE_CONTRACT.validate(features_dict) extra = result.leaked_envelope | result.unexpected return result.ok, result.missing, extra def compute_all_features(self, df): """ Compute exactly 60 features with regime-adaptive normalization Regime detection is internal - NOT published """ try: if len(df) < 10: return pd.DataFrame() # Step 1: Compute raw features df = self.compute_core_technical_features(df) df = self.compute_derivative_features(df) df = self.compute_additional_technical(df) df = self.compute_candlestick_patterns(df) df = self.compute_support_resistance_features(df) # Step 2: Internal regime detection regime_weights = self.regime_detector.detect_regime(df) # Step 3: Apply adaptive normalization ONLY to continuous features continuous_features = [ 'log_return', 'rolling_std_5', 'zscore_5', 'rsi_14', 'macd', 'macd_signal', 'macd_hist', 'atr', 'cdf_value', 'cdf_slope', 'cdf_diff', 'volatility_ratio', 'entropy_50', 'autocorr_3', 'momentum_10', 'volume_change_rate', 'volume_zscore', 'price_vel_mean', 'price_acc_mean', 'price_jrk_mean', 'price_vel_std', 'price_acc_std', 'price_jrk_std', 'price_vel_skew', 'price_acc_skew', 'price_jrk_skew', 'price_vel_kurtosis', 'price_acc_kurtosis', 'price_jrk_kurtosis', 'bollinger_width', 'bollinger_position', 'distance_to_nearest_support', 'distance_to_nearest_resistance', 'distance_to_stop_loss', 'support_strength', 'resistance_strength' ] for feature in continuous_features: if feature in df.columns and feature not in NORMALIZATION_EXCLUSIONS: df[feature] = self.adaptive_normalizer.normalize( df[feature], regime_weights ) # Clean infinities and NaNs df = df.replace([np.inf, -np.inf], np.nan) df = df.ffill().bfill().fillna(0) return df except Exception as e: logger.error(f"Feature computation failed: {e}") return pd.DataFrame() def extract_meta_features(self, df, current_price): """Extract exactly 24 meta features (23 + timestamp)""" try: if len(df) < 10: return {} supports = find_supports(current_price, df) resistances = find_resistances(current_price, df) stop_level = find_stop_level(current_price, df) min_p, max_p = df['Low'].min(), df['High'].max() rng = max_p - min_p if max_p > min_p else 1 # Voting features (8) voting = { 'distance_to_nearest_support_scaled': dist_to_nearest(current_price, supports) / rng if rng > 0 else 0.0, 'distance_to_nearest_resistance_scaled': dist_to_nearest(current_price, resistances) / rng if rng > 0 else 0.0, 'near_support': int(any(abs(current_price - s) < 0.3 for s in supports)) if supports else 0, 'near_resistance': int(any(abs(current_price - r) < 0.3 for r in resistances)) if resistances else 0, 'distance_to_stop_loss_scaled': (current_price - stop_level) / rng if stop_level and rng > 0 else 0.5, 'support_strength_scaled': cluster_strength([s/rng for s in supports]) if rng > 0 else 0.0, 'resistance_strength_scaled': cluster_strength([r/rng for r in resistances]) if rng > 0 else 0.0, 'close_price': float(current_price) } # Filtered technical (15) latest = df.iloc[-1] feature_mappings = [ ('price_vel', 'price_vel_scaled'), ('price_acc', 'price_acc_scaled'), ('price_jrk', 'price_jrk_scaled'), ('price_vel_mean', 'price_vel_mean_scaled'), ('price_acc_mean', 'price_acc_mean_scaled'), ('price_jrk_mean', 'price_jrk_mean_scaled'), ('ma10', 'ma10_scaled'), ('ma20', 'ma20_scaled'), ('bollinger_upper', 'bollinger_upper_scaled'), ('bollinger_lower', 'bollinger_lower_scaled'), ('macd', 'macd_scaled'), ('macd_signal', 'macd_signal_scaled'), ('macd_hist', 'macd_hist_scaled'), ('rsi_14', 'rsi_scaled'), ('std20', 'std20_scaled') ] filtered = {} for df_col, meta_col in feature_mappings: if df_col in latest.index: filtered[meta_col] = float(latest[df_col]) else: filtered[meta_col] = 0.0 meta_features = {**filtered, **voting} # Validate count (23 features, timestamp added later) if len(meta_features) != 23: logger.error(f"Meta feature count violation: {len(meta_features)} != 23") return {} return meta_features except Exception as e: logger.error(f"Meta feature extraction failed: {e}") return {} def process_raw_tick(self, agent_name, price_data): """Process tick and enforce 60-feature contract""" try: close_price = price_data.get('close', 0) self.price_buffers[agent_name].append({ 'Close': close_price, 'High': price_data.get('high', close_price), 'Low': price_data.get('low', close_price), 'Volume': price_data.get('volume', 0), 'Open': price_data.get('open', close_price) }) if len(self.price_buffers[agent_name]) < 30: return df = pd.DataFrame(list(self.price_buffers[agent_name])) enhanced_df = self.compute_all_features(df) if enhanced_df.empty: return # CRITICAL FIX: Only extract computed features, not raw OHLCV latest_row = enhanced_df.iloc[-1] # Extract only REQUIRED_FEATURES (excluding raw OHLCV columns) latest_features = {} for feature in REQUIRED_FEATURES: if feature in ['price', 'close_scaled', 'close_price']: # These are price variants we add manually latest_features[feature] = float(close_price) elif feature in latest_row.index: latest_features[feature] = float(latest_row[feature]) else: logger.warning(f"[{agent_name}] Missing feature: {feature}") latest_features[feature] = 0.0 # ENFORCE CONTRACT — use the rich ValidationResult directly so we # log three distinct failure modes separately instead of collapsing # them into a single ambiguous "Missing / Extra" pair. validation = FEATURE_CONTRACT.validate(latest_features) if not validation.ok: logger.error("=" * 80) logger.error( f"❌ [{agent_name}] FEATURE CONTRACT VIOLATION " f"(contract={FEATURE_CONTRACT.version})" ) for line in validation.as_error_lines(): logger.error(f" {line}") logger.error("=" * 80) # Bookkeeping counter — lets ops tell the difference between # "feed is dry" and "feed is arriving but contract is broken". if not hasattr(self, '_contract_violation_counts'): self._contract_violation_counts = {} self._contract_violation_counts[agent_name] = ( self._contract_violation_counts.get(agent_name, 0) + 1 ) return with self.features_lock: self.latest_computed_features[agent_name] = latest_features.copy() except Exception as e: logger.error(f"[{agent_name}] Feature enhancement failed: {e}") async def publish_features(self, agent_name, features_dict, tick_index=None): """ Publish 60 features on the wire. Payload shape is enforced by FEATURE_CONTRACT.build_payload() — envelope keys live at the top level, feature keys live ONLY inside payload['features'], and a contract_version string accompanies every message so the consumer can detect schema drift. """ try: # Defensive re-validation at the publish boundary. Zero cost on # the happy path; catches any mutation between compute and # publish (e.g. a caller accidentally injecting envelope keys # into the features dict). validation = FEATURE_CONTRACT.validate(features_dict) if not validation.ok: logger.error( f"[{agent_name}] publish BLOCKED — contract violation at " f"publish boundary: {validation.as_error_lines()}" ) return # Coerce numpy scalars to native floats so the JSON serialiser # doesn't choke. Done on the features-only dict, inside the # contract shape. clean_features = { k: float(v) if isinstance(v, (np.floating, np.integer)) else v for k, v in features_dict.items() } # Resolve tick_index: caller may pass it explicitly, or it may # be embedded in the dict (legacy path). Envelope keys should # NOT be inside features_dict after the validation above, so # these .get() calls will normally return None — kept for # defensive back-compat. resolved_tick = tick_index if resolved_tick is None: resolved_tick = ( features_dict.get('tick_count') or features_dict.get('tick_index') ) payload = FEATURE_CONTRACT.build_payload( agent_name = agent_name, features_dict = clean_features, tick_index = resolved_tick, timestamp_iso = datetime.now(UTC).isoformat(), ) await self.features_channel.publish("integrated-features", payload) except Exception as e: logger.error(f"[{agent_name}] Feature publish failed: {e}") async def publish_meta_features(self, agent_name, meta_features): """Publish 24 meta features""" try: channel = self.meta_channels[agent_name] clean_meta = { k: float(v) if isinstance(v, (np.floating, np.integer)) else v for k, v in meta_features.items() } clean_meta['agent'] = agent_name clean_meta['timestamp'] = datetime.now(UTC).isoformat() await channel.publish("meta_features", clean_meta) except Exception as e: logger.error(f"[{agent_name}] Meta feature publish failed: {e}") def get_latest_state_features(self, agent_name=None): """Get latest features with type-aware aggregation""" with self.features_lock: if agent_name: return self.latest_computed_features.get(agent_name, {}) if not self.latest_computed_features: return {} all_features = list(self.latest_computed_features.values()) if not all_features: return {} return self._safe_aggregate_features(all_features) def _safe_aggregate_features(self, all_features): """Type-aware feature aggregation across agents""" avg_features = {} feature_keys = all_features[0].keys() for key in feature_keys: values = [f[key] for f in all_features if key in f] if not values: continue if key in BINARY_FEATURES: # Voting for binary features avg_features[key] = int(np.sum(values) > len(values) / 2) elif key in PRICE_FEATURES: # Median for price features (robust to outliers) clean_values = [v for v in values if not np.isnan(v)] if clean_values: avg_features[key] = float(np.median(clean_values)) else: avg_features[key] = 0.0 else: # Mean for continuous features clean_values = [v for v in values if not np.isnan(v)] if clean_values: avg_features[key] = float(np.mean(clean_values)) else: avg_features[key] = 0.0 return avg_features def get_feature_summary(self): """Get detailed feature summary""" with self.features_lock: if not self.latest_computed_features: return "No features computed yet" sample_agent = list(self.latest_computed_features.keys())[0] features = self.latest_computed_features[sample_agent] # Count only keys that are actually declared features in the # contract. This is set-intersection, not set-difference — so # it's correct regardless of whether envelope keys have leaked # into the features dict or not. actual_count = len(set(features.keys()) & FEATURE_CONTRACT.features) summary = f"REGIME-ADAPTIVE FEATURE ENHANCER\n" summary += "=" * 60 + "\n\n" summary += f"Total Features: {actual_count} (Expected: 60)\n\n" summary += "Feature Categories:\n" summary += f" • Core Technical: 19 features\n" summary += f" • Derivatives: 15 features\n" summary += f" • Additional Technical: 7 features\n" summary += f" • Candlestick Patterns: 9 features (institutional-grade)\n" summary += f" • Support/Resistance: 7 features\n" summary += f" • Price Variants: 3 features\n" summary += f" • TOTAL: 60 features\n\n" summary += f"Meta Features (24 total, published separately):\n" summary += f" • Voting: 8 features\n" summary += f" • Technical: 15 features\n" summary += f" • Timestamp: 1 metadata\n\n" summary += f"Regime Detection: INTERNAL (adaptive normalization)\n" summary += f" • Volatility regimes: low/medium/high\n" summary += f" • Trend detection: momentum-based\n" summary += f" • Mean-reversion: entropy-based\n\n" summary += f"Normalization: Regime-adaptive\n" summary += f" • High vol → Robust scaling (IQR)\n" summary += f" • Low vol → Standard z-score\n" summary += f" • Excluded: {len(NORMALIZATION_EXCLUSIONS)} features\n\n" summary += f"Aggregation: Type-aware\n" summary += f" • Binary: Voting (majority rule)\n" summary += f" • Price: Median (outlier-resistant)\n" summary += f" • Continuous: Mean\n" return summary # ============================================================================ # ASYNC WRAPPER # ============================================================================ class AsyncIntegratedFeatureEnhancer: def __init__(self, ably_client, agent_names, window_size=100): self.enhancer = IntegratedFeatureEnhancer(ably_client, agent_names, window_size) self.ably = ably_client self.agents = agent_names self.running = False self.channels = {} def get_latest_state_features(self, agent_name=None): return self.enhancer.get_latest_state_features(agent_name) async def start(self): self.running = True logger.info("AsyncIntegratedFeatureEnhancer started") logger.info("\n" + self.enhancer.get_feature_summary()) await self._start_ably_listeners() async def _start_ably_listeners(self): if not self.ably: logger.error("No Ably client available") return if hasattr(self.ably, 'connection') and self.ably.connection.state != 'connected': try: self.ably.connection.connect() for _ in range(20): await asyncio.sleep(0.5) if self.ably.connection.state == 'connected': break else: logger.error("Failed to connect to Ably") return except Exception as e: logger.error(f"Redis connection failed: {e}") return logger.info(f"Starting Ably listeners") for agent in self.agents: agent_str = agent.decode('utf-8') if isinstance(agent, bytes) else str(agent) feature_ok = await self._subscribe_with_retry( agent_str, "integrated-features", lambda msg, name=agent_str: self._handle_feature_message(name, msg) ) meta_ok = await self._subscribe_with_retry( agent_str, "meta_features", lambda msg, name=agent_str: self._handle_meta_features_message(name, msg), channel_suffix="meta_features-" ) if feature_ok: logger.info(f"✓ [{agent_str}] Feature channel attached") if meta_ok: logger.info(f"✓ [{agent_str}] Meta features channel attached") async def _subscribe_with_retry(self, agent_name, event_name, callback, max_retries=3, timeout=10, channel_suffix=""): channel_name = f"{channel_suffix}{agent_name}" if channel_suffix else agent_name for attempt in range(max_retries): try: channel = self.ably.channels.get(channel_name) self.channels[channel_name] = channel attach_task = asyncio.create_task(channel.attach()) try: await asyncio.wait_for(attach_task, timeout=timeout) except asyncio.TimeoutError: if attempt < max_retries - 1: await asyncio.sleep(2 ** attempt) continue return False subscribe_task = asyncio.create_task(channel.subscribe(event_name, callback)) try: await asyncio.wait_for(subscribe_task, timeout=timeout) return True except asyncio.TimeoutError: if attempt < max_retries - 1: await asyncio.sleep(2 ** attempt) continue return False except Exception as e: if attempt < max_retries - 1: await asyncio.sleep(2 ** attempt) continue return False return False async def process_tick(self, agent_name, price_data): loop = asyncio.get_event_loop() await loop.run_in_executor(None, self.enhancer.process_raw_tick, agent_name, price_data) features = self.enhancer.get_latest_state_features(agent_name) if features: await self._publish_with_retry(agent_name, features, meta=False) df = pd.DataFrame(list(self.enhancer.price_buffers[agent_name])) if len(df) >= 10: current_price = price_data.get('close', 0) meta_features = self.enhancer.extract_meta_features(df, current_price) if meta_features: await self._publish_with_retry(agent_name, meta_features, meta=True) async def _publish_with_retry(self, agent_name, features_dict, meta=False, tick_index=None): channel_name = f"meta_features-{agent_name}" if meta else agent_name event_name = "meta_features" if meta else "feature" if channel_name not in self.channels: self.channels[channel_name] = self.ably.channels.get(channel_name) channel = self.channels[channel_name] # Resolve tick_index from the features dict if not supplied resolved_tick = tick_index if resolved_tick is None and isinstance(features_dict, dict): resolved_tick = features_dict.get('tick_count') or features_dict.get('tick_index') payload = { 'agent': agent_name, 'features' if not meta else 'meta_features': features_dict, 'timestamp': datetime.now(UTC).isoformat(), 'tick_index': resolved_tick } for attempt in range(3): try: await channel.publish(event_name, payload) break except Exception as e: await asyncio.sleep(2 ** attempt) def _handle_feature_message(self, agent_name, msg): logger.debug(f"[{agent_name}] Feature message received") def _handle_meta_features_message(self, agent_name, msg): logger.debug(f"[{agent_name}] Meta feature message received") # ============================================================================ # MAIN EXECUTION - DERIV WEBSOCKET VERSION # ============================================================================ async def main(): nest_asyncio.apply() logger.info("=" * 80) logger.info("🚀 REGIME-ADAPTIVE FEATURE ENHANCER - DERIV WEBSOCKET EDITION") logger.info("=" * 80) # Initialize Deriv WebSocket instead of MT5 if not await deriv_bridge.initialize(SYMBOL): raise RuntimeError(f"❌ Deriv initialization failed") logger.info(f"✅ Deriv WebSocket initialized") logger.info(f" Symbol: {SYMBOL} -> {DERIV_SYMBOL}") # Verify symbol symbol_info = deriv_bridge.symbol_info(SYMBOL) if symbol_info is None: await deriv_bridge.shutdown() raise RuntimeError(f"❌ Symbol {SYMBOL} not found") logger.info(f"✅ Symbol verified") logger.info("\n📡 Connecting to Redis (V25 namespace)...") try: ably_client = RedisAblyClient(redis_url=REDIS_URL, use_streams=True) # V25 await asyncio.sleep(1) logger.info("✅ Redis connected (V25 — channels prefixed with '%s')" % CHANNEL_PREFIX) except Exception as e: await deriv_bridge.shutdown() raise RuntimeError(f"❌ Redis connection failed: {e}") logger.info("\n🔧 Initializing feature enhancers...") agent_names = list(TIMEFRAMES.keys()) enhancer = AsyncIntegratedFeatureEnhancer( ably_client=ably_client, agent_names=agent_names, window_size=100 ) await enhancer.start() agent_channels = {tf: ably_client.channels.get(tf) for tf in TIMEFRAMES} # ========================================================================= # BATCH SYNCHRONISATION — now handled by FeatureBatchGateway in Redis # ========================================================================= # Features.py's responsibility is ONLY to publish each agent's features to # its own per-agent Redis channel as soon as they are computed. # # The FeatureBatchGateway (in redis_connection_manager.py) subscribes to # all 8 per-agent channels on the Quasar side and acts as the gating layer: # • Accumulates per-agent contributions for each tick # • DISCARDS any partial batch when a new tick_index arrives (waitlist discard) # • Only fires on_batch_ready() when ALL 8 agents share the same tick/price # # This keeps Features.py simple (just publish, no coordination) and moves # the synchronisation concern to the Redis transport layer where it belongs. # ========================================================================= logger.info("\n✅ All systems initialized - Starting tick processing...\n") tick_count = 0 last_summary_time = time.time() feature_counts = {tf: 0 for tf in TIMEFRAMES} # ── Rate-limit gate ─────────────────────────────────────────────────────── # Derived from observed p95 latencies in the QSAP health report: # • Per-agent inference p95 ≈ 1552 ms # • Dispatch latency p95 ≈ 1292 ms # With all 8 agents running concurrently (asyncio.gather) the bottleneck # is max(p95_inference) ≈ 1552 ms. 3 000 ms gives ~93 % headroom and # guarantees the downstream QSAP never receives a stale tick. MIN_TICK_INTERVAL = 60.0 # seconds — never dispatch faster than this _processing = asyncio.Semaphore(1) # only one tick in-flight at a time async def _process_one_agent(tf_name, price_data, timestamp): """Process and publish a single timeframe agent concurrently.""" try: await enhancer.process_tick(tf_name, price_data) features = enhancer.get_latest_state_features(tf_name) if features: feature_counts[tf_name] += 1 features_with_meta = { **features, 'timestamp': timestamp.isoformat(), 'tick_count': tick_count, 'timeframe': tf_name, } # Publish to per-agent channel. # The FeatureBatchGateway in redis_connection_manager.py # subscribes to all 8 per-agent channels and fires a # complete batch only when all agents share the same # tick_index — discarding any partial/stale waitlist. await agent_channels[tf_name].publish( "integrated-features", { "agent": tf_name, "features": features_with_meta, "feature_count": len(features), "tick_index": tick_count, "price": price_data['close'], # raw Deriv tick — §0c }, ) if feature_counts[tf_name] % 10 == 0: logger.info( f"✅ [{tf_name}] Tick #{tick_count}: " f"60 features + meta | Price: {price_data['close']:.5f}" ) except Exception as e: logger.error(f"❌ [{tf_name}] Error: {e}") try: while True: tick_start = time.monotonic() try: # Get tick from Deriv WebSocket instead of MT5 tick = deriv_bridge.symbol_info_tick(SYMBOL) if tick is None: await asyncio.sleep(0.5) continue tick_count += 1 mid_price = (tick.bid + tick.ask) / 2.0 timestamp = datetime.now(UTC) price_data = { 'close': mid_price, 'high': tick.ask, 'low': tick.bid, 'open': mid_price, 'volume': getattr(tick, 'volume', 0), } # ── All 8 agents run CONCURRENTLY; next tick cannot start until # every agent has finished computing and publishing. ───────── async with _processing: await asyncio.gather( *[_process_one_agent(tf, price_data, timestamp) for tf in TIMEFRAMES], return_exceptions=True, # one agent error never kills others ) if time.time() - last_summary_time > 60: logger.info("\n" + "=" * 80) logger.info(f"📊 SUMMARY (Tick #{tick_count})") logger.info("=" * 80) logger.info(f"Price: {mid_price:.5f}") logger.info(f"Data Source: Deriv WebSocket (Streaming)") for tf in TIMEFRAMES: logger.info(f" {tf}: {feature_counts[tf]} updates") logger.info("=" * 80 + "\n") last_summary_time = time.time() except KeyboardInterrupt: break except Exception as e: logger.error(f"❌ Tick error: {e}") # ── Completion-based gate ───────────────────────────────────────── # Sleep only the time remaining to reach MIN_TICK_INTERVAL. # If processing already took longer, sleep_for = 0 (no extra wait). elapsed = time.monotonic() - tick_start sleep_for = max(0.0, MIN_TICK_INTERVAL - elapsed) logger.debug( f"Tick #{tick_count} | processed in {elapsed*1000:.0f} ms " f"| sleeping {sleep_for*1000:.0f} ms" ) await asyncio.sleep(sleep_for) finally: logger.info("\n🛑 SHUTTING DOWN") await deriv_bridge.shutdown() logger.info(f"Total Ticks: {tick_count}") logger.info("✅ Shutdown complete") if __name__ == "__main__": try: nest_asyncio.apply() asyncio.run(main()) except KeyboardInterrupt: logger.info("\n⚠️ Interrupted by user") except Exception as e: logger.error(f"\n❌ Fatal error: {e}") traceback.print_exc() #+263780563561 ENG Karl Muzunze Masvingo Zimbabwe