import pandas as pd import matplotlib.pyplot as plt import joblib from sklearn.preprocessing import LabelEncoder, StandardScaler from sklearn.model_selection import train_test_split from sklearn.ensemble import RandomForestClassifier import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader, TensorDataset import json import numpy as np import random label_encoder = LabelEncoder() scaler = StandardScaler() def load_and_preprocess_data(): df = pd.read_csv("dataset/QOS VOIP.csv") df['Status'] = label_encoder.fit_transform(df['Status']) for col in df.select_dtypes(include='object').columns: df[col] = LabelEncoder().fit_transform(df[col]) return df def get_top_features(): df = load_and_preprocess_data() X = df.drop('Status', axis=1) y = df['Status'] model = RandomForestClassifier(n_estimators=100, random_state=42) model.fit(X, y) importances = model.feature_importances_ feature_importance = sorted(zip(X.columns, importances), key=lambda x: x[1], reverse=True) top_5 = [feat for feat, _ in feature_importance[:5]] # name selected_features = ['Lost percentage','Max Delta (ms)', 'Max Jitter'] filtered_features = [feat for feat in selected_features if feat in top_5] return filtered_features def get_splits(top_features): df = load_and_preprocess_data() X = df[top_features] y = df['Status'] #X_train,y_train,x_test,y_test return train_test_split(X, y, test_size=0.2, random_state=42), top_features def get_scaled_data(top_features): (X_train, X_test, y_train, y_test), _ = get_splits(top_features) X_train_scaled = scaler.fit_transform(X_train) X_test_scaled = scaler.transform(X_test) joblib.dump(scaler, "scaler.pkl") print("Scaler saved to scaler.pkl") X_train_tensor = torch.FloatTensor(X_train_scaled) X_test_tensor = torch.FloatTensor(X_test_scaled) y_train_tensor = torch.FloatTensor(y_train.values).reshape(-1, 1) y_test_tensor = torch.FloatTensor(y_test.values).reshape(-1, 1) return X_train_tensor, X_test_tensor, y_train_tensor, y_test_tensor, X_train.shape[1] class VOIPClassifier(nn.Module): def __init__(self, input_dim): super(VOIPClassifier, self).__init__() self.model = nn.Sequential( nn.Linear(input_dim, 64), nn.ReLU(), nn.Dropout(0.2), nn.Linear(64, 32), nn.ReLU(), nn.Linear(32, 1), nn.Sigmoid() ) def forward(self, x): return self.model(x) def set_seed(seed=42): torch.manual_seed(seed) torch.cuda.manual_seed(seed) torch.cuda.manual_seed_all(seed) np.random.seed(seed) random.seed(seed) torch.backends.cudnn.deterministic = True torch.backends.cudnn.benchmark = False def build_and_train_nn(top_features, epochs=3, batch_size=32): set_seed(seed=42) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") X_train, X_test, y_train, y_test, input_dim = get_scaled_data(top_features) train_dataset = TensorDataset(X_train, y_train) train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) model = VOIPClassifier(input_dim).to(device) criterion = nn.BCELoss() optimizer = optim.Adam(model.parameters(), lr=0.01) scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=3, factor=0.5, verbose=True) train_losses, train_accs, val_losses, val_accs = [], [], [], [] for epoch in range(epochs): model.train() running_loss, correct, total = 0.0, 0, 0 for inputs, labels in train_loader: inputs, labels = inputs.to(device), labels.to(device) optimizer.zero_grad() outputs = model(inputs) loss = criterion(outputs, labels) loss.backward() optimizer.step() running_loss += loss.item() predicted = (outputs > 0.5).float() total += labels.size(0) correct += (predicted == labels).sum().item() train_loss = running_loss / len(train_loader) train_acc = correct / total train_losses.append(train_loss) train_accs.append(train_acc) model.eval() with torch.no_grad(): X_test_device, y_test_device = X_test.to(device), y_test.to(device) val_outputs = model(X_test_device) val_loss = criterion(val_outputs, y_test_device) val_predicted = (val_outputs > 0.5).float() val_correct = (val_predicted == y_test_device).sum().item() val_acc = val_correct / y_test_device.size(0) val_losses.append(val_loss.item()) val_accs.append(val_acc) scheduler.step(val_loss) print(f"Epoch {epoch+1}/{epochs} - " f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, " f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}") history = { 'loss': train_losses, 'accuracy': train_accs, 'val_loss': val_losses, 'val_accuracy': val_accs } return model, history def plot_accuracy_loss(history): plt.figure(figsize=(12, 5)) plt.subplot(1, 2, 1) plt.plot(history['accuracy'], label='Train Acc') plt.plot(history['val_accuracy'], label='Val Acc') plt.title("Model Accuracy") plt.xlabel("Epoch") plt.ylabel("Accuracy") plt.legend() plt.subplot(1, 2, 2) plt.plot(history['loss'], label='Train Loss') plt.plot(history['val_loss'], label='Val Loss') plt.title("Model Loss") plt.xlabel("Epoch") plt.ylabel("Loss") plt.legend() plt.tight_layout() plt.show() def save_trained_model(model, filename="VOIP_Classifier.pth"): torch.save(model.state_dict(), filename) print(f"Model saved as {filename}") def display_top_features(): top_features = get_top_features() print("Selected Features from Top 5 by Importance (if present):") for i, feat in enumerate(top_features, 1): print(f"{i}. {feat}") return top_features def save_history_as_json(history, filename="training_history.json"): with open(filename, 'w') as f: json.dump(history, f) print(f"Training history saved to {filename}")