Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| BERT Fine-tuning Script for Complaint Classification | |
| Supports training on local datasets and Hugging Face datasets and pushing to Hub | |
| """ | |
| import argparse | |
| import os | |
| import sys | |
| import json | |
| import logging | |
| from datetime import datetime | |
| from pathlib import Path | |
| import torch | |
| import numpy as np | |
| from sklearn.metrics import accuracy_score, classification_report, confusion_matrix | |
| import pandas as pd | |
| from datasets import load_dataset, Dataset, DatasetDict, load_from_disk | |
| from transformers import ( | |
| AutoTokenizer, | |
| AutoModelForSequenceClassification, | |
| TrainingArguments, | |
| Trainer, | |
| DataCollatorWithPadding, | |
| EarlyStoppingCallback | |
| ) | |
| from huggingface_hub import login | |
| # Set up logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.StreamHandler(sys.stdout) | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| def parse_args(): | |
| """Parse command line arguments""" | |
| parser = argparse.ArgumentParser(description="Fine-tune BERT for complaint classification") | |
| # Dataset arguments - now supports both local and Hub datasets | |
| parser.add_argument("--dataset_name", type=str, | |
| help="Name of the dataset on Hugging Face Hub (for Hub datasets)") | |
| parser.add_argument("--dataset_path", type=str, | |
| help="Path to local dataset directory (for local datasets)") | |
| # Required arguments | |
| parser.add_argument("--model_id", type=str, default="bert-base-uncased", | |
| help="Pre-trained model identifier") | |
| parser.add_argument("--output_dir", type=str, required=True, | |
| help="Directory to save the trained model") | |
| # Dataset configuration | |
| parser.add_argument("--feature_column", type=str, default="complaint", | |
| help="Name of the text feature column") | |
| parser.add_argument("--label_column", type=str, default="label_idx", | |
| help="Name of the label column") | |
| parser.add_argument("--num_labels", type=int, default=3, | |
| help="Number of classification labels") | |
| # Training hyperparameters | |
| parser.add_argument("--num_train_epochs", type=int, default=3, | |
| help="Number of training epochs") | |
| parser.add_argument("--batch_size", type=int, default=8, | |
| help="Training batch size") | |
| parser.add_argument("--learning_rate", type=float, default=2e-5, | |
| help="Learning rate") | |
| parser.add_argument("--max_length", type=int, default=512, | |
| help="Maximum sequence length") | |
| parser.add_argument("--warmup_steps", type=int, default=500, | |
| help="Number of warmup steps") | |
| parser.add_argument("--weight_decay", type=float, default=0.01, | |
| help="Weight decay") | |
| # Hugging Face Hub settings | |
| parser.add_argument("--push_to_hub", action="store_true", | |
| help="Push model to Hugging Face Hub after training") | |
| parser.add_argument("--hub_model_id", type=str, | |
| help="Model ID for Hugging Face Hub") | |
| parser.add_argument("--hf_token", type=str, | |
| help="Hugging Face authentication token") | |
| # Additional settings | |
| parser.add_argument("--seed", type=int, default=42, | |
| help="Random seed for reproducibility") | |
| parser.add_argument("--eval_steps", type=int, default=100, | |
| help="Evaluation steps during training") | |
| parser.add_argument("--save_steps", type=int, default=500, | |
| help="Save checkpoint every N steps") | |
| parser.add_argument("--logging_steps", type=int, default=50, | |
| help="Log training progress every N steps") | |
| parser.add_argument("--early_stopping_patience", type=int, default=3, | |
| help="Early stopping patience") | |
| return parser.parse_args() | |
| def set_seed(seed): | |
| """Set random seeds for reproducibility""" | |
| torch.manual_seed(seed) | |
| torch.cuda.manual_seed_all(seed) | |
| np.random.seed(seed) | |
| def load_and_prepare_dataset(dataset_name=None, dataset_path=None, feature_column="complaint", label_column="label_idx"): | |
| """Load and prepare the dataset from either Hub or local path""" | |
| if dataset_path: | |
| # Load from local path (created by app.py) | |
| logger.info(f"Loading dataset from local path: {dataset_path}") | |
| try: | |
| # Check if it's a saved dataset directory | |
| if os.path.exists(os.path.join(dataset_path, "dataset_info.json")): | |
| # Load from saved dataset format | |
| dataset = load_from_disk(dataset_path) | |
| logger.info("Dataset loaded from saved format") | |
| else: | |
| # Fallback: try to load CSV files from the directory | |
| logger.info("Attempting to load CSV files from directory") | |
| csv_files = [f for f in os.listdir(dataset_path) if f.endswith('.csv')] | |
| if not csv_files: | |
| raise FileNotFoundError("No CSV files found in the specified directory") | |
| # Load the first CSV file found | |
| csv_path = os.path.join(dataset_path, csv_files[0]) | |
| df = pd.read_csv(csv_path) | |
| # Create train/validation split | |
| from sklearn.model_selection import train_test_split | |
| train_df, val_df = train_test_split(df, test_size=0.2, random_state=42, stratify=df[label_column]) | |
| # Convert to Dataset format | |
| train_dataset = Dataset.from_pandas(train_df) | |
| val_dataset = Dataset.from_pandas(val_df) | |
| dataset = DatasetDict({ | |
| 'train': train_dataset, | |
| 'validation': val_dataset | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error loading local dataset: {str(e)}") | |
| sys.exit(1) | |
| elif dataset_name: | |
| # Load from Hugging Face Hub | |
| logger.info(f"Loading dataset from Hugging Face Hub: {dataset_name}") | |
| try: | |
| dataset = load_dataset(dataset_name) | |
| logger.info(f"Dataset loaded successfully from Hub") | |
| # Create validation split if it doesn't exist | |
| if 'validation' not in dataset: | |
| logger.info("No validation split found, creating one from train split") | |
| dataset = dataset['train'].train_test_split(test_size=0.2, seed=42) | |
| dataset = DatasetDict({ | |
| 'train': dataset['train'], | |
| 'validation': dataset['test'] | |
| }) | |
| except Exception as e: | |
| logger.error(f"Error loading dataset from Hub: {str(e)}") | |
| sys.exit(1) | |
| else: | |
| logger.error("Either dataset_name or dataset_path must be provided") | |
| sys.exit(1) | |
| logger.info(f"Dataset structure: {dataset}") | |
| # Check if dataset has train/test splits | |
| if 'train' not in dataset: | |
| logger.error("Dataset must have a 'train' split") | |
| sys.exit(1) | |
| # Verify required columns exist | |
| train_features = dataset['train'].features | |
| if feature_column not in train_features: | |
| logger.error(f"Feature column '{feature_column}' not found in dataset") | |
| logger.info(f"Available columns: {list(train_features.keys())}") | |
| sys.exit(1) | |
| if label_column not in train_features: | |
| logger.error(f"Label column '{label_column}' not found in dataset") | |
| logger.info(f"Available columns: {list(train_features.keys())}") | |
| sys.exit(1) | |
| # Log dataset statistics | |
| logger.info(f"Train samples: {len(dataset['train'])}") | |
| logger.info(f"Validation samples: {len(dataset['validation'])}") | |
| # Show label distribution | |
| train_labels = dataset['train'][label_column] | |
| unique_labels = set(train_labels) | |
| logger.info(f"Unique labels: {sorted(unique_labels)}") | |
| for label in sorted(unique_labels): | |
| count = train_labels.count(label) | |
| logger.info(f"Label {label}: {count} samples ({count/len(train_labels)*100:.1f}%)") | |
| return dataset | |
| def tokenize_function(examples, tokenizer, feature_column, max_length): | |
| """Tokenize the input text""" | |
| return tokenizer( | |
| examples[feature_column], | |
| truncation=True, | |
| padding=False, # Padding will be done by the data collator | |
| max_length=max_length | |
| ) | |
| def compute_metrics(eval_pred): | |
| """Compute metrics for evaluation""" | |
| predictions, labels = eval_pred | |
| predictions = np.argmax(predictions, axis=1) | |
| accuracy = accuracy_score(labels, predictions) | |
| # Detailed classification report | |
| report = classification_report(labels, predictions, output_dict=True) | |
| return { | |
| 'accuracy': accuracy, | |
| 'f1_macro': report['macro avg']['f1-score'], | |
| 'f1_weighted': report['weighted avg']['f1-score'], | |
| 'precision_macro': report['macro avg']['precision'], | |
| 'recall_macro': report['macro avg']['recall'] | |
| } | |
| def main(): | |
| args = parse_args() | |
| # Validate that either dataset_name or dataset_path is provided | |
| if not args.dataset_name and not args.dataset_path: | |
| logger.error("Either --dataset_name or --dataset_path must be provided") | |
| sys.exit(1) | |
| if args.dataset_name and args.dataset_path: | |
| logger.warning("Both dataset_name and dataset_path provided, using dataset_path (local dataset)") | |
| # Set seed for reproducibility | |
| set_seed(args.seed) | |
| logger.info("Starting BERT fine-tuning process") | |
| logger.info(f"Arguments: {vars(args)}") | |
| # Login to Hugging Face if token provided | |
| if args.hf_token: | |
| logger.info("Logging in to Hugging Face Hub") | |
| try: | |
| login(token=args.hf_token) | |
| logger.info("Successfully logged in to Hugging Face Hub") | |
| except Exception as e: | |
| logger.error(f"Failed to login to Hugging Face: {str(e)}") | |
| if args.push_to_hub: | |
| sys.exit(1) | |
| # Load dataset | |
| dataset = load_and_prepare_dataset( | |
| dataset_name=args.dataset_name, | |
| dataset_path=args.dataset_path, | |
| feature_column=args.feature_column, | |
| label_column=args.label_column | |
| ) | |
| # Load tokenizer and model | |
| logger.info(f"Loading tokenizer and model: {args.model_id}") | |
| try: | |
| tokenizer = AutoTokenizer.from_pretrained(args.model_id) | |
| model = AutoModelForSequenceClassification.from_pretrained( | |
| args.model_id, | |
| num_labels=args.num_labels | |
| ) | |
| logger.info("Model and tokenizer loaded successfully") | |
| except Exception as e: | |
| logger.error(f"Error loading model/tokenizer: {str(e)}") | |
| sys.exit(1) | |
| # Tokenize datasets | |
| logger.info("Tokenizing datasets") | |
| # Get columns to remove (keep only label column and tokenized features) | |
| columns_to_remove = [col for col in dataset['train'].column_names if col != args.label_column] | |
| try: | |
| tokenized_datasets = dataset.map( | |
| lambda examples: tokenize_function(examples, tokenizer, args.feature_column, args.max_length), | |
| batched=True, | |
| remove_columns=columns_to_remove | |
| ) | |
| logger.info("Tokenization completed successfully") | |
| except Exception as e: | |
| logger.error(f"Error during tokenization: {str(e)}") | |
| sys.exit(1) | |
| # Rename label column to 'labels' (required by Trainer) | |
| tokenized_datasets = tokenized_datasets.rename_column(args.label_column, 'labels') | |
| # Data collator | |
| data_collator = DataCollatorWithPadding(tokenizer=tokenizer) | |
| # Create output directory | |
| output_dir = Path(args.output_dir) | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| logger.info(f"Output directory created: {output_dir}") | |
| # Training arguments | |
| training_args = TrainingArguments( | |
| output_dir=str(output_dir), | |
| num_train_epochs=args.num_train_epochs, | |
| per_device_train_batch_size=args.batch_size, | |
| per_device_eval_batch_size=args.batch_size, | |
| warmup_steps=args.warmup_steps, | |
| weight_decay=args.weight_decay, | |
| learning_rate=args.learning_rate, | |
| logging_dir=str(output_dir / "logs"), | |
| logging_steps=args.logging_steps, | |
| eval_strategy="steps", | |
| eval_steps=args.eval_steps, | |
| save_steps=args.save_steps, | |
| save_total_limit=2, | |
| load_best_model_at_end=True, | |
| metric_for_best_model="eval_accuracy", | |
| greater_is_better=True, | |
| push_to_hub=args.push_to_hub, | |
| hub_model_id=args.hub_model_id if args.push_to_hub else None, | |
| report_to=None, # Disable wandb/tensorboard reporting | |
| dataloader_num_workers=2, | |
| fp16=torch.cuda.is_available(), # Use mixed precision if GPU available | |
| seed=args.seed, | |
| ) | |
| # Create trainer | |
| trainer = Trainer( | |
| model=model, | |
| args=training_args, | |
| train_dataset=tokenized_datasets['train'], | |
| eval_dataset=tokenized_datasets['validation'], | |
| tokenizer=tokenizer, | |
| data_collator=data_collator, | |
| compute_metrics=compute_metrics, | |
| callbacks=[EarlyStoppingCallback(early_stopping_patience=args.early_stopping_patience)] | |
| ) | |
| # Train the model | |
| logger.info("Starting training...") | |
| try: | |
| trainer.train() | |
| logger.info("Training completed successfully") | |
| except Exception as e: | |
| logger.error(f"Error during training: {str(e)}") | |
| sys.exit(1) | |
| # Save the model | |
| logger.info(f"Saving model to {output_dir}") | |
| try: | |
| trainer.save_model() | |
| tokenizer.save_pretrained(output_dir) | |
| logger.info("Model saved successfully") | |
| except Exception as e: | |
| logger.error(f"Error saving model: {str(e)}") | |
| sys.exit(1) | |
| # Final evaluation | |
| logger.info("Running final evaluation...") | |
| try: | |
| eval_results = trainer.evaluate() | |
| # Print evaluation results | |
| logger.info("Final Evaluation Results:") | |
| for key, value in eval_results.items(): | |
| logger.info(f" {key}: {value:.4f}") | |
| # Save evaluation results | |
| with open(output_dir / "eval_results.json", "w") as f: | |
| json.dump(eval_results, f, indent=2) | |
| except Exception as e: | |
| logger.error(f"Error during evaluation: {str(e)}") | |
| # Continue execution even if evaluation fails | |
| eval_results = {} | |
| # Generate detailed classification report on validation set | |
| logger.info("Generating detailed classification report...") | |
| try: | |
| predictions = trainer.predict(tokenized_datasets['validation']) | |
| y_pred = np.argmax(predictions.predictions, axis=1) | |
| y_true = predictions.label_ids | |
| # Classification report | |
| report = classification_report(y_true, y_pred, output_dict=True) | |
| # Save detailed report | |
| with open(output_dir / "classification_report.json", "w") as f: | |
| json.dump(report, f, indent=2) | |
| logger.info("Classification report generated successfully") | |
| except Exception as e: | |
| logger.error(f"Error generating classification report: {str(e)}") | |
| # Push to Hub if requested | |
| if args.push_to_hub and args.hub_model_id: | |
| logger.info(f"Pushing model to Hugging Face Hub: {args.hub_model_id}") | |
| try: | |
| trainer.push_to_hub() | |
| logger.info(f"Model successfully pushed to Hub: {args.hub_model_id}") | |
| except Exception as e: | |
| logger.error(f"Error pushing to Hub: {str(e)}") | |
| # Print summary | |
| logger.info("\n" + "="*50) | |
| logger.info("TRAINING COMPLETED SUCCESSFULLY!") | |
| logger.info("="*50) | |
| logger.info(f"Model saved to: {output_dir}") | |
| if eval_results: | |
| logger.info(f"Final Accuracy: {eval_results.get('eval_accuracy', 'N/A')}") | |
| logger.info(f"Final F1 (Macro): {eval_results.get('eval_f1_macro', 'N/A')}") | |
| if args.push_to_hub and args.hub_model_id: | |
| logger.info(f"Model pushed to Hub: {args.hub_model_id}") | |
| # Save training configuration | |
| config = { | |
| 'model_id': args.model_id, | |
| 'dataset_name': args.dataset_name, | |
| 'dataset_path': args.dataset_path, | |
| 'feature_column': args.feature_column, | |
| 'label_column': args.label_column, | |
| 'num_labels': args.num_labels, | |
| 'training_args': training_args.to_dict(), | |
| 'final_results': eval_results, | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| try: | |
| with open(output_dir / "training_config.json", "w") as f: | |
| json.dump(config, f, indent=2) | |
| logger.info("Training configuration saved to training_config.json") | |
| except Exception as e: | |
| logger.error(f"Error saving training configuration: {str(e)}") | |
| logger.info("Training process completed!") | |
| if __name__ == "__main__": | |
| main() |