| import numpy as np | |
| from sklearn.ensemble import IsolationForest | |
| from sklearn.preprocessing import StandardScaler | |
| class AnomalyDetector: | |
| def __init__(self): | |
| self.model = IsolationForest(contamination=0.1, random_state=42) | |
| self.scaler = StandardScaler() | |
| def detect(self, data): | |
| # Select numeric columns | |
| numeric_columns = data.select_dtypes(include=[np.number]).columns | |
| X = data[numeric_columns] | |
| # Scale the data | |
| X_scaled = self.scaler.fit_transform(X) | |
| # Fit the model and predict | |
| self.model.fit(X_scaled) | |
| anomaly_labels = self.model.predict(X_scaled) | |
| # Create a DataFrame with anomaly information | |
| anomaly_data = data.copy() | |
| anomaly_data['is_anomaly'] = anomaly_labels == -1 | |
| # Calculate anomaly scores | |
| anomaly_scores = self.model.decision_function(X_scaled) | |
| anomaly_data['anomaly_score'] = anomaly_scores | |
| # Sort by anomaly score (most anomalous first) | |
| anomaly_data = anomaly_data.sort_values('anomaly_score') | |
| # Return summary of anomalies | |
| n_anomalies = anomaly_data['is_anomaly'].sum() | |
| summary = f"Detected {n_anomalies} anomalies out of {len(data)} data points." | |
| return summary, anomaly_data[anomaly_data['is_anomaly']] |