"""Automated Alpha Factor Mining with Genetic Programming + LLM-Driven Discovery Based on: - Lopez de Prado: Genetic programming for alpha factor discovery - QuantaAlpha (Han et al. 2026): LLM + MCTS evolutionary framework - gplearn: Symbolic regression for finance This replaces hand-coded RSI/MACD with DISCOVERED factors. """ import numpy as np import pandas as pd from typing import Dict, List, Optional, Callable, Tuple import warnings warnings.filterwarnings('ignore') try: from gplearn.genetic import SymbolicTransformer from gplearn.functions import make_function GPLEARN_AVAILABLE = True except ImportError: GPLEARN_AVAILABLE = False print("WARNING: gplearn not available. Install with: pip install gplearn") class FinancialFunctionLibrary: """ Financial operators for genetic programming alpha mining. Key principle: Standard math operators (+, -, *, /) are not enough. Financial alpha requires TIME-SERIES and CROSS-SECTIONAL operators. Operators: - ts_*: Time-series (operate within one asset over time) - cs_*: Cross-sectional (operate across assets at one time) """ @staticmethod def ts_delta(x): """First difference""" result = np.empty_like(x) result[0] = 0 result[1:] = np.diff(x) return result @staticmethod def ts_delay(x, d=1): """Lag operator""" result = np.empty_like(x) result[:d] = x[0] result[d:] = x[:-d] return result @staticmethod def ts_mean(x, d=5): """Rolling mean""" result = np.empty_like(x) for i in range(len(x)): start = max(0, i - d + 1) result[i] = np.mean(x[start:i+1]) return result @staticmethod def ts_std(x, d=5): """Rolling standard deviation""" result = np.empty_like(x) for i in range(len(x)): start = max(0, i - d + 1) result[i] = np.std(x[start:i+1]) + 1e-10 return result @staticmethod def ts_rank(x, d=5): """Rolling rank (percentile within window)""" result = np.empty_like(x) for i in range(len(x)): start = max(0, i - d + 1) window = x[start:i+1] if len(window) > 0 and np.std(window) > 0: result[i] = np.sum(window < x[i]) / len(window) else: result[i] = 0.5 return result @staticmethod def ts_corr(x, y, d=5): """Rolling correlation""" result = np.empty_like(x) for i in range(len(x)): start = max(0, i - d + 1) wx, wy = x[start:i+1], y[start:i+1] if len(wx) > 1 and np.std(wx) > 0 and np.std(wy) > 0: result[i] = np.corrcoef(wx, wy)[0, 1] else: result[i] = 0 return result @staticmethod def ts_cov(x, y, d=5): """Rolling covariance""" result = np.empty_like(x) for i in range(len(x)): start = max(0, i - d + 1) wx, wy = x[start:i+1], y[start:i+1] if len(wx) > 1: result[i] = np.cov(wx, wy)[0, 1] else: result[i] = 0 return result @staticmethod def ts_max(x, d=5): """Rolling max""" result = np.empty_like(x) for i in range(len(x)): start = max(0, i - d + 1) result[i] = np.max(x[start:i+1]) return result @staticmethod def ts_min(x, d=5): """Rolling min""" result = np.empty_like(x) for i in range(len(x)): start = max(0, i - d + 1) result[i] = np.min(x[start:i+1]) return result @staticmethod def ts_sum(x, d=5): """Rolling sum""" result = np.empty_like(x) for i in range(len(x)): start = max(0, i - d + 1) result[i] = np.sum(x[start:i+1]) return result @staticmethod def ts_product(x, d=5): """Rolling product""" result = np.empty_like(x) for i in range(len(x)): start = max(0, i - d + 1) result[i] = np.prod(x[start:i+1] + 1) - 1 return result @staticmethod def ts_decay_linear(x, d=5): """Linearly weighted moving average (recent gets more weight)""" result = np.empty_like(x) weights = np.arange(1, d + 1) for i in range(len(x)): start = max(0, i - d + 1) window = x[start:i+1] w = weights[-len(window):] result[i] = np.average(window, weights=w) return result @staticmethod def sign(x): """Sign function""" return np.sign(x) @staticmethod def signed_power(x, p=2): """Signed power: sign(x) * |x|^p""" return np.sign(x) * np.power(np.abs(x), p) @classmethod def get_function_set(cls): """Get gplearn-compatible function set""" if not GPLEARN_AVAILABLE: return ['add', 'sub', 'mul', 'div', 'sqrt', 'log', 'abs', 'neg', 'inv'] functions = [ make_function(function=cls.ts_delta, name='ts_delta', arity=1), make_function(function=cls.ts_mean, name='ts_mean5', arity=1), make_function(function=cls.ts_std, name='ts_std5', arity=1), make_function(function=cls.ts_rank, name='ts_rank5', arity=1), make_function(function=cls.ts_max, name='ts_max5', arity=1), make_function(function=cls.ts_min, name='ts_min5', arity=1), make_function(function=cls.ts_sum, name='ts_sum5', arity=1), make_function(function=cls.ts_decay_linear, name='ts_decay5', arity=1), make_function(function=cls.sign, name='sign', arity=1), make_function(function=cls.signed_power, name='signed_power', arity=1), ] # Standard operators std_ops = ['add', 'sub', 'mul', 'div', 'sqrt', 'log', 'abs', 'neg', 'inv'] return std_ops + functions class AlphaMiner: """ Genetic Programming Alpha Factor Mining Engine. Instead of hand-coding "RSI > 70 means sell," this EVOLVES factors from raw data. The discovered formulas are: 1. Non-linear (can capture complex patterns) 2. Interpretable (symbolic formulas, not black boxes) 3. Novel (not in any textbook) Pipeline: 1. Feed raw features (OHLCV-derived) 2. GP evolves formulas that predict returns 3. Select top formulas by IC (Information Coefficient) 4. Use as additional features for downstream ML models Based on WorldQuant's 101 Formulaic Alphas and QuantaAlpha. """ def __init__(self, n_factors: int = 50, population_size: int = 1000, generations: int = 20, hall_of_fame: int = 100, parsimony_coefficient: float = 0.01, random_state: int = 42): self.n_factors = n_factors self.population_size = population_size self.generations = generations self.hall_of_fame = hall_of_fame self.parsimony_coefficient = parsimony_coefficient self.random_state = random_state self.gp = None self.discovered_factors = None def fit(self, X: np.ndarray, y: np.ndarray) -> 'AlphaMiner': """ Mine alpha factors from features X predicting target y. Args: X: Features array (n_samples, n_features) - FLAT, not sequences y: Target returns (n_samples,) Returns: self """ if not GPLEARN_AVAILABLE: print("WARNING: gplearn not available. Returning identity features.") self.discovered_factors = X return self print(f"Mining {self.n_factors} alpha factors with GP...") print(f" Population: {self.population_size}, Generations: {self.generations}") print(f" Input features: {X.shape[1]}") function_set = FinancialFunctionLibrary.get_function_set() # Genetic programming symbolic transformer self.gp = SymbolicTransformer( generations=self.generations, population_size=self.population_size, hall_of_fame=self.hall_of_fame, n_components=self.n_factors, function_set=function_set, parsimony_coefficient=self.parsimony_coefficient, max_samples=0.9, verbose=1, random_state=self.random_state, n_jobs=-1 ) # Fit GP to discover symbolic expressions self.gp.fit(X, y) # Transform to get discovered factors self.discovered_factors = self.gp.transform(X) print(f" Discovered {self.discovered_factors.shape[1]} alpha factors") # Evaluate and rank factors by IC self._rank_factors(y) return self def transform(self, X: np.ndarray) -> np.ndarray: """Transform features using discovered alpha factors""" if self.gp is None: return X return self.gp.transform(X) def _rank_factors(self, y: np.ndarray): """Rank discovered factors by Information Coefficient""" from scipy.stats import spearmanr if self.discovered_factors is None: return ics = [] for i in range(self.discovered_factors.shape[1]): factor = self.discovered_factors[:, i] ic, _ = spearmanr(factor, y) if not np.isnan(ic): ics.append((i, abs(ic), ic)) ics.sort(key=lambda x: x[1], reverse=True) print("\n Top 10 Discovered Alpha Factors (by |IC|):") for i, (idx, abs_ic, ic) in enumerate(ics[:10], 1): print(f" {i}. Factor {idx}: IC = {ic:+.4f}") def get_factor_expressions(self) -> List[str]: """Get human-readable formulas for discovered factors""" if self.gp is None: return [] expressions = [] for program in self.gp._best_programs: expressions.append(str(program)) return expressions class LLMAlphaMiner: """ LLM-Driven Alpha Factor Discovery (Simplified Version). Full implementation would use MCTS (Monte Carlo Tree Search) + LLM to explore the space of possible formulas, using the LLM as a "policy" to suggest promising formula modifications. This simplified version uses LLM embeddings to cluster and suggest factor combinations. """ def __init__(self, model_name: str = "sentence-transformers/all-MiniLM-L6-v2"): self.model_name = model_name self.embedder = None def _load_embedder(self): """Lazy load sentence transformer""" if self.embedder is None: try: from sentence_transformers import SentenceTransformer self.embedder = SentenceTransformer(self.model_name) except ImportError: print("sentence-transformers not available. Using random projections.") self.embedder = None def suggest_factors(self, descriptions: List[str], n_suggestions: int = 10) -> List[Dict]: """ Use LLM embeddings to suggest new factor combinations. Args: descriptions: List of existing factor descriptions/formulas n_suggestions: Number of new factor ideas to generate Returns: List of suggested factor descriptions """ self._load_embedder() if self.embedder is None: # Fallback: random combinations return self._random_suggestions(descriptions, n_suggestions) # Get embeddings embeddings = self.embedder.encode(descriptions) # Find "gaps" in embedding space (regions with low density) # Suggest combinations of distant factors from sklearn.metrics.pairwise import cosine_similarity sim_matrix = cosine_similarity(embeddings) suggestions = [] for _ in range(n_suggestions): # Find least similar pair min_sim = 1.0 min_pair = (0, 1) for i in range(len(descriptions)): for j in range(i+1, len(descriptions)): if sim_matrix[i, j] < min_sim: min_sim = sim_matrix[i, j] min_pair = (i, j) desc1, desc2 = descriptions[min_pair[0]], descriptions[min_pair[1]] suggestions.append({ 'type': 'combination', 'factors': [desc1, desc2], 'similarity': min_sim, 'description': f"Combine ({desc1}) with ({desc2})" }) return suggestions def _random_suggestions(self, descriptions: List[str], n_suggestions: int) -> List[Dict]: """Fallback random suggestions""" import random suggestions = [] for _ in range(n_suggestions): pair = random.sample(range(len(descriptions)), 2) suggestions.append({ 'type': 'combination', 'factors': [descriptions[pair[0]], descriptions[pair[1]]], 'similarity': 0.0, 'description': f"Combine ({descriptions[pair[0]]}) with ({descriptions[pair[1]]})" }) return suggestions class AlphaMiningPipeline: """ Complete pipeline: Raw data -> GP-discovered factors -> Enhanced features. Usage: pipeline = AlphaMiningPipeline(n_factors=50) enhanced_features = pipeline.fit_transform(raw_features, returns) The enhanced features combine: - Original technical indicators - GP-discovered nonlinear factors - LLM-suggested factor combinations """ def __init__(self, n_gp_factors: int = 50, gp_generations: int = 20, use_llm: bool = True): self.n_gp_factors = n_gp_factors self.gp_generations = gp_generations self.use_llm = use_llm self.gp_miner = None self.llm_miner = None self.feature_names = [] def fit_transform(self, X: np.ndarray, y: np.ndarray, feature_names: Optional[List[str]] = None) -> np.ndarray: """ Fit and transform in one call. Args: X: Raw features (n_samples, n_features) y: Target returns (n_samples,) feature_names: Names of original features (for LLM suggestions) Returns: Enhanced features (n_samples, n_original + n_gp_factors) """ print("=" * 60) print("ALPHA MINING PIPELINE") print("=" * 60) # Step 1: GP Alpha Mining print("\n[1/3] Genetic Programming Alpha Mining...") self.gp_miner = AlphaMiner( n_factors=self.n_gp_factors, generations=self.gp_generations ) gp_features = self.gp_miner.fit(X, y).transform(X) # Step 2: LLM Suggestions (optional) if self.use_llm and feature_names is not None: print("\n[2/3] LLM Factor Suggestions...") self.llm_miner = LLMAlphaMiner() suggestions = self.llm_miner.suggest_factors(feature_names, n_suggestions=10) print(f" Generated {len(suggestions)} factor ideas") # Step 3: Combine print("\n[3/3] Combining original + discovered features...") enhanced = np.column_stack([X, gp_features]) self.feature_names = (feature_names or [f'f{i}' for i in range(X.shape[1])]) + \ [f'gp_alpha_{i}' for i in range(gp_features.shape[1])] print(f"\nEnhanced features: {enhanced.shape[1]} (original: {X.shape[1]}, GP: {gp_features.shape[1]})") return enhanced def transform(self, X: np.ndarray) -> np.ndarray: """Transform new data using fitted miners""" if self.gp_miner is None: return X gp_features = self.gp_miner.transform(X) return np.column_stack([X, gp_features]) def get_discovered_expressions(self) -> List[str]: """Get human-readable discovered factor formulas""" if self.gp_miner is None: return [] return self.gp_miner.get_factor_expressions() def mine_alphas_from_sequences(sequences: np.ndarray, targets: np.ndarray, n_factors: int = 50) -> Tuple[np.ndarray, AlphaMiningPipeline]: """ Convenience function: Flatten sequences and mine alphas. Args: sequences: (n_samples, seq_len, n_features) targets: (n_samples,) Returns: enhanced_features: (n_samples, n_features + n_factors) pipeline: Fitted AlphaMiningPipeline """ # Flatten sequences for GP (GP doesn't handle sequences natively) n_samples, seq_len, n_features = sequences.shape X_flat = sequences.reshape(n_samples, seq_len * n_features) # Create feature names feature_names = [f'f{t}_{f}' for t in range(seq_len) for f in range(n_features)] pipeline = AlphaMiningPipeline(n_gp_factors=n_factors) enhanced = pipeline.fit_transform(X_flat, targets, feature_names) return enhanced, pipeline if __name__ == '__main__': # Test alpha mining on synthetic data np.random.seed(42) n_samples = 5000 n_features = 20 X = np.random.randn(n_samples, n_features) # True relationship: y = x0 * x1 + sin(x2) + noise y = X[:, 0] * X[:, 1] + np.sin(X[:, 2] * 2) + np.random.randn(n_samples) * 0.1 miner = AlphaMiner(n_factors=20, generations=5, population_size=500) miner.fit(X, y) print("\nDiscovered expressions (top 5):") for expr in miner.get_factor_expressions()[:5]: print(f" {expr}")