Spaces:
Sleeping
Sleeping
| import torch | |
| import torch.nn as nn | |
| from sklearn.preprocessing import LabelEncoder, StandardScaler, OneHotEncoder | |
| from sklearn.impute import SimpleImputer | |
| import pandas as pd | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.metrics import accuracy_score, classification_report | |
| from sklearn.utils.class_weight import compute_class_weight | |
| from torch.amp import autocast, GradScaler | |
| from torch.utils.data import TensorDataset, DataLoader | |
| from torch.nn.utils import clip_grad_norm_ | |
| from collections import Counter | |
| import torch | |
| import torch.nn as nn | |
| import os | |
| import torch.optim as optim | |
| # Define the ImprovedTagClassifier class for tag prediction | |
| class ImprovedTagClassifier(nn.Module): | |
| def __init__(self, input_size, output_size, dropout_rate=0.4): | |
| super(ImprovedTagClassifier, self).__init__() | |
| # First hidden layer: transforms input features to 512 dimensions | |
| self.fc1 = nn.Linear(input_size, 512) | |
| self.bn1 = nn.BatchNorm1d(512) # Normalizes the output | |
| # Second hidden layer: reduces from 512 to 256 dimensions | |
| self.fc2 = nn.Linear(512, 256) | |
| self.bn2 = nn.BatchNorm1d(256) # Normalizes again | |
| # Third hidden layer: further reduces to 128 dimensions | |
| self.fc3 = nn.Linear(256, 128) | |
| self.bn3 = nn.BatchNorm1d(128) # Another normalization | |
| # Output layer: maps 128 dimensions to the number of classes | |
| self.fc4 = nn.Linear(128, output_size) | |
| # Tools to prevent overfitting and improve learning | |
| self.dropout = nn.Dropout(dropout_rate) # Randomly drops some data | |
| self.leaky_relu = nn.LeakyReLU(0.1) # Activation function with a small slope | |
| # Skip connection: connects layer 1 directly to layer 3 | |
| self.skip1_3 = nn.Linear(512, 128) | |
| # Set up the initial weights for better training | |
| self._initialize_weights() | |
| def _initialize_weights(self): | |
| # Loop through all parts of the model | |
| for m in self.modules(): | |
| if isinstance(m, nn.Linear): | |
| # Use a special method to set weights for linear layers | |
| nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='leaky_relu') | |
| if m.bias is not None: | |
| # Set biases to zero | |
| nn.init.constant_(m.bias, 0) | |
| elif isinstance(m, nn.BatchNorm1d): | |
| # Set batch norm weights to 1 and biases to 0 | |
| nn.init.constant_(m.weight, 1) | |
| nn.init.constant_(m.bias, 0) | |
| def forward(self, x): | |
| # First block: process input through the first layer | |
| x1 = self.fc1(x) | |
| x1 = self.bn1(x1) # Normalize | |
| x1 = self.leaky_relu(x1) # Activate | |
| x1 = self.dropout(x1) # Drop some data to prevent overfitting | |
| # Second block: process through the second layer | |
| x2 = self.fc2(x1) | |
| x2 = self.bn2(x2) # Normalize | |
| x2 = self.leaky_relu(x2) # Activate | |
| x2 = self.dropout(x2) # Drop some data | |
| # Third block: process with a skip connection | |
| x3 = self.fc3(x2) | |
| skip_x1 = self.skip1_3(x1) # Skip connection from first layer | |
| x3 = x3 + skip_x1 # Add the skip connection | |
| x3 = self.bn3(x3) # Normalize | |
| x3 = self.leaky_relu(x3) # Activate | |
| x3 = self.dropout(x3) # Drop some data | |
| # Final output: get the class predictions | |
| output = self.fc4(x3) | |
| return output | |
| class FocalLoss(nn.Module): | |
| """Focal Loss for handling class imbalance""" | |
| def __init__(self, weight=None, gamma=2.0, reduction='mean'): | |
| super(FocalLoss, self).__init__() | |
| self.weight = weight # Weights for each class | |
| self.gamma = gamma # Focus on hard examples | |
| self.reduction = reduction | |
| self.ce_loss = nn.CrossEntropyLoss(weight=weight, reduction='none') | |
| def forward(self, inputs, targets): | |
| # Calculate basic cross-entropy loss | |
| ce_loss = self.ce_loss(inputs, targets) | |
| pt = torch.exp(-ce_loss) # Probability of correct class | |
| focal_loss = ((1 - pt) ** self.gamma) * ce_loss # Adjust loss | |
| # Combine losses based on reduction type | |
| if self.reduction == 'mean': | |
| return focal_loss.mean() | |
| elif self.reduction == 'sum': | |
| return focal_loss.sum() | |
| else: | |
| return focal_loss | |
| class MultiLevelTagClassifier: | |
| def __init__(self, device='cuda'): | |
| # Use GPU | |
| self.device = torch.device(device if torch.cuda.is_available() else 'cpu') | |
| self.models = {} # Store models for each parent tag | |
| self.preprocessors = {} # Store preprocessing tools | |
| self.label_encoders = {} # Store label encoders | |
| # Define tag groups | |
| self.tag_hierarchy = { | |
| 'DIV': ['DIV', 'LIST', 'CARD'], | |
| 'P': ['P', 'LI'], | |
| 'INPUT': ['INPUT', 'DROPDOWN'], | |
| 'ICON': ['ICON', 'CHECKBOX', 'RADIO'], | |
| } | |
| print(f"Using device: {self.device}") | |
| def prepare_data_for_subtask(self, df, parent_tag, subtags): | |
| # Get only the data for this parent tag’s subtags | |
| filtered_df = df[df['tag'].isin(subtags)].copy() | |
| print(f"\n=== Preparing data for {parent_tag} sub-classification ===") | |
| print(f"Subtags: {subtags}") | |
| print(f"Total samples: {len(filtered_df)}") | |
| print(f"Distribution: \n{filtered_df['tag'].value_counts()}") | |
| if len(filtered_df) == 0: | |
| print(f"No data found for {parent_tag} subtags!") | |
| return None, None, None, None, None, None | |
| y = filtered_df["tag"] # Target tags | |
| X = filtered_df.drop(columns=["tag"]) # Features | |
| # Define which columns are categories and numerical features | |
| categorical_cols = ['type', 'prev_sibling_html_tag', 'child_1_html_tag', 'child_2_html_tag', 'parent_tag_html'] | |
| continuous_cols = [col for col in X.columns if col not in categorical_cols] | |
| # Add missing columns with default values | |
| missing_cols = [col for col in categorical_cols + continuous_cols if col not in X.columns] | |
| if missing_cols: | |
| print(f"Warning: Missing columns {missing_cols} in data for {parent_tag}") | |
| for col in missing_cols: | |
| X[col] = 'unknown' if col in categorical_cols else 0 | |
| # Process categories | |
| X[categorical_cols] = X[categorical_cols].astype(str).fillna('unknown') | |
| ohe = OneHotEncoder(sparse_output=False, handle_unknown='ignore') | |
| X_cat_encoded = ohe.fit_transform(X[categorical_cols]) | |
| # Process continous features | |
| imputer = SimpleImputer(strategy='median') | |
| X_continuous_imputed = imputer.fit_transform(X[continuous_cols]) | |
| scaler = StandardScaler() | |
| X_continuous_scaled = scaler.fit_transform(X_continuous_imputed) | |
| X_processed = np.concatenate([X_cat_encoded, X_continuous_scaled], axis=1) | |
| # Encode target tags | |
| label_encoder = LabelEncoder() | |
| y_encoded = label_encoder.fit_transform(y) | |
| # Boost rare classes by copying them | |
| class_counts = Counter(y_encoded) | |
| min_samples_threshold = max(10, len(subtags) * 3) | |
| rare_classes = [cls for cls, count in class_counts.items() if count < min_samples_threshold] | |
| for cls in rare_classes: | |
| idx = np.where(y_encoded == cls)[0] | |
| original_class_name = label_encoder.inverse_transform([cls])[0] | |
| samples_needed = min_samples_threshold - len(idx) | |
| print(f"Adding {samples_needed} copies to class '{original_class_name}'") | |
| for _ in range(samples_needed): | |
| sample_idx = np.random.choice(idx) | |
| new_sample = X_processed[sample_idx].copy() | |
| continuous_start = X_cat_encoded.shape[1] | |
| noise = np.random.normal(0, 0.05, size=X_continuous_scaled.shape[1]) | |
| new_sample[continuous_start:] += noise | |
| X_processed = np.vstack([X_processed, new_sample]) | |
| y_encoded = np.append(y_encoded, cls) | |
| # Bundle up preprocessing models | |
| preprocessors = { | |
| 'ohe': ohe, | |
| 'imputer': imputer, | |
| 'scaler': scaler, | |
| 'label_encoder': label_encoder, | |
| 'categorical_cols': categorical_cols, | |
| 'continuous_cols': continuous_cols | |
| } | |
| return X_processed, y_encoded, preprocessors, categorical_cols, continuous_cols, label_encoder | |
| def train_subtask_model(self, X, y, preprocessors, parent_tag, epochs=100): | |
| # Split data into train, validation, and test sets | |
| print(f"\n=== Training {parent_tag} sub-classifier ===") | |
| X_temp, X_test, y_temp, y_test = train_test_split(X, y, test_size=0.15, random_state=42, stratify=y) | |
| X_train, X_val, y_train, y_val = train_test_split(X_temp, y_temp, test_size=0.15, random_state=42, stratify=y_temp) | |
| print(f"Training set size: {X_train.shape[0]}") | |
| print(f"Validation set size: {X_val.shape[0]}") | |
| print(f"Test set size: {X_test.shape[0]}") | |
| # Balance classes | |
| class_weights = compute_class_weight('balanced', classes=np.unique(y_train), y=y_train) | |
| # Turn data into tensors | |
| X_train_tensor = torch.tensor(X_train, dtype=torch.float32) | |
| y_train_tensor = torch.tensor(y_train, dtype=torch.long) | |
| X_val_tensor = torch.tensor(X_val, dtype=torch.float32) | |
| y_val_tensor = torch.tensor(y_val, dtype=torch.long) | |
| X_test_tensor = torch.tensor(X_test, dtype=torch.float32) | |
| y_test_tensor = torch.tensor(y_test, dtype=torch.long) | |
| class_weights_tensor = torch.tensor(class_weights, dtype=torch.float32).to(self.device) | |
| # Set up datasets and loaders | |
| train_dataset = TensorDataset(X_train_tensor, y_train_tensor) | |
| val_dataset = TensorDataset(X_val_tensor, y_val_tensor) | |
| test_dataset = TensorDataset(X_test_tensor, y_test_tensor) | |
| train_loader = DataLoader(train_dataset, batch_size=256, shuffle=True, num_workers=2) | |
| val_loader = DataLoader(val_dataset, batch_size=256, shuffle=False, num_workers=2) | |
| test_loader = DataLoader(test_dataset, batch_size=256, shuffle=False, num_workers=2) | |
| # Create and set up the model | |
| input_size = X_train.shape[1] | |
| output_size = len(np.unique(y)) | |
| model = ImprovedTagClassifier(input_size, output_size).to(self.device) | |
| criterion = FocalLoss(weight=class_weights_tensor, gamma=2.0) | |
| optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=1e-4) | |
| scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5, verbose=True) | |
| scaler = GradScaler() | |
| # Training loop | |
| best_val_loss = float('inf') | |
| patience = 15 | |
| counter = 0 | |
| train_losses = [] | |
| val_losses = [] | |
| val_accuracies = [] | |
| for epoch in range(epochs): | |
| model.train() | |
| running_loss = 0.0 | |
| for batch_X, batch_y in train_loader: | |
| batch_X, batch_y = batch_X.to(self.device), batch_y.to(self.device) | |
| optimizer.zero_grad() | |
| with autocast(device_type=self.device.type): | |
| outputs = model(batch_X) | |
| loss = criterion(outputs, batch_y) | |
| scaler.scale(loss).backward() | |
| clip_grad_norm_(model.parameters(), max_norm=1.0) | |
| scaler.step(optimizer) | |
| scaler.update() | |
| running_loss += loss.item() | |
| train_loss = running_loss / len(train_loader) | |
| model.eval() | |
| val_running_loss = 0.0 | |
| all_preds = [] | |
| all_labels = [] | |
| with torch.no_grad(): | |
| for batch_X, batch_y in val_loader: | |
| batch_X, batch_y = batch_X.to(self.device), batch_y.to(self.device) | |
| with autocast(device_type=self.device.type): | |
| outputs = model(batch_X) | |
| loss = criterion(outputs, batch_y) | |
| val_running_loss += loss.item() | |
| _, preds = torch.max(outputs, 1) | |
| all_preds.extend(preds.cpu().numpy()) | |
| all_labels.extend(batch_y.cpu().numpy()) | |
| val_loss = val_running_loss / len(val_loader) | |
| val_accuracy = accuracy_score(all_labels, all_preds) | |
| scheduler.step(val_loss) | |
| # Track progress | |
| train_losses.append(train_loss) | |
| val_losses.append(val_loss) | |
| val_accuracies.append(val_accuracy) | |
| print(f"Epoch [{epoch+1}/{epochs}] - Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}") | |
| if val_loss < best_val_loss: | |
| best_val_loss = val_loss | |
| counter = 0 | |
| best_model_state = model.state_dict().copy() | |
| else: | |
| counter += 1 | |
| if counter >= patience: | |
| print(f"Early stopping triggered after {epoch+1} epochs") | |
| break | |
| model.load_state_dict(best_model_state) | |
| model.eval() | |
| test_preds = [] | |
| test_labels = [] | |
| with torch.no_grad(): | |
| for batch_X, batch_y in test_loader: | |
| batch_X, batch_y = batch_X.to(self.device), batch_y.to(self.device) | |
| outputs = model(batch_X) | |
| _, preds = torch.max(outputs, 1) | |
| test_preds.extend(preds.cpu().numpy()) | |
| test_labels.extend(batch_y.cpu().numpy()) | |
| test_accuracy = accuracy_score(test_labels, test_preds) | |
| print(f"\n{parent_tag} Model Test Accuracy: {test_accuracy:.4f}") | |
| print(f"\n{parent_tag} Classification Report:") | |
| print(classification_report(test_labels, test_preds, target_names=preprocessors['label_encoder'].classes_, zero_division=0)) | |
| return model, (train_losses, val_losses, val_accuracies), test_accuracy | |
| def train_all_models(self, df_path, epochs=100): | |
| # Load and clean the main dataset | |
| print("Loading and cleaning data...") | |
| df = pd.read_csv(df_path) | |
| df.loc[(df["tag"] == "SPAN") & ((df["type"] == "RECTANGLE") | (df["type"] == "GROUP")), "tag"] = "DIV" | |
| children_cols = ['child_1_html_tag', 'child_2_html_tag'] | |
| for col in children_cols: | |
| df[col] = df[col].apply(lambda x: "DIV" if isinstance(x, str) and '-' in x else x) | |
| for col in ['tag', 'prev_sibling_html_tag', 'child_1_html_tag', 'child_2_html_tag']: | |
| df[col] = df[col].str.upper() | |
| # Make a folder for models | |
| os.makedirs('../models/sub_classifiers', exist_ok=True) | |
| # Train a model for each parent tag | |
| for parent_tag, subtags in self.tag_hierarchy.items(): | |
| print(f"\n{'='*60}") | |
| print(f"Training {parent_tag} sub-classifier") | |
| print(f"{'='*60}") | |
| result = self.prepare_data_for_subtask(df, parent_tag, subtags) | |
| if result[0] is None: | |
| print(f"Skipping {parent_tag} due to insufficient data") | |
| continue | |
| X, y, preprocessors, cat_cols, cont_cols, label_encoder = result | |
| model, training_history, test_accuracy = self.train_subtask_model(X, y, preprocessors, parent_tag, epochs) | |
| self.models[parent_tag] = model | |
| self.preprocessors[parent_tag] = preprocessors | |
| self.label_encoders[parent_tag] = label_encoder | |
| model_path = f'../models/sub_classifiers/{parent_tag.lower()}_classifier.pth' | |
| torch.save({ | |
| 'model_state_dict': model.state_dict(), | |
| 'input_size': X.shape[1], | |
| 'output_size': len(np.unique(y)), | |
| 'preprocessors': preprocessors, | |
| 'test_accuracy': test_accuracy | |
| }, model_path) | |
| print(f"Saved {parent_tag} model to {model_path}") | |
| self.plot_training_history(training_history, parent_tag) | |
| def plot_training_history(self, history, parent_tag): | |
| # Plot training history (good function naming no need for commenting but here we go) | |
| train_losses, val_losses, val_accuracies = history | |
| plt.figure(figsize=(12, 5)) | |
| plt.subplot(1, 2, 1) | |
| plt.plot(train_losses, label='Training Loss') | |
| plt.plot(val_losses, label='Validation Loss') | |
| plt.title(f'{parent_tag} Model: Loss over epochs') | |
| plt.xlabel('Epoch') | |
| plt.ylabel('Loss') | |
| plt.legend() | |
| plt.subplot(1, 2, 2) | |
| plt.plot(val_accuracies, label='Validation Accuracy') | |
| plt.title(f'{parent_tag} Model: Accuracy over epochs') | |
| plt.xlabel('Epoch') | |
| plt.ylabel('Accuracy') | |
| plt.legend() | |
| plt.tight_layout() | |
| plt.savefig(f'../models/sub_classifiers/{parent_tag.lower()}_training_history.png') | |
| plt.close() | |
| def load_models(self, model_dir='../models/sub_classifiers'): | |
| # Load saved models | |
| for parent_tag in self.tag_hierarchy.keys(): | |
| model_path = f'{model_dir}/{parent_tag.lower()}_classifier.pth' | |
| if os.path.exists(model_path): | |
| print(f"Loading {parent_tag} model from {model_path}") | |
| checkpoint = torch.load(model_path, map_location=self.device,weights_only=False) | |
| model = ImprovedTagClassifier(checkpoint['input_size'], checkpoint['output_size']).to(self.device) | |
| model.load_state_dict(checkpoint['model_state_dict']) | |
| model.eval() | |
| self.models[parent_tag] = model | |
| self.preprocessors[parent_tag] = checkpoint['preprocessors'] | |
| self.label_encoders[parent_tag] = checkpoint['preprocessors']['label_encoder'] | |
| print(f"Loaded {parent_tag} model (Test Accuracy: {checkpoint['test_accuracy']:.4f})") | |
| else: | |
| print(f"Model file {model_path} not found!") | |
| def predict_hierarchical(self, sample_data, base_prediction): | |
| # Predict a tag using the right sub-classifier | |
| if base_prediction not in self.tag_hierarchy: | |
| return base_prediction, 1.0 | |
| if base_prediction not in self.models: | |
| print(f"No sub-classifier found for {base_prediction}") | |
| return base_prediction, 1.0 | |
| preprocessors = self.preprocessors[base_prediction] | |
| sample_df = pd.DataFrame([sample_data]) | |
| cat_cols = preprocessors['categorical_cols'] | |
| cont_cols = preprocessors['continuous_cols'] | |
| # Add missing columns | |
| for col in cat_cols + cont_cols: | |
| if col not in sample_df.columns: | |
| sample_df[col] = 'unknown' if col in cat_cols else 0 | |
| sample_df[cat_cols] = sample_df[cat_cols].astype(str).fillna('unknown') | |
| X_cat = preprocessors['ohe'].transform(sample_df[cat_cols]) | |
| X_cont = preprocessors['imputer'].transform(sample_df[cont_cols]) | |
| X_cont = preprocessors['scaler'].transform(X_cont) | |
| X_processed = np.concatenate([X_cat, X_cont], axis=1) | |
| X_tensor = torch.tensor(X_processed, dtype=torch.float32).to(self.device) | |
| model = self.models[base_prediction] | |
| with torch.no_grad(): | |
| outputs = model(X_tensor) | |
| probabilities = torch.softmax(outputs, dim=1) | |
| _, predicted = torch.max(outputs, 1) | |
| predicted_label = preprocessors['label_encoder'].inverse_transform([predicted.cpu().numpy()[0]])[0] | |
| confidence = probabilities.max().item() | |
| return predicted_label, confidence |