| import pandas as pd |
| import matplotlib.pyplot as plt |
| import joblib |
| from sklearn.preprocessing import LabelEncoder, StandardScaler |
| from sklearn.model_selection import train_test_split |
| from sklearn.ensemble import RandomForestClassifier |
| import torch |
| import torch.nn as nn |
| import torch.optim as optim |
| from torch.utils.data import DataLoader, TensorDataset |
| import json |
| import numpy as np |
| import random |
| label_encoder = LabelEncoder() |
| scaler = StandardScaler() |
|
|
| def load_and_preprocess_data(): |
| df = pd.read_csv("dataset/QOS VOIP.csv") |
| df['Status'] = label_encoder.fit_transform(df['Status']) |
| for col in df.select_dtypes(include='object').columns: |
| df[col] = LabelEncoder().fit_transform(df[col]) |
| return df |
|
|
| def get_top_features(): |
| df = load_and_preprocess_data() |
| X = df.drop('Status', axis=1) |
| y = df['Status'] |
| model = RandomForestClassifier(n_estimators=100, random_state=42) |
| model.fit(X, y) |
| importances = model.feature_importances_ |
| feature_importance = sorted(zip(X.columns, importances), key=lambda x: x[1], reverse=True) |
| top_5 = [feat for feat, _ in feature_importance[:5]] |
| selected_features = ['Lost percentage','Max Delta (ms)', 'Max Jitter'] |
| filtered_features = [feat for feat in selected_features if feat in top_5] |
| return filtered_features |
|
|
| def get_splits(top_features): |
| df = load_and_preprocess_data() |
| X = df[top_features] |
| y = df['Status'] |
| |
| return train_test_split(X, y, test_size=0.2, random_state=42), top_features |
|
|
| def get_scaled_data(top_features): |
| (X_train, X_test, y_train, y_test), _ = get_splits(top_features) |
| X_train_scaled = scaler.fit_transform(X_train) |
| X_test_scaled = scaler.transform(X_test) |
| joblib.dump(scaler, "scaler.pkl") |
| print("Scaler saved to scaler.pkl") |
| X_train_tensor = torch.FloatTensor(X_train_scaled) |
| X_test_tensor = torch.FloatTensor(X_test_scaled) |
| y_train_tensor = torch.FloatTensor(y_train.values).reshape(-1, 1) |
| y_test_tensor = torch.FloatTensor(y_test.values).reshape(-1, 1) |
| return X_train_tensor, X_test_tensor, y_train_tensor, y_test_tensor, X_train.shape[1] |
|
|
|
|
| class VOIPClassifier(nn.Module): |
| def __init__(self, input_dim): |
| super(VOIPClassifier, self).__init__() |
| self.model = nn.Sequential( |
| nn.Linear(input_dim, 64), |
| nn.ReLU(), |
| nn.Dropout(0.2), |
| nn.Linear(64, 32), |
| nn.ReLU(), |
| nn.Linear(32, 1), |
| nn.Sigmoid() |
| ) |
|
|
| def forward(self, x): |
| return self.model(x) |
|
|
| def set_seed(seed=42): |
| torch.manual_seed(seed) |
| torch.cuda.manual_seed(seed) |
| torch.cuda.manual_seed_all(seed) |
| np.random.seed(seed) |
| random.seed(seed) |
| torch.backends.cudnn.deterministic = True |
| torch.backends.cudnn.benchmark = False |
|
|
| def build_and_train_nn(top_features, epochs=3, batch_size=32): |
| set_seed(seed=42) |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| X_train, X_test, y_train, y_test, input_dim = get_scaled_data(top_features) |
| train_dataset = TensorDataset(X_train, y_train) |
| train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) |
| model = VOIPClassifier(input_dim).to(device) |
| criterion = nn.BCELoss() |
| optimizer = optim.Adam(model.parameters(), lr=0.01) |
| scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=3, factor=0.5, verbose=True) |
| train_losses, train_accs, val_losses, val_accs = [], [], [], [] |
| for epoch in range(epochs): |
| model.train() |
| running_loss, correct, total = 0.0, 0, 0 |
| for inputs, labels in train_loader: |
| inputs, labels = inputs.to(device), labels.to(device) |
| optimizer.zero_grad() |
| outputs = model(inputs) |
| loss = criterion(outputs, labels) |
| loss.backward() |
| optimizer.step() |
| running_loss += loss.item() |
| predicted = (outputs > 0.5).float() |
| total += labels.size(0) |
| correct += (predicted == labels).sum().item() |
| train_loss = running_loss / len(train_loader) |
| train_acc = correct / total |
| train_losses.append(train_loss) |
| train_accs.append(train_acc) |
| model.eval() |
| with torch.no_grad(): |
| X_test_device, y_test_device = X_test.to(device), y_test.to(device) |
| val_outputs = model(X_test_device) |
| val_loss = criterion(val_outputs, y_test_device) |
| val_predicted = (val_outputs > 0.5).float() |
| val_correct = (val_predicted == y_test_device).sum().item() |
| val_acc = val_correct / y_test_device.size(0) |
|
|
| val_losses.append(val_loss.item()) |
| val_accs.append(val_acc) |
| scheduler.step(val_loss) |
| print(f"Epoch {epoch+1}/{epochs} - " |
| f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, " |
| f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}") |
| history = { |
| 'loss': train_losses, |
| 'accuracy': train_accs, |
| 'val_loss': val_losses, |
| 'val_accuracy': val_accs |
| } |
| return model, history |
|
|
| def plot_accuracy_loss(history): |
| plt.figure(figsize=(12, 5)) |
| plt.subplot(1, 2, 1) |
| plt.plot(history['accuracy'], label='Train Acc') |
| plt.plot(history['val_accuracy'], label='Val Acc') |
| plt.title("Model Accuracy") |
| plt.xlabel("Epoch") |
| plt.ylabel("Accuracy") |
| plt.legend() |
| plt.subplot(1, 2, 2) |
| plt.plot(history['loss'], label='Train Loss') |
| plt.plot(history['val_loss'], label='Val Loss') |
| plt.title("Model Loss") |
| plt.xlabel("Epoch") |
| plt.ylabel("Loss") |
| plt.legend() |
| plt.tight_layout() |
| plt.show() |
|
|
| def save_trained_model(model, filename="VOIP_Classifier.pth"): |
| torch.save(model.state_dict(), filename) |
| print(f"Model saved as {filename}") |
|
|
| def display_top_features(): |
| top_features = get_top_features() |
| print("Selected Features from Top 5 by Importance (if present):") |
| for i, feat in enumerate(top_features, 1): |
| print(f"{i}. {feat}") |
| return top_features |
|
|
| def save_history_as_json(history, filename="training_history.json"): |
| with open(filename, 'w') as f: |
| json.dump(history, f) |
| print(f"Training history saved to {filename}") |
|
|
|
|