update content with the text model from Thomas repository https://huggingface.co/spaces/tombou/frugal-ai-challenge
42b7ac6
| import json | |
| import random | |
| from abc import ABC, abstractmethod | |
| from datetime import datetime | |
| from pathlib import Path | |
| import joblib | |
| import numpy as np | |
| import tensorflow as tf | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.linear_model import LogisticRegression | |
| from transformers import AutoTokenizer, DataCollatorWithPadding, create_optimizer, TFAutoModelForSequenceClassification, \ | |
| KerasMetricCallback | |
| import evaluate | |
| from tasks.data.data_loaders import TextDataLoader | |
| class PredictionModel(ABC): | |
| def __init__(self, data_loader: TextDataLoader = TextDataLoader()): | |
| self.description = "" | |
| self.model = None | |
| def predict(self, quote: str) -> int: | |
| """ | |
| Predict the label for a given quote. | |
| Parameters: | |
| ----------- | |
| quote: str | |
| The quote to classify. | |
| Returns: | |
| -------- | |
| int | |
| The predicted label (0-7). | |
| """ | |
| pass | |
| def train(self, dataset) -> None: | |
| """ | |
| Train the model on a given dataset. | |
| Parameters: | |
| ----------- | |
| dataset: | |
| The dataset to train on. | |
| Returns: | |
| -------- | |
| None | |
| """ | |
| pass | |
| def save_to_directory(self, directory: Path) -> None: | |
| pass | |
| def save(self) -> None: | |
| save_directory = Path(__file__).parent / "pretrained_models" | |
| timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") | |
| sanitized_description = (((self.description. | |
| replace(" ", "_")). | |
| replace("(", "")). | |
| replace(")", "")) | |
| save_filename = f"{timestamp}_{sanitized_description}" | |
| self.save_to_directory(save_directory / save_filename) | |
| class BaselineModel(PredictionModel): | |
| def __init__(self, data_loader: TextDataLoader = TextDataLoader()): | |
| super().__init__() | |
| self.description = "Random Baseline (with Strategy Pattern, from another module)" | |
| def predict(self, quote: str) -> int: | |
| return random.randint(0, 7) | |
| def train(self, dataset): | |
| pass | |
| def save_to_directory(self, directory: Path) -> None: | |
| pass | |
| class DistilBERTModel(PredictionModel): | |
| def __init__(self, | |
| data_loader: TextDataLoader = TextDataLoader(), | |
| batch_size: int = 4, | |
| num_epochs: int = 5, | |
| initial_learning_rate: float = 2e-5, | |
| start_model_name: str = "distilbert-base-uncased"): | |
| super().__init__() | |
| self.start_model_name = start_model_name | |
| self.description = f"DistilBERT Model (fined-tuned from {self.start_model_name})" | |
| self.label_to_id_mapping = data_loader.get_label_to_id_mapping() | |
| self.id_to_label_mapping = data_loader.get_id_to_label_mapping() | |
| # tokenizer | |
| self.tokenizer = AutoTokenizer.from_pretrained(self.start_model_name) | |
| # data collator with dynamic padding | |
| self.data_collator = DataCollatorWithPadding(tokenizer=self.tokenizer, return_tensors="tf") | |
| # load accuracy metric | |
| self.accuracy = evaluate.load("accuracy") | |
| # training parameters | |
| self.batch_size = batch_size | |
| self.num_epochs = num_epochs | |
| self.initial_learning_rate = initial_learning_rate | |
| def predict(self, quote: str) -> int: | |
| if self.model is None: | |
| raise ValueError("Model has not been trained yet. Please train the model before making predictions.") | |
| inputs = self.tokenizer(quote, return_tensors="tf", truncation=True, max_length=128) | |
| outputs = self.model(**inputs) | |
| logits = outputs.logits | |
| probabilities = tf.nn.softmax(logits) | |
| predicted_label = self.model.config.id2label[tf.argmax(probabilities, axis=1).numpy()[0]] | |
| return self.label_to_id_mapping[predicted_label] | |
| def train(self, dataset): | |
| # Pre-process data | |
| tokenized_data = self.pre_process_data(dataset) | |
| # Training setup | |
| batch_size = self.batch_size | |
| num_epochs = self.num_epochs | |
| batches_per_epoch = len(tokenized_data["train"]) // batch_size | |
| total_train_steps = int(batches_per_epoch * num_epochs) | |
| # Learning rate scheduler | |
| initial_learning_rate = self.initial_learning_rate | |
| lr_schedule = tf.keras.optimizers.schedules.PolynomialDecay( | |
| initial_learning_rate=initial_learning_rate, | |
| decay_steps=total_train_steps, | |
| end_learning_rate=0.0, | |
| power=1.0 | |
| ) | |
| # Optimizer with learning rate scheduler | |
| optimizer, schedule = create_optimizer(init_lr=initial_learning_rate, num_warmup_steps=0, | |
| num_train_steps=total_train_steps) | |
| # Load model | |
| self.model = TFAutoModelForSequenceClassification.from_pretrained( | |
| self.start_model_name, | |
| num_labels=8, | |
| id2label=self.id_to_label_mapping, | |
| label2id=self.label_to_id_mapping | |
| ) | |
| # Convert datasets to tf.data.Dataset format | |
| tf_train_set = self.model.prepare_tf_dataset( | |
| tokenized_data["train"], | |
| shuffle=True, | |
| batch_size=batch_size, | |
| collate_fn=self.data_collator, | |
| ) | |
| tf_validation_set = self.model.prepare_tf_dataset( | |
| tokenized_data["test"], | |
| shuffle=False, | |
| batch_size=batch_size, | |
| collate_fn=self.data_collator, | |
| ) | |
| # Compile model | |
| self.model.compile(optimizer=optimizer) | |
| # Keras metric callback | |
| metric_callback = KerasMetricCallback(metric_fn=self.compute_metrics, eval_dataset=tf_validation_set) | |
| # Train model | |
| self.model.fit(tf_train_set, validation_data=tf_validation_set, epochs=num_epochs, callbacks=[metric_callback]) | |
| def pre_process_data(self, dataset): | |
| return ((dataset. | |
| train_test_split(test_size=0.2, seed=42). | |
| remove_columns([col for col in dataset.column_names if col not in ["quote", "label"]])). | |
| map(self.tokenize)) | |
| def tokenize(self, example): | |
| return self.tokenizer(example["quote"], truncation=True, max_length=128) | |
| def compute_metrics(self, eval_pred): | |
| predictions, labels = eval_pred | |
| predictions = np.argmax(predictions, axis=1) | |
| return self.accuracy.compute(predictions=predictions, references=labels) | |
| def save_to_directory(self, directory: Path) -> None: | |
| self.model.save_pretrained(str(directory)) | |
| class TextEmbedder(ABC): | |
| def encode(self, text: list[str]) -> np.ndarray[float]: | |
| """ | |
| Encode a list of text inputs into a numpy array. | |
| Parameters: | |
| ----------- | |
| text: list[str] | |
| The text inputs to encode. | |
| Returns: | |
| -------- | |
| np.ndarray | |
| The encoded text inputs. | |
| """ | |
| pass | |
| def fit(self, param): | |
| pass | |
| def save_to_directory(self, directory: Path) -> None: | |
| pass | |
| class TfIdfEmbedder(TextEmbedder): | |
| """ | |
| A simple TF-IDF text embedder. | |
| TF-IDF stands for Term Frequency-Inverse Document Frequency. | |
| It can be defined as the calculation of how relevant a word | |
| in a series or corpus is to a text. The meaning increases | |
| proportionally to the number of times in the text a word | |
| appears but is compensated by the word frequency in the corpus | |
| (data-set). | |
| Source: https://www.geeksforgeeks.org/understanding-tf-idf-term-frequency-inverse-document-frequency/ | |
| The TfidfVectorizer class from scikit-learn is used to encode | |
| """ | |
| def __init__(self): | |
| self.vectorizer = TfidfVectorizer() | |
| self._is_fitted = False # Nouveau flag | |
| def fit(self, text: list[str]): | |
| """Fit the embedder to the given text.""" | |
| self.vectorizer.fit(text) | |
| self._is_fitted = True | |
| def encode(self, text: list[str]) -> np.ndarray[float]: | |
| if not self._is_fitted: | |
| raise RuntimeError("TfIdfEmbedder should be fitted before encoding text.") | |
| return self.vectorizer.transform(text).toarray() | |
| def save_to_directory(self, directory: Path) -> None: | |
| directory.mkdir(parents=True, exist_ok=True) | |
| joblib.dump(self.vectorizer, directory / "tfidf_vectorizer.joblib") | |
| class MLModel(ABC): | |
| def fit(self, embedded_quotes: np.ndarray[float], y: list[int]) -> None: | |
| """ | |
| Fit the model to the data. | |
| Parameters: | |
| ----------- | |
| embedded_quotes: np.ndarray | |
| The embedded quotes, given by TextEmbedder.encode(). | |
| y: list[int] | |
| The labels (ranging from 0 to 7). | |
| """ | |
| pass | |
| def predict(self, embedded_quotes: np.ndarray[float]) -> int: | |
| """ | |
| Predict the labels for the given embedded quotes. | |
| Parameters: | |
| ----------- | |
| embedded_quotes: np.ndarray | |
| The embedded quotes, given by TextEmbedder.encode(). | |
| Returns: | |
| -------- | |
| int | |
| The predicted labels (ranging from 0 to 7). | |
| """ | |
| pass | |
| def save_to_directory(self, directory: Path) -> None: | |
| pass | |
| class MultivariateLogisticRegression(MLModel): | |
| def __init__(self): | |
| self.model = LogisticRegression() | |
| def fit(self, embedded_quotes: np.ndarray[float], y: list[int]) -> None: | |
| self.model.fit(embedded_quotes, y) | |
| def predict(self, embedded_quotes: np.ndarray[float]) -> int: | |
| return self.model.predict(embedded_quotes) | |
| def save_to_directory(self, directory: Path) -> None: | |
| directory.mkdir(parents=True, exist_ok=True) | |
| joblib.dump(self.model, directory / "logistic_regression.joblib") | |
| class EmbeddingMLModel(PredictionModel): | |
| def __init__(self, | |
| data_loader: TextDataLoader = TextDataLoader(), | |
| embedder: TextEmbedder = TfIdfEmbedder(), | |
| ml_model: MLModel = MultivariateLogisticRegression()): | |
| super().__init__() | |
| self.embedder = embedder | |
| self.ml_model = ml_model | |
| self.description = f"EmbeddingMLModel ({embedder.__class__.__name__} + {ml_model.__class__.__name__})" | |
| def predict(self, quote: str) -> int: | |
| embedded_quote = self.embedder.encode([quote]) | |
| return self.ml_model.predict(embedded_quote) | |
| def train(self, dataset): | |
| self.embedder.fit(dataset["quote"]) | |
| embedded_quotes = self.embedder.encode(dataset["quote"]) | |
| labels = dataset["label"] | |
| self.ml_model.fit(embedded_quotes, labels) | |
| def save_to_directory(self, directory: Path) -> None: | |
| directory.mkdir(parents=True, exist_ok=True) | |
| # save embedder and ml_model | |
| self.embedder.save_to_directory(directory) | |
| self.ml_model.save_to_directory(directory) | |
| # Metadata pour le reload | |
| metadata = { | |
| "embedder_type": self.embedder.__class__.__name__, | |
| "ml_model_type": self.ml_model.__class__.__name__ | |
| } | |
| with open(directory / "metadata.json", "w") as f: | |
| json.dump(metadata, f) | |
| class ModelFactory: | |
| def create_model(config) -> PredictionModel: | |
| """ | |
| Factory method to create a model based on the model type. | |
| Parameters: | |
| ----------- | |
| model_type: str | |
| The type of model to create. Options: "baseline", "distilbert" | |
| Returns: | |
| -------- | |
| PredictionModel | |
| The model instance. | |
| Raises: | |
| ------- | |
| ValueError | |
| If the model type is not recognized. | |
| """ | |
| model_type = config["model_type"] | |
| if model_type == "baseline": | |
| return BaselineModel() | |
| elif model_type == "distilbert": | |
| try: | |
| batch_size = config["batch_size"] | |
| num_epochs = config["num_epochs"] | |
| initial_learning_rate = config["initial_learning_rate"] | |
| except KeyError as e: | |
| raise ValueError(f"Missing configuration parameter: {e}") | |
| return DistilBERTModel(batch_size=batch_size, | |
| num_epochs=num_epochs, | |
| initial_learning_rate=initial_learning_rate) | |
| elif model_type == "distilbert-pretrained": | |
| model = DistilBERTModel() | |
| model_name = config["model_name"] | |
| model_path = Path(__file__).parent / "pretrained_models" / model_name | |
| if model_path.exists(): | |
| model.model = TFAutoModelForSequenceClassification.from_pretrained(model_path) | |
| return model | |
| else: | |
| raise FileNotFoundError(f"Pretrained model not found at {model_path}") | |
| elif model_type == "embeddingML": | |
| embedding_ml_model = EmbeddingMLModel() | |
| embedding_ml_model.train(TextDataLoader().get_train_dataset()) | |
| return embedding_ml_model | |
| else: | |
| raise ValueError(f"Unknown model type: {model_type}") | |