alphaforge-quant-system / alpha_mining.py
Premchan369's picture
Add automated alpha mining with genetic programming + LLM-driven factor discovery
094073d verified
"""Automated Alpha Factor Mining with Genetic Programming + LLM-Driven Discovery
Based on:
- Lopez de Prado: Genetic programming for alpha factor discovery
- QuantaAlpha (Han et al. 2026): LLM + MCTS evolutionary framework
- gplearn: Symbolic regression for finance
This replaces hand-coded RSI/MACD with DISCOVERED factors.
"""
import numpy as np
import pandas as pd
from typing import Dict, List, Optional, Callable, Tuple
import warnings
warnings.filterwarnings('ignore')
try:
from gplearn.genetic import SymbolicTransformer
from gplearn.functions import make_function
GPLEARN_AVAILABLE = True
except ImportError:
GPLEARN_AVAILABLE = False
print("WARNING: gplearn not available. Install with: pip install gplearn")
class FinancialFunctionLibrary:
"""
Financial operators for genetic programming alpha mining.
Key principle: Standard math operators (+, -, *, /) are not enough.
Financial alpha requires TIME-SERIES and CROSS-SECTIONAL operators.
Operators:
- ts_*: Time-series (operate within one asset over time)
- cs_*: Cross-sectional (operate across assets at one time)
"""
@staticmethod
def ts_delta(x):
"""First difference"""
result = np.empty_like(x)
result[0] = 0
result[1:] = np.diff(x)
return result
@staticmethod
def ts_delay(x, d=1):
"""Lag operator"""
result = np.empty_like(x)
result[:d] = x[0]
result[d:] = x[:-d]
return result
@staticmethod
def ts_mean(x, d=5):
"""Rolling mean"""
result = np.empty_like(x)
for i in range(len(x)):
start = max(0, i - d + 1)
result[i] = np.mean(x[start:i+1])
return result
@staticmethod
def ts_std(x, d=5):
"""Rolling standard deviation"""
result = np.empty_like(x)
for i in range(len(x)):
start = max(0, i - d + 1)
result[i] = np.std(x[start:i+1]) + 1e-10
return result
@staticmethod
def ts_rank(x, d=5):
"""Rolling rank (percentile within window)"""
result = np.empty_like(x)
for i in range(len(x)):
start = max(0, i - d + 1)
window = x[start:i+1]
if len(window) > 0 and np.std(window) > 0:
result[i] = np.sum(window < x[i]) / len(window)
else:
result[i] = 0.5
return result
@staticmethod
def ts_corr(x, y, d=5):
"""Rolling correlation"""
result = np.empty_like(x)
for i in range(len(x)):
start = max(0, i - d + 1)
wx, wy = x[start:i+1], y[start:i+1]
if len(wx) > 1 and np.std(wx) > 0 and np.std(wy) > 0:
result[i] = np.corrcoef(wx, wy)[0, 1]
else:
result[i] = 0
return result
@staticmethod
def ts_cov(x, y, d=5):
"""Rolling covariance"""
result = np.empty_like(x)
for i in range(len(x)):
start = max(0, i - d + 1)
wx, wy = x[start:i+1], y[start:i+1]
if len(wx) > 1:
result[i] = np.cov(wx, wy)[0, 1]
else:
result[i] = 0
return result
@staticmethod
def ts_max(x, d=5):
"""Rolling max"""
result = np.empty_like(x)
for i in range(len(x)):
start = max(0, i - d + 1)
result[i] = np.max(x[start:i+1])
return result
@staticmethod
def ts_min(x, d=5):
"""Rolling min"""
result = np.empty_like(x)
for i in range(len(x)):
start = max(0, i - d + 1)
result[i] = np.min(x[start:i+1])
return result
@staticmethod
def ts_sum(x, d=5):
"""Rolling sum"""
result = np.empty_like(x)
for i in range(len(x)):
start = max(0, i - d + 1)
result[i] = np.sum(x[start:i+1])
return result
@staticmethod
def ts_product(x, d=5):
"""Rolling product"""
result = np.empty_like(x)
for i in range(len(x)):
start = max(0, i - d + 1)
result[i] = np.prod(x[start:i+1] + 1) - 1
return result
@staticmethod
def ts_decay_linear(x, d=5):
"""Linearly weighted moving average (recent gets more weight)"""
result = np.empty_like(x)
weights = np.arange(1, d + 1)
for i in range(len(x)):
start = max(0, i - d + 1)
window = x[start:i+1]
w = weights[-len(window):]
result[i] = np.average(window, weights=w)
return result
@staticmethod
def sign(x):
"""Sign function"""
return np.sign(x)
@staticmethod
def signed_power(x, p=2):
"""Signed power: sign(x) * |x|^p"""
return np.sign(x) * np.power(np.abs(x), p)
@classmethod
def get_function_set(cls):
"""Get gplearn-compatible function set"""
if not GPLEARN_AVAILABLE:
return ['add', 'sub', 'mul', 'div', 'sqrt', 'log', 'abs', 'neg', 'inv']
functions = [
make_function(function=cls.ts_delta, name='ts_delta', arity=1),
make_function(function=cls.ts_mean, name='ts_mean5', arity=1),
make_function(function=cls.ts_std, name='ts_std5', arity=1),
make_function(function=cls.ts_rank, name='ts_rank5', arity=1),
make_function(function=cls.ts_max, name='ts_max5', arity=1),
make_function(function=cls.ts_min, name='ts_min5', arity=1),
make_function(function=cls.ts_sum, name='ts_sum5', arity=1),
make_function(function=cls.ts_decay_linear, name='ts_decay5', arity=1),
make_function(function=cls.sign, name='sign', arity=1),
make_function(function=cls.signed_power, name='signed_power', arity=1),
]
# Standard operators
std_ops = ['add', 'sub', 'mul', 'div', 'sqrt', 'log', 'abs', 'neg', 'inv']
return std_ops + functions
class AlphaMiner:
"""
Genetic Programming Alpha Factor Mining Engine.
Instead of hand-coding "RSI > 70 means sell," this EVOLVES factors
from raw data. The discovered formulas are:
1. Non-linear (can capture complex patterns)
2. Interpretable (symbolic formulas, not black boxes)
3. Novel (not in any textbook)
Pipeline:
1. Feed raw features (OHLCV-derived)
2. GP evolves formulas that predict returns
3. Select top formulas by IC (Information Coefficient)
4. Use as additional features for downstream ML models
Based on WorldQuant's 101 Formulaic Alphas and QuantaAlpha.
"""
def __init__(self,
n_factors: int = 50,
population_size: int = 1000,
generations: int = 20,
hall_of_fame: int = 100,
parsimony_coefficient: float = 0.01,
random_state: int = 42):
self.n_factors = n_factors
self.population_size = population_size
self.generations = generations
self.hall_of_fame = hall_of_fame
self.parsimony_coefficient = parsimony_coefficient
self.random_state = random_state
self.gp = None
self.discovered_factors = None
def fit(self, X: np.ndarray, y: np.ndarray) -> 'AlphaMiner':
"""
Mine alpha factors from features X predicting target y.
Args:
X: Features array (n_samples, n_features) - FLAT, not sequences
y: Target returns (n_samples,)
Returns:
self
"""
if not GPLEARN_AVAILABLE:
print("WARNING: gplearn not available. Returning identity features.")
self.discovered_factors = X
return self
print(f"Mining {self.n_factors} alpha factors with GP...")
print(f" Population: {self.population_size}, Generations: {self.generations}")
print(f" Input features: {X.shape[1]}")
function_set = FinancialFunctionLibrary.get_function_set()
# Genetic programming symbolic transformer
self.gp = SymbolicTransformer(
generations=self.generations,
population_size=self.population_size,
hall_of_fame=self.hall_of_fame,
n_components=self.n_factors,
function_set=function_set,
parsimony_coefficient=self.parsimony_coefficient,
max_samples=0.9,
verbose=1,
random_state=self.random_state,
n_jobs=-1
)
# Fit GP to discover symbolic expressions
self.gp.fit(X, y)
# Transform to get discovered factors
self.discovered_factors = self.gp.transform(X)
print(f" Discovered {self.discovered_factors.shape[1]} alpha factors")
# Evaluate and rank factors by IC
self._rank_factors(y)
return self
def transform(self, X: np.ndarray) -> np.ndarray:
"""Transform features using discovered alpha factors"""
if self.gp is None:
return X
return self.gp.transform(X)
def _rank_factors(self, y: np.ndarray):
"""Rank discovered factors by Information Coefficient"""
from scipy.stats import spearmanr
if self.discovered_factors is None:
return
ics = []
for i in range(self.discovered_factors.shape[1]):
factor = self.discovered_factors[:, i]
ic, _ = spearmanr(factor, y)
if not np.isnan(ic):
ics.append((i, abs(ic), ic))
ics.sort(key=lambda x: x[1], reverse=True)
print("\n Top 10 Discovered Alpha Factors (by |IC|):")
for i, (idx, abs_ic, ic) in enumerate(ics[:10], 1):
print(f" {i}. Factor {idx}: IC = {ic:+.4f}")
def get_factor_expressions(self) -> List[str]:
"""Get human-readable formulas for discovered factors"""
if self.gp is None:
return []
expressions = []
for program in self.gp._best_programs:
expressions.append(str(program))
return expressions
class LLMAlphaMiner:
"""
LLM-Driven Alpha Factor Discovery (Simplified Version).
Full implementation would use MCTS (Monte Carlo Tree Search) + LLM
to explore the space of possible formulas, using the LLM as a "policy"
to suggest promising formula modifications.
This simplified version uses LLM embeddings to cluster and suggest
factor combinations.
"""
def __init__(self, model_name: str = "sentence-transformers/all-MiniLM-L6-v2"):
self.model_name = model_name
self.embedder = None
def _load_embedder(self):
"""Lazy load sentence transformer"""
if self.embedder is None:
try:
from sentence_transformers import SentenceTransformer
self.embedder = SentenceTransformer(self.model_name)
except ImportError:
print("sentence-transformers not available. Using random projections.")
self.embedder = None
def suggest_factors(self, descriptions: List[str],
n_suggestions: int = 10) -> List[Dict]:
"""
Use LLM embeddings to suggest new factor combinations.
Args:
descriptions: List of existing factor descriptions/formulas
n_suggestions: Number of new factor ideas to generate
Returns:
List of suggested factor descriptions
"""
self._load_embedder()
if self.embedder is None:
# Fallback: random combinations
return self._random_suggestions(descriptions, n_suggestions)
# Get embeddings
embeddings = self.embedder.encode(descriptions)
# Find "gaps" in embedding space (regions with low density)
# Suggest combinations of distant factors
from sklearn.metrics.pairwise import cosine_similarity
sim_matrix = cosine_similarity(embeddings)
suggestions = []
for _ in range(n_suggestions):
# Find least similar pair
min_sim = 1.0
min_pair = (0, 1)
for i in range(len(descriptions)):
for j in range(i+1, len(descriptions)):
if sim_matrix[i, j] < min_sim:
min_sim = sim_matrix[i, j]
min_pair = (i, j)
desc1, desc2 = descriptions[min_pair[0]], descriptions[min_pair[1]]
suggestions.append({
'type': 'combination',
'factors': [desc1, desc2],
'similarity': min_sim,
'description': f"Combine ({desc1}) with ({desc2})"
})
return suggestions
def _random_suggestions(self, descriptions: List[str],
n_suggestions: int) -> List[Dict]:
"""Fallback random suggestions"""
import random
suggestions = []
for _ in range(n_suggestions):
pair = random.sample(range(len(descriptions)), 2)
suggestions.append({
'type': 'combination',
'factors': [descriptions[pair[0]], descriptions[pair[1]]],
'similarity': 0.0,
'description': f"Combine ({descriptions[pair[0]]}) with ({descriptions[pair[1]]})"
})
return suggestions
class AlphaMiningPipeline:
"""
Complete pipeline: Raw data -> GP-discovered factors -> Enhanced features.
Usage:
pipeline = AlphaMiningPipeline(n_factors=50)
enhanced_features = pipeline.fit_transform(raw_features, returns)
The enhanced features combine:
- Original technical indicators
- GP-discovered nonlinear factors
- LLM-suggested factor combinations
"""
def __init__(self, n_gp_factors: int = 50,
gp_generations: int = 20,
use_llm: bool = True):
self.n_gp_factors = n_gp_factors
self.gp_generations = gp_generations
self.use_llm = use_llm
self.gp_miner = None
self.llm_miner = None
self.feature_names = []
def fit_transform(self, X: np.ndarray, y: np.ndarray,
feature_names: Optional[List[str]] = None) -> np.ndarray:
"""
Fit and transform in one call.
Args:
X: Raw features (n_samples, n_features)
y: Target returns (n_samples,)
feature_names: Names of original features (for LLM suggestions)
Returns:
Enhanced features (n_samples, n_original + n_gp_factors)
"""
print("=" * 60)
print("ALPHA MINING PIPELINE")
print("=" * 60)
# Step 1: GP Alpha Mining
print("\n[1/3] Genetic Programming Alpha Mining...")
self.gp_miner = AlphaMiner(
n_factors=self.n_gp_factors,
generations=self.gp_generations
)
gp_features = self.gp_miner.fit(X, y).transform(X)
# Step 2: LLM Suggestions (optional)
if self.use_llm and feature_names is not None:
print("\n[2/3] LLM Factor Suggestions...")
self.llm_miner = LLMAlphaMiner()
suggestions = self.llm_miner.suggest_factors(feature_names, n_suggestions=10)
print(f" Generated {len(suggestions)} factor ideas")
# Step 3: Combine
print("\n[3/3] Combining original + discovered features...")
enhanced = np.column_stack([X, gp_features])
self.feature_names = (feature_names or [f'f{i}' for i in range(X.shape[1])]) + \
[f'gp_alpha_{i}' for i in range(gp_features.shape[1])]
print(f"\nEnhanced features: {enhanced.shape[1]} (original: {X.shape[1]}, GP: {gp_features.shape[1]})")
return enhanced
def transform(self, X: np.ndarray) -> np.ndarray:
"""Transform new data using fitted miners"""
if self.gp_miner is None:
return X
gp_features = self.gp_miner.transform(X)
return np.column_stack([X, gp_features])
def get_discovered_expressions(self) -> List[str]:
"""Get human-readable discovered factor formulas"""
if self.gp_miner is None:
return []
return self.gp_miner.get_factor_expressions()
def mine_alphas_from_sequences(sequences: np.ndarray,
targets: np.ndarray,
n_factors: int = 50) -> Tuple[np.ndarray, AlphaMiningPipeline]:
"""
Convenience function: Flatten sequences and mine alphas.
Args:
sequences: (n_samples, seq_len, n_features)
targets: (n_samples,)
Returns:
enhanced_features: (n_samples, n_features + n_factors)
pipeline: Fitted AlphaMiningPipeline
"""
# Flatten sequences for GP (GP doesn't handle sequences natively)
n_samples, seq_len, n_features = sequences.shape
X_flat = sequences.reshape(n_samples, seq_len * n_features)
# Create feature names
feature_names = [f'f{t}_{f}' for t in range(seq_len) for f in range(n_features)]
pipeline = AlphaMiningPipeline(n_gp_factors=n_factors)
enhanced = pipeline.fit_transform(X_flat, targets, feature_names)
return enhanced, pipeline
if __name__ == '__main__':
# Test alpha mining on synthetic data
np.random.seed(42)
n_samples = 5000
n_features = 20
X = np.random.randn(n_samples, n_features)
# True relationship: y = x0 * x1 + sin(x2) + noise
y = X[:, 0] * X[:, 1] + np.sin(X[:, 2] * 2) + np.random.randn(n_samples) * 0.1
miner = AlphaMiner(n_factors=20, generations=5, population_size=500)
miner.fit(X, y)
print("\nDiscovered expressions (top 5):")
for expr in miner.get_factor_expressions()[:5]:
print(f" {expr}")