quanthedge / backend /app /services /ml /pattern_recognition /advanced_features.py
jashdoshi77's picture
whole lotta changes
e6021a3
"""
Advanced Mathematical Features for Pattern Recognition.
Institutional-grade feature engineering beyond standard technical indicators.
These features capture deep market microstructure that traditional indicators miss.
Features:
- Fourier Transform (spectral analysis, dominant frequencies)
- Fractal Dimension (market roughness via Higuchi method)
- Hurst Exponent (trending vs mean-reverting via R/S analysis)
- Shannon Entropy (market randomness/uncertainty)
- Autocorrelation Decay (momentum persistence)
- Volume-Price Correlation (smart money detection)
- Trend Strength Index (custom composite)
- Market Microstructure Features (tick patterns, gap analysis)
"""
from __future__ import annotations
import logging
from typing import Any, Dict, List
import numpy as np
import pandas as pd
logger = logging.getLogger(__name__)
class AdvancedFeatureEngine:
"""Compute advanced mathematical features from OHLCV data."""
def compute_all(self, df: pd.DataFrame) -> Dict[str, Any]:
"""
Compute all advanced features and return as a flat dict.
Returns both raw feature values and a features DataFrame
that can be used for ML model input.
"""
if df.empty or len(df) < 30:
return {"error": "Insufficient data (need 30+ bars)", "features": {}}
close = df["Close"].values.astype(float)
high = df["High"].values.astype(float)
low = df["Low"].values.astype(float)
volume = df["Volume"].values.astype(float) if "Volume" in df.columns else np.ones(len(close))
features: Dict[str, Any] = {}
# 1. Fourier Transform Analysis
try:
ft = self._fourier_analysis(close)
features.update(ft)
except Exception as e:
logger.debug("Fourier analysis failed: %s", e)
# 2. Fractal Dimension (Higuchi)
try:
features["fractal_dimension"] = self._higuchi_fractal_dimension(close)
except Exception as e:
logger.debug("Fractal dimension failed: %s", e)
# 3. Hurst Exponent
try:
features["hurst_exponent"] = self._hurst_exponent(close)
features["hurst_regime"] = (
"trending" if features["hurst_exponent"] > 0.55
else "mean_reverting" if features["hurst_exponent"] < 0.45
else "random_walk"
)
except Exception as e:
logger.debug("Hurst exponent failed: %s", e)
# 4. Shannon Entropy
try:
features["entropy_returns"] = self._shannon_entropy(close, bins=20)
features["entropy_volume"] = self._shannon_entropy(volume, bins=20)
except Exception as e:
logger.debug("Entropy failed: %s", e)
# 5. Autocorrelation Decay
try:
acf = self._autocorrelation_profile(close)
features.update(acf)
except Exception as e:
logger.debug("Autocorrelation failed: %s", e)
# 6. Volume-Price Correlation
try:
vpc = self._volume_price_analysis(close, volume)
features.update(vpc)
except Exception as e:
logger.debug("Volume-price analysis failed: %s", e)
# 7. Trend Strength Index
try:
features["trend_strength"] = self._trend_strength_index(close, high, low)
except Exception as e:
logger.debug("Trend strength failed: %s", e)
# 8. Gap Analysis
try:
gap = self._gap_analysis(df)
features.update(gap)
except Exception as e:
logger.debug("Gap analysis failed: %s", e)
# 9. Price Efficiency Ratio
try:
features["price_efficiency"] = self._price_efficiency_ratio(close)
except Exception as e:
logger.debug("Price efficiency failed: %s", e)
# 10. Kurtosis & Skewness of returns
try:
returns = np.diff(np.log(close + 1e-10))
if len(returns) >= 10:
features["return_skewness"] = float(pd.Series(returns).skew())
features["return_kurtosis"] = float(pd.Series(returns).kurtosis())
except Exception as e:
logger.debug("Moments failed: %s", e)
return features
def compute_feature_series(self, df: pd.DataFrame, window: int = 20) -> pd.DataFrame:
"""
Compute rolling advanced features as a DataFrame for ML training.
Returns one row per bar with multiple feature columns.
"""
result = df.copy()
close = df["Close"].values.astype(float)
volume = df["Volume"].values.astype(float) if "Volume" in df.columns else np.ones(len(close))
n = len(df)
hurst_vals = np.full(n, np.nan)
fractal_vals = np.full(n, np.nan)
entropy_vals = np.full(n, np.nan)
efficiency_vals = np.full(n, np.nan)
trend_vals = np.full(n, np.nan)
for i in range(window, n):
segment = close[i - window:i + 1]
vol_seg = volume[i - window:i + 1]
try:
hurst_vals[i] = self._hurst_exponent(segment)
except Exception:
pass
try:
fractal_vals[i] = self._higuchi_fractal_dimension(segment)
except Exception:
pass
try:
entropy_vals[i] = self._shannon_entropy(segment, bins=10)
except Exception:
pass
try:
efficiency_vals[i] = self._price_efficiency_ratio(segment)
except Exception:
pass
try:
high_seg = df["High"].values[i - window:i + 1].astype(float)
low_seg = df["Low"].values[i - window:i + 1].astype(float)
trend_vals[i] = self._trend_strength_index(segment, high_seg, low_seg)
except Exception:
pass
result["hurst_exponent"] = hurst_vals
result["fractal_dimension"] = fractal_vals
result["entropy"] = entropy_vals
result["price_efficiency"] = efficiency_vals
result["trend_strength"] = trend_vals
# Return-based distribution features (rolling)
returns = pd.Series(np.log(df["Close"] / df["Close"].shift(1)))
result["return_skew_20"] = returns.rolling(window).skew()
result["return_kurtosis_20"] = returns.rolling(window).apply(
lambda x: float(pd.Series(x).kurtosis()), raw=False
)
return result
# ── Fourier Analysis ─────────────────────────────────────────────────
def _fourier_analysis(self, prices: np.ndarray, top_k: int = 5) -> Dict[str, Any]:
"""FFT spectral analysis of price series."""
log_prices = np.log(prices + 1e-10)
detrended = log_prices - np.linspace(log_prices[0], log_prices[-1], len(log_prices))
fft_vals = np.fft.rfft(detrended)
magnitudes = np.abs(fft_vals)
freqs = np.fft.rfftfreq(len(detrended))
# Skip DC component (index 0)
if len(magnitudes) > 1:
magnitudes = magnitudes[1:]
freqs = freqs[1:]
if len(magnitudes) == 0:
return {}
# Top dominant frequencies
top_indices = np.argsort(magnitudes)[-top_k:][::-1]
total_energy = np.sum(magnitudes ** 2)
dominant_periods = []
for idx in top_indices:
if freqs[idx] > 0:
period = 1.0 / freqs[idx]
energy_pct = (magnitudes[idx] ** 2) / total_energy * 100 if total_energy > 0 else 0
dominant_periods.append({
"period_bars": round(period, 1),
"energy_pct": round(energy_pct, 2),
})
# Spectral energy ratios
low_freq = magnitudes[:max(1, len(magnitudes)//4)]
high_freq = magnitudes[len(magnitudes)//4:]
low_energy = np.sum(low_freq ** 2)
high_energy = np.sum(high_freq ** 2)
return {
"fft_dominant_periods": dominant_periods[:3],
"fft_spectral_ratio": round(low_energy / (high_energy + 1e-10), 4),
"fft_total_energy": round(float(total_energy), 4),
}
# ── Fractal Dimension (Higuchi) ──────────────────────────────────────
def _higuchi_fractal_dimension(self, x: np.ndarray, k_max: int = 10) -> float:
"""Higuchi fractal dimension β€” measures market roughness."""
n = len(x)
if n < k_max + 1:
k_max = max(2, n // 2)
lk = np.zeros(k_max)
for k in range(1, k_max + 1):
lm_sum = 0.0
for m in range(1, k + 1):
indices = np.arange(0, (n - m) // k) * k + m - 1
if len(indices) < 2:
continue
segment = x[indices.astype(int)]
length = np.sum(np.abs(np.diff(segment))) * (n - 1) / (k * len(segment))
lm_sum += length
lk[k - 1] = lm_sum / k if k > 0 else 0
# Fit log-log regression
valid = lk > 0
if np.sum(valid) < 2:
return 1.5 # default
ks = np.arange(1, k_max + 1)[valid]
log_k = np.log(1.0 / ks)
log_lk = np.log(lk[valid])
slope = np.polyfit(log_k, log_lk, 1)[0]
return round(float(slope), 4)
# ── Hurst Exponent (R/S Analysis) ────────────────────────────────────
def _hurst_exponent(self, prices: np.ndarray) -> float:
"""
Rescaled range (R/S) Hurst exponent.
H > 0.5: trending, H < 0.5: mean-reverting, H β‰ˆ 0.5: random walk
"""
returns = np.diff(np.log(prices + 1e-10))
n = len(returns)
if n < 20:
return 0.5
max_k = min(n // 2, 100)
divisions = [d for d in range(10, max_k + 1, max(1, max_k // 20))]
if len(divisions) < 3:
return 0.5
rs_values = []
sizes = []
for d in divisions:
n_segments = n // d
if n_segments < 1:
continue
rs_list = []
for seg in range(n_segments):
segment = returns[seg * d:(seg + 1) * d]
mean_seg = np.mean(segment)
cumdev = np.cumsum(segment - mean_seg)
r = np.max(cumdev) - np.min(cumdev)
s = np.std(segment, ddof=1)
if s > 0:
rs_list.append(r / s)
if rs_list:
rs_values.append(np.mean(rs_list))
sizes.append(d)
if len(sizes) < 3:
return 0.5
log_sizes = np.log(np.array(sizes, dtype=float))
log_rs = np.log(np.array(rs_values, dtype=float))
slope = np.polyfit(log_sizes, log_rs, 1)[0]
return round(float(np.clip(slope, 0, 1)), 4)
# ── Shannon Entropy ──────────────────────────────────────────────────
def _shannon_entropy(self, data: np.ndarray, bins: int = 20) -> float:
"""Shannon entropy of data distribution. Higher = more random."""
if len(data) < 5:
return 0.0
hist, _ = np.histogram(data, bins=bins, density=True)
hist = hist[hist > 0]
if len(hist) == 0:
return 0.0
hist = hist / hist.sum() # normalize
return round(float(-np.sum(hist * np.log2(hist + 1e-12))), 4)
# ── Autocorrelation Profile ──────────────────────────────────────────
def _autocorrelation_profile(self, prices: np.ndarray) -> Dict[str, float]:
"""Compute autocorrelation at multiple lags."""
returns = np.diff(np.log(prices + 1e-10))
if len(returns) < 20:
return {}
result = {}
for lag in [1, 3, 5, 10, 20]:
if lag < len(returns):
acf = np.corrcoef(returns[lag:], returns[:-lag])[0, 1]
result[f"acf_lag_{lag}"] = round(float(acf), 4) if np.isfinite(acf) else 0.0
# Decay rate: how fast autocorrelation drops
acf_values = [result.get(f"acf_lag_{l}", 0) for l in [1, 5, 10, 20]]
result["acf_decay_rate"] = round(
float(np.polyfit(range(len(acf_values)), acf_values, 1)[0]), 6
)
return result
# ── Volume-Price Analysis ────────────────────────────────────────────
def _volume_price_analysis(
self, prices: np.ndarray, volume: np.ndarray
) -> Dict[str, float]:
"""Analyze volume-price relationship for smart money detection."""
returns = np.diff(np.log(prices + 1e-10))
vol = volume[1:] # align with returns
if len(returns) < 10:
return {}
# Volume-return correlation
corr = np.corrcoef(returns, vol)[0, 1]
abs_corr = np.corrcoef(np.abs(returns), vol)[0, 1]
# Volume on up vs down days
up_mask = returns > 0
down_mask = returns < 0
avg_up_vol = np.mean(vol[up_mask]) if up_mask.sum() > 0 else 0
avg_down_vol = np.mean(vol[down_mask]) if down_mask.sum() > 0 else 0
vol_asymmetry = (avg_up_vol - avg_down_vol) / (avg_up_vol + avg_down_vol + 1e-10)
return {
"volume_return_corr": round(float(corr), 4) if np.isfinite(corr) else 0,
"volume_abs_return_corr": round(float(abs_corr), 4) if np.isfinite(abs_corr) else 0,
"volume_asymmetry": round(float(vol_asymmetry), 4),
}
# ── Trend Strength Index ─────────────────────────────────────────────
def _trend_strength_index(
self, close: np.ndarray, high: np.ndarray, low: np.ndarray
) -> float:
"""
Custom composite trend strength (0 = no trend, 1 = strong trend).
Combines ADX-like directional movement with price efficiency.
"""
n = len(close)
if n < 14:
return 0.5
# Directional movement
dm_plus = np.maximum(np.diff(high), 0)
dm_minus = np.maximum(-np.diff(low), 0)
# Nullify weaker direction
both = dm_plus > dm_minus
dm_plus[~both] = 0
dm_minus[both] = 0
# Smoothed (14-period average)
period = min(14, len(dm_plus))
avg_dm_plus = np.mean(dm_plus[-period:])
avg_dm_minus = np.mean(dm_minus[-period:])
total_dm = avg_dm_plus + avg_dm_minus
if total_dm == 0:
return 0.0
dx = abs(avg_dm_plus - avg_dm_minus) / total_dm
# Combine with efficiency
efficiency = self._price_efficiency_ratio(close)
return round(float((dx + efficiency) / 2), 4)
# ── Price Efficiency Ratio ───────────────────────────────────────────
def _price_efficiency_ratio(self, prices: np.ndarray) -> float:
"""
Kaufman Efficiency Ratio: net movement / total path length.
1.0 = perfect trend, 0.0 = pure noise.
"""
if len(prices) < 5:
return 0.5
net_change = abs(prices[-1] - prices[0])
total_path = np.sum(np.abs(np.diff(prices)))
return round(float(net_change / (total_path + 1e-10)), 4)
# ── Gap Analysis ─────────────────────────────────────────────────────
def _gap_analysis(self, df: pd.DataFrame) -> Dict[str, Any]:
"""Analyze price gaps for institutional activity detection."""
if len(df) < 10:
return {}
opens = df["Open"].values
prev_closes = df["Close"].shift(1).values
gaps = (opens[1:] - prev_closes[1:]) / (prev_closes[1:] + 1e-10)
gap_ups = gaps[gaps > 0.005]
gap_downs = gaps[gaps < -0.005]
return {
"gap_up_count_20": int(np.sum(gaps[-20:] > 0.005)) if len(gaps) >= 20 else 0,
"gap_down_count_20": int(np.sum(gaps[-20:] < -0.005)) if len(gaps) >= 20 else 0,
"avg_gap_size": round(float(np.mean(np.abs(gaps[-20:])) * 100), 4) if len(gaps) >= 20 else 0,
"max_gap_pct": round(float(np.max(np.abs(gaps[-20:])) * 100), 4) if len(gaps) >= 20 else 0,
}
# Module singleton
advanced_feature_engine = AdvancedFeatureEngine()