""" train_anomaly_detection.py Trains an anomaly detection model (Isolation Forest, One-Class SVM, etc.) on a dataset. Allows dropping or selecting columns, label-encoding for non-numeric data, saves predictions (0 = normal, 1 = outlier) and optionally visualizes in 2D. Usage Example: -------------- python scripts/train_anomaly_detection.py \ --model_module isolation_forest \ --data_path data/raw/my_dataset.csv \ --drop_columns "unwanted_col" \ --select_columns "feat1,feat2,feat3" \ --visualize """ import os import sys import argparse import importlib import pandas as pd import numpy as np import joblib from sklearn.preprocessing import LabelEncoder import matplotlib.pyplot as plt from timeit import default_timer as timer def main(args): # Change to the project root if needed project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) os.chdir(project_root) sys.path.insert(0, project_root) # Dynamically import the chosen anomaly model module model_module_path = f"models.unsupervised.anomaly.{args.model_module}" model_module = importlib.import_module(model_module_path) # Retrieve the estimator from the model file estimator = model_module.estimator # Prepare results directory if args.results_path is None: # e.g., 'results/IsolationForest_Anomaly' args.results_path = os.path.join("results", f"{estimator.__class__.__name__}_Anomaly") os.makedirs(args.results_path, exist_ok=True) # Prepare model directory if args.model_path is None: # e.g., 'saved_model/IsolationForest_Anomaly' args.model_path = os.path.join('saved_models', f"{estimator.__class__.__name__}_Anomaly") os.makedirs(args.model_path, exist_ok=True) # Load data df = pd.read_csv(args.data_path) print(f"Data loaded from {args.data_path}, initial shape: {df.shape}") # Drop empty columns df = df.dropna(axis='columns', how='all') print("After dropping empty columns:", df.shape) # Drop specified columns if any if args.drop_columns: drop_cols = [c.strip() for c in args.drop_columns.split(',') if c.strip()] df.drop(columns=drop_cols, inplace=True, errors='ignore') print(f"Dropped columns: {drop_cols} | New shape: {df.shape}") # Select specified columns if any if args.select_columns: keep_cols = [c.strip() for c in args.select_columns.split(',') if c.strip()] df = df[keep_cols] print(f"Selected columns: {keep_cols} | New shape: {df.shape}") # Label-encode non-numeric columns for col in df.columns: if df[col].dtype == 'object': le = LabelEncoder() df[col] = le.fit_transform(df[col]) # Convert DataFrame to numpy array X = df.values print(f"Final data shape after dropping/selecting columns and encoding: {X.shape}") # Fit the anomaly model start_time = timer() estimator.fit(X) end_time = timer() train_time = end_time - start_time print(f"Anomaly detection training with {args.model_module} completed in {train_time:.2f} seconds.") # Save the model model_output_path = os.path.join(args.model_path, "anomaly_model.pkl") joblib.dump(estimator, model_output_path) print(f"Model saved to {model_output_path}") # Predict outliers: Typically returns 1 for inliers, -1 for outliers (or vice versa) # We'll unify them to 0 = normal, 1 = outlier raw_preds = estimator.predict(X) # Some anomaly detectors do the opposite: IsolationForest => +1 inlier, -1 outlier # Convert to 0/1: preds_binary = np.where(raw_preds == 1, 0, 1) outlier_count = np.sum(preds_binary) inlier_count = len(preds_binary) - outlier_count print(f"Detected {outlier_count} outliers out of {len(X)} samples. ({inlier_count} normal)") # Save predictions pred_df = pd.DataFrame({ 'OutlierPrediction': preds_binary }) pred_path = os.path.join(args.results_path, "predictions.csv") pred_df.to_csv(pred_path, index=False) print(f"Predictions saved to {pred_path}") # Visualization if 2D or 3D if args.visualize: print("Creating anomaly detection visualization...") # We'll do PCA => 2D if dimension > 2 if X.shape[1] > 2: from sklearn.decomposition import PCA pca = PCA(n_components=2) X_2d = pca.fit_transform(X) x_label = "PC1" y_label = "PC2" elif X.shape[1] == 2: X_2d = X x_label = df.columns[0] if df.shape[1] == 2 else "Feature 1" y_label = df.columns[1] if df.shape[1] == 2 else "Feature 2" else: # 1D or 0D => skip print("Only 1 feature or none; can't create 2D scatter. Skipping.") return # Plot plt.figure(figsize=(6,5)) # color outliers differently colors = np.where(preds_binary == 1, 'r', 'b') plt.scatter(X_2d[:,0], X_2d[:,1], c=colors, s=30, alpha=0.7) plt.title(f"{estimator.__class__.__name__} Anomaly Detection") plt.xlabel(x_label) plt.ylabel(y_label) # Save plot_path = os.path.join(args.results_path, "anomaly_plot.png") plt.savefig(plot_path) plt.show() print(f"Anomaly plot saved to {plot_path}") if __name__ == "__main__": parser = argparse.ArgumentParser(description="Train an anomaly detection model.") parser.add_argument('--model_module', type=str, required=True, help='Name of the anomaly detection model (e.g. isolation_forest, one_class_svm).') parser.add_argument('--data_path', type=str, required=True, help='Path to the CSV dataset file.') parser.add_argument('--model_path', type=str, default=None, help='Path to save the trained model.') parser.add_argument('--results_path', type=str, default=None, help='Directory to save results (predictions, plots).') parser.add_argument('--drop_columns', type=str, default='', help='Comma-separated column names to drop.') parser.add_argument('--select_columns', type=str, default='', help='Comma-separated column names to keep (ignore the rest).') parser.add_argument('--visualize', action='store_true', help='If set, reduce to 2D (via PCA if needed) and color outliers vs. normal points.') args = parser.parse_args() main(args)