pierre-loic's picture
update content with the text model from Thomas repository https://huggingface.co/spaces/tombou/frugal-ai-challenge
42b7ac6
import json
import random
from abc import ABC, abstractmethod
from datetime import datetime
from pathlib import Path
import joblib
import numpy as np
import tensorflow as tf
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from transformers import AutoTokenizer, DataCollatorWithPadding, create_optimizer, TFAutoModelForSequenceClassification, \
KerasMetricCallback
import evaluate
from tasks.data.data_loaders import TextDataLoader
class PredictionModel(ABC):
def __init__(self, data_loader: TextDataLoader = TextDataLoader()):
self.description = ""
self.model = None
@abstractmethod
def predict(self, quote: str) -> int:
"""
Predict the label for a given quote.
Parameters:
-----------
quote: str
The quote to classify.
Returns:
--------
int
The predicted label (0-7).
"""
pass
@abstractmethod
def train(self, dataset) -> None:
"""
Train the model on a given dataset.
Parameters:
-----------
dataset:
The dataset to train on.
Returns:
--------
None
"""
pass
@abstractmethod
def save_to_directory(self, directory: Path) -> None:
pass
def save(self) -> None:
save_directory = Path(__file__).parent / "pretrained_models"
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
sanitized_description = (((self.description.
replace(" ", "_")).
replace("(", "")).
replace(")", ""))
save_filename = f"{timestamp}_{sanitized_description}"
self.save_to_directory(save_directory / save_filename)
class BaselineModel(PredictionModel):
def __init__(self, data_loader: TextDataLoader = TextDataLoader()):
super().__init__()
self.description = "Random Baseline (with Strategy Pattern, from another module)"
def predict(self, quote: str) -> int:
return random.randint(0, 7)
def train(self, dataset):
pass
def save_to_directory(self, directory: Path) -> None:
pass
class DistilBERTModel(PredictionModel):
def __init__(self,
data_loader: TextDataLoader = TextDataLoader(),
batch_size: int = 4,
num_epochs: int = 5,
initial_learning_rate: float = 2e-5,
start_model_name: str = "distilbert-base-uncased"):
super().__init__()
self.start_model_name = start_model_name
self.description = f"DistilBERT Model (fined-tuned from {self.start_model_name})"
self.label_to_id_mapping = data_loader.get_label_to_id_mapping()
self.id_to_label_mapping = data_loader.get_id_to_label_mapping()
# tokenizer
self.tokenizer = AutoTokenizer.from_pretrained(self.start_model_name)
# data collator with dynamic padding
self.data_collator = DataCollatorWithPadding(tokenizer=self.tokenizer, return_tensors="tf")
# load accuracy metric
self.accuracy = evaluate.load("accuracy")
# training parameters
self.batch_size = batch_size
self.num_epochs = num_epochs
self.initial_learning_rate = initial_learning_rate
def predict(self, quote: str) -> int:
if self.model is None:
raise ValueError("Model has not been trained yet. Please train the model before making predictions.")
inputs = self.tokenizer(quote, return_tensors="tf", truncation=True, max_length=128)
outputs = self.model(**inputs)
logits = outputs.logits
probabilities = tf.nn.softmax(logits)
predicted_label = self.model.config.id2label[tf.argmax(probabilities, axis=1).numpy()[0]]
return self.label_to_id_mapping[predicted_label]
def train(self, dataset):
# Pre-process data
tokenized_data = self.pre_process_data(dataset)
# Training setup
batch_size = self.batch_size
num_epochs = self.num_epochs
batches_per_epoch = len(tokenized_data["train"]) // batch_size
total_train_steps = int(batches_per_epoch * num_epochs)
# Learning rate scheduler
initial_learning_rate = self.initial_learning_rate
lr_schedule = tf.keras.optimizers.schedules.PolynomialDecay(
initial_learning_rate=initial_learning_rate,
decay_steps=total_train_steps,
end_learning_rate=0.0,
power=1.0
)
# Optimizer with learning rate scheduler
optimizer, schedule = create_optimizer(init_lr=initial_learning_rate, num_warmup_steps=0,
num_train_steps=total_train_steps)
# Load model
self.model = TFAutoModelForSequenceClassification.from_pretrained(
self.start_model_name,
num_labels=8,
id2label=self.id_to_label_mapping,
label2id=self.label_to_id_mapping
)
# Convert datasets to tf.data.Dataset format
tf_train_set = self.model.prepare_tf_dataset(
tokenized_data["train"],
shuffle=True,
batch_size=batch_size,
collate_fn=self.data_collator,
)
tf_validation_set = self.model.prepare_tf_dataset(
tokenized_data["test"],
shuffle=False,
batch_size=batch_size,
collate_fn=self.data_collator,
)
# Compile model
self.model.compile(optimizer=optimizer)
# Keras metric callback
metric_callback = KerasMetricCallback(metric_fn=self.compute_metrics, eval_dataset=tf_validation_set)
# Train model
self.model.fit(tf_train_set, validation_data=tf_validation_set, epochs=num_epochs, callbacks=[metric_callback])
def pre_process_data(self, dataset):
return ((dataset.
train_test_split(test_size=0.2, seed=42).
remove_columns([col for col in dataset.column_names if col not in ["quote", "label"]])).
map(self.tokenize))
def tokenize(self, example):
return self.tokenizer(example["quote"], truncation=True, max_length=128)
def compute_metrics(self, eval_pred):
predictions, labels = eval_pred
predictions = np.argmax(predictions, axis=1)
return self.accuracy.compute(predictions=predictions, references=labels)
def save_to_directory(self, directory: Path) -> None:
self.model.save_pretrained(str(directory))
class TextEmbedder(ABC):
@abstractmethod
def encode(self, text: list[str]) -> np.ndarray[float]:
"""
Encode a list of text inputs into a numpy array.
Parameters:
-----------
text: list[str]
The text inputs to encode.
Returns:
--------
np.ndarray
The encoded text inputs.
"""
pass
def fit(self, param):
pass
@abstractmethod
def save_to_directory(self, directory: Path) -> None:
pass
class TfIdfEmbedder(TextEmbedder):
"""
A simple TF-IDF text embedder.
TF-IDF stands for Term Frequency-Inverse Document Frequency.
It can be defined as the calculation of how relevant a word
in a series or corpus is to a text. The meaning increases
proportionally to the number of times in the text a word
appears but is compensated by the word frequency in the corpus
(data-set).
Source: https://www.geeksforgeeks.org/understanding-tf-idf-term-frequency-inverse-document-frequency/
The TfidfVectorizer class from scikit-learn is used to encode
"""
def __init__(self):
self.vectorizer = TfidfVectorizer()
self._is_fitted = False # Nouveau flag
def fit(self, text: list[str]):
"""Fit the embedder to the given text."""
self.vectorizer.fit(text)
self._is_fitted = True
def encode(self, text: list[str]) -> np.ndarray[float]:
if not self._is_fitted:
raise RuntimeError("TfIdfEmbedder should be fitted before encoding text.")
return self.vectorizer.transform(text).toarray()
def save_to_directory(self, directory: Path) -> None:
directory.mkdir(parents=True, exist_ok=True)
joblib.dump(self.vectorizer, directory / "tfidf_vectorizer.joblib")
class MLModel(ABC):
@abstractmethod
def fit(self, embedded_quotes: np.ndarray[float], y: list[int]) -> None:
"""
Fit the model to the data.
Parameters:
-----------
embedded_quotes: np.ndarray
The embedded quotes, given by TextEmbedder.encode().
y: list[int]
The labels (ranging from 0 to 7).
"""
pass
@abstractmethod
def predict(self, embedded_quotes: np.ndarray[float]) -> int:
"""
Predict the labels for the given embedded quotes.
Parameters:
-----------
embedded_quotes: np.ndarray
The embedded quotes, given by TextEmbedder.encode().
Returns:
--------
int
The predicted labels (ranging from 0 to 7).
"""
pass
@abstractmethod
def save_to_directory(self, directory: Path) -> None:
pass
class MultivariateLogisticRegression(MLModel):
def __init__(self):
self.model = LogisticRegression()
def fit(self, embedded_quotes: np.ndarray[float], y: list[int]) -> None:
self.model.fit(embedded_quotes, y)
def predict(self, embedded_quotes: np.ndarray[float]) -> int:
return self.model.predict(embedded_quotes)
def save_to_directory(self, directory: Path) -> None:
directory.mkdir(parents=True, exist_ok=True)
joblib.dump(self.model, directory / "logistic_regression.joblib")
class EmbeddingMLModel(PredictionModel):
def __init__(self,
data_loader: TextDataLoader = TextDataLoader(),
embedder: TextEmbedder = TfIdfEmbedder(),
ml_model: MLModel = MultivariateLogisticRegression()):
super().__init__()
self.embedder = embedder
self.ml_model = ml_model
self.description = f"EmbeddingMLModel ({embedder.__class__.__name__} + {ml_model.__class__.__name__})"
def predict(self, quote: str) -> int:
embedded_quote = self.embedder.encode([quote])
return self.ml_model.predict(embedded_quote)
def train(self, dataset):
self.embedder.fit(dataset["quote"])
embedded_quotes = self.embedder.encode(dataset["quote"])
labels = dataset["label"]
self.ml_model.fit(embedded_quotes, labels)
def save_to_directory(self, directory: Path) -> None:
directory.mkdir(parents=True, exist_ok=True)
# save embedder and ml_model
self.embedder.save_to_directory(directory)
self.ml_model.save_to_directory(directory)
# Metadata pour le reload
metadata = {
"embedder_type": self.embedder.__class__.__name__,
"ml_model_type": self.ml_model.__class__.__name__
}
with open(directory / "metadata.json", "w") as f:
json.dump(metadata, f)
class ModelFactory:
@staticmethod
def create_model(config) -> PredictionModel:
"""
Factory method to create a model based on the model type.
Parameters:
-----------
model_type: str
The type of model to create. Options: "baseline", "distilbert"
Returns:
--------
PredictionModel
The model instance.
Raises:
-------
ValueError
If the model type is not recognized.
"""
model_type = config["model_type"]
if model_type == "baseline":
return BaselineModel()
elif model_type == "distilbert":
try:
batch_size = config["batch_size"]
num_epochs = config["num_epochs"]
initial_learning_rate = config["initial_learning_rate"]
except KeyError as e:
raise ValueError(f"Missing configuration parameter: {e}")
return DistilBERTModel(batch_size=batch_size,
num_epochs=num_epochs,
initial_learning_rate=initial_learning_rate)
elif model_type == "distilbert-pretrained":
model = DistilBERTModel()
model_name = config["model_name"]
model_path = Path(__file__).parent / "pretrained_models" / model_name
if model_path.exists():
model.model = TFAutoModelForSequenceClassification.from_pretrained(model_path)
return model
else:
raise FileNotFoundError(f"Pretrained model not found at {model_path}")
elif model_type == "embeddingML":
embedding_ml_model = EmbeddingMLModel()
embedding_ml_model.train(TextDataLoader().get_train_dataset())
return embedding_ml_model
else:
raise ValueError(f"Unknown model type: {model_type}")