VOIP / model /model.py
Shymaa2611
update api
e9c79f8
import pandas as pd
import matplotlib.pyplot as plt
import joblib
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import json
import numpy as np
import random
label_encoder = LabelEncoder()
scaler = StandardScaler()
def load_and_preprocess_data():
df = pd.read_csv("dataset/QOS VOIP.csv")
df['Status'] = label_encoder.fit_transform(df['Status'])
for col in df.select_dtypes(include='object').columns:
df[col] = LabelEncoder().fit_transform(df[col])
return df
def get_top_features():
df = load_and_preprocess_data()
X = df.drop('Status', axis=1)
y = df['Status']
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X, y)
importances = model.feature_importances_
feature_importance = sorted(zip(X.columns, importances), key=lambda x: x[1], reverse=True)
top_5 = [feat for feat, _ in feature_importance[:5]] # name
selected_features = ['Lost percentage','Max Delta (ms)', 'Max Jitter']
filtered_features = [feat for feat in selected_features if feat in top_5]
return filtered_features
def get_splits(top_features):
df = load_and_preprocess_data()
X = df[top_features]
y = df['Status']
#X_train,y_train,x_test,y_test
return train_test_split(X, y, test_size=0.2, random_state=42), top_features
def get_scaled_data(top_features):
(X_train, X_test, y_train, y_test), _ = get_splits(top_features)
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
joblib.dump(scaler, "scaler.pkl")
print("Scaler saved to scaler.pkl")
X_train_tensor = torch.FloatTensor(X_train_scaled)
X_test_tensor = torch.FloatTensor(X_test_scaled)
y_train_tensor = torch.FloatTensor(y_train.values).reshape(-1, 1)
y_test_tensor = torch.FloatTensor(y_test.values).reshape(-1, 1)
return X_train_tensor, X_test_tensor, y_train_tensor, y_test_tensor, X_train.shape[1]
class VOIPClassifier(nn.Module):
def __init__(self, input_dim):
super(VOIPClassifier, self).__init__()
self.model = nn.Sequential(
nn.Linear(input_dim, 64),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(64, 32),
nn.ReLU(),
nn.Linear(32, 1),
nn.Sigmoid()
)
def forward(self, x):
return self.model(x)
def set_seed(seed=42):
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
np.random.seed(seed)
random.seed(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
def build_and_train_nn(top_features, epochs=3, batch_size=32):
set_seed(seed=42)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
X_train, X_test, y_train, y_test, input_dim = get_scaled_data(top_features)
train_dataset = TensorDataset(X_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
model = VOIPClassifier(input_dim).to(device)
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=3, factor=0.5, verbose=True)
train_losses, train_accs, val_losses, val_accs = [], [], [], []
for epoch in range(epochs):
model.train()
running_loss, correct, total = 0.0, 0, 0
for inputs, labels in train_loader:
inputs, labels = inputs.to(device), labels.to(device)
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
running_loss += loss.item()
predicted = (outputs > 0.5).float()
total += labels.size(0)
correct += (predicted == labels).sum().item()
train_loss = running_loss / len(train_loader)
train_acc = correct / total
train_losses.append(train_loss)
train_accs.append(train_acc)
model.eval()
with torch.no_grad():
X_test_device, y_test_device = X_test.to(device), y_test.to(device)
val_outputs = model(X_test_device)
val_loss = criterion(val_outputs, y_test_device)
val_predicted = (val_outputs > 0.5).float()
val_correct = (val_predicted == y_test_device).sum().item()
val_acc = val_correct / y_test_device.size(0)
val_losses.append(val_loss.item())
val_accs.append(val_acc)
scheduler.step(val_loss)
print(f"Epoch {epoch+1}/{epochs} - "
f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, "
f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")
history = {
'loss': train_losses,
'accuracy': train_accs,
'val_loss': val_losses,
'val_accuracy': val_accs
}
return model, history
def plot_accuracy_loss(history):
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.plot(history['accuracy'], label='Train Acc')
plt.plot(history['val_accuracy'], label='Val Acc')
plt.title("Model Accuracy")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.legend()
plt.subplot(1, 2, 2)
plt.plot(history['loss'], label='Train Loss')
plt.plot(history['val_loss'], label='Val Loss')
plt.title("Model Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.legend()
plt.tight_layout()
plt.show()
def save_trained_model(model, filename="VOIP_Classifier.pth"):
torch.save(model.state_dict(), filename)
print(f"Model saved as {filename}")
def display_top_features():
top_features = get_top_features()
print("Selected Features from Top 5 by Importance (if present):")
for i, feat in enumerate(top_features, 1):
print(f"{i}. {feat}")
return top_features
def save_history_as_json(history, filename="training_history.json"):
with open(filename, 'w') as f:
json.dump(history, f)
print(f"Training history saved to {filename}")