import json import random from abc import ABC, abstractmethod from datetime import datetime from pathlib import Path import joblib import numpy as np import tensorflow as tf from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.linear_model import LogisticRegression from transformers import AutoTokenizer, DataCollatorWithPadding, create_optimizer, TFAutoModelForSequenceClassification, \ KerasMetricCallback import evaluate from tasks.data.data_loaders import TextDataLoader class PredictionModel(ABC): def __init__(self, data_loader: TextDataLoader = TextDataLoader()): self.description = "" self.model = None @abstractmethod def predict(self, quote: str) -> int: """ Predict the label for a given quote. Parameters: ----------- quote: str The quote to classify. Returns: -------- int The predicted label (0-7). """ pass @abstractmethod def train(self, dataset) -> None: """ Train the model on a given dataset. Parameters: ----------- dataset: The dataset to train on. Returns: -------- None """ pass @abstractmethod def save_to_directory(self, directory: Path) -> None: pass def save(self) -> None: save_directory = Path(__file__).parent / "pretrained_models" timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") sanitized_description = (((self.description. replace(" ", "_")). replace("(", "")). replace(")", "")) save_filename = f"{timestamp}_{sanitized_description}" self.save_to_directory(save_directory / save_filename) class BaselineModel(PredictionModel): def __init__(self, data_loader: TextDataLoader = TextDataLoader()): super().__init__() self.description = "Random Baseline (with Strategy Pattern, from another module)" def predict(self, quote: str) -> int: return random.randint(0, 7) def train(self, dataset): pass def save_to_directory(self, directory: Path) -> None: pass class DistilBERTModel(PredictionModel): def __init__(self, data_loader: TextDataLoader = TextDataLoader(), batch_size: int = 4, num_epochs: int = 5, initial_learning_rate: float = 2e-5, start_model_name: str = "distilbert-base-uncased"): super().__init__() self.start_model_name = start_model_name self.description = f"DistilBERT Model (fined-tuned from {self.start_model_name})" self.label_to_id_mapping = data_loader.get_label_to_id_mapping() self.id_to_label_mapping = data_loader.get_id_to_label_mapping() # tokenizer self.tokenizer = AutoTokenizer.from_pretrained(self.start_model_name) # data collator with dynamic padding self.data_collator = DataCollatorWithPadding(tokenizer=self.tokenizer, return_tensors="tf") # load accuracy metric self.accuracy = evaluate.load("accuracy") # training parameters self.batch_size = batch_size self.num_epochs = num_epochs self.initial_learning_rate = initial_learning_rate def predict(self, quote: str) -> int: if self.model is None: raise ValueError("Model has not been trained yet. Please train the model before making predictions.") inputs = self.tokenizer(quote, return_tensors="tf", truncation=True, max_length=128) outputs = self.model(**inputs) logits = outputs.logits probabilities = tf.nn.softmax(logits) predicted_label = self.model.config.id2label[tf.argmax(probabilities, axis=1).numpy()[0]] return self.label_to_id_mapping[predicted_label] def train(self, dataset): # Pre-process data tokenized_data = self.pre_process_data(dataset) # Training setup batch_size = self.batch_size num_epochs = self.num_epochs batches_per_epoch = len(tokenized_data["train"]) // batch_size total_train_steps = int(batches_per_epoch * num_epochs) # Learning rate scheduler initial_learning_rate = self.initial_learning_rate lr_schedule = tf.keras.optimizers.schedules.PolynomialDecay( initial_learning_rate=initial_learning_rate, decay_steps=total_train_steps, end_learning_rate=0.0, power=1.0 ) # Optimizer with learning rate scheduler optimizer, schedule = create_optimizer(init_lr=initial_learning_rate, num_warmup_steps=0, num_train_steps=total_train_steps) # Load model self.model = TFAutoModelForSequenceClassification.from_pretrained( self.start_model_name, num_labels=8, id2label=self.id_to_label_mapping, label2id=self.label_to_id_mapping ) # Convert datasets to tf.data.Dataset format tf_train_set = self.model.prepare_tf_dataset( tokenized_data["train"], shuffle=True, batch_size=batch_size, collate_fn=self.data_collator, ) tf_validation_set = self.model.prepare_tf_dataset( tokenized_data["test"], shuffle=False, batch_size=batch_size, collate_fn=self.data_collator, ) # Compile model self.model.compile(optimizer=optimizer) # Keras metric callback metric_callback = KerasMetricCallback(metric_fn=self.compute_metrics, eval_dataset=tf_validation_set) # Train model self.model.fit(tf_train_set, validation_data=tf_validation_set, epochs=num_epochs, callbacks=[metric_callback]) def pre_process_data(self, dataset): return ((dataset. train_test_split(test_size=0.2, seed=42). remove_columns([col for col in dataset.column_names if col not in ["quote", "label"]])). map(self.tokenize)) def tokenize(self, example): return self.tokenizer(example["quote"], truncation=True, max_length=128) def compute_metrics(self, eval_pred): predictions, labels = eval_pred predictions = np.argmax(predictions, axis=1) return self.accuracy.compute(predictions=predictions, references=labels) def save_to_directory(self, directory: Path) -> None: self.model.save_pretrained(str(directory)) class TextEmbedder(ABC): @abstractmethod def encode(self, text: list[str]) -> np.ndarray[float]: """ Encode a list of text inputs into a numpy array. Parameters: ----------- text: list[str] The text inputs to encode. Returns: -------- np.ndarray The encoded text inputs. """ pass def fit(self, param): pass @abstractmethod def save_to_directory(self, directory: Path) -> None: pass class TfIdfEmbedder(TextEmbedder): """ A simple TF-IDF text embedder. TF-IDF stands for Term Frequency-Inverse Document Frequency. It can be defined as the calculation of how relevant a word in a series or corpus is to a text. The meaning increases proportionally to the number of times in the text a word appears but is compensated by the word frequency in the corpus (data-set). Source: https://www.geeksforgeeks.org/understanding-tf-idf-term-frequency-inverse-document-frequency/ The TfidfVectorizer class from scikit-learn is used to encode """ def __init__(self): self.vectorizer = TfidfVectorizer() self._is_fitted = False # Nouveau flag def fit(self, text: list[str]): """Fit the embedder to the given text.""" self.vectorizer.fit(text) self._is_fitted = True def encode(self, text: list[str]) -> np.ndarray[float]: if not self._is_fitted: raise RuntimeError("TfIdfEmbedder should be fitted before encoding text.") return self.vectorizer.transform(text).toarray() def save_to_directory(self, directory: Path) -> None: directory.mkdir(parents=True, exist_ok=True) joblib.dump(self.vectorizer, directory / "tfidf_vectorizer.joblib") class MLModel(ABC): @abstractmethod def fit(self, embedded_quotes: np.ndarray[float], y: list[int]) -> None: """ Fit the model to the data. Parameters: ----------- embedded_quotes: np.ndarray The embedded quotes, given by TextEmbedder.encode(). y: list[int] The labels (ranging from 0 to 7). """ pass @abstractmethod def predict(self, embedded_quotes: np.ndarray[float]) -> int: """ Predict the labels for the given embedded quotes. Parameters: ----------- embedded_quotes: np.ndarray The embedded quotes, given by TextEmbedder.encode(). Returns: -------- int The predicted labels (ranging from 0 to 7). """ pass @abstractmethod def save_to_directory(self, directory: Path) -> None: pass class MultivariateLogisticRegression(MLModel): def __init__(self): self.model = LogisticRegression() def fit(self, embedded_quotes: np.ndarray[float], y: list[int]) -> None: self.model.fit(embedded_quotes, y) def predict(self, embedded_quotes: np.ndarray[float]) -> int: return self.model.predict(embedded_quotes) def save_to_directory(self, directory: Path) -> None: directory.mkdir(parents=True, exist_ok=True) joblib.dump(self.model, directory / "logistic_regression.joblib") class EmbeddingMLModel(PredictionModel): def __init__(self, data_loader: TextDataLoader = TextDataLoader(), embedder: TextEmbedder = TfIdfEmbedder(), ml_model: MLModel = MultivariateLogisticRegression()): super().__init__() self.embedder = embedder self.ml_model = ml_model self.description = f"EmbeddingMLModel ({embedder.__class__.__name__} + {ml_model.__class__.__name__})" def predict(self, quote: str) -> int: embedded_quote = self.embedder.encode([quote]) return self.ml_model.predict(embedded_quote) def train(self, dataset): self.embedder.fit(dataset["quote"]) embedded_quotes = self.embedder.encode(dataset["quote"]) labels = dataset["label"] self.ml_model.fit(embedded_quotes, labels) def save_to_directory(self, directory: Path) -> None: directory.mkdir(parents=True, exist_ok=True) # save embedder and ml_model self.embedder.save_to_directory(directory) self.ml_model.save_to_directory(directory) # Metadata pour le reload metadata = { "embedder_type": self.embedder.__class__.__name__, "ml_model_type": self.ml_model.__class__.__name__ } with open(directory / "metadata.json", "w") as f: json.dump(metadata, f) class ModelFactory: @staticmethod def create_model(config) -> PredictionModel: """ Factory method to create a model based on the model type. Parameters: ----------- model_type: str The type of model to create. Options: "baseline", "distilbert" Returns: -------- PredictionModel The model instance. Raises: ------- ValueError If the model type is not recognized. """ model_type = config["model_type"] if model_type == "baseline": return BaselineModel() elif model_type == "distilbert": try: batch_size = config["batch_size"] num_epochs = config["num_epochs"] initial_learning_rate = config["initial_learning_rate"] except KeyError as e: raise ValueError(f"Missing configuration parameter: {e}") return DistilBERTModel(batch_size=batch_size, num_epochs=num_epochs, initial_learning_rate=initial_learning_rate) elif model_type == "distilbert-pretrained": model = DistilBERTModel() model_name = config["model_name"] model_path = Path(__file__).parent / "pretrained_models" / model_name if model_path.exists(): model.model = TFAutoModelForSequenceClassification.from_pretrained(model_path) return model else: raise FileNotFoundError(f"Pretrained model not found at {model_path}") elif model_type == "embeddingML": embedding_ml_model = EmbeddingMLModel() embedding_ml_model.train(TextDataLoader().get_train_dataset()) return embedding_ml_model else: raise ValueError(f"Unknown model type: {model_type}")