network-anomaly-detector / src /train_anomaly_detector.py
rajuamburu's picture
Upload folder using huggingface_hub
b4c66bc verified
import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.svm import OneClassSVM
from sklearn.neighbors import LocalOutlierFactor
from sklearn.preprocessing import StandardScaler
import joblib
def train_anomaly_detector(features_file='output/features.csv', model_type='isolation_forest', contamination=0.01):
"""
Train unsupervised anomaly detection model on traffic features.
Parameters:
- features_file: Path to extracted features CSV
- model_type: 'isolation_forest', 'one_class_svm', or 'lof'
- contamination: Expected proportion of outliers (default: 0.01 = 1%)
"""
print(f"\n{'='*60}")
print("UNSUPERVISED OUTLIER DETECTION TRAINING")
print(f"Model: {model_type.upper()}")
print(f"{'='*60}\n")
# Load features
print(f"Loading features from {features_file}...")
df = pd.read_csv(features_file)
print(f"Total windows: {len(df)}")
# Feature sanity check
if df.isnull().sum().sum() > 0:
raise ValueError("Feature file contains NaN values. Clean data first.")
# Select feature columns (exclude window_id)
feature_cols = [col for col in df.columns if col != 'window_id']
X = df[feature_cols].values
print(f"Feature dimensions: {X.shape}")
print(f"Features used: {len(feature_cols)} features")
print(f"Contamination: {contamination}\n")
# Save feature order (critical for real-time detection)
print("Saving feature column order...")
joblib.dump(feature_cols, 'models/feature_columns.pkl')
# Standardize features (important for anomaly detection)
print("Standardizing features...")
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# Train model based on type
print(f"Training {model_type} model...\n")
if model_type == 'isolation_forest':
# Isolation Forest: Good for high-dimensional data
model = IsolationForest(
contamination=contamination,
random_state=42,
n_estimators=100
)
model.fit(X_scaled)
predictions = model.predict(X_scaled)
scores = model.decision_function(X_scaled)
elif model_type == 'one_class_svm':
# One-Class SVM: Learns boundary of normal data
model = OneClassSVM(
nu=contamination,
kernel='rbf',
gamma='auto'
)
model.fit(X_scaled)
predictions = model.predict(X_scaled)
scores = model.decision_function(X_scaled)
elif model_type == 'lof':
# Local Outlier Factor: Density-based
model = LocalOutlierFactor(
n_neighbors=20,
contamination=contamination,
novelty=True # Allows predict on new data
)
model.fit(X_scaled)
predictions = model.predict(X_scaled)
scores = model.decision_function(X_scaled)
else:
raise ValueError(f"Unknown model_type: {model_type}")
# Predictions: 1 = normal, -1 = outlier
outlier_count = (predictions == -1).sum()
normal_count = (predictions == 1).sum()
print(f"{'='*60}")
print("TRAINING RESULTS")
print(f"{'='*60}")
print(f"Normal windows: {normal_count} ({normal_count/len(predictions)*100:.1f}%)")
print(f"Outlier windows: {outlier_count} ({outlier_count/len(predictions)*100:.1f}%)")
print(f"Note: This is on training data - not validation")
print(f"{'='*60}\n")
# Add predictions and scores to dataframe
df['prediction'] = predictions
df['is_outlier'] = (predictions == -1).astype(int)
df['outlier_score'] = scores
# Show detected outliers
outliers = df[df['is_outlier'] == 1].copy()
if len(outliers) > 0:
print("DETECTED OUTLIERS (Unusual Traffic Patterns):")
print(f"{'='*60}")
print(outliers[['window_id', 'packet_count', 'packets_per_second',
'bytes_per_second', 'unique_src_ips', 'outlier_score']].to_string(index=False))
print(f"{'='*60}\n")
else:
print("No outliers detected (all windows within normal range)\n")
# Save model and scaler
model_filename = f'models/{model_type}_model.pkl'
scaler_filename = 'models/scaler.pkl'
print("Saving model and scaler...")
joblib.dump(model, model_filename)
joblib.dump(scaler, scaler_filename)
print(f"Model saved to: {model_filename}")
print(f"Scaler saved to: {scaler_filename}\n")
# Save results
results_file = f'output/outlier_results_{model_type}.csv'
df.to_csv(results_file, index=False)
print(f"Results saved to: {results_file}\n")
# Plot score distribution
try:
import matplotlib.pyplot as plt
plt.figure(figsize=(10, 6))
plt.hist(scores, bins=50, edgecolor='black', alpha=0.7)
plt.axvline(scores[predictions == -1].max(), color='red', linestyle='--',
label=f'Outlier threshold')
plt.xlabel('Outlier Score')
plt.ylabel('Frequency')
plt.title(f'{model_type.upper()} - Score Distribution')
plt.legend()
plt.grid(True, alpha=0.3)
plot_file = f'output/score_distribution_{model_type}.png'
plt.savefig(plot_file)
print(f"Score distribution plot saved to: {plot_file}\n")
plt.close()
except Exception as e:
print(f"Could not create plot: {e}\n")
return model, scaler, df
if __name__ == "__main__":
import os
# Create directories
os.makedirs('models', exist_ok=True)
os.makedirs('pkts', exist_ok=True)
# Load and scale features once
print("\n" + "="*60)
print("BASELINE TRAFFIC MODELING (UNSUPERVISED)")
print("="*60)
print("\nNote: Training on normal traffic only.")
print("Outliers = statistically unusual patterns (not attacks).\n")
# Train different models with low contamination
models_to_train = ['isolation_forest', 'one_class_svm', 'lof']
contamination = 0.05 # Expect 5% outliers
for model_type in models_to_train:
print(f"\n{'#'*60}")
print(f"# MODEL: {model_type.upper()}")
print(f"{'#'*60}\n")
try:
model, scaler, results = train_anomaly_detector(
features_file='output/features.csv',
model_type=model_type,
contamination=contamination
)
print(f"✓ {model_type} training complete!\n")
except Exception as e:
print(f"✗ Error training {model_type}: {e}\n")
print("\n" + "="*60)
print("ALL MODELS TRAINED")
print("="*60)
print("\nNext steps:")
print("1. Review outlier_results_*.csv files")
print("2. Check score_distribution_*.png plots")
print("3. Compare which model best identifies burst traffic")
print("4. Use isolation_forest for real-time detection (recommended)")
print("\nReminder: These are statistical outliers, not validated attacks.")