Spaces:
Build error
Build error
| import pandas as pd | |
| import numpy as np | |
| import re | |
| import os | |
| import pickle | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.preprocessing import MultiLabelBinarizer | |
| from sklearn.multioutput import MultiOutputClassifier | |
| from sklearn.linear_model import LogisticRegression | |
| from sklearn.pipeline import Pipeline | |
| from sklearn.model_selection import train_test_split | |
| from transformers import pipeline | |
| import torch | |
| import logging | |
| import time | |
| from typing import List, Dict, Tuple, Union, Optional | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| # Define interest categories | |
| INTEREST_CATEGORIES = ["Music", "Food", "Sports", "Technology", "Arts", "Travel", "Education"] | |
| class InterestClassifier: | |
| """ | |
| Hybrid Interest Classification model that combines TF-IDF with BERT zero-shot classification | |
| """ | |
| def __init__(self, | |
| model_path: Optional[str] = None, | |
| alpha: float = 0.6, | |
| threshold: float = 0.5, | |
| bert_model_name: str = 'facebook/bart-large-mnli', | |
| use_gpu: bool = torch.cuda.is_available()): | |
| """ | |
| Initialize the hybrid classifier | |
| Args: | |
| model_path: Path to a saved model (if None, a new model will be created) | |
| alpha: Weight for TF-IDF model (1-alpha for BERT) | |
| threshold: Classification threshold for final predictions | |
| bert_model_name: Name of the BERT model to use | |
| use_gpu: Whether to use GPU for BERT inference | |
| """ | |
| self.alpha = alpha | |
| self.threshold = threshold | |
| self.bert_model_name = bert_model_name | |
| self.use_gpu = use_gpu | |
| # Initialize models as None | |
| self.tfidf_pipeline = None | |
| self.mlb = None | |
| self.bert_classifier = None | |
| # Load the model if path is provided | |
| if model_path and os.path.exists(model_path): | |
| self.load_model(model_path) | |
| # Initialize BERT model | |
| self._init_bert_classifier() | |
| def _improved_preprocess_text(self, text: str) -> str: | |
| """ | |
| Enhanced text preprocessing that better preserves domain-specific indicators | |
| Args: | |
| text: Input text to preprocess | |
| Returns: | |
| Preprocessed text | |
| """ | |
| # Handle potential NaN values | |
| if pd.isna(text): | |
| return "" | |
| # Convert to lowercase | |
| text = text.lower() | |
| # Remove special characters while preserving important separators | |
| text = re.sub(r'[^\w\s|-]', ' ', text) | |
| # Replace multiple spaces with a single space | |
| text = re.sub(r'\s+', ' ', text) | |
| # Define domain terms dictionary | |
| domain_terms = { | |
| 'music': ['music', 'guitar', 'band', 'concert', 'gig', 'sing', 'song', 'play music', 'musician'], | |
| 'food': ['food', 'cook', 'cuisine', 'recipe', 'restaurant', 'eat', 'culinary', 'bake', 'chef'], | |
| 'sports': ['sport', 'run', 'gym', 'fitness', 'workout', 'exercise', 'athletic', 'training'], | |
| 'arts': ['art', 'paint', 'draw', 'museum', 'gallery', 'exhibit', 'creative', 'design'], | |
| 'technology': ['tech', 'code', 'program', 'software', 'developer', 'computer', 'app', 'digital'], | |
| 'education': ['education', 'learn', 'course', 'class', 'study', 'book', 'read', 'academic'], | |
| 'travel': ['travel', 'trip', 'hike', 'explore', 'tour', 'visit', 'journey', 'destination'] | |
| } | |
| # Check for domain terms and emphasize them | |
| modified_text = text | |
| for category, terms in domain_terms.items(): | |
| for term in terms: | |
| if term in text: | |
| # Add the category name explicitly if a related term is found | |
| modified_text += f" {category} {category} {term} {term}" | |
| # Split on common separators but preserve the important phrases | |
| parts = [] | |
| for part in re.split(r'\s*\|\s*', modified_text): | |
| # Remove numbers (but keep words with numbers like "web3") | |
| part = re.sub(r'\b\d+\b', '', part) | |
| parts.append(part) | |
| # Define a more focused stopwords list | |
| core_stopwords = {'i', 'me', 'my', 'myself', 'we', 'our', 'ours', 'the', 'a', 'an', 'and', 'but', | |
| 'if', 'or', 'because', 'as', 'until', 'while', 'of', 'at', 'by', 'for', 'with', | |
| 'about', 'against', 'between', 'into', 'through', 'during', 'before', 'after', | |
| 'above', 'below', 'to', 'from', 'up', 'down', 'in', 'out', 'on', 'off', 'over', | |
| 'under', 'this', 'that', 'these', 'those', 'am', 'is', 'are', 'was', 'were'} | |
| # Process each part and filter stopwords | |
| processed_parts = [] | |
| for part in parts: | |
| words = part.split() | |
| filtered_words = [word for word in words if word not in core_stopwords] | |
| if filtered_words: | |
| processed_parts.append(' '.join(filtered_words)) | |
| # Join the processed parts back | |
| processed_text = ' '.join(processed_parts) | |
| return processed_text.strip() | |
| def _init_bert_classifier(self): | |
| """Initialize the BERT zero-shot classifier""" | |
| try: | |
| logger.info(f"Initializing BERT zero-shot classifier with model: {self.bert_model_name}") | |
| device = 0 if self.use_gpu and torch.cuda.is_available() else -1 | |
| self.bert_classifier = pipeline('zero-shot-classification', | |
| model=self.bert_model_name, | |
| device=device) | |
| logger.info("BERT classifier successfully initialized") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize BERT classifier: {e}") | |
| logger.warning("Proceeding without BERT - will use TF-IDF only") | |
| self.bert_classifier = None | |
| def train(self, | |
| df: pd.DataFrame, | |
| text_column: str = 'survey_answer', | |
| labels_column: str = 'labels_list', | |
| test_size: float = 0.2): | |
| """ | |
| Train the TF-IDF + Logistic Regression model | |
| Args: | |
| df: DataFrame containing survey responses and labels | |
| text_column: Column name containing the survey responses | |
| labels_column: Column name containing the labels | |
| test_size: Proportion of data to use for testing | |
| Returns: | |
| Evaluation metrics on test set | |
| """ | |
| logger.info("Starting model training...") | |
| # Prepare labels | |
| if isinstance(df[labels_column].iloc[0], str): | |
| logger.info("Converting labels from string to list...") | |
| # Convert string representation of lists to actual lists | |
| df[labels_column] = df[labels_column].str.strip('[]').str.split(',') | |
| # Clean up any extra quotes or spaces | |
| df[labels_column] = df[labels_column].apply(lambda x: [item.strip().strip("'\"") for item in x]) | |
| # Preprocess text | |
| logger.info("Preprocessing text data...") | |
| df['processed_text'] = df[text_column].apply(self._improved_preprocess_text) | |
| # Initialize MultiLabelBinarizer | |
| self.mlb = MultiLabelBinarizer(classes=INTEREST_CATEGORIES) | |
| y = self.mlb.fit_transform(df[labels_column]) | |
| logger.info(f"Target shape: {y.shape}") | |
| # Split data | |
| X_train, X_test, y_train, y_test = train_test_split( | |
| df['processed_text'], y, test_size=test_size, random_state=42, shuffle=True | |
| ) | |
| logger.info(f"Training set: {X_train.shape[0]} samples, Test set: {X_test.shape[0]} samples") | |
| # Create TF-IDF pipeline | |
| logger.info("Creating and training TF-IDF pipeline...") | |
| tfidf_vectorizer = TfidfVectorizer( | |
| max_features=3000, | |
| min_df=2, | |
| max_df=0.9, | |
| ngram_range=(1, 3), | |
| sublinear_tf=True | |
| ) | |
| lr_clf = LogisticRegression( | |
| C=0.5, | |
| max_iter=1000, | |
| class_weight='balanced', | |
| solver='liblinear', | |
| penalty='l2' | |
| ) | |
| multi_lr = MultiOutputClassifier(lr_clf) | |
| self.tfidf_pipeline = Pipeline([ | |
| ('tfidf', tfidf_vectorizer), | |
| ('classifier', multi_lr) | |
| ]) | |
| # Train the pipeline | |
| self.tfidf_pipeline.fit(X_train, y_train) | |
| logger.info("TF-IDF pipeline trained successfully") | |
| # Evaluate on test set | |
| logger.info("Evaluating model on test set...") | |
| y_pred = self.tfidf_pipeline.predict(X_test) | |
| # Calculate metrics | |
| from sklearn.metrics import hamming_loss, f1_score, precision_score, recall_score | |
| h_loss = hamming_loss(y_test, y_pred) | |
| micro_f1 = f1_score(y_test, y_pred, average='micro') | |
| macro_f1 = f1_score(y_test, y_pred, average='macro') | |
| logger.info(f"Hamming Loss: {h_loss:.4f}") | |
| logger.info(f"Micro F1 Score: {micro_f1:.4f}") | |
| logger.info(f"Macro F1 Score: {macro_f1:.4f}") | |
| return { | |
| 'hamming_loss': h_loss, | |
| 'micro_f1': micro_f1, | |
| 'macro_f1': macro_f1 | |
| } | |
| def get_tfidf_predictions(self, text: str) -> Dict[str, float]: | |
| """ | |
| Get predictions from TF-IDF model with confidence scores | |
| Args: | |
| text: The input text to classify | |
| Returns: | |
| Dictionary of label -> score | |
| """ | |
| if self.tfidf_pipeline is None: | |
| raise ValueError("TF-IDF model is not trained yet. Call train() first.") | |
| # Preprocess text | |
| processed_text = self._improved_preprocess_text(text) | |
| # Get raw prediction probabilities | |
| y_proba = self.tfidf_pipeline.predict_proba([processed_text]) | |
| # Convert to dictionary of label -> score | |
| scores = {} | |
| for i, label in enumerate(self.mlb.classes_): | |
| # For MultiOutputClassifier, each element of y_proba is a list of arrays | |
| # Each array is for one label and has 2 values: [prob_for_0, prob_for_1] | |
| scores[label] = y_proba[i][0][1] # Get probability of positive class | |
| return scores | |
| def get_bert_predictions(self, text: str) -> Dict[str, float]: | |
| """ | |
| Get predictions from BERT model | |
| Args: | |
| text: The input text to classify | |
| Returns: | |
| Dictionary of label -> score | |
| """ | |
| if self.bert_classifier is None: | |
| logger.warning("BERT classifier is not available, returning empty scores") | |
| return {label: 0.0 for label in INTEREST_CATEGORIES} | |
| try: | |
| # Use the BERT zero-shot classifier | |
| result = self.bert_classifier(text, INTEREST_CATEGORIES, multi_label=True) | |
| # Convert to dictionary of label -> score | |
| scores = dict(zip(result['labels'], result['scores'])) | |
| # Ensure all categories are present (BERT may return in different order) | |
| for category in INTEREST_CATEGORIES: | |
| if category not in scores: | |
| scores[category] = 0.0 | |
| return scores | |
| except Exception as e: | |
| logger.error(f"Error in BERT prediction: {e}") | |
| return {label: 0.0 for label in INTEREST_CATEGORIES} | |
| def predict(self, | |
| text: str, | |
| alpha: Optional[float] = None, | |
| threshold: Optional[float] = None, | |
| return_scores: bool = False) -> Union[List[str], Dict]: | |
| """ | |
| Combine TF-IDF and BERT predictions using weighted average | |
| Args: | |
| text: The input text to classify | |
| alpha: Weight for TF-IDF predictions (1-alpha for BERT), uses self.alpha if None | |
| threshold: Threshold for classification, uses self.threshold if None | |
| return_scores: Whether to return scores along with labels | |
| Returns: | |
| Either a list of predicted labels or a dictionary with labels and scores | |
| """ | |
| if self.tfidf_pipeline is None: | |
| raise ValueError("Model is not trained yet. Call train() first.") | |
| # Use instance values if not provided | |
| alpha = alpha if alpha is not None else self.alpha | |
| threshold = threshold if threshold is not None else self.threshold | |
| # Time the predictions | |
| start_time = time.time() | |
| # Get TF-IDF predictions | |
| tfidf_scores = self.get_tfidf_predictions(text) | |
| tfidf_time = time.time() - start_time | |
| # Get BERT predictions if available | |
| bert_time_start = time.time() | |
| if self.bert_classifier is not None: | |
| bert_scores = self.get_bert_predictions(text) | |
| use_bert = True | |
| else: | |
| bert_scores = {category: 0.0 for category in INTEREST_CATEGORIES} | |
| use_bert = False | |
| logger.warning("BERT classifier not available, using TF-IDF only") | |
| bert_time = time.time() - bert_time_start | |
| # Combine predictions | |
| combined_scores = {} | |
| final_labels = [] | |
| for category in INTEREST_CATEGORIES: | |
| # Get scores from both models | |
| tfidf_score = tfidf_scores.get(category, 0.0) | |
| bert_score = bert_scores.get(category, 0.0) | |
| # Weighted average (if using BERT) | |
| if use_bert: | |
| final_score = (alpha * tfidf_score) + ((1 - alpha) * bert_score) | |
| else: | |
| final_score = tfidf_score | |
| combined_scores[category] = final_score | |
| # Apply threshold | |
| if final_score >= threshold: | |
| final_labels.append(category) | |
| total_time = time.time() - start_time | |
| if return_scores: | |
| # Sort scores for easier interpretation | |
| sorted_scores = sorted(combined_scores.items(), key=lambda x: x[1], reverse=True) | |
| return { | |
| 'labels': final_labels, | |
| 'scores': combined_scores, | |
| 'sorted_scores': sorted_scores, | |
| 'tfidf_scores': tfidf_scores, | |
| 'bert_scores': bert_scores, | |
| 'timing': { | |
| 'tfidf': tfidf_time, | |
| 'bert': bert_time, | |
| 'total': total_time | |
| }, | |
| 'alpha': alpha, | |
| 'threshold': threshold, | |
| 'using_bert': use_bert | |
| } | |
| return final_labels | |
| def save_model(self, path: str = "hybrid_interest_classifier.pkl"): | |
| """ | |
| Save the model to disk | |
| Args: | |
| path: Path to save the model | |
| """ | |
| if self.tfidf_pipeline is None: | |
| raise ValueError("Model is not trained yet. Call train() first.") | |
| # Note: We only save the TF-IDF pipeline and MLBinarizer | |
| # BERT will be re-initialized on load | |
| components = { | |
| 'tfidf_pipeline': self.tfidf_pipeline, | |
| 'mlb': self.mlb, | |
| 'alpha': self.alpha, | |
| 'threshold': self.threshold, | |
| 'bert_model_name': self.bert_model_name, | |
| 'interest_categories': INTEREST_CATEGORIES, | |
| 'version': '1.0' | |
| } | |
| with open(path, 'wb') as f: | |
| pickle.dump(components, f) | |
| logger.info(f"Model saved to {path}") | |
| def load_model(self, path: str): | |
| """ | |
| Load a saved model from disk | |
| Args: | |
| path: Path to the saved model | |
| """ | |
| try: | |
| with open(path, 'rb') as f: | |
| components = pickle.load(f) | |
| self.tfidf_pipeline = components['tfidf_pipeline'] | |
| self.mlb = components['mlb'] | |
| self.alpha = components.get('alpha', 0.6) | |
| self.threshold = components.get('threshold', 0.5) | |
| self.bert_model_name = components.get('bert_model_name', 'facebook/bart-large-mnli') | |
| logger.info(f"Model loaded from {path}") | |
| # Re-initialize BERT classifier | |
| self._init_bert_classifier() | |
| except Exception as e: | |
| logger.error(f"Failed to load model: {e}") | |
| raise | |
| # Example usage | |
| def main(): | |
| try: | |
| # Load dataset | |
| logger.info("Loading dataset: survey_interest_dataset_enhanced.csv") | |
| df = pd.read_csv('survey_interest_dataset_enhanced.csv') | |
| # Convert labels_list if it's a string representation | |
| if 'labels_list' in df.columns and isinstance(df['labels_list'].iloc[0], str): | |
| logger.info("Converting labels_list from string to list...") | |
| df['labels_list'] = df['labels_list'].str.strip('[]').str.split(',') | |
| df['labels_list'] = df['labels_list'].apply(lambda x: [item.strip().strip("'\"") for item in x]) | |
| # Initialize classifier | |
| logger.info("Initializing classifier with alpha=0.6, threshold=0.5") | |
| classifier = InterestClassifier(alpha=0.6, threshold=0.5) | |
| # Train the model | |
| logger.info("Training the model...") | |
| metrics = classifier.train(df) | |
| logger.info(f"Training metrics: {metrics}") | |
| # Save the model | |
| model_path = "hybrid_interest_classifier.pkl" | |
| logger.info(f"Saving model to {model_path}") | |
| classifier.save_model(model_path) | |
| # Test on some examples | |
| test_examples = [ | |
| "I love hiking in the mountains and trying local foods wherever I travel.", | |
| "I'm a software developer who plays guitar in a band on weekends.", | |
| "I spend most of my time reading books and attending online courses.", | |
| "I enjoy painting landscapes and visiting art museums when I travel." | |
| ] | |
| logger.info("Testing model on example inputs...") | |
| for example in test_examples: | |
| result = classifier.predict(example, return_scores=True) | |
| logger.info(f"\nExample: '{example}'") | |
| logger.info(f"Predicted interests: {result['labels']}") | |
| logger.info("Top interests by score:") | |
| for category, score in result['sorted_scores'][:3]: | |
| logger.info(f" {category}: {score:.4f}") | |
| # Fine-tuning alpha parameter demo | |
| logger.info("\nFine-tuning alpha parameter:") | |
| example = "I work as a software developer and enjoy hiking on weekends" | |
| for alpha in [0.3, 0.5, 0.7, 0.9]: | |
| result = classifier.predict(example, alpha=alpha, return_scores=True) | |
| logger.info(f"\nAlpha = {alpha} (TF-IDF weight: {alpha}, BERT weight: {1-alpha})") | |
| logger.info(f"Predicted interests: {result['labels']}") | |
| logger.info("Top 3 scores:") | |
| for category, score in result['sorted_scores'][:3]: | |
| logger.info(f" {category}: {score:.4f}") | |
| logger.info("Model training and evaluation completed successfully") | |
| except Exception as e: | |
| logger.error(f"Error in main function: {e}", exc_info=True) | |
| raise | |
| if __name__ == "__main__": | |
| main() |