| """Anomaly Detection for Alpha Generation.""" |
| import numpy as np |
| import pandas as pd |
| from sklearn.ensemble import IsolationForest |
| from sklearn.preprocessing import StandardScaler |
| from typing import Dict, List, Tuple |
| import warnings |
| warnings.filterwarnings('ignore') |
|
|
|
|
| class AnomalyDetector: |
| """Detect market anomalies using Isolation Forest and Autoencoder.""" |
| |
| def __init__(self, contamination: float = 0.05, use_autoencoder: bool = False): |
| self.contamination = contamination |
| self.use_autoencoder = use_autoencoder |
| self.isolation_forest = IsolationForest(contamination=contamination, random_state=42, n_estimators=200) |
| self.scaler = StandardScaler() |
| self.is_fitted = False |
| self.anomaly_history = [] |
| |
| def fit(self, features: pd.DataFrame): |
| """Fit anomaly detection models.""" |
| scaled = self.scaler.fit_transform(features.fillna(0)) |
| self.isolation_forest.fit(scaled) |
| |
| if self.use_autoencoder: |
| self._build_autoencoder(scaled.shape[1]) |
| self._train_autoencoder(scaled) |
| |
| self.is_fitted = True |
| |
| def _build_autoencoder(self, input_dim: int): |
| """Build autoencoder for anomaly detection.""" |
| import torch |
| import torch.nn as nn |
| |
| class Autoencoder(nn.Module): |
| def __init__(self, input_dim): |
| super().__init__() |
| self.encoder = nn.Sequential( |
| nn.Linear(input_dim, 32), nn.ReLU(), |
| nn.Linear(32, 16), nn.ReLU(), |
| nn.Linear(16, 8) |
| ) |
| self.decoder = nn.Sequential( |
| nn.Linear(8, 16), nn.ReLU(), |
| nn.Linear(16, 32), nn.ReLU(), |
| nn.Linear(32, input_dim) |
| ) |
| def forward(self, x): |
| encoded = self.encoder(x) |
| return self.decoder(encoded), encoded |
| |
| self.autoencoder = Autoencoder(input_dim) |
| |
| def _train_autoencoder(self, X: np.ndarray, epochs: int = 30): |
| try: |
| import torch |
| X_t = torch.FloatTensor(X) |
| optimizer = torch.optim.Adam(self.autoencoder.parameters(), lr=1e-3) |
| criterion = torch.nn.MSELoss() |
| |
| for epoch in range(epochs): |
| self.autoencoder.train() |
| optimizer.zero_grad() |
| recon, _ = self.autoencoder(X_t) |
| loss = criterion(recon, X_t) |
| loss.backward() |
| optimizer.step() |
| except Exception as e: |
| print(f"Autoencoder training skipped: {e}") |
| |
| def detect(self, features: pd.DataFrame) -> pd.Series: |
| """Detect anomalies. Returns -1 for anomaly, 1 for normal.""" |
| scaled = self.scaler.transform(features.fillna(0)) |
| |
| if_scores = self.isolation_forest.predict(scaled) |
| |
| if self.use_autoencoder: |
| try: |
| import torch |
| X_t = torch.FloatTensor(scaled) |
| self.autoencoder.eval() |
| with torch.no_grad(): |
| recon, _ = self.autoencoder(X_t) |
| recon_errors = torch.mean((recon - X_t) ** 2, dim=1).numpy() |
| |
| threshold = np.percentile(recon_errors, (1 - self.contamination) * 100) |
| ae_scores = np.where(recon_errors > threshold, -1, 1) |
| |
| final_scores = np.where((if_scores == -1) | (ae_scores == -1), -1, 1) |
| except: |
| final_scores = if_scores |
| else: |
| final_scores = if_scores |
| |
| result = pd.Series(final_scores, index=features.index) |
| self.anomaly_history.append(result) |
| return result |
| |
| def get_anomaly_stats(self) -> Dict: |
| """Statistics about detected anomalies.""" |
| if not self.anomaly_history: return {} |
| last = self.anomaly_history[-1] |
| return { |
| 'n_anomalies': (last == -1).sum(), |
| 'anomaly_rate': (last == -1).mean(), |
| 'total_samples': len(last) |
| } |
|
|