New_Hybrid_Text_Classifier_Model / hybrid_interest_classifier.py
WinterJet2021's picture
Upload 13 files
673d9a1 verified
import pandas as pd
import numpy as np
import re
import os
import pickle
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.multioutput import MultiOutputClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split
from transformers import pipeline
import torch
import logging
import time
from typing import List, Dict, Tuple, Union, Optional
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Define interest categories
INTEREST_CATEGORIES = ["Music", "Food", "Sports", "Technology", "Arts", "Travel", "Education"]
class InterestClassifier:
"""
Hybrid Interest Classification model that combines TF-IDF with BERT zero-shot classification
"""
def __init__(self,
model_path: Optional[str] = None,
alpha: float = 0.6,
threshold: float = 0.5,
bert_model_name: str = 'facebook/bart-large-mnli',
use_gpu: bool = torch.cuda.is_available()):
"""
Initialize the hybrid classifier
Args:
model_path: Path to a saved model (if None, a new model will be created)
alpha: Weight for TF-IDF model (1-alpha for BERT)
threshold: Classification threshold for final predictions
bert_model_name: Name of the BERT model to use
use_gpu: Whether to use GPU for BERT inference
"""
self.alpha = alpha
self.threshold = threshold
self.bert_model_name = bert_model_name
self.use_gpu = use_gpu
# Initialize models as None
self.tfidf_pipeline = None
self.mlb = None
self.bert_classifier = None
# Load the model if path is provided
if model_path and os.path.exists(model_path):
self.load_model(model_path)
# Initialize BERT model
self._init_bert_classifier()
def _improved_preprocess_text(self, text: str) -> str:
"""
Enhanced text preprocessing that better preserves domain-specific indicators
Args:
text: Input text to preprocess
Returns:
Preprocessed text
"""
# Handle potential NaN values
if pd.isna(text):
return ""
# Convert to lowercase
text = text.lower()
# Remove special characters while preserving important separators
text = re.sub(r'[^\w\s|-]', ' ', text)
# Replace multiple spaces with a single space
text = re.sub(r'\s+', ' ', text)
# Define domain terms dictionary
domain_terms = {
'music': ['music', 'guitar', 'band', 'concert', 'gig', 'sing', 'song', 'play music', 'musician'],
'food': ['food', 'cook', 'cuisine', 'recipe', 'restaurant', 'eat', 'culinary', 'bake', 'chef'],
'sports': ['sport', 'run', 'gym', 'fitness', 'workout', 'exercise', 'athletic', 'training'],
'arts': ['art', 'paint', 'draw', 'museum', 'gallery', 'exhibit', 'creative', 'design'],
'technology': ['tech', 'code', 'program', 'software', 'developer', 'computer', 'app', 'digital'],
'education': ['education', 'learn', 'course', 'class', 'study', 'book', 'read', 'academic'],
'travel': ['travel', 'trip', 'hike', 'explore', 'tour', 'visit', 'journey', 'destination']
}
# Check for domain terms and emphasize them
modified_text = text
for category, terms in domain_terms.items():
for term in terms:
if term in text:
# Add the category name explicitly if a related term is found
modified_text += f" {category} {category} {term} {term}"
# Split on common separators but preserve the important phrases
parts = []
for part in re.split(r'\s*\|\s*', modified_text):
# Remove numbers (but keep words with numbers like "web3")
part = re.sub(r'\b\d+\b', '', part)
parts.append(part)
# Define a more focused stopwords list
core_stopwords = {'i', 'me', 'my', 'myself', 'we', 'our', 'ours', 'the', 'a', 'an', 'and', 'but',
'if', 'or', 'because', 'as', 'until', 'while', 'of', 'at', 'by', 'for', 'with',
'about', 'against', 'between', 'into', 'through', 'during', 'before', 'after',
'above', 'below', 'to', 'from', 'up', 'down', 'in', 'out', 'on', 'off', 'over',
'under', 'this', 'that', 'these', 'those', 'am', 'is', 'are', 'was', 'were'}
# Process each part and filter stopwords
processed_parts = []
for part in parts:
words = part.split()
filtered_words = [word for word in words if word not in core_stopwords]
if filtered_words:
processed_parts.append(' '.join(filtered_words))
# Join the processed parts back
processed_text = ' '.join(processed_parts)
return processed_text.strip()
def _init_bert_classifier(self):
"""Initialize the BERT zero-shot classifier"""
try:
logger.info(f"Initializing BERT zero-shot classifier with model: {self.bert_model_name}")
device = 0 if self.use_gpu and torch.cuda.is_available() else -1
self.bert_classifier = pipeline('zero-shot-classification',
model=self.bert_model_name,
device=device)
logger.info("BERT classifier successfully initialized")
except Exception as e:
logger.error(f"Failed to initialize BERT classifier: {e}")
logger.warning("Proceeding without BERT - will use TF-IDF only")
self.bert_classifier = None
def train(self,
df: pd.DataFrame,
text_column: str = 'survey_answer',
labels_column: str = 'labels_list',
test_size: float = 0.2):
"""
Train the TF-IDF + Logistic Regression model
Args:
df: DataFrame containing survey responses and labels
text_column: Column name containing the survey responses
labels_column: Column name containing the labels
test_size: Proportion of data to use for testing
Returns:
Evaluation metrics on test set
"""
logger.info("Starting model training...")
# Prepare labels
if isinstance(df[labels_column].iloc[0], str):
logger.info("Converting labels from string to list...")
# Convert string representation of lists to actual lists
df[labels_column] = df[labels_column].str.strip('[]').str.split(',')
# Clean up any extra quotes or spaces
df[labels_column] = df[labels_column].apply(lambda x: [item.strip().strip("'\"") for item in x])
# Preprocess text
logger.info("Preprocessing text data...")
df['processed_text'] = df[text_column].apply(self._improved_preprocess_text)
# Initialize MultiLabelBinarizer
self.mlb = MultiLabelBinarizer(classes=INTEREST_CATEGORIES)
y = self.mlb.fit_transform(df[labels_column])
logger.info(f"Target shape: {y.shape}")
# Split data
X_train, X_test, y_train, y_test = train_test_split(
df['processed_text'], y, test_size=test_size, random_state=42, shuffle=True
)
logger.info(f"Training set: {X_train.shape[0]} samples, Test set: {X_test.shape[0]} samples")
# Create TF-IDF pipeline
logger.info("Creating and training TF-IDF pipeline...")
tfidf_vectorizer = TfidfVectorizer(
max_features=3000,
min_df=2,
max_df=0.9,
ngram_range=(1, 3),
sublinear_tf=True
)
lr_clf = LogisticRegression(
C=0.5,
max_iter=1000,
class_weight='balanced',
solver='liblinear',
penalty='l2'
)
multi_lr = MultiOutputClassifier(lr_clf)
self.tfidf_pipeline = Pipeline([
('tfidf', tfidf_vectorizer),
('classifier', multi_lr)
])
# Train the pipeline
self.tfidf_pipeline.fit(X_train, y_train)
logger.info("TF-IDF pipeline trained successfully")
# Evaluate on test set
logger.info("Evaluating model on test set...")
y_pred = self.tfidf_pipeline.predict(X_test)
# Calculate metrics
from sklearn.metrics import hamming_loss, f1_score, precision_score, recall_score
h_loss = hamming_loss(y_test, y_pred)
micro_f1 = f1_score(y_test, y_pred, average='micro')
macro_f1 = f1_score(y_test, y_pred, average='macro')
logger.info(f"Hamming Loss: {h_loss:.4f}")
logger.info(f"Micro F1 Score: {micro_f1:.4f}")
logger.info(f"Macro F1 Score: {macro_f1:.4f}")
return {
'hamming_loss': h_loss,
'micro_f1': micro_f1,
'macro_f1': macro_f1
}
def get_tfidf_predictions(self, text: str) -> Dict[str, float]:
"""
Get predictions from TF-IDF model with confidence scores
Args:
text: The input text to classify
Returns:
Dictionary of label -> score
"""
if self.tfidf_pipeline is None:
raise ValueError("TF-IDF model is not trained yet. Call train() first.")
# Preprocess text
processed_text = self._improved_preprocess_text(text)
# Get raw prediction probabilities
y_proba = self.tfidf_pipeline.predict_proba([processed_text])
# Convert to dictionary of label -> score
scores = {}
for i, label in enumerate(self.mlb.classes_):
# For MultiOutputClassifier, each element of y_proba is a list of arrays
# Each array is for one label and has 2 values: [prob_for_0, prob_for_1]
scores[label] = y_proba[i][0][1] # Get probability of positive class
return scores
def get_bert_predictions(self, text: str) -> Dict[str, float]:
"""
Get predictions from BERT model
Args:
text: The input text to classify
Returns:
Dictionary of label -> score
"""
if self.bert_classifier is None:
logger.warning("BERT classifier is not available, returning empty scores")
return {label: 0.0 for label in INTEREST_CATEGORIES}
try:
# Use the BERT zero-shot classifier
result = self.bert_classifier(text, INTEREST_CATEGORIES, multi_label=True)
# Convert to dictionary of label -> score
scores = dict(zip(result['labels'], result['scores']))
# Ensure all categories are present (BERT may return in different order)
for category in INTEREST_CATEGORIES:
if category not in scores:
scores[category] = 0.0
return scores
except Exception as e:
logger.error(f"Error in BERT prediction: {e}")
return {label: 0.0 for label in INTEREST_CATEGORIES}
def predict(self,
text: str,
alpha: Optional[float] = None,
threshold: Optional[float] = None,
return_scores: bool = False) -> Union[List[str], Dict]:
"""
Combine TF-IDF and BERT predictions using weighted average
Args:
text: The input text to classify
alpha: Weight for TF-IDF predictions (1-alpha for BERT), uses self.alpha if None
threshold: Threshold for classification, uses self.threshold if None
return_scores: Whether to return scores along with labels
Returns:
Either a list of predicted labels or a dictionary with labels and scores
"""
if self.tfidf_pipeline is None:
raise ValueError("Model is not trained yet. Call train() first.")
# Use instance values if not provided
alpha = alpha if alpha is not None else self.alpha
threshold = threshold if threshold is not None else self.threshold
# Time the predictions
start_time = time.time()
# Get TF-IDF predictions
tfidf_scores = self.get_tfidf_predictions(text)
tfidf_time = time.time() - start_time
# Get BERT predictions if available
bert_time_start = time.time()
if self.bert_classifier is not None:
bert_scores = self.get_bert_predictions(text)
use_bert = True
else:
bert_scores = {category: 0.0 for category in INTEREST_CATEGORIES}
use_bert = False
logger.warning("BERT classifier not available, using TF-IDF only")
bert_time = time.time() - bert_time_start
# Combine predictions
combined_scores = {}
final_labels = []
for category in INTEREST_CATEGORIES:
# Get scores from both models
tfidf_score = tfidf_scores.get(category, 0.0)
bert_score = bert_scores.get(category, 0.0)
# Weighted average (if using BERT)
if use_bert:
final_score = (alpha * tfidf_score) + ((1 - alpha) * bert_score)
else:
final_score = tfidf_score
combined_scores[category] = final_score
# Apply threshold
if final_score >= threshold:
final_labels.append(category)
total_time = time.time() - start_time
if return_scores:
# Sort scores for easier interpretation
sorted_scores = sorted(combined_scores.items(), key=lambda x: x[1], reverse=True)
return {
'labels': final_labels,
'scores': combined_scores,
'sorted_scores': sorted_scores,
'tfidf_scores': tfidf_scores,
'bert_scores': bert_scores,
'timing': {
'tfidf': tfidf_time,
'bert': bert_time,
'total': total_time
},
'alpha': alpha,
'threshold': threshold,
'using_bert': use_bert
}
return final_labels
def save_model(self, path: str = "hybrid_interest_classifier.pkl"):
"""
Save the model to disk
Args:
path: Path to save the model
"""
if self.tfidf_pipeline is None:
raise ValueError("Model is not trained yet. Call train() first.")
# Note: We only save the TF-IDF pipeline and MLBinarizer
# BERT will be re-initialized on load
components = {
'tfidf_pipeline': self.tfidf_pipeline,
'mlb': self.mlb,
'alpha': self.alpha,
'threshold': self.threshold,
'bert_model_name': self.bert_model_name,
'interest_categories': INTEREST_CATEGORIES,
'version': '1.0'
}
with open(path, 'wb') as f:
pickle.dump(components, f)
logger.info(f"Model saved to {path}")
def load_model(self, path: str):
"""
Load a saved model from disk
Args:
path: Path to the saved model
"""
try:
with open(path, 'rb') as f:
components = pickle.load(f)
self.tfidf_pipeline = components['tfidf_pipeline']
self.mlb = components['mlb']
self.alpha = components.get('alpha', 0.6)
self.threshold = components.get('threshold', 0.5)
self.bert_model_name = components.get('bert_model_name', 'facebook/bart-large-mnli')
logger.info(f"Model loaded from {path}")
# Re-initialize BERT classifier
self._init_bert_classifier()
except Exception as e:
logger.error(f"Failed to load model: {e}")
raise
# Example usage
def main():
try:
# Load dataset
logger.info("Loading dataset: survey_interest_dataset_enhanced.csv")
df = pd.read_csv('survey_interest_dataset_enhanced.csv')
# Convert labels_list if it's a string representation
if 'labels_list' in df.columns and isinstance(df['labels_list'].iloc[0], str):
logger.info("Converting labels_list from string to list...")
df['labels_list'] = df['labels_list'].str.strip('[]').str.split(',')
df['labels_list'] = df['labels_list'].apply(lambda x: [item.strip().strip("'\"") for item in x])
# Initialize classifier
logger.info("Initializing classifier with alpha=0.6, threshold=0.5")
classifier = InterestClassifier(alpha=0.6, threshold=0.5)
# Train the model
logger.info("Training the model...")
metrics = classifier.train(df)
logger.info(f"Training metrics: {metrics}")
# Save the model
model_path = "hybrid_interest_classifier.pkl"
logger.info(f"Saving model to {model_path}")
classifier.save_model(model_path)
# Test on some examples
test_examples = [
"I love hiking in the mountains and trying local foods wherever I travel.",
"I'm a software developer who plays guitar in a band on weekends.",
"I spend most of my time reading books and attending online courses.",
"I enjoy painting landscapes and visiting art museums when I travel."
]
logger.info("Testing model on example inputs...")
for example in test_examples:
result = classifier.predict(example, return_scores=True)
logger.info(f"\nExample: '{example}'")
logger.info(f"Predicted interests: {result['labels']}")
logger.info("Top interests by score:")
for category, score in result['sorted_scores'][:3]:
logger.info(f" {category}: {score:.4f}")
# Fine-tuning alpha parameter demo
logger.info("\nFine-tuning alpha parameter:")
example = "I work as a software developer and enjoy hiking on weekends"
for alpha in [0.3, 0.5, 0.7, 0.9]:
result = classifier.predict(example, alpha=alpha, return_scores=True)
logger.info(f"\nAlpha = {alpha} (TF-IDF weight: {alpha}, BERT weight: {1-alpha})")
logger.info(f"Predicted interests: {result['labels']}")
logger.info("Top 3 scores:")
for category, score in result['sorted_scores'][:3]:
logger.info(f" {category}: {score:.4f}")
logger.info("Model training and evaluation completed successfully")
except Exception as e:
logger.error(f"Error in main function: {e}", exc_info=True)
raise
if __name__ == "__main__":
main()