Spaces:
Running
Running
| """ | |
| Advanced Mathematical Features for Pattern Recognition. | |
| Institutional-grade feature engineering beyond standard technical indicators. | |
| These features capture deep market microstructure that traditional indicators miss. | |
| Features: | |
| - Fourier Transform (spectral analysis, dominant frequencies) | |
| - Fractal Dimension (market roughness via Higuchi method) | |
| - Hurst Exponent (trending vs mean-reverting via R/S analysis) | |
| - Shannon Entropy (market randomness/uncertainty) | |
| - Autocorrelation Decay (momentum persistence) | |
| - Volume-Price Correlation (smart money detection) | |
| - Trend Strength Index (custom composite) | |
| - Market Microstructure Features (tick patterns, gap analysis) | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| from typing import Any, Dict, List | |
| import numpy as np | |
| import pandas as pd | |
| logger = logging.getLogger(__name__) | |
| class AdvancedFeatureEngine: | |
| """Compute advanced mathematical features from OHLCV data.""" | |
| def compute_all(self, df: pd.DataFrame) -> Dict[str, Any]: | |
| """ | |
| Compute all advanced features and return as a flat dict. | |
| Returns both raw feature values and a features DataFrame | |
| that can be used for ML model input. | |
| """ | |
| if df.empty or len(df) < 30: | |
| return {"error": "Insufficient data (need 30+ bars)", "features": {}} | |
| close = df["Close"].values.astype(float) | |
| high = df["High"].values.astype(float) | |
| low = df["Low"].values.astype(float) | |
| volume = df["Volume"].values.astype(float) if "Volume" in df.columns else np.ones(len(close)) | |
| features: Dict[str, Any] = {} | |
| # 1. Fourier Transform Analysis | |
| try: | |
| ft = self._fourier_analysis(close) | |
| features.update(ft) | |
| except Exception as e: | |
| logger.debug("Fourier analysis failed: %s", e) | |
| # 2. Fractal Dimension (Higuchi) | |
| try: | |
| features["fractal_dimension"] = self._higuchi_fractal_dimension(close) | |
| except Exception as e: | |
| logger.debug("Fractal dimension failed: %s", e) | |
| # 3. Hurst Exponent | |
| try: | |
| features["hurst_exponent"] = self._hurst_exponent(close) | |
| features["hurst_regime"] = ( | |
| "trending" if features["hurst_exponent"] > 0.55 | |
| else "mean_reverting" if features["hurst_exponent"] < 0.45 | |
| else "random_walk" | |
| ) | |
| except Exception as e: | |
| logger.debug("Hurst exponent failed: %s", e) | |
| # 4. Shannon Entropy | |
| try: | |
| features["entropy_returns"] = self._shannon_entropy(close, bins=20) | |
| features["entropy_volume"] = self._shannon_entropy(volume, bins=20) | |
| except Exception as e: | |
| logger.debug("Entropy failed: %s", e) | |
| # 5. Autocorrelation Decay | |
| try: | |
| acf = self._autocorrelation_profile(close) | |
| features.update(acf) | |
| except Exception as e: | |
| logger.debug("Autocorrelation failed: %s", e) | |
| # 6. Volume-Price Correlation | |
| try: | |
| vpc = self._volume_price_analysis(close, volume) | |
| features.update(vpc) | |
| except Exception as e: | |
| logger.debug("Volume-price analysis failed: %s", e) | |
| # 7. Trend Strength Index | |
| try: | |
| features["trend_strength"] = self._trend_strength_index(close, high, low) | |
| except Exception as e: | |
| logger.debug("Trend strength failed: %s", e) | |
| # 8. Gap Analysis | |
| try: | |
| gap = self._gap_analysis(df) | |
| features.update(gap) | |
| except Exception as e: | |
| logger.debug("Gap analysis failed: %s", e) | |
| # 9. Price Efficiency Ratio | |
| try: | |
| features["price_efficiency"] = self._price_efficiency_ratio(close) | |
| except Exception as e: | |
| logger.debug("Price efficiency failed: %s", e) | |
| # 10. Kurtosis & Skewness of returns | |
| try: | |
| returns = np.diff(np.log(close + 1e-10)) | |
| if len(returns) >= 10: | |
| features["return_skewness"] = float(pd.Series(returns).skew()) | |
| features["return_kurtosis"] = float(pd.Series(returns).kurtosis()) | |
| except Exception as e: | |
| logger.debug("Moments failed: %s", e) | |
| return features | |
| def compute_feature_series(self, df: pd.DataFrame, window: int = 20) -> pd.DataFrame: | |
| """ | |
| Compute rolling advanced features as a DataFrame for ML training. | |
| Returns one row per bar with multiple feature columns. | |
| """ | |
| result = df.copy() | |
| close = df["Close"].values.astype(float) | |
| volume = df["Volume"].values.astype(float) if "Volume" in df.columns else np.ones(len(close)) | |
| n = len(df) | |
| hurst_vals = np.full(n, np.nan) | |
| fractal_vals = np.full(n, np.nan) | |
| entropy_vals = np.full(n, np.nan) | |
| efficiency_vals = np.full(n, np.nan) | |
| trend_vals = np.full(n, np.nan) | |
| for i in range(window, n): | |
| segment = close[i - window:i + 1] | |
| vol_seg = volume[i - window:i + 1] | |
| try: | |
| hurst_vals[i] = self._hurst_exponent(segment) | |
| except Exception: | |
| pass | |
| try: | |
| fractal_vals[i] = self._higuchi_fractal_dimension(segment) | |
| except Exception: | |
| pass | |
| try: | |
| entropy_vals[i] = self._shannon_entropy(segment, bins=10) | |
| except Exception: | |
| pass | |
| try: | |
| efficiency_vals[i] = self._price_efficiency_ratio(segment) | |
| except Exception: | |
| pass | |
| try: | |
| high_seg = df["High"].values[i - window:i + 1].astype(float) | |
| low_seg = df["Low"].values[i - window:i + 1].astype(float) | |
| trend_vals[i] = self._trend_strength_index(segment, high_seg, low_seg) | |
| except Exception: | |
| pass | |
| result["hurst_exponent"] = hurst_vals | |
| result["fractal_dimension"] = fractal_vals | |
| result["entropy"] = entropy_vals | |
| result["price_efficiency"] = efficiency_vals | |
| result["trend_strength"] = trend_vals | |
| # Return-based distribution features (rolling) | |
| returns = pd.Series(np.log(df["Close"] / df["Close"].shift(1))) | |
| result["return_skew_20"] = returns.rolling(window).skew() | |
| result["return_kurtosis_20"] = returns.rolling(window).apply( | |
| lambda x: float(pd.Series(x).kurtosis()), raw=False | |
| ) | |
| return result | |
| # ββ Fourier Analysis βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _fourier_analysis(self, prices: np.ndarray, top_k: int = 5) -> Dict[str, Any]: | |
| """FFT spectral analysis of price series.""" | |
| log_prices = np.log(prices + 1e-10) | |
| detrended = log_prices - np.linspace(log_prices[0], log_prices[-1], len(log_prices)) | |
| fft_vals = np.fft.rfft(detrended) | |
| magnitudes = np.abs(fft_vals) | |
| freqs = np.fft.rfftfreq(len(detrended)) | |
| # Skip DC component (index 0) | |
| if len(magnitudes) > 1: | |
| magnitudes = magnitudes[1:] | |
| freqs = freqs[1:] | |
| if len(magnitudes) == 0: | |
| return {} | |
| # Top dominant frequencies | |
| top_indices = np.argsort(magnitudes)[-top_k:][::-1] | |
| total_energy = np.sum(magnitudes ** 2) | |
| dominant_periods = [] | |
| for idx in top_indices: | |
| if freqs[idx] > 0: | |
| period = 1.0 / freqs[idx] | |
| energy_pct = (magnitudes[idx] ** 2) / total_energy * 100 if total_energy > 0 else 0 | |
| dominant_periods.append({ | |
| "period_bars": round(period, 1), | |
| "energy_pct": round(energy_pct, 2), | |
| }) | |
| # Spectral energy ratios | |
| low_freq = magnitudes[:max(1, len(magnitudes)//4)] | |
| high_freq = magnitudes[len(magnitudes)//4:] | |
| low_energy = np.sum(low_freq ** 2) | |
| high_energy = np.sum(high_freq ** 2) | |
| return { | |
| "fft_dominant_periods": dominant_periods[:3], | |
| "fft_spectral_ratio": round(low_energy / (high_energy + 1e-10), 4), | |
| "fft_total_energy": round(float(total_energy), 4), | |
| } | |
| # ββ Fractal Dimension (Higuchi) ββββββββββββββββββββββββββββββββββββββ | |
| def _higuchi_fractal_dimension(self, x: np.ndarray, k_max: int = 10) -> float: | |
| """Higuchi fractal dimension β measures market roughness.""" | |
| n = len(x) | |
| if n < k_max + 1: | |
| k_max = max(2, n // 2) | |
| lk = np.zeros(k_max) | |
| for k in range(1, k_max + 1): | |
| lm_sum = 0.0 | |
| for m in range(1, k + 1): | |
| indices = np.arange(0, (n - m) // k) * k + m - 1 | |
| if len(indices) < 2: | |
| continue | |
| segment = x[indices.astype(int)] | |
| length = np.sum(np.abs(np.diff(segment))) * (n - 1) / (k * len(segment)) | |
| lm_sum += length | |
| lk[k - 1] = lm_sum / k if k > 0 else 0 | |
| # Fit log-log regression | |
| valid = lk > 0 | |
| if np.sum(valid) < 2: | |
| return 1.5 # default | |
| ks = np.arange(1, k_max + 1)[valid] | |
| log_k = np.log(1.0 / ks) | |
| log_lk = np.log(lk[valid]) | |
| slope = np.polyfit(log_k, log_lk, 1)[0] | |
| return round(float(slope), 4) | |
| # ββ Hurst Exponent (R/S Analysis) ββββββββββββββββββββββββββββββββββββ | |
| def _hurst_exponent(self, prices: np.ndarray) -> float: | |
| """ | |
| Rescaled range (R/S) Hurst exponent. | |
| H > 0.5: trending, H < 0.5: mean-reverting, H β 0.5: random walk | |
| """ | |
| returns = np.diff(np.log(prices + 1e-10)) | |
| n = len(returns) | |
| if n < 20: | |
| return 0.5 | |
| max_k = min(n // 2, 100) | |
| divisions = [d for d in range(10, max_k + 1, max(1, max_k // 20))] | |
| if len(divisions) < 3: | |
| return 0.5 | |
| rs_values = [] | |
| sizes = [] | |
| for d in divisions: | |
| n_segments = n // d | |
| if n_segments < 1: | |
| continue | |
| rs_list = [] | |
| for seg in range(n_segments): | |
| segment = returns[seg * d:(seg + 1) * d] | |
| mean_seg = np.mean(segment) | |
| cumdev = np.cumsum(segment - mean_seg) | |
| r = np.max(cumdev) - np.min(cumdev) | |
| s = np.std(segment, ddof=1) | |
| if s > 0: | |
| rs_list.append(r / s) | |
| if rs_list: | |
| rs_values.append(np.mean(rs_list)) | |
| sizes.append(d) | |
| if len(sizes) < 3: | |
| return 0.5 | |
| log_sizes = np.log(np.array(sizes, dtype=float)) | |
| log_rs = np.log(np.array(rs_values, dtype=float)) | |
| slope = np.polyfit(log_sizes, log_rs, 1)[0] | |
| return round(float(np.clip(slope, 0, 1)), 4) | |
| # ββ Shannon Entropy ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _shannon_entropy(self, data: np.ndarray, bins: int = 20) -> float: | |
| """Shannon entropy of data distribution. Higher = more random.""" | |
| if len(data) < 5: | |
| return 0.0 | |
| hist, _ = np.histogram(data, bins=bins, density=True) | |
| hist = hist[hist > 0] | |
| if len(hist) == 0: | |
| return 0.0 | |
| hist = hist / hist.sum() # normalize | |
| return round(float(-np.sum(hist * np.log2(hist + 1e-12))), 4) | |
| # ββ Autocorrelation Profile ββββββββββββββββββββββββββββββββββββββββββ | |
| def _autocorrelation_profile(self, prices: np.ndarray) -> Dict[str, float]: | |
| """Compute autocorrelation at multiple lags.""" | |
| returns = np.diff(np.log(prices + 1e-10)) | |
| if len(returns) < 20: | |
| return {} | |
| result = {} | |
| for lag in [1, 3, 5, 10, 20]: | |
| if lag < len(returns): | |
| acf = np.corrcoef(returns[lag:], returns[:-lag])[0, 1] | |
| result[f"acf_lag_{lag}"] = round(float(acf), 4) if np.isfinite(acf) else 0.0 | |
| # Decay rate: how fast autocorrelation drops | |
| acf_values = [result.get(f"acf_lag_{l}", 0) for l in [1, 5, 10, 20]] | |
| result["acf_decay_rate"] = round( | |
| float(np.polyfit(range(len(acf_values)), acf_values, 1)[0]), 6 | |
| ) | |
| return result | |
| # ββ Volume-Price Analysis ββββββββββββββββββββββββββββββββββββββββββββ | |
| def _volume_price_analysis( | |
| self, prices: np.ndarray, volume: np.ndarray | |
| ) -> Dict[str, float]: | |
| """Analyze volume-price relationship for smart money detection.""" | |
| returns = np.diff(np.log(prices + 1e-10)) | |
| vol = volume[1:] # align with returns | |
| if len(returns) < 10: | |
| return {} | |
| # Volume-return correlation | |
| corr = np.corrcoef(returns, vol)[0, 1] | |
| abs_corr = np.corrcoef(np.abs(returns), vol)[0, 1] | |
| # Volume on up vs down days | |
| up_mask = returns > 0 | |
| down_mask = returns < 0 | |
| avg_up_vol = np.mean(vol[up_mask]) if up_mask.sum() > 0 else 0 | |
| avg_down_vol = np.mean(vol[down_mask]) if down_mask.sum() > 0 else 0 | |
| vol_asymmetry = (avg_up_vol - avg_down_vol) / (avg_up_vol + avg_down_vol + 1e-10) | |
| return { | |
| "volume_return_corr": round(float(corr), 4) if np.isfinite(corr) else 0, | |
| "volume_abs_return_corr": round(float(abs_corr), 4) if np.isfinite(abs_corr) else 0, | |
| "volume_asymmetry": round(float(vol_asymmetry), 4), | |
| } | |
| # ββ Trend Strength Index βββββββββββββββββββββββββββββββββββββββββββββ | |
| def _trend_strength_index( | |
| self, close: np.ndarray, high: np.ndarray, low: np.ndarray | |
| ) -> float: | |
| """ | |
| Custom composite trend strength (0 = no trend, 1 = strong trend). | |
| Combines ADX-like directional movement with price efficiency. | |
| """ | |
| n = len(close) | |
| if n < 14: | |
| return 0.5 | |
| # Directional movement | |
| dm_plus = np.maximum(np.diff(high), 0) | |
| dm_minus = np.maximum(-np.diff(low), 0) | |
| # Nullify weaker direction | |
| both = dm_plus > dm_minus | |
| dm_plus[~both] = 0 | |
| dm_minus[both] = 0 | |
| # Smoothed (14-period average) | |
| period = min(14, len(dm_plus)) | |
| avg_dm_plus = np.mean(dm_plus[-period:]) | |
| avg_dm_minus = np.mean(dm_minus[-period:]) | |
| total_dm = avg_dm_plus + avg_dm_minus | |
| if total_dm == 0: | |
| return 0.0 | |
| dx = abs(avg_dm_plus - avg_dm_minus) / total_dm | |
| # Combine with efficiency | |
| efficiency = self._price_efficiency_ratio(close) | |
| return round(float((dx + efficiency) / 2), 4) | |
| # ββ Price Efficiency Ratio βββββββββββββββββββββββββββββββββββββββββββ | |
| def _price_efficiency_ratio(self, prices: np.ndarray) -> float: | |
| """ | |
| Kaufman Efficiency Ratio: net movement / total path length. | |
| 1.0 = perfect trend, 0.0 = pure noise. | |
| """ | |
| if len(prices) < 5: | |
| return 0.5 | |
| net_change = abs(prices[-1] - prices[0]) | |
| total_path = np.sum(np.abs(np.diff(prices))) | |
| return round(float(net_change / (total_path + 1e-10)), 4) | |
| # ββ Gap Analysis βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _gap_analysis(self, df: pd.DataFrame) -> Dict[str, Any]: | |
| """Analyze price gaps for institutional activity detection.""" | |
| if len(df) < 10: | |
| return {} | |
| opens = df["Open"].values | |
| prev_closes = df["Close"].shift(1).values | |
| gaps = (opens[1:] - prev_closes[1:]) / (prev_closes[1:] + 1e-10) | |
| gap_ups = gaps[gaps > 0.005] | |
| gap_downs = gaps[gaps < -0.005] | |
| return { | |
| "gap_up_count_20": int(np.sum(gaps[-20:] > 0.005)) if len(gaps) >= 20 else 0, | |
| "gap_down_count_20": int(np.sum(gaps[-20:] < -0.005)) if len(gaps) >= 20 else 0, | |
| "avg_gap_size": round(float(np.mean(np.abs(gaps[-20:])) * 100), 4) if len(gaps) >= 20 else 0, | |
| "max_gap_pct": round(float(np.max(np.abs(gaps[-20:])) * 100), 4) if len(gaps) >= 20 else 0, | |
| } | |
| # Module singleton | |
| advanced_feature_engine = AdvancedFeatureEngine() | |