| """Automated Alpha Factor Mining with Genetic Programming + LLM-Driven Discovery |
| |
| Based on: |
| - Lopez de Prado: Genetic programming for alpha factor discovery |
| - QuantaAlpha (Han et al. 2026): LLM + MCTS evolutionary framework |
| - gplearn: Symbolic regression for finance |
| |
| This replaces hand-coded RSI/MACD with DISCOVERED factors. |
| """ |
| import numpy as np |
| import pandas as pd |
| from typing import Dict, List, Optional, Callable, Tuple |
| import warnings |
| warnings.filterwarnings('ignore') |
|
|
| try: |
| from gplearn.genetic import SymbolicTransformer |
| from gplearn.functions import make_function |
| GPLEARN_AVAILABLE = True |
| except ImportError: |
| GPLEARN_AVAILABLE = False |
| print("WARNING: gplearn not available. Install with: pip install gplearn") |
|
|
|
|
| class FinancialFunctionLibrary: |
| """ |
| Financial operators for genetic programming alpha mining. |
| |
| Key principle: Standard math operators (+, -, *, /) are not enough. |
| Financial alpha requires TIME-SERIES and CROSS-SECTIONAL operators. |
| |
| Operators: |
| - ts_*: Time-series (operate within one asset over time) |
| - cs_*: Cross-sectional (operate across assets at one time) |
| """ |
| |
| @staticmethod |
| def ts_delta(x): |
| """First difference""" |
| result = np.empty_like(x) |
| result[0] = 0 |
| result[1:] = np.diff(x) |
| return result |
| |
| @staticmethod |
| def ts_delay(x, d=1): |
| """Lag operator""" |
| result = np.empty_like(x) |
| result[:d] = x[0] |
| result[d:] = x[:-d] |
| return result |
| |
| @staticmethod |
| def ts_mean(x, d=5): |
| """Rolling mean""" |
| result = np.empty_like(x) |
| for i in range(len(x)): |
| start = max(0, i - d + 1) |
| result[i] = np.mean(x[start:i+1]) |
| return result |
| |
| @staticmethod |
| def ts_std(x, d=5): |
| """Rolling standard deviation""" |
| result = np.empty_like(x) |
| for i in range(len(x)): |
| start = max(0, i - d + 1) |
| result[i] = np.std(x[start:i+1]) + 1e-10 |
| return result |
| |
| @staticmethod |
| def ts_rank(x, d=5): |
| """Rolling rank (percentile within window)""" |
| result = np.empty_like(x) |
| for i in range(len(x)): |
| start = max(0, i - d + 1) |
| window = x[start:i+1] |
| if len(window) > 0 and np.std(window) > 0: |
| result[i] = np.sum(window < x[i]) / len(window) |
| else: |
| result[i] = 0.5 |
| return result |
| |
| @staticmethod |
| def ts_corr(x, y, d=5): |
| """Rolling correlation""" |
| result = np.empty_like(x) |
| for i in range(len(x)): |
| start = max(0, i - d + 1) |
| wx, wy = x[start:i+1], y[start:i+1] |
| if len(wx) > 1 and np.std(wx) > 0 and np.std(wy) > 0: |
| result[i] = np.corrcoef(wx, wy)[0, 1] |
| else: |
| result[i] = 0 |
| return result |
| |
| @staticmethod |
| def ts_cov(x, y, d=5): |
| """Rolling covariance""" |
| result = np.empty_like(x) |
| for i in range(len(x)): |
| start = max(0, i - d + 1) |
| wx, wy = x[start:i+1], y[start:i+1] |
| if len(wx) > 1: |
| result[i] = np.cov(wx, wy)[0, 1] |
| else: |
| result[i] = 0 |
| return result |
| |
| @staticmethod |
| def ts_max(x, d=5): |
| """Rolling max""" |
| result = np.empty_like(x) |
| for i in range(len(x)): |
| start = max(0, i - d + 1) |
| result[i] = np.max(x[start:i+1]) |
| return result |
| |
| @staticmethod |
| def ts_min(x, d=5): |
| """Rolling min""" |
| result = np.empty_like(x) |
| for i in range(len(x)): |
| start = max(0, i - d + 1) |
| result[i] = np.min(x[start:i+1]) |
| return result |
| |
| @staticmethod |
| def ts_sum(x, d=5): |
| """Rolling sum""" |
| result = np.empty_like(x) |
| for i in range(len(x)): |
| start = max(0, i - d + 1) |
| result[i] = np.sum(x[start:i+1]) |
| return result |
| |
| @staticmethod |
| def ts_product(x, d=5): |
| """Rolling product""" |
| result = np.empty_like(x) |
| for i in range(len(x)): |
| start = max(0, i - d + 1) |
| result[i] = np.prod(x[start:i+1] + 1) - 1 |
| return result |
| |
| @staticmethod |
| def ts_decay_linear(x, d=5): |
| """Linearly weighted moving average (recent gets more weight)""" |
| result = np.empty_like(x) |
| weights = np.arange(1, d + 1) |
| for i in range(len(x)): |
| start = max(0, i - d + 1) |
| window = x[start:i+1] |
| w = weights[-len(window):] |
| result[i] = np.average(window, weights=w) |
| return result |
| |
| @staticmethod |
| def sign(x): |
| """Sign function""" |
| return np.sign(x) |
| |
| @staticmethod |
| def signed_power(x, p=2): |
| """Signed power: sign(x) * |x|^p""" |
| return np.sign(x) * np.power(np.abs(x), p) |
|
|
| @classmethod |
| def get_function_set(cls): |
| """Get gplearn-compatible function set""" |
| if not GPLEARN_AVAILABLE: |
| return ['add', 'sub', 'mul', 'div', 'sqrt', 'log', 'abs', 'neg', 'inv'] |
| |
| functions = [ |
| make_function(function=cls.ts_delta, name='ts_delta', arity=1), |
| make_function(function=cls.ts_mean, name='ts_mean5', arity=1), |
| make_function(function=cls.ts_std, name='ts_std5', arity=1), |
| make_function(function=cls.ts_rank, name='ts_rank5', arity=1), |
| make_function(function=cls.ts_max, name='ts_max5', arity=1), |
| make_function(function=cls.ts_min, name='ts_min5', arity=1), |
| make_function(function=cls.ts_sum, name='ts_sum5', arity=1), |
| make_function(function=cls.ts_decay_linear, name='ts_decay5', arity=1), |
| make_function(function=cls.sign, name='sign', arity=1), |
| make_function(function=cls.signed_power, name='signed_power', arity=1), |
| ] |
| |
| |
| std_ops = ['add', 'sub', 'mul', 'div', 'sqrt', 'log', 'abs', 'neg', 'inv'] |
| |
| return std_ops + functions |
|
|
|
|
| class AlphaMiner: |
| """ |
| Genetic Programming Alpha Factor Mining Engine. |
| |
| Instead of hand-coding "RSI > 70 means sell," this EVOLVES factors |
| from raw data. The discovered formulas are: |
| 1. Non-linear (can capture complex patterns) |
| 2. Interpretable (symbolic formulas, not black boxes) |
| 3. Novel (not in any textbook) |
| |
| Pipeline: |
| 1. Feed raw features (OHLCV-derived) |
| 2. GP evolves formulas that predict returns |
| 3. Select top formulas by IC (Information Coefficient) |
| 4. Use as additional features for downstream ML models |
| |
| Based on WorldQuant's 101 Formulaic Alphas and QuantaAlpha. |
| """ |
| |
| def __init__(self, |
| n_factors: int = 50, |
| population_size: int = 1000, |
| generations: int = 20, |
| hall_of_fame: int = 100, |
| parsimony_coefficient: float = 0.01, |
| random_state: int = 42): |
| self.n_factors = n_factors |
| self.population_size = population_size |
| self.generations = generations |
| self.hall_of_fame = hall_of_fame |
| self.parsimony_coefficient = parsimony_coefficient |
| self.random_state = random_state |
| self.gp = None |
| self.discovered_factors = None |
| |
| def fit(self, X: np.ndarray, y: np.ndarray) -> 'AlphaMiner': |
| """ |
| Mine alpha factors from features X predicting target y. |
| |
| Args: |
| X: Features array (n_samples, n_features) - FLAT, not sequences |
| y: Target returns (n_samples,) |
| |
| Returns: |
| self |
| """ |
| if not GPLEARN_AVAILABLE: |
| print("WARNING: gplearn not available. Returning identity features.") |
| self.discovered_factors = X |
| return self |
| |
| print(f"Mining {self.n_factors} alpha factors with GP...") |
| print(f" Population: {self.population_size}, Generations: {self.generations}") |
| print(f" Input features: {X.shape[1]}") |
| |
| function_set = FinancialFunctionLibrary.get_function_set() |
| |
| |
| self.gp = SymbolicTransformer( |
| generations=self.generations, |
| population_size=self.population_size, |
| hall_of_fame=self.hall_of_fame, |
| n_components=self.n_factors, |
| function_set=function_set, |
| parsimony_coefficient=self.parsimony_coefficient, |
| max_samples=0.9, |
| verbose=1, |
| random_state=self.random_state, |
| n_jobs=-1 |
| ) |
| |
| |
| self.gp.fit(X, y) |
| |
| |
| self.discovered_factors = self.gp.transform(X) |
| |
| print(f" Discovered {self.discovered_factors.shape[1]} alpha factors") |
| |
| |
| self._rank_factors(y) |
| |
| return self |
| |
| def transform(self, X: np.ndarray) -> np.ndarray: |
| """Transform features using discovered alpha factors""" |
| if self.gp is None: |
| return X |
| |
| return self.gp.transform(X) |
| |
| def _rank_factors(self, y: np.ndarray): |
| """Rank discovered factors by Information Coefficient""" |
| from scipy.stats import spearmanr |
| |
| if self.discovered_factors is None: |
| return |
| |
| ics = [] |
| for i in range(self.discovered_factors.shape[1]): |
| factor = self.discovered_factors[:, i] |
| ic, _ = spearmanr(factor, y) |
| if not np.isnan(ic): |
| ics.append((i, abs(ic), ic)) |
| |
| ics.sort(key=lambda x: x[1], reverse=True) |
| |
| print("\n Top 10 Discovered Alpha Factors (by |IC|):") |
| for i, (idx, abs_ic, ic) in enumerate(ics[:10], 1): |
| print(f" {i}. Factor {idx}: IC = {ic:+.4f}") |
| |
| def get_factor_expressions(self) -> List[str]: |
| """Get human-readable formulas for discovered factors""" |
| if self.gp is None: |
| return [] |
| |
| expressions = [] |
| for program in self.gp._best_programs: |
| expressions.append(str(program)) |
| |
| return expressions |
|
|
|
|
| class LLMAlphaMiner: |
| """ |
| LLM-Driven Alpha Factor Discovery (Simplified Version). |
| |
| Full implementation would use MCTS (Monte Carlo Tree Search) + LLM |
| to explore the space of possible formulas, using the LLM as a "policy" |
| to suggest promising formula modifications. |
| |
| This simplified version uses LLM embeddings to cluster and suggest |
| factor combinations. |
| """ |
| |
| def __init__(self, model_name: str = "sentence-transformers/all-MiniLM-L6-v2"): |
| self.model_name = model_name |
| self.embedder = None |
| |
| def _load_embedder(self): |
| """Lazy load sentence transformer""" |
| if self.embedder is None: |
| try: |
| from sentence_transformers import SentenceTransformer |
| self.embedder = SentenceTransformer(self.model_name) |
| except ImportError: |
| print("sentence-transformers not available. Using random projections.") |
| self.embedder = None |
| |
| def suggest_factors(self, descriptions: List[str], |
| n_suggestions: int = 10) -> List[Dict]: |
| """ |
| Use LLM embeddings to suggest new factor combinations. |
| |
| Args: |
| descriptions: List of existing factor descriptions/formulas |
| n_suggestions: Number of new factor ideas to generate |
| |
| Returns: |
| List of suggested factor descriptions |
| """ |
| self._load_embedder() |
| |
| if self.embedder is None: |
| |
| return self._random_suggestions(descriptions, n_suggestions) |
| |
| |
| embeddings = self.embedder.encode(descriptions) |
| |
| |
| |
| from sklearn.metrics.pairwise import cosine_similarity |
| |
| sim_matrix = cosine_similarity(embeddings) |
| |
| suggestions = [] |
| for _ in range(n_suggestions): |
| |
| min_sim = 1.0 |
| min_pair = (0, 1) |
| for i in range(len(descriptions)): |
| for j in range(i+1, len(descriptions)): |
| if sim_matrix[i, j] < min_sim: |
| min_sim = sim_matrix[i, j] |
| min_pair = (i, j) |
| |
| desc1, desc2 = descriptions[min_pair[0]], descriptions[min_pair[1]] |
| suggestions.append({ |
| 'type': 'combination', |
| 'factors': [desc1, desc2], |
| 'similarity': min_sim, |
| 'description': f"Combine ({desc1}) with ({desc2})" |
| }) |
| |
| return suggestions |
| |
| def _random_suggestions(self, descriptions: List[str], |
| n_suggestions: int) -> List[Dict]: |
| """Fallback random suggestions""" |
| import random |
| suggestions = [] |
| for _ in range(n_suggestions): |
| pair = random.sample(range(len(descriptions)), 2) |
| suggestions.append({ |
| 'type': 'combination', |
| 'factors': [descriptions[pair[0]], descriptions[pair[1]]], |
| 'similarity': 0.0, |
| 'description': f"Combine ({descriptions[pair[0]]}) with ({descriptions[pair[1]]})" |
| }) |
| return suggestions |
|
|
|
|
| class AlphaMiningPipeline: |
| """ |
| Complete pipeline: Raw data -> GP-discovered factors -> Enhanced features. |
| |
| Usage: |
| pipeline = AlphaMiningPipeline(n_factors=50) |
| enhanced_features = pipeline.fit_transform(raw_features, returns) |
| |
| The enhanced features combine: |
| - Original technical indicators |
| - GP-discovered nonlinear factors |
| - LLM-suggested factor combinations |
| """ |
| |
| def __init__(self, n_gp_factors: int = 50, |
| gp_generations: int = 20, |
| use_llm: bool = True): |
| self.n_gp_factors = n_gp_factors |
| self.gp_generations = gp_generations |
| self.use_llm = use_llm |
| |
| self.gp_miner = None |
| self.llm_miner = None |
| self.feature_names = [] |
| |
| def fit_transform(self, X: np.ndarray, y: np.ndarray, |
| feature_names: Optional[List[str]] = None) -> np.ndarray: |
| """ |
| Fit and transform in one call. |
| |
| Args: |
| X: Raw features (n_samples, n_features) |
| y: Target returns (n_samples,) |
| feature_names: Names of original features (for LLM suggestions) |
| |
| Returns: |
| Enhanced features (n_samples, n_original + n_gp_factors) |
| """ |
| print("=" * 60) |
| print("ALPHA MINING PIPELINE") |
| print("=" * 60) |
| |
| |
| print("\n[1/3] Genetic Programming Alpha Mining...") |
| self.gp_miner = AlphaMiner( |
| n_factors=self.n_gp_factors, |
| generations=self.gp_generations |
| ) |
| gp_features = self.gp_miner.fit(X, y).transform(X) |
| |
| |
| if self.use_llm and feature_names is not None: |
| print("\n[2/3] LLM Factor Suggestions...") |
| self.llm_miner = LLMAlphaMiner() |
| suggestions = self.llm_miner.suggest_factors(feature_names, n_suggestions=10) |
| print(f" Generated {len(suggestions)} factor ideas") |
| |
| |
| print("\n[3/3] Combining original + discovered features...") |
| enhanced = np.column_stack([X, gp_features]) |
| |
| self.feature_names = (feature_names or [f'f{i}' for i in range(X.shape[1])]) + \ |
| [f'gp_alpha_{i}' for i in range(gp_features.shape[1])] |
| |
| print(f"\nEnhanced features: {enhanced.shape[1]} (original: {X.shape[1]}, GP: {gp_features.shape[1]})") |
| |
| return enhanced |
| |
| def transform(self, X: np.ndarray) -> np.ndarray: |
| """Transform new data using fitted miners""" |
| if self.gp_miner is None: |
| return X |
| |
| gp_features = self.gp_miner.transform(X) |
| return np.column_stack([X, gp_features]) |
| |
| def get_discovered_expressions(self) -> List[str]: |
| """Get human-readable discovered factor formulas""" |
| if self.gp_miner is None: |
| return [] |
| return self.gp_miner.get_factor_expressions() |
|
|
|
|
| def mine_alphas_from_sequences(sequences: np.ndarray, |
| targets: np.ndarray, |
| n_factors: int = 50) -> Tuple[np.ndarray, AlphaMiningPipeline]: |
| """ |
| Convenience function: Flatten sequences and mine alphas. |
| |
| Args: |
| sequences: (n_samples, seq_len, n_features) |
| targets: (n_samples,) |
| |
| Returns: |
| enhanced_features: (n_samples, n_features + n_factors) |
| pipeline: Fitted AlphaMiningPipeline |
| """ |
| |
| n_samples, seq_len, n_features = sequences.shape |
| X_flat = sequences.reshape(n_samples, seq_len * n_features) |
| |
| |
| feature_names = [f'f{t}_{f}' for t in range(seq_len) for f in range(n_features)] |
| |
| pipeline = AlphaMiningPipeline(n_gp_factors=n_factors) |
| enhanced = pipeline.fit_transform(X_flat, targets, feature_names) |
| |
| return enhanced, pipeline |
|
|
|
|
| if __name__ == '__main__': |
| |
| np.random.seed(42) |
| n_samples = 5000 |
| n_features = 20 |
| |
| X = np.random.randn(n_samples, n_features) |
| |
| y = X[:, 0] * X[:, 1] + np.sin(X[:, 2] * 2) + np.random.randn(n_samples) * 0.1 |
| |
| miner = AlphaMiner(n_factors=20, generations=5, population_size=500) |
| miner.fit(X, y) |
| |
| print("\nDiscovered expressions (top 5):") |
| for expr in miner.get_factor_expressions()[:5]: |
| print(f" {expr}") |
|
|