wunder-rnn-gru-ensemble / src /data /causal_features.py
msrishav's picture
Add inference code, config, and technical report
e68eb1c verified
Raw
History Blame Contribute Delete
3.95 kB
"""Causal online feature builders shared by Phase 2 tabular baselines."""
from __future__ import annotations
import hashlib
import json
import numpy as np
COMPACT_V1_WINDOWS = (3, 5, 10, 20, 50, 100)
EXPANDED_V2_LAGS = (1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610)
EXPANDED_V2_EWMA_ALPHAS = (0.02, 0.05, 0.10, 0.20, 0.40)
EXPANDED_V2_WINDOWS = (3, 5, 10, 20, 50, 100, 200, 500)
def _validate_history(history: np.ndarray) -> np.ndarray:
history = np.asarray(history, dtype=np.float32)
if history.ndim != 2 or history.shape[1] != 32:
raise ValueError(f"Expected history shape (t, 32), got {history.shape}")
if len(history) == 0:
raise ValueError("history must contain at least one state")
return history
def _ewma(history: np.ndarray, alpha: float) -> np.ndarray:
state = history[0].astype(np.float32, copy=True)
for row in history[1:]:
state = alpha * row + (1.0 - alpha) * state
return state.astype(np.float32)
def build_compact_v1_features(history: np.ndarray) -> np.ndarray:
"""Build the original compact raw-32 causal feature vector."""
history = _validate_history(history)
current = history[-1]
prev = history[-2] if len(history) >= 2 else np.zeros_like(current)
delta1 = current - prev
features = [current, delta1]
for window in COMPACT_V1_WINDOWS:
chunk = history[-window:]
mean = chunk.mean(axis=0)
features.extend([mean, chunk.std(axis=0), current - mean])
return np.concatenate(features).astype(np.float32)
def build_expanded_v2_features(history: np.ndarray) -> np.ndarray:
"""Build the Phase 2 max-safe expanded causal feature vector."""
history = _validate_history(history)
current = history[-1]
features = [current]
for lag in EXPANDED_V2_LAGS:
lagged = history[-1 - lag] if len(history) > lag else history[0]
features.extend([lagged, current - lagged])
for alpha in EXPANDED_V2_EWMA_ALPHAS:
smoothed = _ewma(history, alpha)
features.extend([smoothed, current - smoothed])
for window in EXPANDED_V2_WINDOWS:
chunk = history[-window:]
mean = chunk.mean(axis=0)
features.extend(
[
mean,
chunk.std(axis=0),
chunk.min(axis=0),
chunk.max(axis=0),
current - mean,
]
)
return np.concatenate(features).astype(np.float32)
def build_causal_tabular_features(history: np.ndarray, schema: str = "compact_v1") -> np.ndarray:
if schema == "compact_v1":
return build_compact_v1_features(history)
if schema == "expanded_v2":
return build_expanded_v2_features(history)
raise ValueError(f"Unknown feature schema: {schema}")
def feature_width(schema: str) -> int:
return int(build_causal_tabular_features(np.zeros((1, 32), dtype=np.float32), schema).shape[0])
def infer_feature_schema_from_width(width: int) -> str:
width = int(width)
for schema in ("compact_v1", "expanded_v2"):
if feature_width(schema) == width:
return schema
raise ValueError(f"No known causal feature schema has width {width}")
def feature_schema_payload(schema: str) -> dict:
if schema == "compact_v1":
return {
"schema": schema,
"windows": list(COMPACT_V1_WINDOWS),
"width": feature_width(schema),
}
if schema == "expanded_v2":
return {
"schema": schema,
"lags": list(EXPANDED_V2_LAGS),
"ewma_alphas": list(EXPANDED_V2_EWMA_ALPHAS),
"windows": list(EXPANDED_V2_WINDOWS),
"width": feature_width(schema),
}
raise ValueError(f"Unknown feature schema: {schema}")
def feature_schema_hash(schema: str) -> str:
payload = json.dumps(feature_schema_payload(schema), sort_keys=True).encode("utf-8")
return hashlib.sha256(payload).hexdigest()