"""Anomaly Detection for Alpha Generation.""" import numpy as np import pandas as pd from sklearn.ensemble import IsolationForest from sklearn.preprocessing import StandardScaler from typing import Dict, List, Tuple import warnings warnings.filterwarnings('ignore') class AnomalyDetector: """Detect market anomalies using Isolation Forest and Autoencoder.""" def __init__(self, contamination: float = 0.05, use_autoencoder: bool = False): self.contamination = contamination self.use_autoencoder = use_autoencoder self.isolation_forest = IsolationForest(contamination=contamination, random_state=42, n_estimators=200) self.scaler = StandardScaler() self.is_fitted = False self.anomaly_history = [] def fit(self, features: pd.DataFrame): """Fit anomaly detection models.""" scaled = self.scaler.fit_transform(features.fillna(0)) self.isolation_forest.fit(scaled) if self.use_autoencoder: self._build_autoencoder(scaled.shape[1]) self._train_autoencoder(scaled) self.is_fitted = True def _build_autoencoder(self, input_dim: int): """Build autoencoder for anomaly detection.""" import torch import torch.nn as nn class Autoencoder(nn.Module): def __init__(self, input_dim): super().__init__() self.encoder = nn.Sequential( nn.Linear(input_dim, 32), nn.ReLU(), nn.Linear(32, 16), nn.ReLU(), nn.Linear(16, 8) ) self.decoder = nn.Sequential( nn.Linear(8, 16), nn.ReLU(), nn.Linear(16, 32), nn.ReLU(), nn.Linear(32, input_dim) ) def forward(self, x): encoded = self.encoder(x) return self.decoder(encoded), encoded self.autoencoder = Autoencoder(input_dim) def _train_autoencoder(self, X: np.ndarray, epochs: int = 30): try: import torch X_t = torch.FloatTensor(X) optimizer = torch.optim.Adam(self.autoencoder.parameters(), lr=1e-3) criterion = torch.nn.MSELoss() for epoch in range(epochs): self.autoencoder.train() optimizer.zero_grad() recon, _ = self.autoencoder(X_t) loss = criterion(recon, X_t) loss.backward() optimizer.step() except Exception as e: print(f"Autoencoder training skipped: {e}") def detect(self, features: pd.DataFrame) -> pd.Series: """Detect anomalies. Returns -1 for anomaly, 1 for normal.""" scaled = self.scaler.transform(features.fillna(0)) if_scores = self.isolation_forest.predict(scaled) if self.use_autoencoder: try: import torch X_t = torch.FloatTensor(scaled) self.autoencoder.eval() with torch.no_grad(): recon, _ = self.autoencoder(X_t) recon_errors = torch.mean((recon - X_t) ** 2, dim=1).numpy() threshold = np.percentile(recon_errors, (1 - self.contamination) * 100) ae_scores = np.where(recon_errors > threshold, -1, 1) final_scores = np.where((if_scores == -1) | (ae_scores == -1), -1, 1) except: final_scores = if_scores else: final_scores = if_scores result = pd.Series(final_scores, index=features.index) self.anomaly_history.append(result) return result def get_anomaly_stats(self) -> Dict: """Statistics about detected anomalies.""" if not self.anomaly_history: return {} last = self.anomaly_history[-1] return { 'n_anomalies': (last == -1).sum(), 'anomaly_rate': (last == -1).mean(), 'total_samples': len(last) }