| | |
| |
|
| | import pandas as pd |
| | import numpy as np |
| | import torch |
| | import torch.nn as nn |
| | import torch.optim as optim |
| | from torch.utils.data import Dataset, DataLoader, TensorDataset |
| | from sklearn.model_selection import train_test_split |
| | from sklearn.preprocessing import StandardScaler, LabelEncoder |
| | from sklearn.metrics import accuracy_score, classification_report, confusion_matrix |
| | import matplotlib.pyplot as plt |
| | import seaborn as sns |
| | import warnings |
| | warnings.filterwarnings('ignore') |
| |
|
| | |
| | class TabularModel(nn.Module): |
| | def __init__(self, input_size, hidden_sizes, output_size, dropout_rate=0.2): |
| | super(TabularModel, self).__init__() |
| | |
| | layers = [] |
| | prev_size = input_size |
| | |
| | |
| | for hidden_size in hidden_sizes: |
| | layers.extend([ |
| | nn.Linear(prev_size, hidden_size), |
| | nn.BatchNorm1d(hidden_size), |
| | nn.ReLU(), |
| | nn.Dropout(dropout_rate) |
| | ]) |
| | prev_size = hidden_size |
| | |
| | |
| | layers.append(nn.Linear(prev_size, output_size)) |
| | |
| | self.model = nn.Sequential(*layers) |
| | |
| | def forward(self, x): |
| | return self.model(x) |
| |
|
| | |
| | def preprocess_data(df, target_column, test_size=0.2): |
| | """ |
| | Preprocess tabular data for neural network training |
| | """ |
| | |
| | X = df.drop(columns=[target_column]) |
| | y = df[target_column] |
| | |
| | |
| | categorical_columns = X.select_dtypes(include=['object']).columns |
| | numerical_columns = X.select_dtypes(include=['int64', 'float64']).columns |
| | |
| | |
| | label_encoders = {} |
| | for col in categorical_columns: |
| | le = LabelEncoder() |
| | X[col] = le.fit_transform(X[col].astype(str)) |
| | label_encoders[col] = le |
| | |
| | |
| | scaler = StandardScaler() |
| | X[numerical_columns] = scaler.fit_transform(X[numerical_columns]) |
| | |
| | |
| | target_encoder = None |
| | if y.dtype == 'object': |
| | target_encoder = LabelEncoder() |
| | y = target_encoder.fit_transform(y) |
| | |
| | |
| | X_train, X_test, y_train, y_test = train_test_split( |
| | X.values, y.values, test_size=test_size, random_state=42, stratify=y |
| | ) |
| | |
| | return (X_train, X_test, y_train, y_test, scaler, label_encoders, target_encoder) |
| |
|
| | |
| | def train_model(model, train_loader, val_loader, epochs=100, lr=0.001): |
| | """ |
| | Train the tabular model |
| | """ |
| | device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') |
| | model.to(device) |
| | |
| | criterion = nn.CrossEntropyLoss() |
| | optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=1e-5) |
| | scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=10) |
| | |
| | train_losses = [] |
| | val_losses = [] |
| | |
| | for epoch in range(epochs): |
| | |
| | model.train() |
| | train_loss = 0.0 |
| | for batch_idx, (data, target) in enumerate(train_loader): |
| | data, target = data.to(device), target.to(device) |
| | |
| | optimizer.zero_grad() |
| | output = model(data) |
| | loss = criterion(output, target) |
| | loss.backward() |
| | optimizer.step() |
| | |
| | train_loss += loss.item() |
| | |
| | |
| | model.eval() |
| | val_loss = 0.0 |
| | with torch.no_grad(): |
| | for data, target in val_loader: |
| | data, target = data.to(device), target.to(device) |
| | output = model(data) |
| | val_loss += criterion(output, target).item() |
| | |
| | avg_train_loss = train_loss / len(train_loader) |
| | avg_val_loss = val_loss / len(val_loader) |
| | |
| | train_losses.append(avg_train_loss) |
| | val_losses.append(avg_val_loss) |
| | |
| | scheduler.step(avg_val_loss) |
| | |
| | if (epoch + 1) % 20 == 0: |
| | print(f'Epoch [{epoch+1}/{epochs}], Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}') |
| | |
| | return train_losses, val_losses |
| |
|
| | |
| | def evaluate_model(model, test_loader, target_encoder=None): |
| | """ |
| | Evaluate the trained model |
| | """ |
| | device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') |
| | model.eval() |
| | |
| | all_predictions = [] |
| | all_targets = [] |
| | |
| | with torch.no_grad(): |
| | for data, target in test_loader: |
| | data, target = data.to(device), target.to(device) |
| | output = model(data) |
| | predictions = torch.argmax(output, dim=1) |
| | |
| | all_predictions.extend(predictions.cpu().numpy()) |
| | all_targets.extend(target.cpu().numpy()) |
| | |
| | |
| | if target_encoder: |
| | all_predictions = target_encoder.inverse_transform(all_predictions) |
| | all_targets = target_encoder.inverse_transform(all_targets) |
| | |
| | accuracy = accuracy_score(all_targets, all_predictions) |
| | report = classification_report(all_targets, all_predictions) |
| | |
| | return accuracy, report, all_predictions, all_targets |
| |
|
| | |
| | def plot_training_history(train_losses, val_losses): |
| | """ |
| | Plot training and validation losses |
| | """ |
| | plt.figure(figsize=(12, 5)) |
| | |
| | plt.subplot(1, 2, 1) |
| | plt.plot(train_losses, label='Training Loss', color='blue') |
| | plt.plot(val_losses, label='Validation Loss', color='red') |
| | plt.xlabel('Epoch') |
| | plt.ylabel('Loss') |
| | plt.title('Training and Validation Loss') |
| | plt.legend() |
| | plt.grid(True) |
| | |
| | plt.subplot(1, 2, 2) |
| | plt.plot(train_losses, label='Training Loss', color='blue') |
| | plt.plot(val_losses, label='Validation Loss', color='red') |
| | plt.xlabel('Epoch') |
| | plt.ylabel('Loss (Log Scale)') |
| | plt.title('Training and Validation Loss (Log Scale)') |
| | plt.yscale('log') |
| | plt.legend() |
| | plt.grid(True) |
| | |
| | plt.tight_layout() |
| | plt.show() |
| |
|
| | |
| | def plot_confusion_matrix(y_true, y_pred, labels=None): |
| | """ |
| | Plot confusion matrix |
| | """ |
| | cm = confusion_matrix(y_true, y_pred) |
| | plt.figure(figsize=(8, 6)) |
| | sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', |
| | xticklabels=labels, yticklabels=labels) |
| | plt.xlabel('Predicted') |
| | plt.ylabel('Actual') |
| | plt.title('Confusion Matrix') |
| | plt.show() |
| |
|
| | |
| | def save_model(model, filepath, scaler, label_encoders, target_encoder=None): |
| | """ |
| | Save the trained model and preprocessing objects |
| | """ |
| | torch.save({ |
| | 'model_state_dict': model.state_dict(), |
| | 'scaler': scaler, |
| | 'label_encoders': label_encoders, |
| | 'target_encoder': target_encoder |
| | }, filepath) |
| | print(f"Model saved to {filepath}") |
| |
|
| | |
| | def load_model(filepath, input_size, hidden_sizes, output_size, dropout_rate=0.2): |
| | """ |
| | Load the trained model and preprocessing objects |
| | """ |
| | checkpoint = torch.load(filepath) |
| | |
| | model = TabularModel(input_size, hidden_sizes, output_size, dropout_rate) |
| | model.load_state_dict(checkpoint['model_state_dict']) |
| | |
| | return model, checkpoint['scaler'], checkpoint['label_encoders'], checkpoint['target_encoder'] |
| |
|
| | |
| | def main(): |
| | |
| | |
| | |
| | df = pd.read_csv("Electric_Vehicle_Population.csv") |
| | |
| | |
| | print(f"Original dataset shape: {df.shape}") |
| | print(f"Columns: {list(df.columns)}") |
| | |
| | |
| | |
| | df = df.dropna(subset=['Make', 'Model', 'Electric Vehicle Type', 'Model Year']) |
| | |
| | |
| | |
| | df_clean = df.copy() |
| | |
| | |
| | df_clean['Model Year'] = pd.to_numeric(df_clean['Model Year'], errors='coerce') |
| | df_clean['Electric Range'] = pd.to_numeric(df_clean['Electric Range'], errors='coerce') |
| | df_clean['Base MSRP'] = pd.to_numeric(df_clean['Base MSRP'], errors='coerce') |
| | df_clean['Legislative District'] = pd.to_numeric(df_clean['Legislative District'], errors='coerce') |
| | |
| | |
| | df_clean['Electric Range'] = df_clean['Electric Range'].fillna(df_clean['Electric Range'].median()) |
| | df_clean['Base MSRP'] = df_clean['Base MSRP'].fillna(df_clean['Base MSRP'].median()) |
| | df_clean['Legislative District'] = df_clean['Legislative District'].fillna(0) |
| | |
| | |
| | df_clean['target'] = (df_clean['Electric Vehicle Type'] == 'Battery Electric Vehicle (BEV)').astype(int) |
| | |
| | |
| | feature_columns = [ |
| | 'Model Year', 'Make', 'Model', 'Electric Range', 'Base MSRP', |
| | 'Legislative District', 'County', 'State', 'Clean Alternative Fuel Vehicle (CAFV) Eligibility' |
| | ] |
| | |
| | |
| | df_final = df_clean[feature_columns + ['target']].copy() |
| | |
| | |
| | df_final.columns = [ |
| | 'model_year', 'make', 'model', 'electric_range', 'base_msrp', |
| | 'legislative_district', 'county', 'state', 'cafv_eligibility', 'target' |
| | ] |
| | |
| | |
| | |
| | top_makes = df_final['make'].value_counts().head(10).index |
| | df_final['make'] = df_final['make'].apply(lambda x: x if x in top_makes else 'OTHER') |
| | |
| | top_models = df_final['model'].value_counts().head(15).index |
| | df_final['model'] = df_final['model'].apply(lambda x: x if x in top_models else 'OTHER') |
| | |
| | top_counties = df_final['county'].value_counts().head(20).index |
| | df_final['county'] = df_final['county'].apply(lambda x: x if x in top_counties else 'OTHER') |
| | |
| | |
| | df_final = df_final.dropna() |
| | |
| | df = df_final |
| | print(f"Processed dataset shape: {df.shape}") |
| | print(f"Target distribution:") |
| | print(f"BEV (1): {(df['target'] == 1).sum()}") |
| | print(f"PHEV (0): {(df['target'] == 0).sum()}") |
| | |
| | |
| | target_column = 'target' |
| | |
| | |
| | X_train, X_test, y_train, y_test, scaler, label_encoders, target_encoder = preprocess_data( |
| | df, target_column |
| | ) |
| | |
| | |
| | X_train_tensor = torch.FloatTensor(X_train) |
| | y_train_tensor = torch.LongTensor(y_train) |
| | X_test_tensor = torch.FloatTensor(X_test) |
| | y_test_tensor = torch.LongTensor(y_test) |
| | |
| | |
| | X_train_split, X_val_split, y_train_split, y_val_split = train_test_split( |
| | X_train_tensor, y_train_tensor, test_size=0.2, random_state=42, stratify=y_train_tensor |
| | ) |
| | |
| | |
| | batch_size = 64 |
| | train_dataset = TensorDataset(X_train_split, y_train_split) |
| | val_dataset = TensorDataset(X_val_split, y_val_split) |
| | test_dataset = TensorDataset(X_test_tensor, y_test_tensor) |
| | |
| | train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) |
| | val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False) |
| | test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False) |
| | |
| | |
| | input_size = X_train.shape[1] |
| | hidden_sizes = [128, 64, 32] |
| | output_size = len(np.unique(y_train)) |
| | |
| | |
| | model = TabularModel( |
| | input_size=input_size, |
| | hidden_sizes=hidden_sizes, |
| | output_size=output_size, |
| | dropout_rate=0.3 |
| | ) |
| | |
| | print(f"\nModel architecture:") |
| | print(f"Input size: {input_size}") |
| | print(f"Hidden layers: {hidden_sizes}") |
| | print(f"Output size: {output_size}") |
| | print(f"Total parameters: {sum(p.numel() for p in model.parameters())}") |
| | |
| | |
| | print("\nStarting training...") |
| | epochs = 100 |
| | learning_rate = 0.001 |
| | |
| | train_losses, val_losses = train_model( |
| | model, train_loader, val_loader, epochs=epochs, lr=learning_rate |
| | ) |
| | |
| | |
| | plot_training_history(train_losses, val_losses) |
| | |
| | |
| | print("\nEvaluating model on test set...") |
| | accuracy, report, predictions, targets = evaluate_model(model, test_loader, target_encoder) |
| | |
| | print(f"Test Accuracy: {accuracy:.4f}") |
| | print("\nClassification Report:") |
| | print(report) |
| | |
| | |
| | labels = ['PHEV', 'BEV'] if target_encoder is None else None |
| | plot_confusion_matrix(targets, predictions, labels) |
| | |
| | |
| | model_filepath = 'ev_classifier_model.pth' |
| | save_model(model, model_filepath, scaler, label_encoders, target_encoder) |
| | |
| | print(f"\nTraining completed successfully!") |
| | print(f"Final test accuracy: {accuracy:.4f}") |
| | |
| | return model, scaler, label_encoders, target_encoder |
| |
|
| | |
| | def predict_new_data(model, new_data, scaler, label_encoders, target_encoder=None): |
| | """ |
| | Make predictions on new data |
| | """ |
| | device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') |
| | model.to(device) |
| | model.eval() |
| | |
| | |
| | new_data_processed = new_data.copy() |
| | |
| | |
| | for col, encoder in label_encoders.items(): |
| | if col in new_data_processed.columns: |
| | |
| | new_data_processed[col] = new_data_processed[col].apply( |
| | lambda x: x if x in encoder.classes_ else 'OTHER' |
| | ) |
| | new_data_processed[col] = encoder.transform(new_data_processed[col].astype(str)) |
| | |
| | |
| | numerical_columns = new_data_processed.select_dtypes(include=['int64', 'float64']).columns |
| | new_data_processed[numerical_columns] = scaler.transform(new_data_processed[numerical_columns]) |
| | |
| | |
| | X_new = torch.FloatTensor(new_data_processed.values) |
| | X_new = X_new.to(device) |
| | |
| | |
| | with torch.no_grad(): |
| | outputs = model(X_new) |
| | probabilities = torch.softmax(outputs, dim=1) |
| | predictions = torch.argmax(outputs, dim=1) |
| | |
| | |
| | if target_encoder: |
| | predictions = target_encoder.inverse_transform(predictions.cpu().numpy()) |
| | else: |
| | predictions = predictions.cpu().numpy() |
| | |
| | return predictions, probabilities.cpu().numpy() |
| |
|
| | if __name__ == "__main__": |
| | |
| | model, scaler, label_encoders, target_encoder = main() |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |