Spaces:
Sleeping
Sleeping
Prepare project for Hugging Face Space deployment - Add app.py with Gradio interface - Update requirements.txt with torch dependencies - Configure LFS for large files (models, data) - Update README with comprehensive documentation
d2173d1
| """ | |
| Anomaly Detection Model using LSTM Neural Network | |
| """ | |
| import torch | |
| import torch.nn as nn | |
| import numpy as np | |
| from pathlib import Path | |
| import pickle | |
| class LSTMAnomalyDetector(nn.Module): | |
| """ | |
| LSTM-based anomaly detection model for time-series sensor data | |
| """ | |
| def __init__(self, input_size, hidden_size=64, num_layers=2, dropout=0.2): | |
| super(LSTMAnomalyDetector, self).__init__() | |
| self.hidden_size = hidden_size | |
| self.num_layers = num_layers | |
| # LSTM layers | |
| self.lstm = nn.LSTM( | |
| input_size=input_size, | |
| hidden_size=hidden_size, | |
| num_layers=num_layers, | |
| batch_first=True, | |
| dropout=dropout if num_layers > 1 else 0 | |
| ) | |
| # Fully connected layers | |
| self.fc1 = nn.Linear(hidden_size, 32) | |
| self.relu = nn.ReLU() | |
| self.dropout = nn.Dropout(dropout) | |
| self.fc2 = nn.Linear(32, 1) | |
| self.sigmoid = nn.Sigmoid() | |
| def forward(self, x): | |
| # LSTM forward pass | |
| lstm_out, _ = self.lstm(x) | |
| # Take the last output | |
| last_output = lstm_out[:, -1, :] | |
| # Fully connected layers | |
| out = self.fc1(last_output) | |
| out = self.relu(out) | |
| out = self.dropout(out) | |
| out = self.fc2(out) | |
| out = self.sigmoid(out) | |
| return out | |
| class AnomalyDetectionModel: | |
| """ | |
| Wrapper class for anomaly detection model with training and inference | |
| """ | |
| def __init__(self, input_size, sequence_length=50, device=None): | |
| self.input_size = input_size | |
| self.sequence_length = sequence_length | |
| self.device = device or torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| self.model = LSTMAnomalyDetector(input_size).to(self.device) | |
| self.criterion = nn.BCELoss() | |
| self.optimizer = torch.optim.Adam(self.model.parameters(), lr=0.001) | |
| print(f"Initialized Anomaly Detection Model on {self.device}") | |
| def create_sequences(self, data, labels=None): | |
| """ | |
| Create sequences for LSTM input | |
| Args: | |
| data: numpy array of shape (n_samples, n_features) | |
| labels: optional numpy array of labels | |
| Returns: | |
| Sequences and labels (if provided) | |
| """ | |
| sequences = [] | |
| seq_labels = [] | |
| for i in range(len(data) - self.sequence_length + 1): | |
| seq = data[i:i + self.sequence_length] | |
| sequences.append(seq) | |
| if labels is not None: | |
| # Label is 1 if any point in sequence is anomalous | |
| label = labels[i + self.sequence_length - 1] | |
| seq_labels.append(label) | |
| sequences = np.array(sequences) | |
| if labels is not None: | |
| seq_labels = np.array(seq_labels) | |
| return sequences, seq_labels | |
| return sequences | |
| def train_epoch(self, train_loader): | |
| """Train for one epoch""" | |
| self.model.train() | |
| total_loss = 0 | |
| for batch_x, batch_y in train_loader: | |
| batch_x = batch_x.to(self.device) | |
| batch_y = batch_y.to(self.device) | |
| # Forward pass | |
| outputs = self.model(batch_x) | |
| loss = self.criterion(outputs.squeeze(), batch_y.float()) | |
| # Backward pass | |
| self.optimizer.zero_grad() | |
| loss.backward() | |
| self.optimizer.step() | |
| total_loss += loss.item() | |
| return total_loss / len(train_loader) | |
| def evaluate(self, val_loader): | |
| """Evaluate on validation set""" | |
| self.model.eval() | |
| total_loss = 0 | |
| all_preds = [] | |
| all_labels = [] | |
| with torch.no_grad(): | |
| for batch_x, batch_y in val_loader: | |
| batch_x = batch_x.to(self.device) | |
| batch_y = batch_y.to(self.device) | |
| outputs = self.model(batch_x) | |
| loss = self.criterion(outputs.squeeze(), batch_y.float()) | |
| total_loss += loss.item() | |
| preds = (outputs.squeeze() > 0.5).cpu().numpy() | |
| all_preds.extend(preds) | |
| all_labels.extend(batch_y.cpu().numpy()) | |
| avg_loss = total_loss / len(val_loader) | |
| # Calculate metrics | |
| all_preds = np.array(all_preds) | |
| all_labels = np.array(all_labels) | |
| accuracy = (all_preds == all_labels).mean() | |
| return avg_loss, accuracy | |
| def predict(self, data): | |
| """ | |
| Predict anomalies for given data | |
| Args: | |
| data: numpy array of shape (n_samples, n_features) | |
| Returns: | |
| Anomaly scores and binary predictions | |
| """ | |
| self.model.eval() | |
| # Create sequences | |
| sequences = self.create_sequences(data) | |
| # Convert to tensor | |
| sequences_tensor = torch.FloatTensor(sequences).to(self.device) | |
| # Predict | |
| with torch.no_grad(): | |
| scores = self.model(sequences_tensor).squeeze().cpu().numpy() | |
| # Binary predictions | |
| predictions = (scores > 0.5).astype(int) | |
| return scores, predictions | |
| def save(self, path): | |
| """Save model""" | |
| path = Path(path) | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| torch.save({ | |
| 'model_state_dict': self.model.state_dict(), | |
| 'optimizer_state_dict': self.optimizer.state_dict(), | |
| 'input_size': self.input_size, | |
| 'sequence_length': self.sequence_length, | |
| }, path) | |
| print(f"✓ Model saved to {path}") | |
| def load(self, path): | |
| """Load model""" | |
| checkpoint = torch.load(path, map_location=self.device) | |
| self.model.load_state_dict(checkpoint['model_state_dict']) | |
| self.optimizer.load_state_dict(checkpoint['optimizer_state_dict']) | |
| self.input_size = checkpoint['input_size'] | |
| self.sequence_length = checkpoint['sequence_length'] | |
| print(f"✓ Model loaded from {path}") | |