|
|
""" |
|
|
Unified HuggingFace-compatible quality classifier model. |
|
|
|
|
|
Merges mmBERT encoder with trained MLP classifier head into a single |
|
|
PreTrainedModel that can be saved/loaded using standard HuggingFace methods |
|
|
and used with vLLM for efficient inference. |
|
|
|
|
|
Example: |
|
|
# Merge trained classifier into unified model |
|
|
from src.hq.merged_model import merge_and_save |
|
|
merge_and_save( |
|
|
base_model_name="jhu-clsp/mmBERT-small", |
|
|
classifier_weights_path="./output/models/ara_Arab.pt", |
|
|
output_dir="./release/arabic-quality-classifier" |
|
|
) |
|
|
|
|
|
# Load and use |
|
|
model = QualityClassifierModel.from_pretrained("./release/arabic-quality-classifier") |
|
|
tokenizer = AutoTokenizer.from_pretrained("./release/arabic-quality-classifier") |
|
|
""" |
|
|
import os |
|
|
from pathlib import Path |
|
|
from typing import Optional, Union |
|
|
|
|
|
import torch |
|
|
import torch.nn as nn |
|
|
from transformers import AutoModel, AutoTokenizer, PreTrainedModel, PretrainedConfig |
|
|
from transformers.modeling_outputs import SequenceClassifierOutput |
|
|
|
|
|
from .config import EMBEDDING_CONFIG, TRAINING_CONFIG |
|
|
|
|
|
|
|
|
class QualityClassifierConfig(PretrainedConfig): |
|
|
"""Configuration for the unified quality classifier model.""" |
|
|
|
|
|
model_type = "quality_classifier" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
base_model_name: str = None, |
|
|
hidden_dim: int = None, |
|
|
dropout: float = None, |
|
|
num_labels: int = 1, |
|
|
**kwargs |
|
|
): |
|
|
""" |
|
|
Initialize configuration. |
|
|
|
|
|
Args: |
|
|
base_model_name: HuggingFace model ID for the encoder |
|
|
hidden_dim: Hidden dimension of the MLP classifier |
|
|
dropout: Dropout probability |
|
|
num_labels: Number of output labels (1 for binary) |
|
|
""" |
|
|
super().__init__(**kwargs) |
|
|
self.base_model_name = base_model_name or EMBEDDING_CONFIG["model_name"] |
|
|
self.hidden_dim = hidden_dim or TRAINING_CONFIG["hidden_dim"] |
|
|
self.dropout = dropout or TRAINING_CONFIG["dropout"] |
|
|
self.num_labels = num_labels |
|
|
|
|
|
|
|
|
class QualityClassifierModel(PreTrainedModel): |
|
|
""" |
|
|
Unified quality classifier combining mmBERT encoder with MLP head. |
|
|
|
|
|
This model can be saved and loaded using standard HuggingFace methods: |
|
|
model.save_pretrained("path/to/model") |
|
|
model = QualityClassifierModel.from_pretrained("path/to/model") |
|
|
|
|
|
It can also be used with vLLM for efficient inference since mmBERT |
|
|
is supported. |
|
|
|
|
|
Architecture: |
|
|
- Encoder: mmBERT (small or base) |
|
|
- Pooling: Mean pooling over sequence |
|
|
- Classifier: Linear(768->256) -> ReLU -> Dropout(0.2) -> Linear(256->1) -> Sigmoid |
|
|
""" |
|
|
|
|
|
config_class = QualityClassifierConfig |
|
|
|
|
|
def __init__(self, config: QualityClassifierConfig): |
|
|
""" |
|
|
Initialize the unified model. |
|
|
|
|
|
Args: |
|
|
config: QualityClassifierConfig instance |
|
|
""" |
|
|
super().__init__(config) |
|
|
|
|
|
|
|
|
self.encoder = AutoModel.from_pretrained( |
|
|
config.base_model_name, |
|
|
attn_implementation="eager", |
|
|
) |
|
|
hidden_size = self.encoder.config.hidden_size |
|
|
|
|
|
|
|
|
self.classifier = nn.Sequential( |
|
|
nn.Linear(hidden_size, config.hidden_dim), |
|
|
nn.ReLU(), |
|
|
nn.Dropout(config.dropout), |
|
|
nn.Linear(config.hidden_dim, config.num_labels), |
|
|
nn.Sigmoid() |
|
|
) |
|
|
|
|
|
self.post_init() |
|
|
|
|
|
def forward( |
|
|
self, |
|
|
input_ids: torch.Tensor, |
|
|
attention_mask: Optional[torch.Tensor] = None, |
|
|
token_type_ids: Optional[torch.Tensor] = None, |
|
|
labels: Optional[torch.Tensor] = None, |
|
|
return_dict: bool = True, |
|
|
) -> SequenceClassifierOutput: |
|
|
""" |
|
|
Forward pass with optional loss computation. |
|
|
|
|
|
Args: |
|
|
input_ids: Token IDs of shape (batch_size, seq_length) |
|
|
attention_mask: Attention mask of shape (batch_size, seq_length) |
|
|
token_type_ids: Token type IDs (unused for mmBERT) |
|
|
labels: Ground truth labels for loss computation |
|
|
return_dict: Whether to return a SequenceClassifierOutput |
|
|
|
|
|
Returns: |
|
|
SequenceClassifierOutput with loss, logits, and hidden states |
|
|
""" |
|
|
|
|
|
outputs = self.encoder( |
|
|
input_ids=input_ids, |
|
|
attention_mask=attention_mask, |
|
|
) |
|
|
|
|
|
|
|
|
token_embeddings = outputs.last_hidden_state |
|
|
if attention_mask is not None: |
|
|
mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float() |
|
|
sum_embeddings = torch.sum(token_embeddings * mask_expanded, dim=1) |
|
|
sum_mask = torch.clamp(mask_expanded.sum(dim=1), min=1e-9) |
|
|
pooled = sum_embeddings / sum_mask |
|
|
else: |
|
|
pooled = token_embeddings.mean(dim=1) |
|
|
|
|
|
|
|
|
logits = self.classifier(pooled) |
|
|
|
|
|
|
|
|
loss = None |
|
|
if labels is not None: |
|
|
loss_fn = nn.BCELoss() |
|
|
loss = loss_fn(logits.squeeze(), labels.float()) |
|
|
|
|
|
if not return_dict: |
|
|
output = (logits,) + outputs[2:] |
|
|
return ((loss,) + output) if loss is not None else output |
|
|
|
|
|
return SequenceClassifierOutput( |
|
|
loss=loss, |
|
|
logits=logits, |
|
|
hidden_states=outputs.hidden_states, |
|
|
attentions=outputs.attentions, |
|
|
) |
|
|
|
|
|
def predict( |
|
|
self, |
|
|
input_ids: torch.Tensor, |
|
|
attention_mask: torch.Tensor |
|
|
) -> torch.Tensor: |
|
|
""" |
|
|
Convenience method for inference. |
|
|
|
|
|
Args: |
|
|
input_ids: Token IDs |
|
|
attention_mask: Attention mask |
|
|
|
|
|
Returns: |
|
|
Quality scores in range [0, 1] |
|
|
""" |
|
|
self.eval() |
|
|
with torch.no_grad(): |
|
|
outputs = self.forward(input_ids=input_ids, attention_mask=attention_mask) |
|
|
return outputs.logits.squeeze() |
|
|
|
|
|
def score_texts( |
|
|
self, |
|
|
texts: list, |
|
|
tokenizer: AutoTokenizer, |
|
|
batch_size: int = 32, |
|
|
max_length: int = 512, |
|
|
device: str = None, |
|
|
) -> list: |
|
|
""" |
|
|
Score a list of texts. |
|
|
|
|
|
Args: |
|
|
texts: List of text strings to score |
|
|
tokenizer: Tokenizer for the model |
|
|
batch_size: Batch size for processing |
|
|
max_length: Maximum sequence length |
|
|
device: Device to use for inference |
|
|
|
|
|
Returns: |
|
|
List of quality scores in range [0, 1] |
|
|
""" |
|
|
device = device or ("cuda" if torch.cuda.is_available() else "cpu") |
|
|
self.to(device) |
|
|
self.eval() |
|
|
|
|
|
scores = [] |
|
|
for i in range(0, len(texts), batch_size): |
|
|
batch = texts[i:i + batch_size] |
|
|
inputs = tokenizer( |
|
|
batch, |
|
|
return_tensors="pt", |
|
|
max_length=max_length, |
|
|
truncation=True, |
|
|
padding=True, |
|
|
).to(device) |
|
|
|
|
|
with torch.no_grad(): |
|
|
outputs = self.forward(**inputs) |
|
|
batch_scores = outputs.logits.squeeze().cpu().tolist() |
|
|
|
|
|
|
|
|
if isinstance(batch_scores, float): |
|
|
batch_scores = [batch_scores] |
|
|
|
|
|
scores.extend(batch_scores) |
|
|
|
|
|
return scores |
|
|
|
|
|
|
|
|
def merge_and_save( |
|
|
base_model_name: str, |
|
|
classifier_weights_path: Union[str, Path], |
|
|
output_dir: Union[str, Path], |
|
|
hidden_dim: int = None, |
|
|
dropout: float = None, |
|
|
) -> QualityClassifierModel: |
|
|
""" |
|
|
Merge encoder and trained classifier head, then save as unified model. |
|
|
|
|
|
The resulting model can be loaded with: |
|
|
model = QualityClassifierModel.from_pretrained(output_dir) |
|
|
|
|
|
Args: |
|
|
base_model_name: HuggingFace model ID for the encoder |
|
|
classifier_weights_path: Path to trained MLP weights (.pt file) |
|
|
output_dir: Directory to save the merged model |
|
|
hidden_dim: Hidden dimension of the MLP (must match training) |
|
|
dropout: Dropout rate (must match training) |
|
|
|
|
|
Returns: |
|
|
The merged QualityClassifierModel |
|
|
""" |
|
|
hidden_dim = hidden_dim or TRAINING_CONFIG["hidden_dim"] |
|
|
dropout = dropout or TRAINING_CONFIG["dropout"] |
|
|
output_dir = Path(output_dir) |
|
|
|
|
|
print(f"Merging model...") |
|
|
print(f" Encoder: {base_model_name}") |
|
|
print(f" Classifier: {classifier_weights_path}") |
|
|
|
|
|
|
|
|
config = QualityClassifierConfig( |
|
|
base_model_name=base_model_name, |
|
|
hidden_dim=hidden_dim, |
|
|
dropout=dropout, |
|
|
num_labels=1 |
|
|
) |
|
|
|
|
|
|
|
|
model = QualityClassifierModel(config) |
|
|
|
|
|
|
|
|
checkpoint = torch.load(classifier_weights_path, map_location="cpu") |
|
|
|
|
|
|
|
|
if isinstance(checkpoint, dict) and "state_dict" in checkpoint: |
|
|
trained_weights = checkpoint["state_dict"] |
|
|
else: |
|
|
trained_weights = checkpoint |
|
|
|
|
|
|
|
|
|
|
|
stripped_weights = {} |
|
|
for key, value in trained_weights.items(): |
|
|
new_key = key.replace("classifier.", "") if key.startswith("classifier.") else key |
|
|
stripped_weights[new_key] = value |
|
|
|
|
|
model.classifier.load_state_dict(stripped_weights) |
|
|
|
|
|
|
|
|
output_dir.mkdir(parents=True, exist_ok=True) |
|
|
model.save_pretrained(output_dir) |
|
|
|
|
|
|
|
|
tokenizer = AutoTokenizer.from_pretrained(base_model_name) |
|
|
tokenizer.save_pretrained(output_dir) |
|
|
|
|
|
print(f"Model saved to {output_dir}") |
|
|
print(f"Contents: {list(output_dir.iterdir())}") |
|
|
|
|
|
return model |
|
|
|
|
|
|
|
|
def merge_all_classifiers( |
|
|
models_dir: Union[str, Path], |
|
|
output_base_dir: Union[str, Path], |
|
|
base_model_name: str = None, |
|
|
) -> dict: |
|
|
""" |
|
|
Merge all trained classifiers into unified models. |
|
|
|
|
|
Args: |
|
|
models_dir: Directory containing trained .pt files |
|
|
output_base_dir: Base directory for output models |
|
|
base_model_name: HuggingFace model ID for the encoder |
|
|
|
|
|
Returns: |
|
|
Dictionary mapping language codes to output directories |
|
|
""" |
|
|
base_model_name = base_model_name or EMBEDDING_CONFIG["model_name"] |
|
|
models_dir = Path(models_dir) |
|
|
output_base_dir = Path(output_base_dir) |
|
|
|
|
|
results = {} |
|
|
|
|
|
for pt_file in models_dir.glob("*.pt"): |
|
|
lang_code = pt_file.stem |
|
|
output_dir = output_base_dir / f"{lang_code}-quality-classifier" |
|
|
|
|
|
print(f"\n{'=' * 50}") |
|
|
print(f"Processing: {lang_code}") |
|
|
print(f"{'=' * 50}") |
|
|
|
|
|
merge_and_save( |
|
|
base_model_name=base_model_name, |
|
|
classifier_weights_path=pt_file, |
|
|
output_dir=output_dir, |
|
|
) |
|
|
|
|
|
results[lang_code] = str(output_dir) |
|
|
|
|
|
return results |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
QualityClassifierConfig.register_for_auto_class() |
|
|
QualityClassifierModel.register_for_auto_class("AutoModel") |
|
|
|