Premchan369 commited on
Commit
55ccb64
·
verified ·
1 Parent(s): 8d72d8e

Upload anomaly_detector.py

Browse files
Files changed (1) hide show
  1. anomaly_detector.py +110 -0
anomaly_detector.py ADDED
@@ -0,0 +1,110 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Anomaly Detection for Alpha Generation."""
2
+ import numpy as np
3
+ import pandas as pd
4
+ from sklearn.ensemble import IsolationForest
5
+ from sklearn.preprocessing import StandardScaler
6
+ from typing import Dict, List, Tuple
7
+ import warnings
8
+ warnings.filterwarnings('ignore')
9
+
10
+
11
+ class AnomalyDetector:
12
+ """Detect market anomalies using Isolation Forest and Autoencoder."""
13
+
14
+ def __init__(self, contamination: float = 0.05, use_autoencoder: bool = False):
15
+ self.contamination = contamination
16
+ self.use_autoencoder = use_autoencoder
17
+ self.isolation_forest = IsolationForest(contamination=contamination, random_state=42, n_estimators=200)
18
+ self.scaler = StandardScaler()
19
+ self.is_fitted = False
20
+ self.anomaly_history = []
21
+
22
+ def fit(self, features: pd.DataFrame):
23
+ """Fit anomaly detection models."""
24
+ scaled = self.scaler.fit_transform(features.fillna(0))
25
+ self.isolation_forest.fit(scaled)
26
+
27
+ if self.use_autoencoder:
28
+ self._build_autoencoder(scaled.shape[1])
29
+ self._train_autoencoder(scaled)
30
+
31
+ self.is_fitted = True
32
+
33
+ def _build_autoencoder(self, input_dim: int):
34
+ """Build autoencoder for anomaly detection."""
35
+ import torch
36
+ import torch.nn as nn
37
+
38
+ class Autoencoder(nn.Module):
39
+ def __init__(self, input_dim):
40
+ super().__init__()
41
+ self.encoder = nn.Sequential(
42
+ nn.Linear(input_dim, 32), nn.ReLU(),
43
+ nn.Linear(32, 16), nn.ReLU(),
44
+ nn.Linear(16, 8)
45
+ )
46
+ self.decoder = nn.Sequential(
47
+ nn.Linear(8, 16), nn.ReLU(),
48
+ nn.Linear(16, 32), nn.ReLU(),
49
+ nn.Linear(32, input_dim)
50
+ )
51
+ def forward(self, x):
52
+ encoded = self.encoder(x)
53
+ return self.decoder(encoded), encoded
54
+
55
+ self.autoencoder = Autoencoder(input_dim)
56
+
57
+ def _train_autoencoder(self, X: np.ndarray, epochs: int = 30):
58
+ try:
59
+ import torch
60
+ X_t = torch.FloatTensor(X)
61
+ optimizer = torch.optim.Adam(self.autoencoder.parameters(), lr=1e-3)
62
+ criterion = torch.nn.MSELoss()
63
+
64
+ for epoch in range(epochs):
65
+ self.autoencoder.train()
66
+ optimizer.zero_grad()
67
+ recon, _ = self.autoencoder(X_t)
68
+ loss = criterion(recon, X_t)
69
+ loss.backward()
70
+ optimizer.step()
71
+ except Exception as e:
72
+ print(f"Autoencoder training skipped: {e}")
73
+
74
+ def detect(self, features: pd.DataFrame) -> pd.Series:
75
+ """Detect anomalies. Returns -1 for anomaly, 1 for normal."""
76
+ scaled = self.scaler.transform(features.fillna(0))
77
+
78
+ if_scores = self.isolation_forest.predict(scaled)
79
+
80
+ if self.use_autoencoder:
81
+ try:
82
+ import torch
83
+ X_t = torch.FloatTensor(scaled)
84
+ self.autoencoder.eval()
85
+ with torch.no_grad():
86
+ recon, _ = self.autoencoder(X_t)
87
+ recon_errors = torch.mean((recon - X_t) ** 2, dim=1).numpy()
88
+
89
+ threshold = np.percentile(recon_errors, (1 - self.contamination) * 100)
90
+ ae_scores = np.where(recon_errors > threshold, -1, 1)
91
+
92
+ final_scores = np.where((if_scores == -1) | (ae_scores == -1), -1, 1)
93
+ except:
94
+ final_scores = if_scores
95
+ else:
96
+ final_scores = if_scores
97
+
98
+ result = pd.Series(final_scores, index=features.index)
99
+ self.anomaly_history.append(result)
100
+ return result
101
+
102
+ def get_anomaly_stats(self) -> Dict:
103
+ """Statistics about detected anomalies."""
104
+ if not self.anomaly_history: return {}
105
+ last = self.anomaly_history[-1]
106
+ return {
107
+ 'n_anomalies': (last == -1).sum(),
108
+ 'anomaly_rate': (last == -1).mean(),
109
+ 'total_samples': len(last)
110
+ }