Spaces:
Running
Running
| """ | |
| ╔══════════════════════════════════════════════════════════════════════════════╗ | |
| ║ ║ | |
| ║ ██╗ ██╗ ██╗██████╗ ██╗ ██████╗ ██╗ ██╗ █████╗ ███╗ ██╗████████╗║ | |
| ║ ██║ ██╔╝███║██╔══██╗██║ ██╔═══██╗██║ ██║██╔══██╗████╗ ██║╚══██╔══╝║ | |
| ║ █████╔╝ ╚██║██████╔╝██║ ██║ ██║██║ ██║███████║██╔██╗ ██║ ██║ ║ | |
| ║ ██╔═██╗ ██║██╔══██╗██║ ██║▄▄ ██║██║ ██║██╔══██║██║╚██╗██║ ██║ ║ | |
| ║ ██║ ██╗ ██║██║ ██║███████╗ ╚██████╔╝╚██████╔╝██║ ██║██║ ╚████║ ██║ ║ | |
| ║ ╚═╝ ╚═╝ ╚═╝╚═╝ ╚═╝╚══════╝ ╚══▀▀═╝ ╚═════╝ ╚═╝ ╚═╝╚═╝ ╚═══╝ ╚═╝ ║ | |
| ║ ║ | |
| ║ ──────────────────────────────────────────────────────────────────────────── ║ | |
| ║ ║ | |
| ║ REGIME-ADAPTIVE FEATURE ENGINEERING SYSTEM ║ | |
| ║ ║ | |
| ║ Multi-Resolution Analysis • Institutional Patterns • AI ║ | |
| ║ ║ | |
| ║ ──────────────────────────────────────────────────────────────────────────── ║ | |
| ║ ║ | |
| ║ ASSET: Volatility 75 Index TIMEFRAMES: 8 (5s - 10m) ║ | |
| ║ FEATURES: 60 per timeframe TOTAL DIMS: 480 features ║ | |
| ║ REGIME: Adaptive (Volatility/Trend) INFO GAIN: +83% vs baseline ║ | |
| ║ PATTERNS: Institutional-Grade COMPUTE: <7ms/tick ║ | |
| ║ ║ | |
| ║ "Latent Regime Detection for Non-Stationary Markets" ║ | |
| ║ ║ | |
| ║ [ FEATURE EXTRACTION ONLINE ] v3.0.0-V75 | DERIV WEBSOCKET EDITION ║ | |
| ║ ║ | |
| ╚══════════════════════════════════════════════════════════════════════════════╝ | |
| THEORETICAL FOUNDATION: | |
| P(Y_{t+Δ}|Φ(X_t)) = Σ_r P(Y_{t+Δ}|Φ,R_t=r)P(R_t=r) | |
| References: | |
| - Ang & Timmermann (2012): Regime Changes and Financial Markets | |
| - Hamilton (1989): Markov Regime-Switching Models | |
| - Nison (1991): Japanese Candlestick Charting Techniques | |
| """ | |
| import pandas as pd | |
| import numpy as np | |
| from scipy.stats import percentileofscore, skew, kurtosis | |
| from collections import deque | |
| from datetime import datetime, timezone | |
| UTC = timezone.utc | |
| from typing import Optional, Dict | |
| from dataclasses import dataclass | |
| import threading | |
| import logging | |
| import nest_asyncio | |
| import time | |
| import asyncio | |
| import json | |
| import ssl | |
| import websockets | |
| import traceback | |
| import warnings | |
| # ============================================================================ | |
| # REDIS CLIENT FOR HUGGINGFACE SPACES (V75 — NAMESPACED CHANNELS) | |
| # ============================================================================ | |
| try: | |
| from redis_config_v75 import REDIS_URL, REDIS_DB_FEATURES, CHANNEL_PREFIX, prefixed_channel | |
| import redis | |
| class RedisAblyClient: | |
| """Simple Redis client for HuggingFace Spaces compatibility (V75 namespaced)""" | |
| def __init__(self, redis_url=None, use_streams=True): | |
| self.redis_url = redis_url or REDIS_URL | |
| self.client = None | |
| self.channels = SimpleChannelManager(self) | |
| self._connect() | |
| def _connect(self): | |
| try: | |
| # V75: Use DB 0 (features) — isolated per Space container | |
| self.client = redis.from_url(self.redis_url, db=REDIS_DB_FEATURES) | |
| self.client.ping() | |
| print(f"✅ Redis connected for features (V75 — DB {REDIS_DB_FEATURES})") | |
| except Exception as e: | |
| print(f"⚠️ Redis connection failed: {e}") | |
| self.client = None | |
| async def publish(self, channel, data): | |
| if self.client: | |
| try: | |
| # V75: Auto-prefix channel name for namespace isolation | |
| self.client.publish(prefixed_channel(channel), json.dumps(data)) | |
| except Exception as e: | |
| print(f"⚠️ Redis publish failed: {e}") | |
| class SimpleChannel: | |
| def __init__(self, name, client): | |
| self.name = prefixed_channel(name) # V75: auto-prefix | |
| self.client = client | |
| async def publish(self, event, data): | |
| # Publish to channel name directly | |
| await self.client.publish(self.name, { | |
| "event": event, | |
| "data": data | |
| }) | |
| class SimpleChannelManager: | |
| def __init__(self, client): | |
| self.client = client | |
| self._channels = {} | |
| def get(self, name): | |
| if name not in self._channels: | |
| self._channels[name] = SimpleChannel(name, self.client) | |
| return self._channels[name] | |
| except ImportError: | |
| print("⚠️ Redis not available - using mock mode") | |
| CHANNEL_PREFIX = "V75:" | |
| def prefixed_channel(name): | |
| return f"V75:{name}" if not name.startswith("V75:") else name | |
| REDIS_DB_FEATURES = 0 | |
| class RedisAblyClient: | |
| def __init__(self, *args, **kwargs): | |
| self.channels = SimpleChannelManager(self) | |
| async def publish(self, channel, data): | |
| pass | |
| class SimpleChannel: | |
| def __init__(self, name, client): | |
| self.name = prefixed_channel(name) | |
| async def publish(self, event, data): | |
| pass | |
| class SimpleChannelManager: | |
| def __init__(self, client): | |
| self._channels = {} | |
| def get(self, name): | |
| if name not in self._channels: | |
| self._channels[name] = SimpleChannel(name, None) | |
| return self._channels[name] | |
| warnings.filterwarnings('ignore', category=FutureWarning) | |
| warnings.filterwarnings('ignore', category=RuntimeWarning, message='Mean of empty slice') | |
| warnings.filterwarnings('ignore', category=RuntimeWarning, message='overflow encountered') | |
| nest_asyncio.apply() | |
| # ============================================================================ | |
| # DERIV WEBSOCKET CONFIGURATION | |
| # ============================================================================ | |
| DERIV_API_KEY = "1KJKxIJKR8LCyKB" | |
| DERIV_WS_URL = "wss://ws.binaryws.com/websockets/v3?app_id=1089" | |
| SYMBOL_MAP = { | |
| "Volatility 25 Index": "R_25", | |
| "Crash 500 Index": "CRASH500", | |
| "Volatility 100 Index": "R_100", | |
| "Volatility 50 Index": "R_50", | |
| "Volatility 75 Index": "R_75", # ✅ V75: Volatility 75 Index symbol | |
| } | |
| # ============================================================================ | |
| # DERIV DATA STRUCTURES | |
| # ============================================================================ | |
| class DerivTick: | |
| time: int = 0 | |
| bid: float = 0.0 | |
| ask: float = 0.0 | |
| last: float = 0.0 | |
| volume: int = 0 | |
| time_msc: int = 0 | |
| flags: int = 0 | |
| volume_real: float = 0.0 | |
| class DerivAccountInfo: | |
| login: int = 0 | |
| balance: float = 0.0 | |
| equity: float = 0.0 | |
| profit: float = 0.0 | |
| margin: float = 0.0 | |
| margin_free: float = 0.0 | |
| margin_level: float = 0.0 | |
| currency: str = "USD" | |
| # ============================================================================ | |
| # DERIV WEBSOCKET BRIDGE - STREAMING VERSION | |
| # ============================================================================ | |
| class DerivBridge: | |
| """Deriv WebSocket bridge - STREAMING VERSION""" | |
| def __init__(self): | |
| self.ws = None | |
| self.is_connected = False | |
| self.is_authorized = False | |
| self.balance = 0.0 | |
| self._prices = {} # Current prices for each symbol | |
| self._price_lock = asyncio.Lock() | |
| self._stream_tasks = {} | |
| self._subscribed_symbols = set() | |
| self._last_tick = None | |
| async def _connect_and_authorize(self): | |
| """Connect and authorize to Deriv""" | |
| try: | |
| print("🔄 Connecting to Deriv WebSocket...") | |
| ssl_context = ssl.create_default_context() | |
| ssl_context.check_hostname = False | |
| ssl_context.verify_mode = ssl.CERT_NONE | |
| self.ws = await websockets.connect( | |
| DERIV_WS_URL, | |
| ssl=ssl_context, | |
| ping_interval=30, | |
| ping_timeout=10, | |
| close_timeout=5, | |
| max_size=2**20 | |
| ) | |
| print("✅ WebSocket connected") | |
| # Authorize | |
| auth_msg = {"authorize": DERIV_API_KEY} | |
| await self.ws.send(json.dumps(auth_msg)) | |
| # Wait for auth response | |
| response = await self.ws.recv() | |
| data = json.loads(response) | |
| if 'authorize' in data: | |
| self.is_connected = True | |
| self.is_authorized = True | |
| self.balance = float(data['authorize'].get('balance', 0)) | |
| print(f"✅ Authorized | Balance: ${self.balance:.2f}") | |
| return True | |
| elif 'error' in data: | |
| print(f"❌ Auth error: {data['error']}") | |
| return False | |
| except Exception as e: | |
| print(f"❌ Connection error: {e}") | |
| return False | |
| async def _stream_prices(self, deriv_symbol: str): | |
| """Continuous price streaming for a symbol with auto-reconnect""" | |
| retry_delay = 5 | |
| while True: | |
| try: | |
| # Re-connect/re-authorize if needed | |
| if not self.is_connected or self.ws is None: | |
| print(f"🔄 Reconnecting WebSocket for {deriv_symbol}...") | |
| connected = await self._connect_and_authorize() | |
| if not connected: | |
| print(f"⚠️ Reconnect failed for {deriv_symbol}, retrying in {retry_delay}s...") | |
| await asyncio.sleep(retry_delay) | |
| retry_delay = min(retry_delay * 2, 60) | |
| continue | |
| retry_delay = 5 # reset on success | |
| # Subscribe to ticks | |
| subscribe_msg = {"ticks": deriv_symbol} | |
| await self.ws.send(json.dumps(subscribe_msg)) | |
| print(f"📡 Streaming {deriv_symbol}...") | |
| # Continuous receive loop | |
| while self.is_connected: | |
| try: | |
| data = await self.ws.recv() | |
| json_data = json.loads(data) | |
| if 'tick' in json_data: | |
| tick_data = json_data['tick'] | |
| if tick_data.get('symbol') == deriv_symbol: | |
| price = float(tick_data['quote']) | |
| epoch = int(tick_data['epoch']) | |
| async with self._price_lock: | |
| self._prices[deriv_symbol] = { | |
| 'bid': price - 0.0005, | |
| 'ask': price + 0.0005, | |
| 'last': price, | |
| 'time': epoch, | |
| 'time_msc': epoch * 1000 | |
| } | |
| self._last_tick = DerivTick( | |
| time=epoch, | |
| bid=price - 0.0005, | |
| ask=price + 0.0005, | |
| last=price, | |
| volume=0, | |
| time_msc=epoch * 1000 | |
| ) | |
| elif 'error' in json_data: | |
| logging.error(f"Stream error for {deriv_symbol}: {json_data['error']}") | |
| except asyncio.CancelledError: | |
| print(f"Stream cancelled for {deriv_symbol}") | |
| return | |
| except websockets.exceptions.ConnectionClosed: | |
| print(f"❌ WebSocket closed for {deriv_symbol} — will reconnect") | |
| self.is_connected = False | |
| break | |
| except json.JSONDecodeError: | |
| continue | |
| except Exception as e: | |
| logging.error(f"Stream error: {e}") | |
| self.is_connected = False | |
| break | |
| except asyncio.CancelledError: | |
| print(f"Stream cancelled for {deriv_symbol}") | |
| return | |
| except Exception as e: | |
| logging.error(f"Fatal stream error for {deriv_symbol}: {e}") | |
| self.is_connected = False | |
| # Brief pause before reconnect attempt | |
| await asyncio.sleep(retry_delay) | |
| retry_delay = min(retry_delay * 2, 60) | |
| async def _ensure_stream(self, deriv_symbol: str): | |
| """Ensure streaming is active for a symbol; restart dead tasks""" | |
| existing = self._stream_tasks.get(deriv_symbol) | |
| if existing is None or existing.done(): | |
| # Start (or restart) streaming task | |
| task = asyncio.create_task(self._stream_prices(deriv_symbol)) | |
| self._stream_tasks[deriv_symbol] = task | |
| self._subscribed_symbols.add(deriv_symbol) | |
| async def get_current_price(self, deriv_symbol: str) -> Optional[Dict]: | |
| """Get current price (from streaming cache)""" | |
| try: | |
| # Ensure we're streaming this symbol | |
| await self._ensure_stream(deriv_symbol) | |
| # Give it a moment to receive first price if new | |
| if deriv_symbol not in self._prices: | |
| await asyncio.sleep(0.5) | |
| # Return cached price | |
| async with self._price_lock: | |
| return self._prices.get(deriv_symbol) | |
| except Exception as e: | |
| logging.error(f"Price fetch error: {e}") | |
| return None | |
| def symbol_info_tick(self, symbol: str) -> Optional[DerivTick]: | |
| """MT5-compatible tick info getter (synchronous wrapper)""" | |
| try: | |
| deriv_symbol = SYMBOL_MAP.get(symbol, symbol) | |
| # Check if we have cached price | |
| if deriv_symbol in self._prices: | |
| price_data = self._prices[deriv_symbol] | |
| return DerivTick( | |
| time=price_data.get('time', 0), | |
| bid=price_data.get('bid', 0), | |
| ask=price_data.get('ask', 0), | |
| last=price_data.get('last', 0), | |
| volume=0, | |
| time_msc=price_data.get('time_msc', 0) | |
| ) | |
| return self._last_tick | |
| except Exception as e: | |
| logging.error(f"symbol_info_tick error: {e}") | |
| return None | |
| async def get_balance(self) -> float: | |
| """Get current balance""" | |
| try: | |
| if not self.is_connected: | |
| return self.balance | |
| await self.ws.send(json.dumps({"balance": 1})) | |
| # Wait for balance response (with timeout for this specific call) | |
| for _ in range(10): | |
| try: | |
| response = await asyncio.wait_for(self.ws.recv(), timeout=1) | |
| data = json.loads(response) | |
| if 'balance' in data: | |
| self.balance = float(data['balance']['balance']) | |
| return self.balance | |
| except asyncio.TimeoutError: | |
| continue | |
| except Exception as e: | |
| logging.error(f"Balance error: {e}") | |
| return self.balance | |
| def symbol_info(self, symbol: str) -> Optional[dict]: | |
| """MT5-compatible symbol_info - returns symbol information""" | |
| deriv_symbol = SYMBOL_MAP.get(symbol, symbol) | |
| return { | |
| 'name': symbol, | |
| 'deriv_symbol': deriv_symbol, | |
| 'visible': True, | |
| 'point': 0.00001, | |
| 'digits': 5 | |
| } | |
| def symbol_select(self, symbol: str, enable: bool = True) -> bool: | |
| """MT5-compatible symbol_select - always returns True for Deriv""" | |
| return True | |
| async def initialize(self, symbol: str = None): | |
| """Initialize connection and optionally start streaming a symbol""" | |
| try: | |
| print("🔄 Initializing Deriv...") | |
| result = await self._connect_and_authorize() | |
| if result: | |
| if symbol: | |
| deriv_symbol = SYMBOL_MAP.get(symbol, symbol) | |
| await self._ensure_stream(deriv_symbol) | |
| # Wait for first tick | |
| await asyncio.sleep(1) | |
| print("✅ Deriv initialized") | |
| return True | |
| else: | |
| print("❌ Deriv init failed") | |
| return False | |
| except Exception as e: | |
| logging.error(f"Initialize error: {e}") | |
| return False | |
| async def shutdown(self): | |
| """Shutdown gracefully""" | |
| try: | |
| # Cancel all streaming tasks | |
| for task in self._stream_tasks.values(): | |
| task.cancel() | |
| # Wait for cancellation | |
| if self._stream_tasks: | |
| await asyncio.gather(*self._stream_tasks.values(), return_exceptions=True) | |
| # Close WebSocket | |
| if self.ws: | |
| await self.ws.close() | |
| self.is_connected = False | |
| self.is_authorized = False | |
| except Exception as e: | |
| logging.error(f"Shutdown error: {e}") | |
| # Global bridge instance | |
| deriv_bridge = DerivBridge() | |
| # ============================================================================ | |
| # CONFIGURATION | |
| # ============================================================================ | |
| # Redis URL already imported above in inline Redis client | |
| SYMBOL = "Volatility 75 Index" # ✅ V75 | |
| DERIV_SYMBOL = "R_75" # ✅ V75: Volatility 75 Index Deriv symbol | |
| FEATURE_WINDOW = 10 # base unit | |
| TIMEFRAMES = { | |
| # === High-Frequency Zone (Volatility Capture) === | |
| 'xs': 5, # tick | |
| 's': 10, # ultra | |
| 'm': 20, # fast | |
| # === Critical Trading Zones === | |
| 'l': 30, # scalp | |
| 'xl': 60, # 1min | |
| 'xxl': 120, # 2min | |
| # === Structure & Regime Detection === | |
| '5m': 300, # 5min | |
| '10m': 600, # 10min | |
| } | |
| # ============================================================================ | |
| # FEATURE CONTRACT — SINGLE SOURCE OF TRUTH (60 FEATURES) | |
| # ---------------------------------------------------------------------------- | |
| # | |
| # ENGINEERING NOTE — why this looks the way it does: | |
| # | |
| # Previously the contract was spread across three top-level sets | |
| # (REQUIRED_FEATURES, METADATA_FIELDS, BINARY_FEATURES, ...), with no | |
| # runtime check that they were mutually consistent. A drift where a | |
| # single key ('price') landed in BOTH the "required features" list AND | |
| # the "metadata to strip before validating" set caused the validator to | |
| # report {'price'} missing on every tick, which silently shut down | |
| # publishing for the entire pipeline. | |
| # | |
| # The fix is structural: ONE FeatureContract object owns the full schema | |
| # and checks its own invariants at import time. Any future drift crashes | |
| # the module on load with a named offender, instead of corrupting the | |
| # wire format at 60Hz for hours. | |
| # | |
| # The old module-level names (REQUIRED_FEATURES, METADATA_FIELDS, etc.) | |
| # are kept as PROJECTIONS of the contract for call-site back-compat — the | |
| # rest of Features.py can import them exactly as before. | |
| # ============================================================================ | |
| from dataclasses import dataclass, field | |
| from typing import FrozenSet, Mapping, Any | |
| CONTRACT_VERSION = "feat-v1.0.0" | |
| EXPECTED_FEATURE_COUNT = 60 | |
| # ---- Feature keys (60) — values fed into model inference ------------------ | |
| _FEATURES: FrozenSet[str] = frozenset({ | |
| # Core Technical (19) | |
| 'log_return', 'rolling_mean_5', 'rolling_std_5', 'zscore_5', | |
| 'rsi_14', 'macd', 'macd_signal', 'macd_hist', 'atr', | |
| 'cdf_value', 'cdf_slope', 'cdf_diff', | |
| 'volatility_quantile_90', 'volatility_ratio', 'entropy_50', | |
| 'autocorr_3', 'momentum_10', 'volume_change_rate', 'volume_zscore', | |
| # Derivatives (15) | |
| 'price_vel', 'price_acc', 'price_jrk', | |
| 'price_vel_mean', 'price_vel_std', 'price_vel_skew', 'price_vel_kurtosis', | |
| 'price_acc_mean', 'price_acc_std', 'price_acc_skew', 'price_acc_kurtosis', | |
| 'price_jrk_mean', 'price_jrk_std', 'price_jrk_skew', 'price_jrk_kurtosis', | |
| # Additional Technical (7) | |
| 'ma10', 'ma20', 'std20', | |
| 'bollinger_upper', 'bollinger_lower', 'bollinger_width', 'bollinger_position', | |
| # Candlestick (9) | |
| 'gravestone_doji', 'four_price_doji', 'doji', 'spinning_top', | |
| 'bullish_candle', 'bearish_candle', 'dragonfly_candle', | |
| 'spinning_top_bearish_followup', 'bullish_then_dragonfly', | |
| # Support / Resistance (7) | |
| 'distance_to_nearest_support', 'distance_to_nearest_resistance', | |
| 'near_support', 'near_resistance', 'distance_to_stop_loss', | |
| 'support_strength', 'resistance_strength', | |
| # Price Variants (3) — models consume these for absolute-scale context | |
| 'price', 'close_scaled', 'close_price', | |
| }) | |
| # ---- Envelope keys — wire metadata, NEVER fed to a model ------------------- | |
| # Disjoint from _FEATURES by invariant (checked below in __post_init__). | |
| _ENVELOPE: FrozenSet[str] = frozenset({ | |
| 'agent', # routing | |
| 'timeframe', # routing | |
| 'timestamp', # wall-clock ISO-8601 at publish | |
| 'tick_index', # monotonic producer tick counter | |
| 'tick_count', # legacy alias, kept for back-compat | |
| 'feature_count', # integrity check: len(features) | |
| 'contract_version', # schema version string | |
| 'features', # nested payload key | |
| }) | |
| # ---- Typed subsets of _FEATURES (validated as subsets at import time) ------ | |
| _BINARY: FrozenSet[str] = frozenset({ | |
| 'near_support', 'near_resistance', | |
| 'gravestone_doji', 'four_price_doji', 'doji', 'spinning_top', | |
| 'bullish_candle', 'bearish_candle', 'dragonfly_candle', | |
| 'spinning_top_bearish_followup', 'bullish_then_dragonfly', | |
| }) | |
| _PRICE_SCALE: FrozenSet[str] = frozenset({ | |
| 'price', 'close_scaled', 'close_price', | |
| 'ma10', 'ma20', 'bollinger_upper', 'bollinger_lower', | |
| }) | |
| _NON_NORMALISED: FrozenSet[str] = _BINARY | _PRICE_SCALE | frozenset({ | |
| 'price_vel', 'price_acc', 'price_jrk', | |
| }) | |
| class ValidationResult: | |
| """Structured validation outcome with three distinct failure modes.""" | |
| ok: bool | |
| missing: FrozenSet[str] # required features absent from dict | |
| leaked_envelope: FrozenSet[str] # envelope keys found inside features dict | |
| unexpected: FrozenSet[str] # keys that belong to neither set | |
| def as_error_lines(self): | |
| lines = [] | |
| if self.missing: | |
| lines.append(f"missing features: {sorted(self.missing)}") | |
| if self.leaked_envelope: | |
| lines.append(f"envelope keys inside features dict: " | |
| f"{sorted(self.leaked_envelope)}") | |
| if self.unexpected: | |
| lines.append(f"unknown keys: {sorted(self.unexpected)}") | |
| return lines | |
| class FeatureContract: | |
| """ | |
| The schema for a single timeframe's feature payload. | |
| Invariants (all checked in __post_init__ — module fails to import if | |
| any are violated): | |
| (1) features ∩ envelope = ∅ | |
| No key is allowed to be "both a feature and envelope". This | |
| was the original bug — 'price' was in both sets, and the | |
| validator silently rejected every tick. | |
| (2) |features| == EXPECTED_FEATURE_COUNT | |
| The contract declares an exact 60-feature shape. Drift here | |
| would corrupt downstream tensor shapes. | |
| (3) binary, price_scale, non_normalised are all ⊆ features | |
| A typed subset cannot contain a key that isn't a feature at | |
| all. This catches stale references after a feature rename. | |
| """ | |
| version: str = CONTRACT_VERSION | |
| features: FrozenSet[str] = field(default_factory=lambda: _FEATURES) | |
| envelope: FrozenSet[str] = field(default_factory=lambda: _ENVELOPE) | |
| binary: FrozenSet[str] = field(default_factory=lambda: _BINARY) | |
| price_scale: FrozenSet[str] = field(default_factory=lambda: _PRICE_SCALE) | |
| non_normalised: FrozenSet[str] = field(default_factory=lambda: _NON_NORMALISED) | |
| def __post_init__(self): | |
| # (1) disjointness | |
| overlap = self.features & self.envelope | |
| if overlap: | |
| raise RuntimeError( | |
| f"[FeatureContract] BROKEN INVARIANT: keys in BOTH features " | |
| f"and envelope: {sorted(overlap)}. Remove from one set — the " | |
| f"validator cannot distinguish feature-vs-envelope for these " | |
| f"keys, so every tick will be rejected." | |
| ) | |
| # (2) cardinality | |
| if len(self.features) != EXPECTED_FEATURE_COUNT: | |
| raise RuntimeError( | |
| f"[FeatureContract] BROKEN INVARIANT: expected " | |
| f"{EXPECTED_FEATURE_COUNT} features, got {len(self.features)}. " | |
| f"Update EXPECTED_FEATURE_COUNT or fix the feature list." | |
| ) | |
| # (3) subsets | |
| for name, subset in ( | |
| ('binary', self.binary), | |
| ('price_scale', self.price_scale), | |
| ('non_normalised', self.non_normalised), | |
| ): | |
| stray = subset - self.features | |
| if stray: | |
| raise RuntimeError( | |
| f"[FeatureContract] BROKEN INVARIANT: '{name}' contains " | |
| f"non-feature keys: {sorted(stray)}" | |
| ) | |
| # ---- public API ------------------------------------------------------- | |
| def validate(self, features_dict: Mapping[str, Any]) -> ValidationResult: | |
| """ | |
| Validate the INNER features dict only — envelope keys should NOT | |
| be present here; if they are, they're reported as leaked_envelope, | |
| not stripped and hidden. | |
| """ | |
| actual = set(features_dict.keys()) | |
| return ValidationResult( | |
| ok = (actual == self.features), | |
| missing = frozenset(self.features - actual), | |
| leaked_envelope = frozenset(actual & self.envelope), | |
| unexpected = frozenset(actual - self.features - self.envelope), | |
| ) | |
| def build_payload( | |
| self, | |
| agent_name: str, | |
| features_dict: Mapping[str, float], | |
| tick_index, | |
| timestamp_iso: str, | |
| ) -> dict: | |
| """ | |
| Construct the wire payload with envelope / feature separation | |
| enforced structurally. Envelope fields live at the top level; | |
| features live ONLY inside payload['features']. | |
| """ | |
| return { | |
| 'agent': agent_name, | |
| 'timestamp': timestamp_iso, | |
| 'tick_index': tick_index, | |
| 'feature_count': len(features_dict), | |
| 'contract_version': self.version, | |
| 'features': dict(features_dict), | |
| } | |
| def extract_features(self, payload: Mapping[str, Any]) -> dict: | |
| """ | |
| Consumer-side: pull the inner features dict and verify envelope | |
| version. Raises ValueError on schema drift so the consumer can | |
| log-and-drop rather than silently accept malformed payloads. | |
| """ | |
| got_ver = payload.get('contract_version') | |
| if got_ver is not None and got_ver != self.version: | |
| raise ValueError( | |
| f"contract version mismatch: payload={got_ver!r} " | |
| f"expected={self.version!r}" | |
| ) | |
| feats = payload.get('features') | |
| if not isinstance(feats, dict): | |
| raise ValueError( | |
| f"payload.features missing or wrong type: {type(feats).__name__}" | |
| ) | |
| return feats | |
| # Singleton — import this, don't construct your own. | |
| # Module import will FAIL LOUDLY here if any invariant is violated. | |
| FEATURE_CONTRACT = FeatureContract() | |
| # --------------------------------------------------------------------------- | |
| # Back-compat aliases — projections of FEATURE_CONTRACT. Existing call sites | |
| # keep working unchanged; only the source of truth moved. Deleting any of | |
| # these will break older code paths that haven't been migrated to use | |
| # FEATURE_CONTRACT directly. | |
| # --------------------------------------------------------------------------- | |
| REQUIRED_FEATURES = tuple(FEATURE_CONTRACT.features) # order-agnostic | |
| METADATA_FIELDS = FEATURE_CONTRACT.envelope | |
| BINARY_FEATURES = FEATURE_CONTRACT.binary | |
| PRICE_FEATURES = FEATURE_CONTRACT.price_scale | |
| NORMALIZATION_EXCLUSIONS = FEATURE_CONTRACT.non_normalised | |
| # ============================================================================ | |
| # REGIME DETECTION PARAMETERS | |
| # ============================================================================ | |
| REGIME_CONFIG = { | |
| 'volatility_lookback': 100, | |
| 'vol_low_threshold': 0.33, | |
| 'vol_high_threshold': 0.67, | |
| 'trend_threshold': 0.6, | |
| 'entropy_threshold': 1.5, | |
| 'regime_memory': 20, | |
| } | |
| # ============================================================================ | |
| # LOGGING SETUP | |
| # ============================================================================ | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| datefmt='%H:%M:%S' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # ============================================================================ | |
| # HELPER FUNCTIONS | |
| # ============================================================================ | |
| def safe_skew(x): | |
| clean_x = x[~np.isnan(x)] | |
| return skew(clean_x) if len(clean_x) >= 3 else 0.0 | |
| def safe_kurtosis(x): | |
| clean_x = x[~np.isnan(x)] | |
| return kurtosis(clean_x) if len(clean_x) >= 3 else 0.0 | |
| def min_max_scale(series): | |
| if len(series) == 0: | |
| return pd.Series([]) | |
| min_val, max_val = series.min(), series.max() | |
| if max_val - min_val == 0: | |
| return pd.Series(np.zeros(len(series)), index=series.index) | |
| return (series - min_val) / (max_val - min_val) | |
| def safe_entropy(series): | |
| try: | |
| clean_series = series.dropna() | |
| if len(clean_series) < 5: | |
| return 0.0 | |
| if clean_series.nunique() == 1: | |
| return 0.0 | |
| hist, _ = np.histogram(clean_series, bins=10, density=True) | |
| hist = hist[hist > 0] | |
| if len(hist) == 0: | |
| return 0.0 | |
| return -np.sum(hist * np.log(hist)) | |
| except: | |
| return 0.0 | |
| # ============================================================================ | |
| # INSTITUTIONAL-GRADE CANDLESTICK PATTERN DETECTION | |
| # ============================================================================ | |
| def gravestone_doji(o, h, l, c): | |
| """ | |
| Gravestone Doji: Death at the top | |
| Institutional criteria: | |
| - Body <= 2% of range | |
| - Upper shadow >= 66% of range | |
| - Lower shadow <= 10% of range | |
| """ | |
| try: | |
| body = abs(c - o) | |
| upper_shadow = h - max(o, c) | |
| lower_shadow = min(o, c) - l | |
| total_range = h - l | |
| if total_range < 1e-6: | |
| return 0 | |
| body_ratio = body / total_range | |
| upper_ratio = upper_shadow / total_range | |
| lower_ratio = lower_shadow / total_range | |
| return int( | |
| body_ratio <= 0.02 and | |
| upper_ratio >= 0.66 and | |
| lower_ratio <= 0.10 | |
| ) | |
| except: | |
| return 0 | |
| def four_price_doji(o, h, l, c): | |
| """ | |
| Four Price Doji: Extreme indecision | |
| All prices equal within 0.1% tolerance | |
| """ | |
| try: | |
| prices = [o, h, l, c] | |
| avg_price = np.mean(prices) | |
| if avg_price < 1e-6: | |
| return 0 | |
| max_deviation = max(abs(p - avg_price) / avg_price for p in prices) | |
| return int(max_deviation <= 0.001) | |
| except: | |
| return 0 | |
| def doji(o, h, l, c): | |
| """ | |
| Standard Doji: Indecision | |
| Institutional criteria: | |
| - Body <= 5% of range | |
| - Both shadows >= 20% of range | |
| """ | |
| try: | |
| body = abs(c - o) | |
| upper_shadow = h - max(o, c) | |
| lower_shadow = min(o, c) - l | |
| total_range = h - l | |
| if total_range < 1e-6: | |
| return 0 | |
| body_ratio = body / total_range | |
| upper_ratio = upper_shadow / total_range | |
| lower_ratio = lower_shadow / total_range | |
| return int( | |
| body_ratio <= 0.05 and | |
| upper_ratio >= 0.20 and | |
| lower_ratio >= 0.20 | |
| ) | |
| except: | |
| return 0 | |
| def spinning_top(o, h, l, c): | |
| """ | |
| Spinning Top: Market confusion | |
| Institutional criteria: | |
| - Body <= 33% of range | |
| - Both shadows >= 25% of range each | |
| """ | |
| try: | |
| body = abs(c - o) | |
| upper_shadow = h - max(o, c) | |
| lower_shadow = min(o, c) - l | |
| total_range = h - l | |
| if total_range < 1e-6: | |
| return 0 | |
| body_ratio = body / total_range | |
| upper_ratio = upper_shadow / total_range | |
| lower_ratio = lower_shadow / total_range | |
| return int( | |
| body_ratio <= 0.33 and | |
| upper_ratio >= 0.25 and | |
| lower_ratio >= 0.25 | |
| ) | |
| except: | |
| return 0 | |
| def bullish_candle(o, h, l, c): | |
| """ | |
| Bullish Candle: Strong buying | |
| Institutional criteria: | |
| - Body >= 60% of range | |
| - Close > Open | |
| - Upper shadow <= 15% of range | |
| """ | |
| try: | |
| if c <= o: | |
| return 0 | |
| body = c - o | |
| total_range = h - l | |
| upper_shadow = h - c | |
| if total_range < 1e-6: | |
| return 0 | |
| body_ratio = body / total_range | |
| upper_ratio = upper_shadow / total_range | |
| return int(body_ratio >= 0.60 and upper_ratio <= 0.15) | |
| except: | |
| return 0 | |
| def bearish_candle(o, h, l, c): | |
| """ | |
| Bearish Candle: Strong selling | |
| Institutional criteria: | |
| - Body >= 60% of range | |
| - Close < Open | |
| - Lower shadow <= 15% of range | |
| """ | |
| try: | |
| if c >= o: | |
| return 0 | |
| body = o - c | |
| total_range = h - l | |
| lower_shadow = c - l | |
| if total_range < 1e-6: | |
| return 0 | |
| body_ratio = body / total_range | |
| lower_ratio = lower_shadow / total_range | |
| return int(body_ratio >= 0.60 and lower_ratio <= 0.15) | |
| except: | |
| return 0 | |
| def dragonfly_candle(o, h, l, c): | |
| """ | |
| Dragonfly Doji: Bullish reversal | |
| Institutional criteria: | |
| - Body <= 5% of range | |
| - Lower shadow >= 66% of range | |
| - Upper shadow <= 10% of range | |
| """ | |
| try: | |
| body = abs(c - o) | |
| upper_shadow = h - max(o, c) | |
| lower_shadow = min(o, c) - l | |
| total_range = h - l | |
| if total_range < 1e-6: | |
| return 0 | |
| body_ratio = body / total_range | |
| upper_ratio = upper_shadow / total_range | |
| lower_ratio = lower_shadow / total_range | |
| return int( | |
| body_ratio <= 0.05 and | |
| lower_ratio >= 0.66 and | |
| upper_ratio <= 0.10 | |
| ) | |
| except: | |
| return 0 | |
| def spinning_top_bearish_followup(c1, c2): | |
| """ | |
| Spinning top followed by bearish candle | |
| Indicates weakness after indecision | |
| """ | |
| try: | |
| return int(spinning_top(*c1) == 1 and bearish_candle(*c2) == 1) | |
| except: | |
| return 0 | |
| def bullish_candle_followed_by_dragonfly(c1, c2): | |
| """ | |
| Bullish candle + dragonfly = strong support | |
| Institutional continuation pattern | |
| """ | |
| try: | |
| return int( | |
| bullish_candle(*c1) == 1 and | |
| dragonfly_candle(*c2) == 1 and | |
| c2[3] >= c1[3] # Second close >= first close | |
| ) | |
| except: | |
| return 0 | |
| # Support/Resistance functions (unchanged) | |
| def find_supports(p, df): | |
| try: | |
| return list(df['Low'][(df['Low'].shift(1) > df['Low']) & | |
| (df['Low'].shift(-1) > df['Low']) & | |
| (df['Low'] < p)]) | |
| except: | |
| return [] | |
| def find_resistances(p, df): | |
| try: | |
| return list(df['High'][(df['High'].shift(1) < df['High']) & | |
| (df['High'].shift(-1) < df['High']) & | |
| (df['High'] > p)]) | |
| except: | |
| return [] | |
| def find_stop_level(p, df): | |
| try: | |
| lows = df['Low'][-10:] | |
| mins = lows[(lows.shift(1) > lows) & (lows.shift(-1) > lows)] | |
| below = mins[mins < p] | |
| return float(below.max()) if not below.empty else None | |
| except: | |
| return None | |
| def dist_to_nearest(p, levels): | |
| try: | |
| return float(min(abs(p - x) for x in levels)) if levels else -1.0 | |
| except: | |
| return -1.0 | |
| def cluster_strength(levels): | |
| try: | |
| if not levels: return 0.0 | |
| levels = sorted(levels) | |
| clusters = 0 | |
| i = 0 | |
| while i < len(levels): | |
| j, count = i+1, 1 | |
| while j < len(levels) and abs(levels[j]-levels[i]) <= 0.1: | |
| count += 1 | |
| j += 1 | |
| if count > 1: | |
| clusters += count | |
| i = j | |
| return float(clusters) | |
| except: | |
| return 0.0 | |
| # ============================================================================ | |
| # REGIME DETECTOR (INTERNAL ONLY) | |
| # ============================================================================ | |
| class RegimeDetector: | |
| """Latent regime detection for adaptive normalization""" | |
| def __init__(self, config=REGIME_CONFIG): | |
| self.config = config | |
| self.regime_history = deque(maxlen=config['regime_memory']) | |
| def detect_regime(self, df): | |
| if len(df) < 30: | |
| return self._default_regime() | |
| try: | |
| returns = df['Close'].pct_change().dropna() | |
| current_vol = returns.rolling(20).std().iloc[-1] | |
| vol_history = returns.rolling(20).std().dropna() | |
| vol_percentile = percentileofscore(vol_history, current_vol) / 100 | |
| low_vol_weight = self._sigmoid(self.config['vol_low_threshold'] - vol_percentile, 10) | |
| high_vol_weight = self._sigmoid(vol_percentile - self.config['vol_high_threshold'], 10) | |
| medium_vol_weight = max(0, 1 - low_vol_weight - high_vol_weight) | |
| momentum = (df['Close'].iloc[-1] / df['Close'].iloc[-20] - 1) if len(df) >= 20 else 0 | |
| trend_strength = abs(momentum) | |
| trending_weight = self._sigmoid(trend_strength - self.config['trend_threshold'], 5) | |
| price_entropy = safe_entropy(df['Close'].pct_change().dropna().tail(50)) | |
| mean_rev_weight = self._sigmoid(price_entropy - self.config['entropy_threshold'], 2) | |
| regime_weights = { | |
| 'low_vol': float(low_vol_weight), | |
| 'medium_vol': float(medium_vol_weight), | |
| 'high_vol': float(high_vol_weight), | |
| 'trending': float(trending_weight), | |
| 'mean_reverting': float(mean_rev_weight), | |
| } | |
| self.regime_history.append(regime_weights) | |
| return self._smooth_regime(regime_weights) | |
| except Exception as e: | |
| logger.debug(f"Regime detection failed: {e}") | |
| return self._default_regime() | |
| def _sigmoid(self, x, steepness=1): | |
| """Numerically stable sigmoid""" | |
| z = np.clip(-steepness * x, -500, 500) # Prevent overflow | |
| return 1 / (1 + np.exp(z)) | |
| def _smooth_regime(self, current_regime): | |
| """Safe EWMA smoothing with NaN handling""" | |
| if len(self.regime_history) < 2: | |
| return current_regime | |
| alpha = 0.3 | |
| smoothed = current_regime.copy() | |
| for key in ['low_vol', 'medium_vol', 'high_vol', 'trending', 'mean_reverting']: | |
| historical = [r[key] for r in self.regime_history if key in r] | |
| historical = [v for v in historical if not (np.isnan(v) or np.isinf(v))] | |
| if len(historical) > 0: | |
| hist_mean = float(np.mean(historical)) | |
| smoothed[key] = alpha * current_regime[key] + (1-alpha) * hist_mean | |
| else: | |
| smoothed[key] = current_regime[key] | |
| return smoothed | |
| def _default_regime(self): | |
| return { | |
| 'low_vol': 0.33, | |
| 'medium_vol': 0.34, | |
| 'high_vol': 0.33, | |
| 'trending': 0.5, | |
| 'mean_reverting': 0.5, | |
| } | |
| # ============================================================================ | |
| # ADAPTIVE NORMALIZER | |
| # ============================================================================ | |
| class AdaptiveNormalizer: | |
| """Regime-aware normalization""" | |
| def normalize(self, feature_series, regime_weights): | |
| if len(feature_series) < 20: | |
| return self._zscore_normalize(feature_series) | |
| try: | |
| z_standard = self._zscore_normalize(feature_series) | |
| z_robust = self._robust_normalize(feature_series) | |
| vol_weight = regime_weights['high_vol'] | |
| z_adaptive = (1 - vol_weight) * z_standard + vol_weight * z_robust | |
| return np.clip(z_adaptive, -5, 5) | |
| except: | |
| return self._zscore_normalize(feature_series) | |
| def _zscore_normalize(self, series): | |
| mu = series.mean() | |
| sigma = series.std() | |
| return (series - mu) / (sigma + 1e-10) if sigma > 1e-8 else series * 0 | |
| def _robust_normalize(self, series): | |
| q25 = series.quantile(0.25) | |
| q75 = series.quantile(0.75) | |
| iqr = q75 - q25 | |
| median = series.median() | |
| return (series - median) / (iqr + 1e-10) if iqr > 1e-8 else series * 0 | |
| # ============================================================================ | |
| # INTEGRATED FEATURE ENHANCER (60 FEATURES STRICT) | |
| # ============================================================================ | |
| class IntegratedFeatureEnhancer: | |
| def __init__(self, ably_client, agent_names, window_size=100): | |
| self.ably = ably_client | |
| self.agent_names = agent_names | |
| self.window_size = window_size | |
| self.price_buffers = {name: deque(maxlen=window_size) for name in agent_names} | |
| # Internal regime components | |
| self.regime_detector = RegimeDetector() | |
| self.adaptive_normalizer = AdaptiveNormalizer() | |
| # Channels | |
| self.features_channel = ably_client.channels.get("integrated_features_all") | |
| self.meta_channels = { | |
| name: ably_client.channels.get(f"meta_features-{name}") | |
| for name in agent_names | |
| } | |
| self.latest_computed_features = {} | |
| self.features_lock = threading.Lock() | |
| logger.info(f"Regime-Adaptive Feature Enhancer initialized") | |
| logger.info( | |
| f"Contract: version={FEATURE_CONTRACT.version} " | |
| f"features={len(FEATURE_CONTRACT.features)} " | |
| f"envelope={len(FEATURE_CONTRACT.envelope)} " | |
| f"(invariants enforced at import time)" | |
| ) | |
| # Defensive re-check at instantiation. The contract's __post_init__ | |
| # already verified this at import, but a runtime assert catches | |
| # anyone monkey-patching FEATURE_CONTRACT.features before first use. | |
| assert len(FEATURE_CONTRACT.features) == EXPECTED_FEATURE_COUNT, ( | |
| f"Feature count mismatch at runtime: " | |
| f"{len(FEATURE_CONTRACT.features)} != {EXPECTED_FEATURE_COUNT}" | |
| ) | |
| def compute_core_technical_features(self, df): | |
| """Compute 19 core technical indicators with robust edge case handling""" | |
| df = df.copy() | |
| eps = 1e-10 | |
| # Suppress warnings during computation | |
| with warnings.catch_warnings(): | |
| warnings.simplefilter("ignore", RuntimeWarning) | |
| df['log_return'] = np.log(df['Close'] / df['Close'].shift(1)).replace([np.inf, -np.inf], 0).fillna(0) | |
| df['rolling_mean_5'] = df['Close'].rolling(5, min_periods=1).mean().fillna(df['Close']) | |
| df['rolling_std_5'] = df['Close'].rolling(5, min_periods=1).std().fillna(eps) | |
| df['rolling_std_5'] = df['rolling_std_5'].replace(0, eps) | |
| df['zscore_5'] = (df['Close'] - df['rolling_mean_5']) / df['rolling_std_5'] | |
| # RSI | |
| delta = df['Close'].diff().fillna(0) | |
| gain = np.where(delta > 0, delta, 0) | |
| loss = np.where(delta < 0, -delta, 0) | |
| avg_gain = pd.Series(gain).rolling(14, min_periods=1).mean().fillna(0) | |
| avg_loss = pd.Series(loss).rolling(14, min_periods=1).mean().fillna(0) | |
| rs = avg_gain / (avg_loss + eps) | |
| df['rsi_14'] = 100 - (100 / (1 + rs)) | |
| df['rsi_14'] = df['rsi_14'].ewm(span=5, adjust=False).mean().fillna(50) | |
| # MACD | |
| ema12 = df['Close'].ewm(span=12, adjust=False).mean() | |
| ema26 = df['Close'].ewm(span=26, adjust=False).mean() | |
| df['macd'] = ema12 - ema26 | |
| df['macd_signal'] = df['macd'].ewm(span=9, adjust=False).mean() | |
| df['macd_hist'] = df['macd'] - df['macd_signal'] | |
| # ATR | |
| high_low = df['High'] - df['Low'] | |
| high_close = np.abs(df['High'] - df['Close'].shift(1)) | |
| low_close = np.abs(df['Low'] - df['Close'].shift(1)) | |
| tr = np.maximum.reduce([high_low, high_close, low_close]) | |
| df['atr'] = pd.Series(tr).rolling(14, min_periods=1).mean().fillna(0) | |
| # CDF features | |
| window = min(100, len(df)) | |
| if window >= 20: | |
| df['cdf_value'] = df['log_return'].rolling(window, min_periods=10).apply( | |
| lambda x: percentileofscore(x.dropna(), x.iloc[-1]) / 100 if len(x.dropna()) > 10 else 0.5 | |
| ).fillna(0.5) | |
| else: | |
| df['cdf_value'] = 0.5 | |
| df['cdf_value'] = df['cdf_value'].ffill().bfill().fillna(0.5) | |
| df['cdf_slope'] = df['cdf_value'].diff().ewm(span=5, adjust=False).mean().fillna(0) | |
| df['cdf_diff'] = (df['cdf_value'] - df['cdf_value'].shift(10)).fillna(0) | |
| df['cdf_diff'] = df['cdf_diff'].ewm(span=5, adjust=False).mean().fillna(0) | |
| # Volatility | |
| df['volatility_quantile_90'] = df['rolling_std_5'].rolling( | |
| min(100, len(df)), min_periods=20 | |
| ).quantile(0.9).fillna(df['rolling_std_5']) | |
| df['volatility_ratio'] = df['rolling_std_5'] / (df['volatility_quantile_90'] + eps) | |
| df['volatility_ratio'] = df['volatility_ratio'].clip(0, 3).fillna(1.0) | |
| # Entropy | |
| df['entropy_50'] = df['log_return'].rolling( | |
| min(50, len(df)), min_periods=20 | |
| ).apply(safe_entropy).fillna(0) | |
| # Autocorrelation | |
| df['autocorr_3'] = df['log_return'].rolling(20, min_periods=5).apply( | |
| lambda x: x.autocorr(lag=3) if len(x) > 3 else 0 | |
| ).fillna(0) | |
| # Momentum | |
| df['momentum_10'] = (df['Close'] / df['Close'].shift(10) - 1).fillna(0) | |
| # Volume | |
| df['volume_change_rate'] = df['Volume'].pct_change().replace([np.inf, -np.inf], 0).fillna(0) | |
| vol_mean = df['Volume'].rolling(20, min_periods=1).mean() | |
| vol_std = df['Volume'].rolling(20, min_periods=1).std().fillna(eps) | |
| df['volume_zscore'] = ((df['Volume'] - vol_mean) / (vol_std + eps)).clip(-3, 3).fillna(0) | |
| return df | |
| def compute_derivative_features(self, df, window=10): | |
| """Compute 15 derivative features with robust handling""" | |
| df = df.copy() | |
| df['price_vel'] = df['Close'].diff() | |
| df['price_acc'] = df['price_vel'].diff() | |
| df['price_jrk'] = df['price_acc'].diff() | |
| for col in ['price_vel', 'price_acc', 'price_jrk']: | |
| try: | |
| # Use fillna(0) to handle edge cases | |
| df[f'{col}_mean'] = df[col].rolling(window, min_periods=1).mean().fillna(0) | |
| df[f'{col}_std'] = df[col].rolling(window, min_periods=1).std().fillna(0) | |
| df[f'{col}_skew'] = df[col].rolling(window, min_periods=3).apply( | |
| safe_skew, raw=True | |
| ).fillna(0) | |
| df[f'{col}_kurtosis'] = df[col].rolling(window, min_periods=3).apply( | |
| safe_kurtosis, raw=True | |
| ).fillna(0) | |
| except Exception as e: | |
| logger.debug(f"Derivative feature {col} computation failed: {e}") | |
| df[f'{col}_mean'] = 0 | |
| df[f'{col}_std'] = 0 | |
| df[f'{col}_skew'] = 0 | |
| df[f'{col}_kurtosis'] = 0 | |
| return df | |
| def compute_additional_technical(self, df): | |
| """Compute 7 additional technical features""" | |
| df = df.copy() | |
| eps = 1e-10 | |
| df['ma10'] = df['Close'].rolling(10, min_periods=1).mean() | |
| df['ma20'] = df['Close'].rolling(20, min_periods=1).mean() | |
| df['std20'] = df['Close'].rolling(20, min_periods=1).std() | |
| df['bollinger_upper'] = df['ma20'] + 2 * df['std20'] | |
| df['bollinger_lower'] = df['ma20'] - 2 * df['std20'] | |
| df['bollinger_width'] = (df['bollinger_upper'] - df['bollinger_lower']) / (df['ma20'] + eps) | |
| df['bollinger_position'] = (df['Close'] - df['bollinger_lower']) / (df['bollinger_upper'] - df['bollinger_lower'] + eps) | |
| df['bollinger_position'] = df['bollinger_position'].clip(0, 1) | |
| return df | |
| def compute_candlestick_patterns(self, df): | |
| """Compute 9 institutional-grade candlestick patterns""" | |
| df = df.copy() | |
| if 'Open' not in df.columns: | |
| df['Open'] = df['Close'] | |
| patterns = [ | |
| ('gravestone_doji', gravestone_doji), | |
| ('four_price_doji', four_price_doji), | |
| ('doji', doji), | |
| ('spinning_top', spinning_top), | |
| ('bullish_candle', bullish_candle), | |
| ('bearish_candle', bearish_candle), | |
| ('dragonfly_candle', dragonfly_candle) | |
| ] | |
| for name, func in patterns: | |
| df[name] = df.apply( | |
| lambda r: func(r['Open'], r['High'], r['Low'], r['Close']), | |
| axis=1 | |
| ) | |
| df['spinning_top_bearish_followup'] = 0 | |
| df['bullish_then_dragonfly'] = 0 | |
| for i in range(1, len(df)): | |
| c1 = tuple(df.iloc[i-1][['Open', 'High', 'Low', 'Close']]) | |
| c2 = tuple(df.iloc[i][['Open', 'High', 'Low', 'Close']]) | |
| df.at[df.index[i], 'spinning_top_bearish_followup'] = spinning_top_bearish_followup(c1, c2) | |
| df.at[df.index[i], 'bullish_then_dragonfly'] = bullish_candle_followed_by_dragonfly(c1, c2) | |
| return df | |
| def compute_support_resistance_features(self, df): | |
| """Compute 7 support/resistance features""" | |
| df = df.copy() | |
| if len(df) < 10: | |
| df['distance_to_nearest_support'] = 0.0 | |
| df['distance_to_nearest_resistance'] = 0.0 | |
| df['near_support'] = 0 | |
| df['near_resistance'] = 0 | |
| df['distance_to_stop_loss'] = 0.5 | |
| df['support_strength'] = 0.0 | |
| df['resistance_strength'] = 0.0 | |
| return df | |
| current_price = df['Close'].iloc[-1] | |
| supports = find_supports(current_price, df) | |
| resistances = find_resistances(current_price, df) | |
| stop_level = find_stop_level(current_price, df) | |
| min_p, max_p = df['Low'].min(), df['High'].max() | |
| rng = max_p - min_p if max_p > min_p else 1 | |
| df['distance_to_nearest_support'] = dist_to_nearest(current_price, supports) | |
| df['distance_to_nearest_resistance'] = dist_to_nearest(current_price, resistances) | |
| df['near_support'] = int(any(abs(current_price - s) < 0.3 for s in supports)) if supports else 0 | |
| df['near_resistance'] = int(any(abs(current_price - r) < 0.3 for r in resistances)) if resistances else 0 | |
| df['distance_to_stop_loss'] = (current_price - stop_level) / rng if stop_level else 0.5 | |
| df['support_strength'] = cluster_strength([s/rng for s in supports]) | |
| df['resistance_strength'] = cluster_strength([r/rng for r in resistances]) | |
| return df | |
| def _validate_feature_contract(self, features_dict): | |
| """ | |
| Delegate to FEATURE_CONTRACT.validate() and return a legacy | |
| 3-tuple (is_valid, missing, extra) for call-site back-compat. | |
| `extra` in the legacy contract conflated two distinct failure | |
| modes — envelope leakage and unknown keys. We preserve the | |
| 3-tuple shape but keep them merged; richer diagnostics are | |
| available by calling FEATURE_CONTRACT.validate() directly. | |
| """ | |
| result = FEATURE_CONTRACT.validate(features_dict) | |
| extra = result.leaked_envelope | result.unexpected | |
| return result.ok, result.missing, extra | |
| def compute_all_features(self, df): | |
| """ | |
| Compute exactly 60 features with regime-adaptive normalization | |
| Regime detection is internal - NOT published | |
| """ | |
| try: | |
| if len(df) < 10: | |
| return pd.DataFrame() | |
| # Step 1: Compute raw features | |
| df = self.compute_core_technical_features(df) | |
| df = self.compute_derivative_features(df) | |
| df = self.compute_additional_technical(df) | |
| df = self.compute_candlestick_patterns(df) | |
| df = self.compute_support_resistance_features(df) | |
| # Step 2: Internal regime detection | |
| regime_weights = self.regime_detector.detect_regime(df) | |
| # Step 3: Apply adaptive normalization ONLY to continuous features | |
| continuous_features = [ | |
| 'log_return', 'rolling_std_5', 'zscore_5', 'rsi_14', | |
| 'macd', 'macd_signal', 'macd_hist', 'atr', | |
| 'cdf_value', 'cdf_slope', 'cdf_diff', | |
| 'volatility_ratio', 'entropy_50', 'autocorr_3', 'momentum_10', | |
| 'volume_change_rate', 'volume_zscore', | |
| 'price_vel_mean', 'price_acc_mean', 'price_jrk_mean', | |
| 'price_vel_std', 'price_acc_std', 'price_jrk_std', | |
| 'price_vel_skew', 'price_acc_skew', 'price_jrk_skew', | |
| 'price_vel_kurtosis', 'price_acc_kurtosis', 'price_jrk_kurtosis', | |
| 'bollinger_width', 'bollinger_position', | |
| 'distance_to_nearest_support', 'distance_to_nearest_resistance', | |
| 'distance_to_stop_loss', 'support_strength', 'resistance_strength' | |
| ] | |
| for feature in continuous_features: | |
| if feature in df.columns and feature not in NORMALIZATION_EXCLUSIONS: | |
| df[feature] = self.adaptive_normalizer.normalize( | |
| df[feature], regime_weights | |
| ) | |
| # Clean infinities and NaNs | |
| df = df.replace([np.inf, -np.inf], np.nan) | |
| df = df.ffill().bfill().fillna(0) | |
| return df | |
| except Exception as e: | |
| logger.error(f"Feature computation failed: {e}") | |
| return pd.DataFrame() | |
| def extract_meta_features(self, df, current_price): | |
| """Extract exactly 24 meta features (23 + timestamp)""" | |
| try: | |
| if len(df) < 10: | |
| return {} | |
| supports = find_supports(current_price, df) | |
| resistances = find_resistances(current_price, df) | |
| stop_level = find_stop_level(current_price, df) | |
| min_p, max_p = df['Low'].min(), df['High'].max() | |
| rng = max_p - min_p if max_p > min_p else 1 | |
| # Voting features (8) | |
| voting = { | |
| 'distance_to_nearest_support_scaled': dist_to_nearest(current_price, supports) / rng if rng > 0 else 0.0, | |
| 'distance_to_nearest_resistance_scaled': dist_to_nearest(current_price, resistances) / rng if rng > 0 else 0.0, | |
| 'near_support': int(any(abs(current_price - s) < 0.3 for s in supports)) if supports else 0, | |
| 'near_resistance': int(any(abs(current_price - r) < 0.3 for r in resistances)) if resistances else 0, | |
| 'distance_to_stop_loss_scaled': (current_price - stop_level) / rng if stop_level and rng > 0 else 0.5, | |
| 'support_strength_scaled': cluster_strength([s/rng for s in supports]) if rng > 0 else 0.0, | |
| 'resistance_strength_scaled': cluster_strength([r/rng for r in resistances]) if rng > 0 else 0.0, | |
| 'close_price': float(current_price) | |
| } | |
| # Filtered technical (15) | |
| latest = df.iloc[-1] | |
| feature_mappings = [ | |
| ('price_vel', 'price_vel_scaled'), | |
| ('price_acc', 'price_acc_scaled'), | |
| ('price_jrk', 'price_jrk_scaled'), | |
| ('price_vel_mean', 'price_vel_mean_scaled'), | |
| ('price_acc_mean', 'price_acc_mean_scaled'), | |
| ('price_jrk_mean', 'price_jrk_mean_scaled'), | |
| ('ma10', 'ma10_scaled'), | |
| ('ma20', 'ma20_scaled'), | |
| ('bollinger_upper', 'bollinger_upper_scaled'), | |
| ('bollinger_lower', 'bollinger_lower_scaled'), | |
| ('macd', 'macd_scaled'), | |
| ('macd_signal', 'macd_signal_scaled'), | |
| ('macd_hist', 'macd_hist_scaled'), | |
| ('rsi_14', 'rsi_scaled'), | |
| ('std20', 'std20_scaled') | |
| ] | |
| filtered = {} | |
| for df_col, meta_col in feature_mappings: | |
| if df_col in latest.index: | |
| filtered[meta_col] = float(latest[df_col]) | |
| else: | |
| filtered[meta_col] = 0.0 | |
| meta_features = {**filtered, **voting} | |
| # Validate count (23 features, timestamp added later) | |
| if len(meta_features) != 23: | |
| logger.error(f"Meta feature count violation: {len(meta_features)} != 23") | |
| return {} | |
| return meta_features | |
| except Exception as e: | |
| logger.error(f"Meta feature extraction failed: {e}") | |
| return {} | |
| def process_raw_tick(self, agent_name, price_data): | |
| """Process tick and enforce 60-feature contract""" | |
| try: | |
| close_price = price_data.get('close', 0) | |
| self.price_buffers[agent_name].append({ | |
| 'Close': close_price, | |
| 'High': price_data.get('high', close_price), | |
| 'Low': price_data.get('low', close_price), | |
| 'Volume': price_data.get('volume', 0), | |
| 'Open': price_data.get('open', close_price) | |
| }) | |
| if len(self.price_buffers[agent_name]) < 30: | |
| return | |
| df = pd.DataFrame(list(self.price_buffers[agent_name])) | |
| enhanced_df = self.compute_all_features(df) | |
| if enhanced_df.empty: | |
| return | |
| # CRITICAL FIX: Only extract computed features, not raw OHLCV | |
| latest_row = enhanced_df.iloc[-1] | |
| # Extract only REQUIRED_FEATURES (excluding raw OHLCV columns) | |
| latest_features = {} | |
| for feature in REQUIRED_FEATURES: | |
| if feature in ['price', 'close_scaled', 'close_price']: | |
| # These are price variants we add manually | |
| latest_features[feature] = float(close_price) | |
| elif feature in latest_row.index: | |
| latest_features[feature] = float(latest_row[feature]) | |
| else: | |
| logger.warning(f"[{agent_name}] Missing feature: {feature}") | |
| latest_features[feature] = 0.0 | |
| # ENFORCE CONTRACT — use the rich ValidationResult directly so we | |
| # log three distinct failure modes separately instead of collapsing | |
| # them into a single ambiguous "Missing / Extra" pair. | |
| validation = FEATURE_CONTRACT.validate(latest_features) | |
| if not validation.ok: | |
| logger.error("=" * 80) | |
| logger.error( | |
| f"❌ [{agent_name}] FEATURE CONTRACT VIOLATION " | |
| f"(contract={FEATURE_CONTRACT.version})" | |
| ) | |
| for line in validation.as_error_lines(): | |
| logger.error(f" {line}") | |
| logger.error("=" * 80) | |
| # Bookkeeping counter — lets ops tell the difference between | |
| # "feed is dry" and "feed is arriving but contract is broken". | |
| if not hasattr(self, '_contract_violation_counts'): | |
| self._contract_violation_counts = {} | |
| self._contract_violation_counts[agent_name] = ( | |
| self._contract_violation_counts.get(agent_name, 0) + 1 | |
| ) | |
| return | |
| with self.features_lock: | |
| self.latest_computed_features[agent_name] = latest_features.copy() | |
| except Exception as e: | |
| logger.error(f"[{agent_name}] Feature enhancement failed: {e}") | |
| async def publish_features(self, agent_name, features_dict, tick_index=None): | |
| """ | |
| Publish 60 features on the wire. Payload shape is enforced by | |
| FEATURE_CONTRACT.build_payload() — envelope keys live at the | |
| top level, feature keys live ONLY inside payload['features'], | |
| and a contract_version string accompanies every message so the | |
| consumer can detect schema drift. | |
| """ | |
| try: | |
| # Defensive re-validation at the publish boundary. Zero cost on | |
| # the happy path; catches any mutation between compute and | |
| # publish (e.g. a caller accidentally injecting envelope keys | |
| # into the features dict). | |
| validation = FEATURE_CONTRACT.validate(features_dict) | |
| if not validation.ok: | |
| logger.error( | |
| f"[{agent_name}] publish BLOCKED — contract violation at " | |
| f"publish boundary: {validation.as_error_lines()}" | |
| ) | |
| return | |
| # Coerce numpy scalars to native floats so the JSON serialiser | |
| # doesn't choke. Done on the features-only dict, inside the | |
| # contract shape. | |
| clean_features = { | |
| k: float(v) if isinstance(v, (np.floating, np.integer)) else v | |
| for k, v in features_dict.items() | |
| } | |
| # Resolve tick_index: caller may pass it explicitly, or it may | |
| # be embedded in the dict (legacy path). Envelope keys should | |
| # NOT be inside features_dict after the validation above, so | |
| # these .get() calls will normally return None — kept for | |
| # defensive back-compat. | |
| resolved_tick = tick_index | |
| if resolved_tick is None: | |
| resolved_tick = ( | |
| features_dict.get('tick_count') | |
| or features_dict.get('tick_index') | |
| ) | |
| payload = FEATURE_CONTRACT.build_payload( | |
| agent_name = agent_name, | |
| features_dict = clean_features, | |
| tick_index = resolved_tick, | |
| timestamp_iso = datetime.now(UTC).isoformat(), | |
| ) | |
| await self.features_channel.publish("integrated-features", payload) | |
| except Exception as e: | |
| logger.error(f"[{agent_name}] Feature publish failed: {e}") | |
| async def publish_meta_features(self, agent_name, meta_features): | |
| """Publish 24 meta features""" | |
| try: | |
| channel = self.meta_channels[agent_name] | |
| clean_meta = { | |
| k: float(v) if isinstance(v, (np.floating, np.integer)) else v | |
| for k, v in meta_features.items() | |
| } | |
| clean_meta['agent'] = agent_name | |
| clean_meta['timestamp'] = datetime.now(UTC).isoformat() | |
| await channel.publish("meta_features", clean_meta) | |
| except Exception as e: | |
| logger.error(f"[{agent_name}] Meta feature publish failed: {e}") | |
| def get_latest_state_features(self, agent_name=None): | |
| """Get latest features with type-aware aggregation""" | |
| with self.features_lock: | |
| if agent_name: | |
| return self.latest_computed_features.get(agent_name, {}) | |
| if not self.latest_computed_features: | |
| return {} | |
| all_features = list(self.latest_computed_features.values()) | |
| if not all_features: | |
| return {} | |
| return self._safe_aggregate_features(all_features) | |
| def _safe_aggregate_features(self, all_features): | |
| """Type-aware feature aggregation across agents""" | |
| avg_features = {} | |
| feature_keys = all_features[0].keys() | |
| for key in feature_keys: | |
| values = [f[key] for f in all_features if key in f] | |
| if not values: | |
| continue | |
| if key in BINARY_FEATURES: | |
| # Voting for binary features | |
| avg_features[key] = int(np.sum(values) > len(values) / 2) | |
| elif key in PRICE_FEATURES: | |
| # Median for price features (robust to outliers) | |
| clean_values = [v for v in values if not np.isnan(v)] | |
| if clean_values: | |
| avg_features[key] = float(np.median(clean_values)) | |
| else: | |
| avg_features[key] = 0.0 | |
| else: | |
| # Mean for continuous features | |
| clean_values = [v for v in values if not np.isnan(v)] | |
| if clean_values: | |
| avg_features[key] = float(np.mean(clean_values)) | |
| else: | |
| avg_features[key] = 0.0 | |
| return avg_features | |
| def get_feature_summary(self): | |
| """Get detailed feature summary""" | |
| with self.features_lock: | |
| if not self.latest_computed_features: | |
| return "No features computed yet" | |
| sample_agent = list(self.latest_computed_features.keys())[0] | |
| features = self.latest_computed_features[sample_agent] | |
| # Count only keys that are actually declared features in the | |
| # contract. This is set-intersection, not set-difference — so | |
| # it's correct regardless of whether envelope keys have leaked | |
| # into the features dict or not. | |
| actual_count = len(set(features.keys()) & FEATURE_CONTRACT.features) | |
| summary = f"REGIME-ADAPTIVE FEATURE ENHANCER\n" | |
| summary += "=" * 60 + "\n\n" | |
| summary += f"Total Features: {actual_count} (Expected: 60)\n\n" | |
| summary += "Feature Categories:\n" | |
| summary += f" • Core Technical: 19 features\n" | |
| summary += f" • Derivatives: 15 features\n" | |
| summary += f" • Additional Technical: 7 features\n" | |
| summary += f" • Candlestick Patterns: 9 features (institutional-grade)\n" | |
| summary += f" • Support/Resistance: 7 features\n" | |
| summary += f" • Price Variants: 3 features\n" | |
| summary += f" • TOTAL: 60 features\n\n" | |
| summary += f"Meta Features (24 total, published separately):\n" | |
| summary += f" • Voting: 8 features\n" | |
| summary += f" • Technical: 15 features\n" | |
| summary += f" • Timestamp: 1 metadata\n\n" | |
| summary += f"Regime Detection: INTERNAL (adaptive normalization)\n" | |
| summary += f" • Volatility regimes: low/medium/high\n" | |
| summary += f" • Trend detection: momentum-based\n" | |
| summary += f" • Mean-reversion: entropy-based\n\n" | |
| summary += f"Normalization: Regime-adaptive\n" | |
| summary += f" • High vol → Robust scaling (IQR)\n" | |
| summary += f" • Low vol → Standard z-score\n" | |
| summary += f" • Excluded: {len(NORMALIZATION_EXCLUSIONS)} features\n\n" | |
| summary += f"Aggregation: Type-aware\n" | |
| summary += f" • Binary: Voting (majority rule)\n" | |
| summary += f" • Price: Median (outlier-resistant)\n" | |
| summary += f" • Continuous: Mean\n" | |
| return summary | |
| # ============================================================================ | |
| # ASYNC WRAPPER | |
| # ============================================================================ | |
| class AsyncIntegratedFeatureEnhancer: | |
| def __init__(self, ably_client, agent_names, window_size=100): | |
| self.enhancer = IntegratedFeatureEnhancer(ably_client, agent_names, window_size) | |
| self.ably = ably_client | |
| self.agents = agent_names | |
| self.running = False | |
| self.channels = {} | |
| def get_latest_state_features(self, agent_name=None): | |
| return self.enhancer.get_latest_state_features(agent_name) | |
| async def start(self): | |
| self.running = True | |
| logger.info("AsyncIntegratedFeatureEnhancer started") | |
| logger.info("\n" + self.enhancer.get_feature_summary()) | |
| await self._start_ably_listeners() | |
| async def _start_ably_listeners(self): | |
| if not self.ably: | |
| logger.error("No Ably client available") | |
| return | |
| if hasattr(self.ably, 'connection') and self.ably.connection.state != 'connected': | |
| try: | |
| self.ably.connection.connect() | |
| for _ in range(20): | |
| await asyncio.sleep(0.5) | |
| if self.ably.connection.state == 'connected': | |
| break | |
| else: | |
| logger.error("Failed to connect to Ably") | |
| return | |
| except Exception as e: | |
| logger.error(f"Redis connection failed: {e}") | |
| return | |
| logger.info(f"Starting Ably listeners") | |
| for agent in self.agents: | |
| agent_str = agent.decode('utf-8') if isinstance(agent, bytes) else str(agent) | |
| feature_ok = await self._subscribe_with_retry( | |
| agent_str, "integrated-features", | |
| lambda msg, name=agent_str: self._handle_feature_message(name, msg) | |
| ) | |
| meta_ok = await self._subscribe_with_retry( | |
| agent_str, "meta_features", | |
| lambda msg, name=agent_str: self._handle_meta_features_message(name, msg), | |
| channel_suffix="meta_features-" | |
| ) | |
| if feature_ok: | |
| logger.info(f"✓ [{agent_str}] Feature channel attached") | |
| if meta_ok: | |
| logger.info(f"✓ [{agent_str}] Meta features channel attached") | |
| async def _subscribe_with_retry(self, agent_name, event_name, callback, max_retries=3, timeout=10, channel_suffix=""): | |
| channel_name = f"{channel_suffix}{agent_name}" if channel_suffix else agent_name | |
| for attempt in range(max_retries): | |
| try: | |
| channel = self.ably.channels.get(channel_name) | |
| self.channels[channel_name] = channel | |
| attach_task = asyncio.create_task(channel.attach()) | |
| try: | |
| await asyncio.wait_for(attach_task, timeout=timeout) | |
| except asyncio.TimeoutError: | |
| if attempt < max_retries - 1: | |
| await asyncio.sleep(2 ** attempt) | |
| continue | |
| return False | |
| subscribe_task = asyncio.create_task(channel.subscribe(event_name, callback)) | |
| try: | |
| await asyncio.wait_for(subscribe_task, timeout=timeout) | |
| return True | |
| except asyncio.TimeoutError: | |
| if attempt < max_retries - 1: | |
| await asyncio.sleep(2 ** attempt) | |
| continue | |
| return False | |
| except Exception as e: | |
| if attempt < max_retries - 1: | |
| await asyncio.sleep(2 ** attempt) | |
| continue | |
| return False | |
| return False | |
| async def process_tick(self, agent_name, price_data): | |
| loop = asyncio.get_event_loop() | |
| await loop.run_in_executor(None, self.enhancer.process_raw_tick, agent_name, price_data) | |
| features = self.enhancer.get_latest_state_features(agent_name) | |
| if features: | |
| await self._publish_with_retry(agent_name, features, meta=False) | |
| df = pd.DataFrame(list(self.enhancer.price_buffers[agent_name])) | |
| if len(df) >= 10: | |
| current_price = price_data.get('close', 0) | |
| meta_features = self.enhancer.extract_meta_features(df, current_price) | |
| if meta_features: | |
| await self._publish_with_retry(agent_name, meta_features, meta=True) | |
| async def _publish_with_retry(self, agent_name, features_dict, meta=False, tick_index=None): | |
| channel_name = f"meta_features-{agent_name}" if meta else agent_name | |
| event_name = "meta_features" if meta else "feature" | |
| if channel_name not in self.channels: | |
| self.channels[channel_name] = self.ably.channels.get(channel_name) | |
| channel = self.channels[channel_name] | |
| # Resolve tick_index from the features dict if not supplied | |
| resolved_tick = tick_index | |
| if resolved_tick is None and isinstance(features_dict, dict): | |
| resolved_tick = features_dict.get('tick_count') or features_dict.get('tick_index') | |
| payload = { | |
| 'agent': agent_name, | |
| 'features' if not meta else 'meta_features': features_dict, | |
| 'timestamp': datetime.now(UTC).isoformat(), | |
| 'tick_index': resolved_tick | |
| } | |
| for attempt in range(3): | |
| try: | |
| await channel.publish(event_name, payload) | |
| break | |
| except Exception as e: | |
| await asyncio.sleep(2 ** attempt) | |
| def _handle_feature_message(self, agent_name, msg): | |
| logger.debug(f"[{agent_name}] Feature message received") | |
| def _handle_meta_features_message(self, agent_name, msg): | |
| logger.debug(f"[{agent_name}] Meta feature message received") | |
| # ============================================================================ | |
| # MAIN EXECUTION - DERIV WEBSOCKET VERSION | |
| # ============================================================================ | |
| async def main(): | |
| nest_asyncio.apply() | |
| logger.info("=" * 80) | |
| logger.info("🚀 REGIME-ADAPTIVE FEATURE ENHANCER - DERIV WEBSOCKET EDITION") | |
| logger.info("=" * 80) | |
| # Initialize Deriv WebSocket instead of MT5 | |
| if not await deriv_bridge.initialize(SYMBOL): | |
| raise RuntimeError(f"❌ Deriv initialization failed") | |
| logger.info(f"✅ Deriv WebSocket initialized") | |
| logger.info(f" Symbol: {SYMBOL} -> {DERIV_SYMBOL}") | |
| # Verify symbol | |
| symbol_info = deriv_bridge.symbol_info(SYMBOL) | |
| if symbol_info is None: | |
| await deriv_bridge.shutdown() | |
| raise RuntimeError(f"❌ Symbol {SYMBOL} not found") | |
| logger.info(f"✅ Symbol verified") | |
| logger.info("\n📡 Connecting to Redis (V75 namespace)...") | |
| try: | |
| ably_client = RedisAblyClient(redis_url=REDIS_URL, use_streams=True) # V75 | |
| await asyncio.sleep(1) | |
| logger.info("✅ Redis connected (V75 — channels prefixed with '%s')" % CHANNEL_PREFIX) | |
| except Exception as e: | |
| await deriv_bridge.shutdown() | |
| raise RuntimeError(f"❌ Redis connection failed: {e}") | |
| logger.info("\n🔧 Initializing feature enhancers...") | |
| agent_names = list(TIMEFRAMES.keys()) | |
| enhancer = AsyncIntegratedFeatureEnhancer( | |
| ably_client=ably_client, | |
| agent_names=agent_names, | |
| window_size=100 | |
| ) | |
| await enhancer.start() | |
| agent_channels = {tf: ably_client.channels.get(tf) for tf in TIMEFRAMES} | |
| # ========================================================================= | |
| # BATCH SYNCHRONISATION — now handled by FeatureBatchGateway in Redis | |
| # ========================================================================= | |
| # Features.py's responsibility is ONLY to publish each agent's features to | |
| # its own per-agent Redis channel as soon as they are computed. | |
| # | |
| # The FeatureBatchGateway (in redis_connection_manager.py) subscribes to | |
| # all 8 per-agent channels on the Quasar side and acts as the gating layer: | |
| # • Accumulates per-agent contributions for each tick | |
| # • DISCARDS any partial batch when a new tick_index arrives (waitlist discard) | |
| # • Only fires on_batch_ready() when ALL 8 agents share the same tick/price | |
| # | |
| # This keeps Features.py simple (just publish, no coordination) and moves | |
| # the synchronisation concern to the Redis transport layer where it belongs. | |
| # ========================================================================= | |
| logger.info("\n✅ All systems initialized - Starting tick processing...\n") | |
| tick_count = 0 | |
| last_summary_time = time.time() | |
| feature_counts = {tf: 0 for tf in TIMEFRAMES} | |
| # ── Rate-limit gate ─────────────────────────────────────────────────────── | |
| # Derived from observed p95 latencies in the QSAP health report: | |
| # • Per-agent inference p95 ≈ 1552 ms | |
| # • Dispatch latency p95 ≈ 1292 ms | |
| # With all 8 agents running concurrently (asyncio.gather) the bottleneck | |
| # is max(p95_inference) ≈ 1552 ms. 3 000 ms gives ~93 % headroom and | |
| # guarantees the downstream QSAP never receives a stale tick. | |
| MIN_TICK_INTERVAL = 60.0 # seconds — never dispatch faster than this | |
| _processing = asyncio.Semaphore(1) # only one tick in-flight at a time | |
| async def _process_one_agent(tf_name, price_data, timestamp): | |
| """Process and publish a single timeframe agent concurrently.""" | |
| try: | |
| await enhancer.process_tick(tf_name, price_data) | |
| features = enhancer.get_latest_state_features(tf_name) | |
| if features: | |
| feature_counts[tf_name] += 1 | |
| features_with_meta = { | |
| **features, | |
| 'timestamp': timestamp.isoformat(), | |
| 'tick_count': tick_count, | |
| 'timeframe': tf_name, | |
| } | |
| # Publish to per-agent channel. | |
| # The FeatureBatchGateway in redis_connection_manager.py | |
| # subscribes to all 8 per-agent channels and fires a | |
| # complete batch only when all agents share the same | |
| # tick_index — discarding any partial/stale waitlist. | |
| await agent_channels[tf_name].publish( | |
| "integrated-features", | |
| { | |
| "agent": tf_name, | |
| "features": features_with_meta, | |
| "feature_count": len(features), | |
| "tick_index": tick_count, | |
| "price": price_data['close'], # raw Deriv tick — §0c | |
| }, | |
| ) | |
| if feature_counts[tf_name] % 10 == 0: | |
| logger.info( | |
| f"✅ [{tf_name}] Tick #{tick_count}: " | |
| f"60 features + meta | Price: {price_data['close']:.5f}" | |
| ) | |
| except Exception as e: | |
| logger.error(f"❌ [{tf_name}] Error: {e}") | |
| try: | |
| while True: | |
| tick_start = time.monotonic() | |
| try: | |
| # Get tick from Deriv WebSocket instead of MT5 | |
| tick = deriv_bridge.symbol_info_tick(SYMBOL) | |
| if tick is None: | |
| await asyncio.sleep(0.5) | |
| continue | |
| tick_count += 1 | |
| mid_price = (tick.bid + tick.ask) / 2.0 | |
| timestamp = datetime.now(UTC) | |
| price_data = { | |
| 'close': mid_price, | |
| 'high': tick.ask, | |
| 'low': tick.bid, | |
| 'open': mid_price, | |
| 'volume': getattr(tick, 'volume', 0), | |
| } | |
| # ── All 8 agents run CONCURRENTLY; next tick cannot start until | |
| # every agent has finished computing and publishing. ───────── | |
| async with _processing: | |
| await asyncio.gather( | |
| *[_process_one_agent(tf, price_data, timestamp) | |
| for tf in TIMEFRAMES], | |
| return_exceptions=True, # one agent error never kills others | |
| ) | |
| if time.time() - last_summary_time > 60: | |
| logger.info("\n" + "=" * 80) | |
| logger.info(f"📊 SUMMARY (Tick #{tick_count})") | |
| logger.info("=" * 80) | |
| logger.info(f"Price: {mid_price:.5f}") | |
| logger.info(f"Data Source: Deriv WebSocket (Streaming)") | |
| for tf in TIMEFRAMES: | |
| logger.info(f" {tf}: {feature_counts[tf]} updates") | |
| logger.info("=" * 80 + "\n") | |
| last_summary_time = time.time() | |
| except KeyboardInterrupt: | |
| break | |
| except Exception as e: | |
| logger.error(f"❌ Tick error: {e}") | |
| # ── Completion-based gate ───────────────────────────────────────── | |
| # Sleep only the time remaining to reach MIN_TICK_INTERVAL. | |
| # If processing already took longer, sleep_for = 0 (no extra wait). | |
| elapsed = time.monotonic() - tick_start | |
| sleep_for = max(0.0, MIN_TICK_INTERVAL - elapsed) | |
| logger.debug( | |
| f"Tick #{tick_count} | processed in {elapsed*1000:.0f} ms " | |
| f"| sleeping {sleep_for*1000:.0f} ms" | |
| ) | |
| await asyncio.sleep(sleep_for) | |
| finally: | |
| logger.info("\n🛑 SHUTTING DOWN") | |
| await deriv_bridge.shutdown() | |
| logger.info(f"Total Ticks: {tick_count}") | |
| logger.info("✅ Shutdown complete") | |
| if __name__ == "__main__": | |
| try: | |
| nest_asyncio.apply() | |
| asyncio.run(main()) | |
| except KeyboardInterrupt: | |
| logger.info("\n⚠️ Interrupted by user") | |
| except Exception as e: | |
| logger.error(f"\n❌ Fatal error: {e}") | |
| traceback.print_exc() | |
| #+263780563561 ENG Karl Muzunze Masvingo Zimbabwe |