alphaforge-quant-system / anomaly_detector.py
Premchan369's picture
Upload anomaly_detector.py
55ccb64 verified
"""Anomaly Detection for Alpha Generation."""
import numpy as np
import pandas as pd
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
from typing import Dict, List, Tuple
import warnings
warnings.filterwarnings('ignore')
class AnomalyDetector:
"""Detect market anomalies using Isolation Forest and Autoencoder."""
def __init__(self, contamination: float = 0.05, use_autoencoder: bool = False):
self.contamination = contamination
self.use_autoencoder = use_autoencoder
self.isolation_forest = IsolationForest(contamination=contamination, random_state=42, n_estimators=200)
self.scaler = StandardScaler()
self.is_fitted = False
self.anomaly_history = []
def fit(self, features: pd.DataFrame):
"""Fit anomaly detection models."""
scaled = self.scaler.fit_transform(features.fillna(0))
self.isolation_forest.fit(scaled)
if self.use_autoencoder:
self._build_autoencoder(scaled.shape[1])
self._train_autoencoder(scaled)
self.is_fitted = True
def _build_autoencoder(self, input_dim: int):
"""Build autoencoder for anomaly detection."""
import torch
import torch.nn as nn
class Autoencoder(nn.Module):
def __init__(self, input_dim):
super().__init__()
self.encoder = nn.Sequential(
nn.Linear(input_dim, 32), nn.ReLU(),
nn.Linear(32, 16), nn.ReLU(),
nn.Linear(16, 8)
)
self.decoder = nn.Sequential(
nn.Linear(8, 16), nn.ReLU(),
nn.Linear(16, 32), nn.ReLU(),
nn.Linear(32, input_dim)
)
def forward(self, x):
encoded = self.encoder(x)
return self.decoder(encoded), encoded
self.autoencoder = Autoencoder(input_dim)
def _train_autoencoder(self, X: np.ndarray, epochs: int = 30):
try:
import torch
X_t = torch.FloatTensor(X)
optimizer = torch.optim.Adam(self.autoencoder.parameters(), lr=1e-3)
criterion = torch.nn.MSELoss()
for epoch in range(epochs):
self.autoencoder.train()
optimizer.zero_grad()
recon, _ = self.autoencoder(X_t)
loss = criterion(recon, X_t)
loss.backward()
optimizer.step()
except Exception as e:
print(f"Autoencoder training skipped: {e}")
def detect(self, features: pd.DataFrame) -> pd.Series:
"""Detect anomalies. Returns -1 for anomaly, 1 for normal."""
scaled = self.scaler.transform(features.fillna(0))
if_scores = self.isolation_forest.predict(scaled)
if self.use_autoencoder:
try:
import torch
X_t = torch.FloatTensor(scaled)
self.autoencoder.eval()
with torch.no_grad():
recon, _ = self.autoencoder(X_t)
recon_errors = torch.mean((recon - X_t) ** 2, dim=1).numpy()
threshold = np.percentile(recon_errors, (1 - self.contamination) * 100)
ae_scores = np.where(recon_errors > threshold, -1, 1)
final_scores = np.where((if_scores == -1) | (ae_scores == -1), -1, 1)
except:
final_scores = if_scores
else:
final_scores = if_scores
result = pd.Series(final_scores, index=features.index)
self.anomaly_history.append(result)
return result
def get_anomaly_stats(self) -> Dict:
"""Statistics about detected anomalies."""
if not self.anomaly_history: return {}
last = self.anomaly_history[-1]
return {
'n_anomalies': (last == -1).sum(),
'anomaly_rate': (last == -1).mean(),
'total_samples': len(last)
}