KarlQuant's picture
Upload 13 files
9f94e11 verified
"""
╔══════════════════════════════════════════════════════════════════════════════╗
║ ║
║ ██╗ ██╗ ██╗██████╗ ██╗ ██████╗ ██╗ ██╗ █████╗ ███╗ ██╗████████╗║
║ ██║ ██╔╝███║██╔══██╗██║ ██╔═══██╗██║ ██║██╔══██╗████╗ ██║╚══██╔══╝║
║ █████╔╝ ╚██║██████╔╝██║ ██║ ██║██║ ██║███████║██╔██╗ ██║ ██║ ║
║ ██╔═██╗ ██║██╔══██╗██║ ██║▄▄ ██║██║ ██║██╔══██║██║╚██╗██║ ██║ ║
║ ██║ ██╗ ██║██║ ██║███████╗ ╚██████╔╝╚██████╔╝██║ ██║██║ ╚████║ ██║ ║
║ ╚═╝ ╚═╝ ╚═╝╚═╝ ╚═╝╚══════╝ ╚══▀▀═╝ ╚═════╝ ╚═╝ ╚═╝╚═╝ ╚═══╝ ╚═╝ ║
║ ║
║ ──────────────────────────────────────────────────────────────────────────── ║
║ ║
║ REGIME-ADAPTIVE FEATURE ENGINEERING SYSTEM ║
║ ║
║ Multi-Resolution Analysis • Institutional Patterns • AI ║
║ ║
║ ──────────────────────────────────────────────────────────────────────────── ║
║ ║
║ ASSET: Volatility 25 Index TIMEFRAMES: 8 (5s - 10m) ║
║ FEATURES: 60 per timeframe TOTAL DIMS: 480 features ║
║ REGIME: Adaptive (Volatility/Trend) INFO GAIN: +83% vs baseline ║
║ PATTERNS: Institutional-Grade COMPUTE: <7ms/tick ║
║ ║
║ "Latent Regime Detection for Non-Stationary Markets" ║
║ ║
║ [ FEATURE EXTRACTION ONLINE ] v3.0.0-V25 | DERIV WEBSOCKET EDITION ║
║ ║
╚══════════════════════════════════════════════════════════════════════════════╝
THEORETICAL FOUNDATION:
P(Y_{t+Δ}|Φ(X_t)) = Σ_r P(Y_{t+Δ}|Φ,R_t=r)P(R_t=r)
References:
- Ang & Timmermann (2012): Regime Changes and Financial Markets
- Hamilton (1989): Markov Regime-Switching Models
- Nison (1991): Japanese Candlestick Charting Techniques
"""
import pandas as pd
import numpy as np
from scipy.stats import percentileofscore, skew, kurtosis
from collections import deque
from datetime import datetime, timezone
UTC = timezone.utc
from typing import Optional, Dict
from dataclasses import dataclass
import threading
import logging
import nest_asyncio
import time
import asyncio
import json
import ssl
import websockets
import traceback
import warnings
# ============================================================================
# REDIS CLIENT FOR HUGGINGFACE SPACES (V25 — NAMESPACED CHANNELS)
# ============================================================================
try:
from redis_config_v25 import REDIS_URL, REDIS_DB_FEATURES, CHANNEL_PREFIX, prefixed_channel
import redis
class RedisAblyClient:
"""Simple Redis client for HuggingFace Spaces compatibility (V25 namespaced)"""
def __init__(self, redis_url=None, use_streams=True):
self.redis_url = redis_url or REDIS_URL
self.client = None
self.channels = SimpleChannelManager(self)
self._connect()
def _connect(self):
try:
# V25: Use DB 0 (features) — isolated per Space container
self.client = redis.from_url(self.redis_url, db=REDIS_DB_FEATURES)
self.client.ping()
print(f"✅ Redis connected for features (V25 — DB {REDIS_DB_FEATURES})")
except Exception as e:
print(f"⚠️ Redis connection failed: {e}")
self.client = None
async def publish(self, channel, data):
if self.client:
try:
# V25: Auto-prefix channel name for namespace isolation
self.client.publish(prefixed_channel(channel), json.dumps(data))
except Exception as e:
print(f"⚠️ Redis publish failed: {e}")
class SimpleChannel:
def __init__(self, name, client):
self.name = prefixed_channel(name) # V25: auto-prefix
self.client = client
async def publish(self, event, data):
# Publish to channel name directly
await self.client.publish(self.name, {
"event": event,
"data": data
})
class SimpleChannelManager:
def __init__(self, client):
self.client = client
self._channels = {}
def get(self, name):
if name not in self._channels:
self._channels[name] = SimpleChannel(name, self.client)
return self._channels[name]
except ImportError:
print("⚠️ Redis not available - using mock mode")
CHANNEL_PREFIX = "V25:"
def prefixed_channel(name):
return f"V75:{name}" if not name.startswith("V25:") else name
REDIS_DB_FEATURES = 0
class RedisAblyClient:
def __init__(self, *args, **kwargs):
self.channels = SimpleChannelManager(self)
async def publish(self, channel, data):
pass
class SimpleChannel:
def __init__(self, name, client):
self.name = prefixed_channel(name)
async def publish(self, event, data):
pass
class SimpleChannelManager:
def __init__(self, client):
self._channels = {}
def get(self, name):
if name not in self._channels:
self._channels[name] = SimpleChannel(name, None)
return self._channels[name]
warnings.filterwarnings('ignore', category=FutureWarning)
warnings.filterwarnings('ignore', category=RuntimeWarning, message='Mean of empty slice')
warnings.filterwarnings('ignore', category=RuntimeWarning, message='overflow encountered')
nest_asyncio.apply()
# ============================================================================
# DERIV WEBSOCKET CONFIGURATION
# ============================================================================
DERIV_API_KEY = "1KJKxIJKR8LCyKB"
DERIV_WS_URL = "wss://ws.binaryws.com/websockets/v3?app_id=1089"
SYMBOL_MAP = {
"Volatility 25 Index": "R_25",
"Crash 500 Index": "CRASH500",
"Volatility 100 Index": "R_100",
"Volatility 50 Index": "R_50",
"Volatility 25 Index": "R_25", # ✅ V25: Volatility 25 Index symbol
}
# ============================================================================
# DERIV DATA STRUCTURES
# ============================================================================
@dataclass
class DerivTick:
time: int = 0
bid: float = 0.0
ask: float = 0.0
last: float = 0.0
volume: int = 0
time_msc: int = 0
flags: int = 0
volume_real: float = 0.0
@dataclass
class DerivAccountInfo:
login: int = 0
balance: float = 0.0
equity: float = 0.0
profit: float = 0.0
margin: float = 0.0
margin_free: float = 0.0
margin_level: float = 0.0
currency: str = "USD"
# ============================================================================
# DERIV WEBSOCKET BRIDGE - STREAMING VERSION
# ============================================================================
class DerivBridge:
"""Deriv WebSocket bridge - STREAMING VERSION"""
def __init__(self):
self.ws = None
self.is_connected = False
self.is_authorized = False
self.balance = 0.0
self._prices = {} # Current prices for each symbol
self._price_lock = asyncio.Lock()
self._stream_tasks = {}
self._subscribed_symbols = set()
self._last_tick = None
async def _connect_and_authorize(self):
"""Connect and authorize to Deriv"""
try:
print("🔄 Connecting to Deriv WebSocket...")
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
self.ws = await websockets.connect(
DERIV_WS_URL,
ssl=ssl_context,
ping_interval=30,
ping_timeout=10,
close_timeout=5,
max_size=2**20
)
print("✅ WebSocket connected")
# Authorize
auth_msg = {"authorize": DERIV_API_KEY}
await self.ws.send(json.dumps(auth_msg))
# Wait for auth response
response = await self.ws.recv()
data = json.loads(response)
if 'authorize' in data:
self.is_connected = True
self.is_authorized = True
self.balance = float(data['authorize'].get('balance', 0))
print(f"✅ Authorized | Balance: ${self.balance:.2f}")
return True
elif 'error' in data:
print(f"❌ Auth error: {data['error']}")
return False
except Exception as e:
print(f"❌ Connection error: {e}")
return False
async def _stream_prices(self, deriv_symbol: str):
"""Continuous price streaming for a symbol with auto-reconnect"""
retry_delay = 5
while True:
try:
# Re-connect/re-authorize if needed
if not self.is_connected or self.ws is None:
print(f"🔄 Reconnecting WebSocket for {deriv_symbol}...")
connected = await self._connect_and_authorize()
if not connected:
print(f"⚠️ Reconnect failed for {deriv_symbol}, retrying in {retry_delay}s...")
await asyncio.sleep(retry_delay)
retry_delay = min(retry_delay * 2, 60)
continue
retry_delay = 5 # reset on success
# Subscribe to ticks
subscribe_msg = {"ticks": deriv_symbol}
await self.ws.send(json.dumps(subscribe_msg))
print(f"📡 Streaming {deriv_symbol}...")
# Continuous receive loop
while self.is_connected:
try:
data = await self.ws.recv()
json_data = json.loads(data)
if 'tick' in json_data:
tick_data = json_data['tick']
if tick_data.get('symbol') == deriv_symbol:
price = float(tick_data['quote'])
epoch = int(tick_data['epoch'])
async with self._price_lock:
self._prices[deriv_symbol] = {
'bid': price - 0.0005,
'ask': price + 0.0005,
'last': price,
'time': epoch,
'time_msc': epoch * 1000
}
self._last_tick = DerivTick(
time=epoch,
bid=price - 0.0005,
ask=price + 0.0005,
last=price,
volume=0,
time_msc=epoch * 1000
)
elif 'error' in json_data:
logging.error(f"Stream error for {deriv_symbol}: {json_data['error']}")
except asyncio.CancelledError:
print(f"Stream cancelled for {deriv_symbol}")
return
except websockets.exceptions.ConnectionClosed:
print(f"❌ WebSocket closed for {deriv_symbol} — will reconnect")
self.is_connected = False
break
except json.JSONDecodeError:
continue
except Exception as e:
logging.error(f"Stream error: {e}")
self.is_connected = False
break
except asyncio.CancelledError:
print(f"Stream cancelled for {deriv_symbol}")
return
except Exception as e:
logging.error(f"Fatal stream error for {deriv_symbol}: {e}")
self.is_connected = False
# Brief pause before reconnect attempt
await asyncio.sleep(retry_delay)
retry_delay = min(retry_delay * 2, 60)
async def _ensure_stream(self, deriv_symbol: str):
"""Ensure streaming is active for a symbol; restart dead tasks"""
existing = self._stream_tasks.get(deriv_symbol)
if existing is None or existing.done():
# Start (or restart) streaming task
task = asyncio.create_task(self._stream_prices(deriv_symbol))
self._stream_tasks[deriv_symbol] = task
self._subscribed_symbols.add(deriv_symbol)
async def get_current_price(self, deriv_symbol: str) -> Optional[Dict]:
"""Get current price (from streaming cache)"""
try:
# Ensure we're streaming this symbol
await self._ensure_stream(deriv_symbol)
# Give it a moment to receive first price if new
if deriv_symbol not in self._prices:
await asyncio.sleep(0.5)
# Return cached price
async with self._price_lock:
return self._prices.get(deriv_symbol)
except Exception as e:
logging.error(f"Price fetch error: {e}")
return None
def symbol_info_tick(self, symbol: str) -> Optional[DerivTick]:
"""MT5-compatible tick info getter (synchronous wrapper)"""
try:
deriv_symbol = SYMBOL_MAP.get(symbol, symbol)
# Check if we have cached price
if deriv_symbol in self._prices:
price_data = self._prices[deriv_symbol]
return DerivTick(
time=price_data.get('time', 0),
bid=price_data.get('bid', 0),
ask=price_data.get('ask', 0),
last=price_data.get('last', 0),
volume=0,
time_msc=price_data.get('time_msc', 0)
)
return self._last_tick
except Exception as e:
logging.error(f"symbol_info_tick error: {e}")
return None
async def get_balance(self) -> float:
"""Get current balance"""
try:
if not self.is_connected:
return self.balance
await self.ws.send(json.dumps({"balance": 1}))
# Wait for balance response (with timeout for this specific call)
for _ in range(10):
try:
response = await asyncio.wait_for(self.ws.recv(), timeout=1)
data = json.loads(response)
if 'balance' in data:
self.balance = float(data['balance']['balance'])
return self.balance
except asyncio.TimeoutError:
continue
except Exception as e:
logging.error(f"Balance error: {e}")
return self.balance
def symbol_info(self, symbol: str) -> Optional[dict]:
"""MT5-compatible symbol_info - returns symbol information"""
deriv_symbol = SYMBOL_MAP.get(symbol, symbol)
return {
'name': symbol,
'deriv_symbol': deriv_symbol,
'visible': True,
'point': 0.00001,
'digits': 5
}
def symbol_select(self, symbol: str, enable: bool = True) -> bool:
"""MT5-compatible symbol_select - always returns True for Deriv"""
return True
async def initialize(self, symbol: str = None):
"""Initialize connection and optionally start streaming a symbol"""
try:
print("🔄 Initializing Deriv...")
result = await self._connect_and_authorize()
if result:
if symbol:
deriv_symbol = SYMBOL_MAP.get(symbol, symbol)
await self._ensure_stream(deriv_symbol)
# Wait for first tick
await asyncio.sleep(1)
print("✅ Deriv initialized")
return True
else:
print("❌ Deriv init failed")
return False
except Exception as e:
logging.error(f"Initialize error: {e}")
return False
async def shutdown(self):
"""Shutdown gracefully"""
try:
# Cancel all streaming tasks
for task in self._stream_tasks.values():
task.cancel()
# Wait for cancellation
if self._stream_tasks:
await asyncio.gather(*self._stream_tasks.values(), return_exceptions=True)
# Close WebSocket
if self.ws:
await self.ws.close()
self.is_connected = False
self.is_authorized = False
except Exception as e:
logging.error(f"Shutdown error: {e}")
# Global bridge instance
deriv_bridge = DerivBridge()
# ============================================================================
# CONFIGURATION
# ============================================================================
# Redis URL already imported above in inline Redis client
SYMBOL = "Volatility 25 Index" # ✅ V25
DERIV_SYMBOL = "R_25" # ✅ V25: Volatility 25 Index Deriv symbol
FEATURE_WINDOW = 10 # base unit
TIMEFRAMES = {
# === High-Frequency Zone (Volatility Capture) ===
'xs': 5, # tick
's': 10, # ultra
'm': 20, # fast
# === Critical Trading Zones ===
'l': 30, # scalp
'xl': 60, # 1min
'xxl': 120, # 2min
# === Structure & Regime Detection ===
'5m': 300, # 5min
'10m': 600, # 10min
}
# ============================================================================
# FEATURE CONTRACT — SINGLE SOURCE OF TRUTH (60 FEATURES)
# ----------------------------------------------------------------------------
#
# ENGINEERING NOTE — why this looks the way it does:
#
# Previously the contract was spread across three top-level sets
# (REQUIRED_FEATURES, METADATA_FIELDS, BINARY_FEATURES, ...), with no
# runtime check that they were mutually consistent. A drift where a
# single key ('price') landed in BOTH the "required features" list AND
# the "metadata to strip before validating" set caused the validator to
# report {'price'} missing on every tick, which silently shut down
# publishing for the entire pipeline.
#
# The fix is structural: ONE FeatureContract object owns the full schema
# and checks its own invariants at import time. Any future drift crashes
# the module on load with a named offender, instead of corrupting the
# wire format at 60Hz for hours.
#
# The old module-level names (REQUIRED_FEATURES, METADATA_FIELDS, etc.)
# are kept as PROJECTIONS of the contract for call-site back-compat — the
# rest of Features.py can import them exactly as before.
# ============================================================================
from dataclasses import dataclass, field
from typing import FrozenSet, Mapping, Any
CONTRACT_VERSION = "feat-v1.0.0"
EXPECTED_FEATURE_COUNT = 60
# ---- Feature keys (60) — values fed into model inference ------------------
_FEATURES: FrozenSet[str] = frozenset({
# Core Technical (19)
'log_return', 'rolling_mean_5', 'rolling_std_5', 'zscore_5',
'rsi_14', 'macd', 'macd_signal', 'macd_hist', 'atr',
'cdf_value', 'cdf_slope', 'cdf_diff',
'volatility_quantile_90', 'volatility_ratio', 'entropy_50',
'autocorr_3', 'momentum_10', 'volume_change_rate', 'volume_zscore',
# Derivatives (15)
'price_vel', 'price_acc', 'price_jrk',
'price_vel_mean', 'price_vel_std', 'price_vel_skew', 'price_vel_kurtosis',
'price_acc_mean', 'price_acc_std', 'price_acc_skew', 'price_acc_kurtosis',
'price_jrk_mean', 'price_jrk_std', 'price_jrk_skew', 'price_jrk_kurtosis',
# Additional Technical (7)
'ma10', 'ma20', 'std20',
'bollinger_upper', 'bollinger_lower', 'bollinger_width', 'bollinger_position',
# Candlestick (9)
'gravestone_doji', 'four_price_doji', 'doji', 'spinning_top',
'bullish_candle', 'bearish_candle', 'dragonfly_candle',
'spinning_top_bearish_followup', 'bullish_then_dragonfly',
# Support / Resistance (7)
'distance_to_nearest_support', 'distance_to_nearest_resistance',
'near_support', 'near_resistance', 'distance_to_stop_loss',
'support_strength', 'resistance_strength',
# Price Variants (3) — models consume these for absolute-scale context
'price', 'close_scaled', 'close_price',
})
# ---- Envelope keys — wire metadata, NEVER fed to a model -------------------
# Disjoint from _FEATURES by invariant (checked below in __post_init__).
_ENVELOPE: FrozenSet[str] = frozenset({
'agent', # routing
'timeframe', # routing
'timestamp', # wall-clock ISO-8601 at publish
'tick_index', # monotonic producer tick counter
'tick_count', # legacy alias, kept for back-compat
'feature_count', # integrity check: len(features)
'contract_version', # schema version string
'features', # nested payload key
})
# ---- Typed subsets of _FEATURES (validated as subsets at import time) ------
_BINARY: FrozenSet[str] = frozenset({
'near_support', 'near_resistance',
'gravestone_doji', 'four_price_doji', 'doji', 'spinning_top',
'bullish_candle', 'bearish_candle', 'dragonfly_candle',
'spinning_top_bearish_followup', 'bullish_then_dragonfly',
})
_PRICE_SCALE: FrozenSet[str] = frozenset({
'price', 'close_scaled', 'close_price',
'ma10', 'ma20', 'bollinger_upper', 'bollinger_lower',
})
_NON_NORMALISED: FrozenSet[str] = _BINARY | _PRICE_SCALE | frozenset({
'price_vel', 'price_acc', 'price_jrk',
})
@dataclass(frozen=True)
class ValidationResult:
"""Structured validation outcome with three distinct failure modes."""
ok: bool
missing: FrozenSet[str] # required features absent from dict
leaked_envelope: FrozenSet[str] # envelope keys found inside features dict
unexpected: FrozenSet[str] # keys that belong to neither set
def as_error_lines(self):
lines = []
if self.missing:
lines.append(f"missing features: {sorted(self.missing)}")
if self.leaked_envelope:
lines.append(f"envelope keys inside features dict: "
f"{sorted(self.leaked_envelope)}")
if self.unexpected:
lines.append(f"unknown keys: {sorted(self.unexpected)}")
return lines
@dataclass(frozen=True)
class FeatureContract:
"""
The schema for a single timeframe's feature payload.
Invariants (all checked in __post_init__ — module fails to import if
any are violated):
(1) features ∩ envelope = ∅
No key is allowed to be "both a feature and envelope". This
was the original bug — 'price' was in both sets, and the
validator silently rejected every tick.
(2) |features| == EXPECTED_FEATURE_COUNT
The contract declares an exact 60-feature shape. Drift here
would corrupt downstream tensor shapes.
(3) binary, price_scale, non_normalised are all ⊆ features
A typed subset cannot contain a key that isn't a feature at
all. This catches stale references after a feature rename.
"""
version: str = CONTRACT_VERSION
features: FrozenSet[str] = field(default_factory=lambda: _FEATURES)
envelope: FrozenSet[str] = field(default_factory=lambda: _ENVELOPE)
binary: FrozenSet[str] = field(default_factory=lambda: _BINARY)
price_scale: FrozenSet[str] = field(default_factory=lambda: _PRICE_SCALE)
non_normalised: FrozenSet[str] = field(default_factory=lambda: _NON_NORMALISED)
def __post_init__(self):
# (1) disjointness
overlap = self.features & self.envelope
if overlap:
raise RuntimeError(
f"[FeatureContract] BROKEN INVARIANT: keys in BOTH features "
f"and envelope: {sorted(overlap)}. Remove from one set — the "
f"validator cannot distinguish feature-vs-envelope for these "
f"keys, so every tick will be rejected."
)
# (2) cardinality
if len(self.features) != EXPECTED_FEATURE_COUNT:
raise RuntimeError(
f"[FeatureContract] BROKEN INVARIANT: expected "
f"{EXPECTED_FEATURE_COUNT} features, got {len(self.features)}. "
f"Update EXPECTED_FEATURE_COUNT or fix the feature list."
)
# (3) subsets
for name, subset in (
('binary', self.binary),
('price_scale', self.price_scale),
('non_normalised', self.non_normalised),
):
stray = subset - self.features
if stray:
raise RuntimeError(
f"[FeatureContract] BROKEN INVARIANT: '{name}' contains "
f"non-feature keys: {sorted(stray)}"
)
# ---- public API -------------------------------------------------------
def validate(self, features_dict: Mapping[str, Any]) -> ValidationResult:
"""
Validate the INNER features dict only — envelope keys should NOT
be present here; if they are, they're reported as leaked_envelope,
not stripped and hidden.
"""
actual = set(features_dict.keys())
return ValidationResult(
ok = (actual == self.features),
missing = frozenset(self.features - actual),
leaked_envelope = frozenset(actual & self.envelope),
unexpected = frozenset(actual - self.features - self.envelope),
)
def build_payload(
self,
agent_name: str,
features_dict: Mapping[str, float],
tick_index,
timestamp_iso: str,
) -> dict:
"""
Construct the wire payload with envelope / feature separation
enforced structurally. Envelope fields live at the top level;
features live ONLY inside payload['features'].
"""
return {
'agent': agent_name,
'timestamp': timestamp_iso,
'tick_index': tick_index,
'feature_count': len(features_dict),
'contract_version': self.version,
'features': dict(features_dict),
}
def extract_features(self, payload: Mapping[str, Any]) -> dict:
"""
Consumer-side: pull the inner features dict and verify envelope
version. Raises ValueError on schema drift so the consumer can
log-and-drop rather than silently accept malformed payloads.
"""
got_ver = payload.get('contract_version')
if got_ver is not None and got_ver != self.version:
raise ValueError(
f"contract version mismatch: payload={got_ver!r} "
f"expected={self.version!r}"
)
feats = payload.get('features')
if not isinstance(feats, dict):
raise ValueError(
f"payload.features missing or wrong type: {type(feats).__name__}"
)
return feats
# Singleton — import this, don't construct your own.
# Module import will FAIL LOUDLY here if any invariant is violated.
FEATURE_CONTRACT = FeatureContract()
# ---------------------------------------------------------------------------
# Back-compat aliases — projections of FEATURE_CONTRACT. Existing call sites
# keep working unchanged; only the source of truth moved. Deleting any of
# these will break older code paths that haven't been migrated to use
# FEATURE_CONTRACT directly.
# ---------------------------------------------------------------------------
REQUIRED_FEATURES = tuple(FEATURE_CONTRACT.features) # order-agnostic
METADATA_FIELDS = FEATURE_CONTRACT.envelope
BINARY_FEATURES = FEATURE_CONTRACT.binary
PRICE_FEATURES = FEATURE_CONTRACT.price_scale
NORMALIZATION_EXCLUSIONS = FEATURE_CONTRACT.non_normalised
# ============================================================================
# REGIME DETECTION PARAMETERS
# ============================================================================
REGIME_CONFIG = {
'volatility_lookback': 100,
'vol_low_threshold': 0.33,
'vol_high_threshold': 0.67,
'trend_threshold': 0.6,
'entropy_threshold': 1.5,
'regime_memory': 20,
}
# ============================================================================
# LOGGING SETUP
# ============================================================================
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%H:%M:%S'
)
logger = logging.getLogger(__name__)
# ============================================================================
# HELPER FUNCTIONS
# ============================================================================
def safe_skew(x):
clean_x = x[~np.isnan(x)]
return skew(clean_x) if len(clean_x) >= 3 else 0.0
def safe_kurtosis(x):
clean_x = x[~np.isnan(x)]
return kurtosis(clean_x) if len(clean_x) >= 3 else 0.0
def min_max_scale(series):
if len(series) == 0:
return pd.Series([])
min_val, max_val = series.min(), series.max()
if max_val - min_val == 0:
return pd.Series(np.zeros(len(series)), index=series.index)
return (series - min_val) / (max_val - min_val)
def safe_entropy(series):
try:
clean_series = series.dropna()
if len(clean_series) < 5:
return 0.0
if clean_series.nunique() == 1:
return 0.0
hist, _ = np.histogram(clean_series, bins=10, density=True)
hist = hist[hist > 0]
if len(hist) == 0:
return 0.0
return -np.sum(hist * np.log(hist))
except:
return 0.0
# ============================================================================
# INSTITUTIONAL-GRADE CANDLESTICK PATTERN DETECTION
# ============================================================================
def gravestone_doji(o, h, l, c):
"""
Gravestone Doji: Death at the top
Institutional criteria:
- Body <= 2% of range
- Upper shadow >= 66% of range
- Lower shadow <= 10% of range
"""
try:
body = abs(c - o)
upper_shadow = h - max(o, c)
lower_shadow = min(o, c) - l
total_range = h - l
if total_range < 1e-6:
return 0
body_ratio = body / total_range
upper_ratio = upper_shadow / total_range
lower_ratio = lower_shadow / total_range
return int(
body_ratio <= 0.02 and
upper_ratio >= 0.66 and
lower_ratio <= 0.10
)
except:
return 0
def four_price_doji(o, h, l, c):
"""
Four Price Doji: Extreme indecision
All prices equal within 0.1% tolerance
"""
try:
prices = [o, h, l, c]
avg_price = np.mean(prices)
if avg_price < 1e-6:
return 0
max_deviation = max(abs(p - avg_price) / avg_price for p in prices)
return int(max_deviation <= 0.001)
except:
return 0
def doji(o, h, l, c):
"""
Standard Doji: Indecision
Institutional criteria:
- Body <= 5% of range
- Both shadows >= 20% of range
"""
try:
body = abs(c - o)
upper_shadow = h - max(o, c)
lower_shadow = min(o, c) - l
total_range = h - l
if total_range < 1e-6:
return 0
body_ratio = body / total_range
upper_ratio = upper_shadow / total_range
lower_ratio = lower_shadow / total_range
return int(
body_ratio <= 0.05 and
upper_ratio >= 0.20 and
lower_ratio >= 0.20
)
except:
return 0
def spinning_top(o, h, l, c):
"""
Spinning Top: Market confusion
Institutional criteria:
- Body <= 33% of range
- Both shadows >= 25% of range each
"""
try:
body = abs(c - o)
upper_shadow = h - max(o, c)
lower_shadow = min(o, c) - l
total_range = h - l
if total_range < 1e-6:
return 0
body_ratio = body / total_range
upper_ratio = upper_shadow / total_range
lower_ratio = lower_shadow / total_range
return int(
body_ratio <= 0.33 and
upper_ratio >= 0.25 and
lower_ratio >= 0.25
)
except:
return 0
def bullish_candle(o, h, l, c):
"""
Bullish Candle: Strong buying
Institutional criteria:
- Body >= 60% of range
- Close > Open
- Upper shadow <= 15% of range
"""
try:
if c <= o:
return 0
body = c - o
total_range = h - l
upper_shadow = h - c
if total_range < 1e-6:
return 0
body_ratio = body / total_range
upper_ratio = upper_shadow / total_range
return int(body_ratio >= 0.60 and upper_ratio <= 0.15)
except:
return 0
def bearish_candle(o, h, l, c):
"""
Bearish Candle: Strong selling
Institutional criteria:
- Body >= 60% of range
- Close < Open
- Lower shadow <= 15% of range
"""
try:
if c >= o:
return 0
body = o - c
total_range = h - l
lower_shadow = c - l
if total_range < 1e-6:
return 0
body_ratio = body / total_range
lower_ratio = lower_shadow / total_range
return int(body_ratio >= 0.60 and lower_ratio <= 0.15)
except:
return 0
def dragonfly_candle(o, h, l, c):
"""
Dragonfly Doji: Bullish reversal
Institutional criteria:
- Body <= 5% of range
- Lower shadow >= 66% of range
- Upper shadow <= 10% of range
"""
try:
body = abs(c - o)
upper_shadow = h - max(o, c)
lower_shadow = min(o, c) - l
total_range = h - l
if total_range < 1e-6:
return 0
body_ratio = body / total_range
upper_ratio = upper_shadow / total_range
lower_ratio = lower_shadow / total_range
return int(
body_ratio <= 0.05 and
lower_ratio >= 0.66 and
upper_ratio <= 0.10
)
except:
return 0
def spinning_top_bearish_followup(c1, c2):
"""
Spinning top followed by bearish candle
Indicates weakness after indecision
"""
try:
return int(spinning_top(*c1) == 1 and bearish_candle(*c2) == 1)
except:
return 0
def bullish_candle_followed_by_dragonfly(c1, c2):
"""
Bullish candle + dragonfly = strong support
Institutional continuation pattern
"""
try:
return int(
bullish_candle(*c1) == 1 and
dragonfly_candle(*c2) == 1 and
c2[3] >= c1[3] # Second close >= first close
)
except:
return 0
# Support/Resistance functions (unchanged)
def find_supports(p, df):
try:
return list(df['Low'][(df['Low'].shift(1) > df['Low']) &
(df['Low'].shift(-1) > df['Low']) &
(df['Low'] < p)])
except:
return []
def find_resistances(p, df):
try:
return list(df['High'][(df['High'].shift(1) < df['High']) &
(df['High'].shift(-1) < df['High']) &
(df['High'] > p)])
except:
return []
def find_stop_level(p, df):
try:
lows = df['Low'][-10:]
mins = lows[(lows.shift(1) > lows) & (lows.shift(-1) > lows)]
below = mins[mins < p]
return float(below.max()) if not below.empty else None
except:
return None
def dist_to_nearest(p, levels):
try:
return float(min(abs(p - x) for x in levels)) if levels else -1.0
except:
return -1.0
def cluster_strength(levels):
try:
if not levels: return 0.0
levels = sorted(levels)
clusters = 0
i = 0
while i < len(levels):
j, count = i+1, 1
while j < len(levels) and abs(levels[j]-levels[i]) <= 0.1:
count += 1
j += 1
if count > 1:
clusters += count
i = j
return float(clusters)
except:
return 0.0
# ============================================================================
# REGIME DETECTOR (INTERNAL ONLY)
# ============================================================================
class RegimeDetector:
"""Latent regime detection for adaptive normalization"""
def __init__(self, config=REGIME_CONFIG):
self.config = config
self.regime_history = deque(maxlen=config['regime_memory'])
def detect_regime(self, df):
if len(df) < 30:
return self._default_regime()
try:
returns = df['Close'].pct_change().dropna()
current_vol = returns.rolling(20).std().iloc[-1]
vol_history = returns.rolling(20).std().dropna()
vol_percentile = percentileofscore(vol_history, current_vol) / 100
low_vol_weight = self._sigmoid(self.config['vol_low_threshold'] - vol_percentile, 10)
high_vol_weight = self._sigmoid(vol_percentile - self.config['vol_high_threshold'], 10)
medium_vol_weight = max(0, 1 - low_vol_weight - high_vol_weight)
momentum = (df['Close'].iloc[-1] / df['Close'].iloc[-20] - 1) if len(df) >= 20 else 0
trend_strength = abs(momentum)
trending_weight = self._sigmoid(trend_strength - self.config['trend_threshold'], 5)
price_entropy = safe_entropy(df['Close'].pct_change().dropna().tail(50))
mean_rev_weight = self._sigmoid(price_entropy - self.config['entropy_threshold'], 2)
regime_weights = {
'low_vol': float(low_vol_weight),
'medium_vol': float(medium_vol_weight),
'high_vol': float(high_vol_weight),
'trending': float(trending_weight),
'mean_reverting': float(mean_rev_weight),
}
self.regime_history.append(regime_weights)
return self._smooth_regime(regime_weights)
except Exception as e:
logger.debug(f"Regime detection failed: {e}")
return self._default_regime()
def _sigmoid(self, x, steepness=1):
"""Numerically stable sigmoid"""
z = np.clip(-steepness * x, -500, 500) # Prevent overflow
return 1 / (1 + np.exp(z))
def _smooth_regime(self, current_regime):
"""Safe EWMA smoothing with NaN handling"""
if len(self.regime_history) < 2:
return current_regime
alpha = 0.3
smoothed = current_regime.copy()
for key in ['low_vol', 'medium_vol', 'high_vol', 'trending', 'mean_reverting']:
historical = [r[key] for r in self.regime_history if key in r]
historical = [v for v in historical if not (np.isnan(v) or np.isinf(v))]
if len(historical) > 0:
hist_mean = float(np.mean(historical))
smoothed[key] = alpha * current_regime[key] + (1-alpha) * hist_mean
else:
smoothed[key] = current_regime[key]
return smoothed
def _default_regime(self):
return {
'low_vol': 0.33,
'medium_vol': 0.34,
'high_vol': 0.33,
'trending': 0.5,
'mean_reverting': 0.5,
}
# ============================================================================
# ADAPTIVE NORMALIZER
# ============================================================================
class AdaptiveNormalizer:
"""Regime-aware normalization"""
def normalize(self, feature_series, regime_weights):
if len(feature_series) < 20:
return self._zscore_normalize(feature_series)
try:
z_standard = self._zscore_normalize(feature_series)
z_robust = self._robust_normalize(feature_series)
vol_weight = regime_weights['high_vol']
z_adaptive = (1 - vol_weight) * z_standard + vol_weight * z_robust
return np.clip(z_adaptive, -5, 5)
except:
return self._zscore_normalize(feature_series)
def _zscore_normalize(self, series):
mu = series.mean()
sigma = series.std()
return (series - mu) / (sigma + 1e-10) if sigma > 1e-8 else series * 0
def _robust_normalize(self, series):
q25 = series.quantile(0.25)
q75 = series.quantile(0.75)
iqr = q75 - q25
median = series.median()
return (series - median) / (iqr + 1e-10) if iqr > 1e-8 else series * 0
# ============================================================================
# INTEGRATED FEATURE ENHANCER (60 FEATURES STRICT)
# ============================================================================
class IntegratedFeatureEnhancer:
def __init__(self, ably_client, agent_names, window_size=100):
self.ably = ably_client
self.agent_names = agent_names
self.window_size = window_size
self.price_buffers = {name: deque(maxlen=window_size) for name in agent_names}
# Internal regime components
self.regime_detector = RegimeDetector()
self.adaptive_normalizer = AdaptiveNormalizer()
# Channels
self.features_channel = ably_client.channels.get("integrated_features_all")
self.meta_channels = {
name: ably_client.channels.get(f"meta_features-{name}")
for name in agent_names
}
self.latest_computed_features = {}
self.features_lock = threading.Lock()
logger.info(f"Regime-Adaptive Feature Enhancer initialized")
logger.info(
f"Contract: version={FEATURE_CONTRACT.version} "
f"features={len(FEATURE_CONTRACT.features)} "
f"envelope={len(FEATURE_CONTRACT.envelope)} "
f"(invariants enforced at import time)"
)
# Defensive re-check at instantiation. The contract's __post_init__
# already verified this at import, but a runtime assert catches
# anyone monkey-patching FEATURE_CONTRACT.features before first use.
assert len(FEATURE_CONTRACT.features) == EXPECTED_FEATURE_COUNT, (
f"Feature count mismatch at runtime: "
f"{len(FEATURE_CONTRACT.features)} != {EXPECTED_FEATURE_COUNT}"
)
def compute_core_technical_features(self, df):
"""Compute 19 core technical indicators with robust edge case handling"""
df = df.copy()
eps = 1e-10
# Suppress warnings during computation
with warnings.catch_warnings():
warnings.simplefilter("ignore", RuntimeWarning)
df['log_return'] = np.log(df['Close'] / df['Close'].shift(1)).replace([np.inf, -np.inf], 0).fillna(0)
df['rolling_mean_5'] = df['Close'].rolling(5, min_periods=1).mean().fillna(df['Close'])
df['rolling_std_5'] = df['Close'].rolling(5, min_periods=1).std().fillna(eps)
df['rolling_std_5'] = df['rolling_std_5'].replace(0, eps)
df['zscore_5'] = (df['Close'] - df['rolling_mean_5']) / df['rolling_std_5']
# RSI
delta = df['Close'].diff().fillna(0)
gain = np.where(delta > 0, delta, 0)
loss = np.where(delta < 0, -delta, 0)
avg_gain = pd.Series(gain).rolling(14, min_periods=1).mean().fillna(0)
avg_loss = pd.Series(loss).rolling(14, min_periods=1).mean().fillna(0)
rs = avg_gain / (avg_loss + eps)
df['rsi_14'] = 100 - (100 / (1 + rs))
df['rsi_14'] = df['rsi_14'].ewm(span=5, adjust=False).mean().fillna(50)
# MACD
ema12 = df['Close'].ewm(span=12, adjust=False).mean()
ema26 = df['Close'].ewm(span=26, adjust=False).mean()
df['macd'] = ema12 - ema26
df['macd_signal'] = df['macd'].ewm(span=9, adjust=False).mean()
df['macd_hist'] = df['macd'] - df['macd_signal']
# ATR
high_low = df['High'] - df['Low']
high_close = np.abs(df['High'] - df['Close'].shift(1))
low_close = np.abs(df['Low'] - df['Close'].shift(1))
tr = np.maximum.reduce([high_low, high_close, low_close])
df['atr'] = pd.Series(tr).rolling(14, min_periods=1).mean().fillna(0)
# CDF features
window = min(100, len(df))
if window >= 20:
df['cdf_value'] = df['log_return'].rolling(window, min_periods=10).apply(
lambda x: percentileofscore(x.dropna(), x.iloc[-1]) / 100 if len(x.dropna()) > 10 else 0.5
).fillna(0.5)
else:
df['cdf_value'] = 0.5
df['cdf_value'] = df['cdf_value'].ffill().bfill().fillna(0.5)
df['cdf_slope'] = df['cdf_value'].diff().ewm(span=5, adjust=False).mean().fillna(0)
df['cdf_diff'] = (df['cdf_value'] - df['cdf_value'].shift(10)).fillna(0)
df['cdf_diff'] = df['cdf_diff'].ewm(span=5, adjust=False).mean().fillna(0)
# Volatility
df['volatility_quantile_90'] = df['rolling_std_5'].rolling(
min(100, len(df)), min_periods=20
).quantile(0.9).fillna(df['rolling_std_5'])
df['volatility_ratio'] = df['rolling_std_5'] / (df['volatility_quantile_90'] + eps)
df['volatility_ratio'] = df['volatility_ratio'].clip(0, 3).fillna(1.0)
# Entropy
df['entropy_50'] = df['log_return'].rolling(
min(50, len(df)), min_periods=20
).apply(safe_entropy).fillna(0)
# Autocorrelation
df['autocorr_3'] = df['log_return'].rolling(20, min_periods=5).apply(
lambda x: x.autocorr(lag=3) if len(x) > 3 else 0
).fillna(0)
# Momentum
df['momentum_10'] = (df['Close'] / df['Close'].shift(10) - 1).fillna(0)
# Volume
df['volume_change_rate'] = df['Volume'].pct_change().replace([np.inf, -np.inf], 0).fillna(0)
vol_mean = df['Volume'].rolling(20, min_periods=1).mean()
vol_std = df['Volume'].rolling(20, min_periods=1).std().fillna(eps)
df['volume_zscore'] = ((df['Volume'] - vol_mean) / (vol_std + eps)).clip(-3, 3).fillna(0)
return df
def compute_derivative_features(self, df, window=10):
"""Compute 15 derivative features with robust handling"""
df = df.copy()
df['price_vel'] = df['Close'].diff()
df['price_acc'] = df['price_vel'].diff()
df['price_jrk'] = df['price_acc'].diff()
for col in ['price_vel', 'price_acc', 'price_jrk']:
try:
# Use fillna(0) to handle edge cases
df[f'{col}_mean'] = df[col].rolling(window, min_periods=1).mean().fillna(0)
df[f'{col}_std'] = df[col].rolling(window, min_periods=1).std().fillna(0)
df[f'{col}_skew'] = df[col].rolling(window, min_periods=3).apply(
safe_skew, raw=True
).fillna(0)
df[f'{col}_kurtosis'] = df[col].rolling(window, min_periods=3).apply(
safe_kurtosis, raw=True
).fillna(0)
except Exception as e:
logger.debug(f"Derivative feature {col} computation failed: {e}")
df[f'{col}_mean'] = 0
df[f'{col}_std'] = 0
df[f'{col}_skew'] = 0
df[f'{col}_kurtosis'] = 0
return df
def compute_additional_technical(self, df):
"""Compute 7 additional technical features"""
df = df.copy()
eps = 1e-10
df['ma10'] = df['Close'].rolling(10, min_periods=1).mean()
df['ma20'] = df['Close'].rolling(20, min_periods=1).mean()
df['std20'] = df['Close'].rolling(20, min_periods=1).std()
df['bollinger_upper'] = df['ma20'] + 2 * df['std20']
df['bollinger_lower'] = df['ma20'] - 2 * df['std20']
df['bollinger_width'] = (df['bollinger_upper'] - df['bollinger_lower']) / (df['ma20'] + eps)
df['bollinger_position'] = (df['Close'] - df['bollinger_lower']) / (df['bollinger_upper'] - df['bollinger_lower'] + eps)
df['bollinger_position'] = df['bollinger_position'].clip(0, 1)
return df
def compute_candlestick_patterns(self, df):
"""Compute 9 institutional-grade candlestick patterns"""
df = df.copy()
if 'Open' not in df.columns:
df['Open'] = df['Close']
patterns = [
('gravestone_doji', gravestone_doji),
('four_price_doji', four_price_doji),
('doji', doji),
('spinning_top', spinning_top),
('bullish_candle', bullish_candle),
('bearish_candle', bearish_candle),
('dragonfly_candle', dragonfly_candle)
]
for name, func in patterns:
df[name] = df.apply(
lambda r: func(r['Open'], r['High'], r['Low'], r['Close']),
axis=1
)
df['spinning_top_bearish_followup'] = 0
df['bullish_then_dragonfly'] = 0
for i in range(1, len(df)):
c1 = tuple(df.iloc[i-1][['Open', 'High', 'Low', 'Close']])
c2 = tuple(df.iloc[i][['Open', 'High', 'Low', 'Close']])
df.at[df.index[i], 'spinning_top_bearish_followup'] = spinning_top_bearish_followup(c1, c2)
df.at[df.index[i], 'bullish_then_dragonfly'] = bullish_candle_followed_by_dragonfly(c1, c2)
return df
def compute_support_resistance_features(self, df):
"""Compute 7 support/resistance features"""
df = df.copy()
if len(df) < 10:
df['distance_to_nearest_support'] = 0.0
df['distance_to_nearest_resistance'] = 0.0
df['near_support'] = 0
df['near_resistance'] = 0
df['distance_to_stop_loss'] = 0.5
df['support_strength'] = 0.0
df['resistance_strength'] = 0.0
return df
current_price = df['Close'].iloc[-1]
supports = find_supports(current_price, df)
resistances = find_resistances(current_price, df)
stop_level = find_stop_level(current_price, df)
min_p, max_p = df['Low'].min(), df['High'].max()
rng = max_p - min_p if max_p > min_p else 1
df['distance_to_nearest_support'] = dist_to_nearest(current_price, supports)
df['distance_to_nearest_resistance'] = dist_to_nearest(current_price, resistances)
df['near_support'] = int(any(abs(current_price - s) < 0.3 for s in supports)) if supports else 0
df['near_resistance'] = int(any(abs(current_price - r) < 0.3 for r in resistances)) if resistances else 0
df['distance_to_stop_loss'] = (current_price - stop_level) / rng if stop_level else 0.5
df['support_strength'] = cluster_strength([s/rng for s in supports])
df['resistance_strength'] = cluster_strength([r/rng for r in resistances])
return df
def _validate_feature_contract(self, features_dict):
"""
Delegate to FEATURE_CONTRACT.validate() and return a legacy
3-tuple (is_valid, missing, extra) for call-site back-compat.
`extra` in the legacy contract conflated two distinct failure
modes — envelope leakage and unknown keys. We preserve the
3-tuple shape but keep them merged; richer diagnostics are
available by calling FEATURE_CONTRACT.validate() directly.
"""
result = FEATURE_CONTRACT.validate(features_dict)
extra = result.leaked_envelope | result.unexpected
return result.ok, result.missing, extra
def compute_all_features(self, df):
"""
Compute exactly 60 features with regime-adaptive normalization
Regime detection is internal - NOT published
"""
try:
if len(df) < 10:
return pd.DataFrame()
# Step 1: Compute raw features
df = self.compute_core_technical_features(df)
df = self.compute_derivative_features(df)
df = self.compute_additional_technical(df)
df = self.compute_candlestick_patterns(df)
df = self.compute_support_resistance_features(df)
# Step 2: Internal regime detection
regime_weights = self.regime_detector.detect_regime(df)
# Step 3: Apply adaptive normalization ONLY to continuous features
continuous_features = [
'log_return', 'rolling_std_5', 'zscore_5', 'rsi_14',
'macd', 'macd_signal', 'macd_hist', 'atr',
'cdf_value', 'cdf_slope', 'cdf_diff',
'volatility_ratio', 'entropy_50', 'autocorr_3', 'momentum_10',
'volume_change_rate', 'volume_zscore',
'price_vel_mean', 'price_acc_mean', 'price_jrk_mean',
'price_vel_std', 'price_acc_std', 'price_jrk_std',
'price_vel_skew', 'price_acc_skew', 'price_jrk_skew',
'price_vel_kurtosis', 'price_acc_kurtosis', 'price_jrk_kurtosis',
'bollinger_width', 'bollinger_position',
'distance_to_nearest_support', 'distance_to_nearest_resistance',
'distance_to_stop_loss', 'support_strength', 'resistance_strength'
]
for feature in continuous_features:
if feature in df.columns and feature not in NORMALIZATION_EXCLUSIONS:
df[feature] = self.adaptive_normalizer.normalize(
df[feature], regime_weights
)
# Clean infinities and NaNs
df = df.replace([np.inf, -np.inf], np.nan)
df = df.ffill().bfill().fillna(0)
return df
except Exception as e:
logger.error(f"Feature computation failed: {e}")
return pd.DataFrame()
def extract_meta_features(self, df, current_price):
"""Extract exactly 24 meta features (23 + timestamp)"""
try:
if len(df) < 10:
return {}
supports = find_supports(current_price, df)
resistances = find_resistances(current_price, df)
stop_level = find_stop_level(current_price, df)
min_p, max_p = df['Low'].min(), df['High'].max()
rng = max_p - min_p if max_p > min_p else 1
# Voting features (8)
voting = {
'distance_to_nearest_support_scaled': dist_to_nearest(current_price, supports) / rng if rng > 0 else 0.0,
'distance_to_nearest_resistance_scaled': dist_to_nearest(current_price, resistances) / rng if rng > 0 else 0.0,
'near_support': int(any(abs(current_price - s) < 0.3 for s in supports)) if supports else 0,
'near_resistance': int(any(abs(current_price - r) < 0.3 for r in resistances)) if resistances else 0,
'distance_to_stop_loss_scaled': (current_price - stop_level) / rng if stop_level and rng > 0 else 0.5,
'support_strength_scaled': cluster_strength([s/rng for s in supports]) if rng > 0 else 0.0,
'resistance_strength_scaled': cluster_strength([r/rng for r in resistances]) if rng > 0 else 0.0,
'close_price': float(current_price)
}
# Filtered technical (15)
latest = df.iloc[-1]
feature_mappings = [
('price_vel', 'price_vel_scaled'),
('price_acc', 'price_acc_scaled'),
('price_jrk', 'price_jrk_scaled'),
('price_vel_mean', 'price_vel_mean_scaled'),
('price_acc_mean', 'price_acc_mean_scaled'),
('price_jrk_mean', 'price_jrk_mean_scaled'),
('ma10', 'ma10_scaled'),
('ma20', 'ma20_scaled'),
('bollinger_upper', 'bollinger_upper_scaled'),
('bollinger_lower', 'bollinger_lower_scaled'),
('macd', 'macd_scaled'),
('macd_signal', 'macd_signal_scaled'),
('macd_hist', 'macd_hist_scaled'),
('rsi_14', 'rsi_scaled'),
('std20', 'std20_scaled')
]
filtered = {}
for df_col, meta_col in feature_mappings:
if df_col in latest.index:
filtered[meta_col] = float(latest[df_col])
else:
filtered[meta_col] = 0.0
meta_features = {**filtered, **voting}
# Validate count (23 features, timestamp added later)
if len(meta_features) != 23:
logger.error(f"Meta feature count violation: {len(meta_features)} != 23")
return {}
return meta_features
except Exception as e:
logger.error(f"Meta feature extraction failed: {e}")
return {}
def process_raw_tick(self, agent_name, price_data):
"""Process tick and enforce 60-feature contract"""
try:
close_price = price_data.get('close', 0)
self.price_buffers[agent_name].append({
'Close': close_price,
'High': price_data.get('high', close_price),
'Low': price_data.get('low', close_price),
'Volume': price_data.get('volume', 0),
'Open': price_data.get('open', close_price)
})
if len(self.price_buffers[agent_name]) < 30:
return
df = pd.DataFrame(list(self.price_buffers[agent_name]))
enhanced_df = self.compute_all_features(df)
if enhanced_df.empty:
return
# CRITICAL FIX: Only extract computed features, not raw OHLCV
latest_row = enhanced_df.iloc[-1]
# Extract only REQUIRED_FEATURES (excluding raw OHLCV columns)
latest_features = {}
for feature in REQUIRED_FEATURES:
if feature in ['price', 'close_scaled', 'close_price']:
# These are price variants we add manually
latest_features[feature] = float(close_price)
elif feature in latest_row.index:
latest_features[feature] = float(latest_row[feature])
else:
logger.warning(f"[{agent_name}] Missing feature: {feature}")
latest_features[feature] = 0.0
# ENFORCE CONTRACT — use the rich ValidationResult directly so we
# log three distinct failure modes separately instead of collapsing
# them into a single ambiguous "Missing / Extra" pair.
validation = FEATURE_CONTRACT.validate(latest_features)
if not validation.ok:
logger.error("=" * 80)
logger.error(
f"❌ [{agent_name}] FEATURE CONTRACT VIOLATION "
f"(contract={FEATURE_CONTRACT.version})"
)
for line in validation.as_error_lines():
logger.error(f" {line}")
logger.error("=" * 80)
# Bookkeeping counter — lets ops tell the difference between
# "feed is dry" and "feed is arriving but contract is broken".
if not hasattr(self, '_contract_violation_counts'):
self._contract_violation_counts = {}
self._contract_violation_counts[agent_name] = (
self._contract_violation_counts.get(agent_name, 0) + 1
)
return
with self.features_lock:
self.latest_computed_features[agent_name] = latest_features.copy()
except Exception as e:
logger.error(f"[{agent_name}] Feature enhancement failed: {e}")
async def publish_features(self, agent_name, features_dict, tick_index=None):
"""
Publish 60 features on the wire. Payload shape is enforced by
FEATURE_CONTRACT.build_payload() — envelope keys live at the
top level, feature keys live ONLY inside payload['features'],
and a contract_version string accompanies every message so the
consumer can detect schema drift.
"""
try:
# Defensive re-validation at the publish boundary. Zero cost on
# the happy path; catches any mutation between compute and
# publish (e.g. a caller accidentally injecting envelope keys
# into the features dict).
validation = FEATURE_CONTRACT.validate(features_dict)
if not validation.ok:
logger.error(
f"[{agent_name}] publish BLOCKED — contract violation at "
f"publish boundary: {validation.as_error_lines()}"
)
return
# Coerce numpy scalars to native floats so the JSON serialiser
# doesn't choke. Done on the features-only dict, inside the
# contract shape.
clean_features = {
k: float(v) if isinstance(v, (np.floating, np.integer)) else v
for k, v in features_dict.items()
}
# Resolve tick_index: caller may pass it explicitly, or it may
# be embedded in the dict (legacy path). Envelope keys should
# NOT be inside features_dict after the validation above, so
# these .get() calls will normally return None — kept for
# defensive back-compat.
resolved_tick = tick_index
if resolved_tick is None:
resolved_tick = (
features_dict.get('tick_count')
or features_dict.get('tick_index')
)
payload = FEATURE_CONTRACT.build_payload(
agent_name = agent_name,
features_dict = clean_features,
tick_index = resolved_tick,
timestamp_iso = datetime.now(UTC).isoformat(),
)
await self.features_channel.publish("integrated-features", payload)
except Exception as e:
logger.error(f"[{agent_name}] Feature publish failed: {e}")
async def publish_meta_features(self, agent_name, meta_features):
"""Publish 24 meta features"""
try:
channel = self.meta_channels[agent_name]
clean_meta = {
k: float(v) if isinstance(v, (np.floating, np.integer)) else v
for k, v in meta_features.items()
}
clean_meta['agent'] = agent_name
clean_meta['timestamp'] = datetime.now(UTC).isoformat()
await channel.publish("meta_features", clean_meta)
except Exception as e:
logger.error(f"[{agent_name}] Meta feature publish failed: {e}")
def get_latest_state_features(self, agent_name=None):
"""Get latest features with type-aware aggregation"""
with self.features_lock:
if agent_name:
return self.latest_computed_features.get(agent_name, {})
if not self.latest_computed_features:
return {}
all_features = list(self.latest_computed_features.values())
if not all_features:
return {}
return self._safe_aggregate_features(all_features)
def _safe_aggregate_features(self, all_features):
"""Type-aware feature aggregation across agents"""
avg_features = {}
feature_keys = all_features[0].keys()
for key in feature_keys:
values = [f[key] for f in all_features if key in f]
if not values:
continue
if key in BINARY_FEATURES:
# Voting for binary features
avg_features[key] = int(np.sum(values) > len(values) / 2)
elif key in PRICE_FEATURES:
# Median for price features (robust to outliers)
clean_values = [v for v in values if not np.isnan(v)]
if clean_values:
avg_features[key] = float(np.median(clean_values))
else:
avg_features[key] = 0.0
else:
# Mean for continuous features
clean_values = [v for v in values if not np.isnan(v)]
if clean_values:
avg_features[key] = float(np.mean(clean_values))
else:
avg_features[key] = 0.0
return avg_features
def get_feature_summary(self):
"""Get detailed feature summary"""
with self.features_lock:
if not self.latest_computed_features:
return "No features computed yet"
sample_agent = list(self.latest_computed_features.keys())[0]
features = self.latest_computed_features[sample_agent]
# Count only keys that are actually declared features in the
# contract. This is set-intersection, not set-difference — so
# it's correct regardless of whether envelope keys have leaked
# into the features dict or not.
actual_count = len(set(features.keys()) & FEATURE_CONTRACT.features)
summary = f"REGIME-ADAPTIVE FEATURE ENHANCER\n"
summary += "=" * 60 + "\n\n"
summary += f"Total Features: {actual_count} (Expected: 60)\n\n"
summary += "Feature Categories:\n"
summary += f" • Core Technical: 19 features\n"
summary += f" • Derivatives: 15 features\n"
summary += f" • Additional Technical: 7 features\n"
summary += f" • Candlestick Patterns: 9 features (institutional-grade)\n"
summary += f" • Support/Resistance: 7 features\n"
summary += f" • Price Variants: 3 features\n"
summary += f" • TOTAL: 60 features\n\n"
summary += f"Meta Features (24 total, published separately):\n"
summary += f" • Voting: 8 features\n"
summary += f" • Technical: 15 features\n"
summary += f" • Timestamp: 1 metadata\n\n"
summary += f"Regime Detection: INTERNAL (adaptive normalization)\n"
summary += f" • Volatility regimes: low/medium/high\n"
summary += f" • Trend detection: momentum-based\n"
summary += f" • Mean-reversion: entropy-based\n\n"
summary += f"Normalization: Regime-adaptive\n"
summary += f" • High vol → Robust scaling (IQR)\n"
summary += f" • Low vol → Standard z-score\n"
summary += f" • Excluded: {len(NORMALIZATION_EXCLUSIONS)} features\n\n"
summary += f"Aggregation: Type-aware\n"
summary += f" • Binary: Voting (majority rule)\n"
summary += f" • Price: Median (outlier-resistant)\n"
summary += f" • Continuous: Mean\n"
return summary
# ============================================================================
# ASYNC WRAPPER
# ============================================================================
class AsyncIntegratedFeatureEnhancer:
def __init__(self, ably_client, agent_names, window_size=100):
self.enhancer = IntegratedFeatureEnhancer(ably_client, agent_names, window_size)
self.ably = ably_client
self.agents = agent_names
self.running = False
self.channels = {}
def get_latest_state_features(self, agent_name=None):
return self.enhancer.get_latest_state_features(agent_name)
async def start(self):
self.running = True
logger.info("AsyncIntegratedFeatureEnhancer started")
logger.info("\n" + self.enhancer.get_feature_summary())
await self._start_ably_listeners()
async def _start_ably_listeners(self):
if not self.ably:
logger.error("No Ably client available")
return
if hasattr(self.ably, 'connection') and self.ably.connection.state != 'connected':
try:
self.ably.connection.connect()
for _ in range(20):
await asyncio.sleep(0.5)
if self.ably.connection.state == 'connected':
break
else:
logger.error("Failed to connect to Ably")
return
except Exception as e:
logger.error(f"Redis connection failed: {e}")
return
logger.info(f"Starting Ably listeners")
for agent in self.agents:
agent_str = agent.decode('utf-8') if isinstance(agent, bytes) else str(agent)
feature_ok = await self._subscribe_with_retry(
agent_str, "integrated-features",
lambda msg, name=agent_str: self._handle_feature_message(name, msg)
)
meta_ok = await self._subscribe_with_retry(
agent_str, "meta_features",
lambda msg, name=agent_str: self._handle_meta_features_message(name, msg),
channel_suffix="meta_features-"
)
if feature_ok:
logger.info(f"✓ [{agent_str}] Feature channel attached")
if meta_ok:
logger.info(f"✓ [{agent_str}] Meta features channel attached")
async def _subscribe_with_retry(self, agent_name, event_name, callback, max_retries=3, timeout=10, channel_suffix=""):
channel_name = f"{channel_suffix}{agent_name}" if channel_suffix else agent_name
for attempt in range(max_retries):
try:
channel = self.ably.channels.get(channel_name)
self.channels[channel_name] = channel
attach_task = asyncio.create_task(channel.attach())
try:
await asyncio.wait_for(attach_task, timeout=timeout)
except asyncio.TimeoutError:
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt)
continue
return False
subscribe_task = asyncio.create_task(channel.subscribe(event_name, callback))
try:
await asyncio.wait_for(subscribe_task, timeout=timeout)
return True
except asyncio.TimeoutError:
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt)
continue
return False
except Exception as e:
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt)
continue
return False
return False
async def process_tick(self, agent_name, price_data):
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self.enhancer.process_raw_tick, agent_name, price_data)
features = self.enhancer.get_latest_state_features(agent_name)
if features:
await self._publish_with_retry(agent_name, features, meta=False)
df = pd.DataFrame(list(self.enhancer.price_buffers[agent_name]))
if len(df) >= 10:
current_price = price_data.get('close', 0)
meta_features = self.enhancer.extract_meta_features(df, current_price)
if meta_features:
await self._publish_with_retry(agent_name, meta_features, meta=True)
async def _publish_with_retry(self, agent_name, features_dict, meta=False, tick_index=None):
channel_name = f"meta_features-{agent_name}" if meta else agent_name
event_name = "meta_features" if meta else "feature"
if channel_name not in self.channels:
self.channels[channel_name] = self.ably.channels.get(channel_name)
channel = self.channels[channel_name]
# Resolve tick_index from the features dict if not supplied
resolved_tick = tick_index
if resolved_tick is None and isinstance(features_dict, dict):
resolved_tick = features_dict.get('tick_count') or features_dict.get('tick_index')
payload = {
'agent': agent_name,
'features' if not meta else 'meta_features': features_dict,
'timestamp': datetime.now(UTC).isoformat(),
'tick_index': resolved_tick
}
for attempt in range(3):
try:
await channel.publish(event_name, payload)
break
except Exception as e:
await asyncio.sleep(2 ** attempt)
def _handle_feature_message(self, agent_name, msg):
logger.debug(f"[{agent_name}] Feature message received")
def _handle_meta_features_message(self, agent_name, msg):
logger.debug(f"[{agent_name}] Meta feature message received")
# ============================================================================
# MAIN EXECUTION - DERIV WEBSOCKET VERSION
# ============================================================================
async def main():
nest_asyncio.apply()
logger.info("=" * 80)
logger.info("🚀 REGIME-ADAPTIVE FEATURE ENHANCER - DERIV WEBSOCKET EDITION")
logger.info("=" * 80)
# Initialize Deriv WebSocket instead of MT5
if not await deriv_bridge.initialize(SYMBOL):
raise RuntimeError(f"❌ Deriv initialization failed")
logger.info(f"✅ Deriv WebSocket initialized")
logger.info(f" Symbol: {SYMBOL} -> {DERIV_SYMBOL}")
# Verify symbol
symbol_info = deriv_bridge.symbol_info(SYMBOL)
if symbol_info is None:
await deriv_bridge.shutdown()
raise RuntimeError(f"❌ Symbol {SYMBOL} not found")
logger.info(f"✅ Symbol verified")
logger.info("\n📡 Connecting to Redis (V25 namespace)...")
try:
ably_client = RedisAblyClient(redis_url=REDIS_URL, use_streams=True) # V25
await asyncio.sleep(1)
logger.info("✅ Redis connected (V25 — channels prefixed with '%s')" % CHANNEL_PREFIX)
except Exception as e:
await deriv_bridge.shutdown()
raise RuntimeError(f"❌ Redis connection failed: {e}")
logger.info("\n🔧 Initializing feature enhancers...")
agent_names = list(TIMEFRAMES.keys())
enhancer = AsyncIntegratedFeatureEnhancer(
ably_client=ably_client,
agent_names=agent_names,
window_size=100
)
await enhancer.start()
agent_channels = {tf: ably_client.channels.get(tf) for tf in TIMEFRAMES}
# =========================================================================
# BATCH SYNCHRONISATION — now handled by FeatureBatchGateway in Redis
# =========================================================================
# Features.py's responsibility is ONLY to publish each agent's features to
# its own per-agent Redis channel as soon as they are computed.
#
# The FeatureBatchGateway (in redis_connection_manager.py) subscribes to
# all 8 per-agent channels on the Quasar side and acts as the gating layer:
# • Accumulates per-agent contributions for each tick
# • DISCARDS any partial batch when a new tick_index arrives (waitlist discard)
# • Only fires on_batch_ready() when ALL 8 agents share the same tick/price
#
# This keeps Features.py simple (just publish, no coordination) and moves
# the synchronisation concern to the Redis transport layer where it belongs.
# =========================================================================
logger.info("\n✅ All systems initialized - Starting tick processing...\n")
tick_count = 0
last_summary_time = time.time()
feature_counts = {tf: 0 for tf in TIMEFRAMES}
# ── Rate-limit gate ───────────────────────────────────────────────────────
# Derived from observed p95 latencies in the QSAP health report:
# • Per-agent inference p95 ≈ 1552 ms
# • Dispatch latency p95 ≈ 1292 ms
# With all 8 agents running concurrently (asyncio.gather) the bottleneck
# is max(p95_inference) ≈ 1552 ms. 3 000 ms gives ~93 % headroom and
# guarantees the downstream QSAP never receives a stale tick.
MIN_TICK_INTERVAL = 60.0 # seconds — never dispatch faster than this
_processing = asyncio.Semaphore(1) # only one tick in-flight at a time
async def _process_one_agent(tf_name, price_data, timestamp):
"""Process and publish a single timeframe agent concurrently."""
try:
await enhancer.process_tick(tf_name, price_data)
features = enhancer.get_latest_state_features(tf_name)
if features:
feature_counts[tf_name] += 1
features_with_meta = {
**features,
'timestamp': timestamp.isoformat(),
'tick_count': tick_count,
'timeframe': tf_name,
}
# Publish to per-agent channel.
# The FeatureBatchGateway in redis_connection_manager.py
# subscribes to all 8 per-agent channels and fires a
# complete batch only when all agents share the same
# tick_index — discarding any partial/stale waitlist.
await agent_channels[tf_name].publish(
"integrated-features",
{
"agent": tf_name,
"features": features_with_meta,
"feature_count": len(features),
"tick_index": tick_count,
"price": price_data['close'], # raw Deriv tick — §0c
},
)
if feature_counts[tf_name] % 10 == 0:
logger.info(
f"✅ [{tf_name}] Tick #{tick_count}: "
f"60 features + meta | Price: {price_data['close']:.5f}"
)
except Exception as e:
logger.error(f"❌ [{tf_name}] Error: {e}")
try:
while True:
tick_start = time.monotonic()
try:
# Get tick from Deriv WebSocket instead of MT5
tick = deriv_bridge.symbol_info_tick(SYMBOL)
if tick is None:
await asyncio.sleep(0.5)
continue
tick_count += 1
mid_price = (tick.bid + tick.ask) / 2.0
timestamp = datetime.now(UTC)
price_data = {
'close': mid_price,
'high': tick.ask,
'low': tick.bid,
'open': mid_price,
'volume': getattr(tick, 'volume', 0),
}
# ── All 8 agents run CONCURRENTLY; next tick cannot start until
# every agent has finished computing and publishing. ─────────
async with _processing:
await asyncio.gather(
*[_process_one_agent(tf, price_data, timestamp)
for tf in TIMEFRAMES],
return_exceptions=True, # one agent error never kills others
)
if time.time() - last_summary_time > 60:
logger.info("\n" + "=" * 80)
logger.info(f"📊 SUMMARY (Tick #{tick_count})")
logger.info("=" * 80)
logger.info(f"Price: {mid_price:.5f}")
logger.info(f"Data Source: Deriv WebSocket (Streaming)")
for tf in TIMEFRAMES:
logger.info(f" {tf}: {feature_counts[tf]} updates")
logger.info("=" * 80 + "\n")
last_summary_time = time.time()
except KeyboardInterrupt:
break
except Exception as e:
logger.error(f"❌ Tick error: {e}")
# ── Completion-based gate ─────────────────────────────────────────
# Sleep only the time remaining to reach MIN_TICK_INTERVAL.
# If processing already took longer, sleep_for = 0 (no extra wait).
elapsed = time.monotonic() - tick_start
sleep_for = max(0.0, MIN_TICK_INTERVAL - elapsed)
logger.debug(
f"Tick #{tick_count} | processed in {elapsed*1000:.0f} ms "
f"| sleeping {sleep_for*1000:.0f} ms"
)
await asyncio.sleep(sleep_for)
finally:
logger.info("\n🛑 SHUTTING DOWN")
await deriv_bridge.shutdown()
logger.info(f"Total Ticks: {tick_count}")
logger.info("✅ Shutdown complete")
if __name__ == "__main__":
try:
nest_asyncio.apply()
asyncio.run(main())
except KeyboardInterrupt:
logger.info("\n⚠️ Interrupted by user")
except Exception as e:
logger.error(f"\n❌ Fatal error: {e}")
traceback.print_exc()
#+263780563561 ENG Karl Muzunze Masvingo Zimbabwe