File size: 4,171 Bytes
55ccb64
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
"""Anomaly Detection for Alpha Generation."""
import numpy as np
import pandas as pd
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
from typing import Dict, List, Tuple
import warnings
warnings.filterwarnings('ignore')


class AnomalyDetector:
    """Detect market anomalies using Isolation Forest and Autoencoder."""
    
    def __init__(self, contamination: float = 0.05, use_autoencoder: bool = False):
        self.contamination = contamination
        self.use_autoencoder = use_autoencoder
        self.isolation_forest = IsolationForest(contamination=contamination, random_state=42, n_estimators=200)
        self.scaler = StandardScaler()
        self.is_fitted = False
        self.anomaly_history = []
        
    def fit(self, features: pd.DataFrame):
        """Fit anomaly detection models."""
        scaled = self.scaler.fit_transform(features.fillna(0))
        self.isolation_forest.fit(scaled)
        
        if self.use_autoencoder:
            self._build_autoencoder(scaled.shape[1])
            self._train_autoencoder(scaled)
        
        self.is_fitted = True
        
    def _build_autoencoder(self, input_dim: int):
        """Build autoencoder for anomaly detection."""
        import torch
        import torch.nn as nn
        
        class Autoencoder(nn.Module):
            def __init__(self, input_dim):
                super().__init__()
                self.encoder = nn.Sequential(
                    nn.Linear(input_dim, 32), nn.ReLU(),
                    nn.Linear(32, 16), nn.ReLU(),
                    nn.Linear(16, 8)
                )
                self.decoder = nn.Sequential(
                    nn.Linear(8, 16), nn.ReLU(),
                    nn.Linear(16, 32), nn.ReLU(),
                    nn.Linear(32, input_dim)
                )
            def forward(self, x):
                encoded = self.encoder(x)
                return self.decoder(encoded), encoded
        
        self.autoencoder = Autoencoder(input_dim)
        
    def _train_autoencoder(self, X: np.ndarray, epochs: int = 30):
        try:
            import torch
            X_t = torch.FloatTensor(X)
            optimizer = torch.optim.Adam(self.autoencoder.parameters(), lr=1e-3)
            criterion = torch.nn.MSELoss()
            
            for epoch in range(epochs):
                self.autoencoder.train()
                optimizer.zero_grad()
                recon, _ = self.autoencoder(X_t)
                loss = criterion(recon, X_t)
                loss.backward()
                optimizer.step()
        except Exception as e:
            print(f"Autoencoder training skipped: {e}")
    
    def detect(self, features: pd.DataFrame) -> pd.Series:
        """Detect anomalies. Returns -1 for anomaly, 1 for normal."""
        scaled = self.scaler.transform(features.fillna(0))
        
        if_scores = self.isolation_forest.predict(scaled)
        
        if self.use_autoencoder:
            try:
                import torch
                X_t = torch.FloatTensor(scaled)
                self.autoencoder.eval()
                with torch.no_grad():
                    recon, _ = self.autoencoder(X_t)
                    recon_errors = torch.mean((recon - X_t) ** 2, dim=1).numpy()
                
                threshold = np.percentile(recon_errors, (1 - self.contamination) * 100)
                ae_scores = np.where(recon_errors > threshold, -1, 1)
                
                final_scores = np.where((if_scores == -1) | (ae_scores == -1), -1, 1)
            except:
                final_scores = if_scores
        else:
            final_scores = if_scores
        
        result = pd.Series(final_scores, index=features.index)
        self.anomaly_history.append(result)
        return result
    
    def get_anomaly_stats(self) -> Dict:
        """Statistics about detected anomalies."""
        if not self.anomaly_history: return {}
        last = self.anomaly_history[-1]
        return {
            'n_anomalies': (last == -1).sum(),
            'anomaly_rate': (last == -1).mean(),
            'total_samples': len(last)
        }