File size: 6,293 Bytes
da8dab8 e9c79f8 da8dab8 e9c79f8 da8dab8 e9c79f8 da8dab8 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 | import pandas as pd
import matplotlib.pyplot as plt
import joblib
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import json
import numpy as np
import random
label_encoder = LabelEncoder()
scaler = StandardScaler()
def load_and_preprocess_data():
df = pd.read_csv("dataset/QOS VOIP.csv")
df['Status'] = label_encoder.fit_transform(df['Status'])
for col in df.select_dtypes(include='object').columns:
df[col] = LabelEncoder().fit_transform(df[col])
return df
def get_top_features():
df = load_and_preprocess_data()
X = df.drop('Status', axis=1)
y = df['Status']
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X, y)
importances = model.feature_importances_
feature_importance = sorted(zip(X.columns, importances), key=lambda x: x[1], reverse=True)
top_5 = [feat for feat, _ in feature_importance[:5]] # name
selected_features = ['Lost percentage','Max Delta (ms)', 'Max Jitter']
filtered_features = [feat for feat in selected_features if feat in top_5]
return filtered_features
def get_splits(top_features):
df = load_and_preprocess_data()
X = df[top_features]
y = df['Status']
#X_train,y_train,x_test,y_test
return train_test_split(X, y, test_size=0.2, random_state=42), top_features
def get_scaled_data(top_features):
(X_train, X_test, y_train, y_test), _ = get_splits(top_features)
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
joblib.dump(scaler, "scaler.pkl")
print("Scaler saved to scaler.pkl")
X_train_tensor = torch.FloatTensor(X_train_scaled)
X_test_tensor = torch.FloatTensor(X_test_scaled)
y_train_tensor = torch.FloatTensor(y_train.values).reshape(-1, 1)
y_test_tensor = torch.FloatTensor(y_test.values).reshape(-1, 1)
return X_train_tensor, X_test_tensor, y_train_tensor, y_test_tensor, X_train.shape[1]
class VOIPClassifier(nn.Module):
def __init__(self, input_dim):
super(VOIPClassifier, self).__init__()
self.model = nn.Sequential(
nn.Linear(input_dim, 64),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(64, 32),
nn.ReLU(),
nn.Linear(32, 1),
nn.Sigmoid()
)
def forward(self, x):
return self.model(x)
def set_seed(seed=42):
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
np.random.seed(seed)
random.seed(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
def build_and_train_nn(top_features, epochs=3, batch_size=32):
set_seed(seed=42)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
X_train, X_test, y_train, y_test, input_dim = get_scaled_data(top_features)
train_dataset = TensorDataset(X_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
model = VOIPClassifier(input_dim).to(device)
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=3, factor=0.5, verbose=True)
train_losses, train_accs, val_losses, val_accs = [], [], [], []
for epoch in range(epochs):
model.train()
running_loss, correct, total = 0.0, 0, 0
for inputs, labels in train_loader:
inputs, labels = inputs.to(device), labels.to(device)
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
running_loss += loss.item()
predicted = (outputs > 0.5).float()
total += labels.size(0)
correct += (predicted == labels).sum().item()
train_loss = running_loss / len(train_loader)
train_acc = correct / total
train_losses.append(train_loss)
train_accs.append(train_acc)
model.eval()
with torch.no_grad():
X_test_device, y_test_device = X_test.to(device), y_test.to(device)
val_outputs = model(X_test_device)
val_loss = criterion(val_outputs, y_test_device)
val_predicted = (val_outputs > 0.5).float()
val_correct = (val_predicted == y_test_device).sum().item()
val_acc = val_correct / y_test_device.size(0)
val_losses.append(val_loss.item())
val_accs.append(val_acc)
scheduler.step(val_loss)
print(f"Epoch {epoch+1}/{epochs} - "
f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, "
f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")
history = {
'loss': train_losses,
'accuracy': train_accs,
'val_loss': val_losses,
'val_accuracy': val_accs
}
return model, history
def plot_accuracy_loss(history):
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.plot(history['accuracy'], label='Train Acc')
plt.plot(history['val_accuracy'], label='Val Acc')
plt.title("Model Accuracy")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.legend()
plt.subplot(1, 2, 2)
plt.plot(history['loss'], label='Train Loss')
plt.plot(history['val_loss'], label='Val Loss')
plt.title("Model Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.legend()
plt.tight_layout()
plt.show()
def save_trained_model(model, filename="VOIP_Classifier.pth"):
torch.save(model.state_dict(), filename)
print(f"Model saved as {filename}")
def display_top_features():
top_features = get_top_features()
print("Selected Features from Top 5 by Importance (if present):")
for i, feat in enumerate(top_features, 1):
print(f"{i}. {feat}")
return top_features
def save_history_as_json(history, filename="training_history.json"):
with open(filename, 'w') as f:
json.dump(history, f)
print(f"Training history saved to {filename}")
|