""" Advanced Mathematical Features for Pattern Recognition. Institutional-grade feature engineering beyond standard technical indicators. These features capture deep market microstructure that traditional indicators miss. Features: - Fourier Transform (spectral analysis, dominant frequencies) - Fractal Dimension (market roughness via Higuchi method) - Hurst Exponent (trending vs mean-reverting via R/S analysis) - Shannon Entropy (market randomness/uncertainty) - Autocorrelation Decay (momentum persistence) - Volume-Price Correlation (smart money detection) - Trend Strength Index (custom composite) - Market Microstructure Features (tick patterns, gap analysis) """ from __future__ import annotations import logging from typing import Any, Dict, List import numpy as np import pandas as pd logger = logging.getLogger(__name__) class AdvancedFeatureEngine: """Compute advanced mathematical features from OHLCV data.""" def compute_all(self, df: pd.DataFrame) -> Dict[str, Any]: """ Compute all advanced features and return as a flat dict. Returns both raw feature values and a features DataFrame that can be used for ML model input. """ if df.empty or len(df) < 30: return {"error": "Insufficient data (need 30+ bars)", "features": {}} close = df["Close"].values.astype(float) high = df["High"].values.astype(float) low = df["Low"].values.astype(float) volume = df["Volume"].values.astype(float) if "Volume" in df.columns else np.ones(len(close)) features: Dict[str, Any] = {} # 1. Fourier Transform Analysis try: ft = self._fourier_analysis(close) features.update(ft) except Exception as e: logger.debug("Fourier analysis failed: %s", e) # 2. Fractal Dimension (Higuchi) try: features["fractal_dimension"] = self._higuchi_fractal_dimension(close) except Exception as e: logger.debug("Fractal dimension failed: %s", e) # 3. Hurst Exponent try: features["hurst_exponent"] = self._hurst_exponent(close) features["hurst_regime"] = ( "trending" if features["hurst_exponent"] > 0.55 else "mean_reverting" if features["hurst_exponent"] < 0.45 else "random_walk" ) except Exception as e: logger.debug("Hurst exponent failed: %s", e) # 4. Shannon Entropy try: features["entropy_returns"] = self._shannon_entropy(close, bins=20) features["entropy_volume"] = self._shannon_entropy(volume, bins=20) except Exception as e: logger.debug("Entropy failed: %s", e) # 5. Autocorrelation Decay try: acf = self._autocorrelation_profile(close) features.update(acf) except Exception as e: logger.debug("Autocorrelation failed: %s", e) # 6. Volume-Price Correlation try: vpc = self._volume_price_analysis(close, volume) features.update(vpc) except Exception as e: logger.debug("Volume-price analysis failed: %s", e) # 7. Trend Strength Index try: features["trend_strength"] = self._trend_strength_index(close, high, low) except Exception as e: logger.debug("Trend strength failed: %s", e) # 8. Gap Analysis try: gap = self._gap_analysis(df) features.update(gap) except Exception as e: logger.debug("Gap analysis failed: %s", e) # 9. Price Efficiency Ratio try: features["price_efficiency"] = self._price_efficiency_ratio(close) except Exception as e: logger.debug("Price efficiency failed: %s", e) # 10. Kurtosis & Skewness of returns try: returns = np.diff(np.log(close + 1e-10)) if len(returns) >= 10: features["return_skewness"] = float(pd.Series(returns).skew()) features["return_kurtosis"] = float(pd.Series(returns).kurtosis()) except Exception as e: logger.debug("Moments failed: %s", e) return features def compute_feature_series(self, df: pd.DataFrame, window: int = 20) -> pd.DataFrame: """ Compute rolling advanced features as a DataFrame for ML training. Returns one row per bar with multiple feature columns. """ result = df.copy() close = df["Close"].values.astype(float) volume = df["Volume"].values.astype(float) if "Volume" in df.columns else np.ones(len(close)) n = len(df) hurst_vals = np.full(n, np.nan) fractal_vals = np.full(n, np.nan) entropy_vals = np.full(n, np.nan) efficiency_vals = np.full(n, np.nan) trend_vals = np.full(n, np.nan) for i in range(window, n): segment = close[i - window:i + 1] vol_seg = volume[i - window:i + 1] try: hurst_vals[i] = self._hurst_exponent(segment) except Exception: pass try: fractal_vals[i] = self._higuchi_fractal_dimension(segment) except Exception: pass try: entropy_vals[i] = self._shannon_entropy(segment, bins=10) except Exception: pass try: efficiency_vals[i] = self._price_efficiency_ratio(segment) except Exception: pass try: high_seg = df["High"].values[i - window:i + 1].astype(float) low_seg = df["Low"].values[i - window:i + 1].astype(float) trend_vals[i] = self._trend_strength_index(segment, high_seg, low_seg) except Exception: pass result["hurst_exponent"] = hurst_vals result["fractal_dimension"] = fractal_vals result["entropy"] = entropy_vals result["price_efficiency"] = efficiency_vals result["trend_strength"] = trend_vals # Return-based distribution features (rolling) returns = pd.Series(np.log(df["Close"] / df["Close"].shift(1))) result["return_skew_20"] = returns.rolling(window).skew() result["return_kurtosis_20"] = returns.rolling(window).apply( lambda x: float(pd.Series(x).kurtosis()), raw=False ) return result # ── Fourier Analysis ───────────────────────────────────────────────── def _fourier_analysis(self, prices: np.ndarray, top_k: int = 5) -> Dict[str, Any]: """FFT spectral analysis of price series.""" log_prices = np.log(prices + 1e-10) detrended = log_prices - np.linspace(log_prices[0], log_prices[-1], len(log_prices)) fft_vals = np.fft.rfft(detrended) magnitudes = np.abs(fft_vals) freqs = np.fft.rfftfreq(len(detrended)) # Skip DC component (index 0) if len(magnitudes) > 1: magnitudes = magnitudes[1:] freqs = freqs[1:] if len(magnitudes) == 0: return {} # Top dominant frequencies top_indices = np.argsort(magnitudes)[-top_k:][::-1] total_energy = np.sum(magnitudes ** 2) dominant_periods = [] for idx in top_indices: if freqs[idx] > 0: period = 1.0 / freqs[idx] energy_pct = (magnitudes[idx] ** 2) / total_energy * 100 if total_energy > 0 else 0 dominant_periods.append({ "period_bars": round(period, 1), "energy_pct": round(energy_pct, 2), }) # Spectral energy ratios low_freq = magnitudes[:max(1, len(magnitudes)//4)] high_freq = magnitudes[len(magnitudes)//4:] low_energy = np.sum(low_freq ** 2) high_energy = np.sum(high_freq ** 2) return { "fft_dominant_periods": dominant_periods[:3], "fft_spectral_ratio": round(low_energy / (high_energy + 1e-10), 4), "fft_total_energy": round(float(total_energy), 4), } # ── Fractal Dimension (Higuchi) ────────────────────────────────────── def _higuchi_fractal_dimension(self, x: np.ndarray, k_max: int = 10) -> float: """Higuchi fractal dimension — measures market roughness.""" n = len(x) if n < k_max + 1: k_max = max(2, n // 2) lk = np.zeros(k_max) for k in range(1, k_max + 1): lm_sum = 0.0 for m in range(1, k + 1): indices = np.arange(0, (n - m) // k) * k + m - 1 if len(indices) < 2: continue segment = x[indices.astype(int)] length = np.sum(np.abs(np.diff(segment))) * (n - 1) / (k * len(segment)) lm_sum += length lk[k - 1] = lm_sum / k if k > 0 else 0 # Fit log-log regression valid = lk > 0 if np.sum(valid) < 2: return 1.5 # default ks = np.arange(1, k_max + 1)[valid] log_k = np.log(1.0 / ks) log_lk = np.log(lk[valid]) slope = np.polyfit(log_k, log_lk, 1)[0] return round(float(slope), 4) # ── Hurst Exponent (R/S Analysis) ──────────────────────────────────── def _hurst_exponent(self, prices: np.ndarray) -> float: """ Rescaled range (R/S) Hurst exponent. H > 0.5: trending, H < 0.5: mean-reverting, H ≈ 0.5: random walk """ returns = np.diff(np.log(prices + 1e-10)) n = len(returns) if n < 20: return 0.5 max_k = min(n // 2, 100) divisions = [d for d in range(10, max_k + 1, max(1, max_k // 20))] if len(divisions) < 3: return 0.5 rs_values = [] sizes = [] for d in divisions: n_segments = n // d if n_segments < 1: continue rs_list = [] for seg in range(n_segments): segment = returns[seg * d:(seg + 1) * d] mean_seg = np.mean(segment) cumdev = np.cumsum(segment - mean_seg) r = np.max(cumdev) - np.min(cumdev) s = np.std(segment, ddof=1) if s > 0: rs_list.append(r / s) if rs_list: rs_values.append(np.mean(rs_list)) sizes.append(d) if len(sizes) < 3: return 0.5 log_sizes = np.log(np.array(sizes, dtype=float)) log_rs = np.log(np.array(rs_values, dtype=float)) slope = np.polyfit(log_sizes, log_rs, 1)[0] return round(float(np.clip(slope, 0, 1)), 4) # ── Shannon Entropy ────────────────────────────────────────────────── def _shannon_entropy(self, data: np.ndarray, bins: int = 20) -> float: """Shannon entropy of data distribution. Higher = more random.""" if len(data) < 5: return 0.0 hist, _ = np.histogram(data, bins=bins, density=True) hist = hist[hist > 0] if len(hist) == 0: return 0.0 hist = hist / hist.sum() # normalize return round(float(-np.sum(hist * np.log2(hist + 1e-12))), 4) # ── Autocorrelation Profile ────────────────────────────────────────── def _autocorrelation_profile(self, prices: np.ndarray) -> Dict[str, float]: """Compute autocorrelation at multiple lags.""" returns = np.diff(np.log(prices + 1e-10)) if len(returns) < 20: return {} result = {} for lag in [1, 3, 5, 10, 20]: if lag < len(returns): acf = np.corrcoef(returns[lag:], returns[:-lag])[0, 1] result[f"acf_lag_{lag}"] = round(float(acf), 4) if np.isfinite(acf) else 0.0 # Decay rate: how fast autocorrelation drops acf_values = [result.get(f"acf_lag_{l}", 0) for l in [1, 5, 10, 20]] result["acf_decay_rate"] = round( float(np.polyfit(range(len(acf_values)), acf_values, 1)[0]), 6 ) return result # ── Volume-Price Analysis ──────────────────────────────────────────── def _volume_price_analysis( self, prices: np.ndarray, volume: np.ndarray ) -> Dict[str, float]: """Analyze volume-price relationship for smart money detection.""" returns = np.diff(np.log(prices + 1e-10)) vol = volume[1:] # align with returns if len(returns) < 10: return {} # Volume-return correlation corr = np.corrcoef(returns, vol)[0, 1] abs_corr = np.corrcoef(np.abs(returns), vol)[0, 1] # Volume on up vs down days up_mask = returns > 0 down_mask = returns < 0 avg_up_vol = np.mean(vol[up_mask]) if up_mask.sum() > 0 else 0 avg_down_vol = np.mean(vol[down_mask]) if down_mask.sum() > 0 else 0 vol_asymmetry = (avg_up_vol - avg_down_vol) / (avg_up_vol + avg_down_vol + 1e-10) return { "volume_return_corr": round(float(corr), 4) if np.isfinite(corr) else 0, "volume_abs_return_corr": round(float(abs_corr), 4) if np.isfinite(abs_corr) else 0, "volume_asymmetry": round(float(vol_asymmetry), 4), } # ── Trend Strength Index ───────────────────────────────────────────── def _trend_strength_index( self, close: np.ndarray, high: np.ndarray, low: np.ndarray ) -> float: """ Custom composite trend strength (0 = no trend, 1 = strong trend). Combines ADX-like directional movement with price efficiency. """ n = len(close) if n < 14: return 0.5 # Directional movement dm_plus = np.maximum(np.diff(high), 0) dm_minus = np.maximum(-np.diff(low), 0) # Nullify weaker direction both = dm_plus > dm_minus dm_plus[~both] = 0 dm_minus[both] = 0 # Smoothed (14-period average) period = min(14, len(dm_plus)) avg_dm_plus = np.mean(dm_plus[-period:]) avg_dm_minus = np.mean(dm_minus[-period:]) total_dm = avg_dm_plus + avg_dm_minus if total_dm == 0: return 0.0 dx = abs(avg_dm_plus - avg_dm_minus) / total_dm # Combine with efficiency efficiency = self._price_efficiency_ratio(close) return round(float((dx + efficiency) / 2), 4) # ── Price Efficiency Ratio ─────────────────────────────────────────── def _price_efficiency_ratio(self, prices: np.ndarray) -> float: """ Kaufman Efficiency Ratio: net movement / total path length. 1.0 = perfect trend, 0.0 = pure noise. """ if len(prices) < 5: return 0.5 net_change = abs(prices[-1] - prices[0]) total_path = np.sum(np.abs(np.diff(prices))) return round(float(net_change / (total_path + 1e-10)), 4) # ── Gap Analysis ───────────────────────────────────────────────────── def _gap_analysis(self, df: pd.DataFrame) -> Dict[str, Any]: """Analyze price gaps for institutional activity detection.""" if len(df) < 10: return {} opens = df["Open"].values prev_closes = df["Close"].shift(1).values gaps = (opens[1:] - prev_closes[1:]) / (prev_closes[1:] + 1e-10) gap_ups = gaps[gaps > 0.005] gap_downs = gaps[gaps < -0.005] return { "gap_up_count_20": int(np.sum(gaps[-20:] > 0.005)) if len(gaps) >= 20 else 0, "gap_down_count_20": int(np.sum(gaps[-20:] < -0.005)) if len(gaps) >= 20 else 0, "avg_gap_size": round(float(np.mean(np.abs(gaps[-20:])) * 100), 4) if len(gaps) >= 20 else 0, "max_gap_pct": round(float(np.max(np.abs(gaps[-20:])) * 100), 4) if len(gaps) >= 20 else 0, } # Module singleton advanced_feature_engine = AdvancedFeatureEngine()