Spaces:
Runtime error
Runtime error
| # -*- coding: utf-8 -*- | |
| """metlearn.ipynb | |
| Automatically generated by Colab. | |
| Original file is located at | |
| https://colab.research.google.com/drive/13Na6TJXmV0xL-sdlEmPHFM6EBj4U7u9a | |
| """ | |
| import pandas as pd | |
| # Upload local file | |
| from google.colab import files | |
| uploaded = files.upload() | |
| # Charger le fichier | |
| df = pd.read_csv("TSAC_Train.csv") # adapte le nom si nécessaire | |
| print(df.head()) | |
| import os | |
| os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True" | |
| import torch | |
| print(torch.cuda.is_available()) # Doit retourner True | |
| #print(torch.cuda.get_device_name(0)) | |
| from transformers import AutoTokenizer, AutoModelForSequenceClassification | |
| import torch | |
| # Charger le tokenizer et le modèle TunBERT | |
| tokenizer = AutoTokenizer.from_pretrained("tunis-ai/TunBERT") | |
| model = AutoModelForSequenceClassification.from_pretrained("tunis-ai/TunBERT", trust_remote_code=True, | |
| output_attentions=False, | |
| output_hidden_states=False) | |
| # Déplacer le modèle sur GPU si disponible | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| model.to(device) | |
| text = "أنا فرحان برشا اليوم" | |
| inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True) | |
| inputs = {k: v.to(device) for k, v in inputs.items()} | |
| with torch.no_grad(): | |
| outputs = model(**inputs) | |
| logits = outputs.logits | |
| predicted_class = torch.argmax(logits, dim=1).item() | |
| print("Classe prédite :", predicted_class) | |
| from google.colab import drive | |
| drive.mount('/content/drive') | |
| file_path = "/content/TSAC_Train.csv" | |
| import torch | |
| print("GPU disponible :", torch.cuda.is_available()) | |
| print("Nom du GPU :", torch.cuda.get_device_name(0) if torch.cuda.is_available() else "CPU") | |
| import random | |
| from collections import defaultdict | |
| from typing import List, Dict, Tuple | |
| from transformers import AutoTokenizer, AutoModel | |
| import torch.nn as nn | |
| from transformers import pipeline | |
| import torch | |
| from torch.utils.data import DataLoader,Dataset | |
| from transformers import AutoTokenizer, AutoModelForSequenceClassification | |
| import csv | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| def load_csv_file(filepath): | |
| """ | |
| Charge un fichier CSV contenant des colonnes 'text' et 'label'. | |
| Args: | |
| filepath: Chemin complet vers le fichier .csv | |
| Returns: | |
| List[Tuple[str, str]]: Liste de tuples (texte, label) | |
| """ | |
| data = [] | |
| with open(filepath, 'r', encoding='utf-8') as f: | |
| reader = csv.DictReader(f) | |
| for row in reader: | |
| text = row['text'] | |
| label = row['label'] | |
| data.append((text, label)) | |
| return data | |
| # Exemple d'utilisation | |
| tsac_dataa = load_csv_file(file_path) | |
| #create episodes | |
| def create_episode(data, tokenizer, device, n_ways=2, k_shots=3, q_queries=3, num_episodes=10): | |
| from collections import defaultdict | |
| import random | |
| label_to_texts = defaultdict(list) | |
| for text, label in data: | |
| label_to_texts[label].append(text) | |
| episodes = [] | |
| labels = list(label_to_texts.keys()) | |
| for _ in range(num_episodes): | |
| selected_labels = random.sample(labels, n_ways) | |
| support_texts, support_labels = [], [] | |
| query_texts, query_labels = [], [] | |
| for i, label in enumerate(selected_labels): | |
| texts = random.sample(label_to_texts[label], k_shots + q_queries) | |
| support_texts += texts[:k_shots] | |
| support_labels += [i] * k_shots | |
| query_texts += texts[k_shots:] | |
| query_labels += [i] * q_queries | |
| support_inputs = tokenizer(support_texts, padding=True, truncation=True, return_tensors="pt") | |
| query_inputs = tokenizer(query_texts, padding=True, truncation=True, return_tensors="pt") | |
| # Supprimer token_type_ids si nécessaire | |
| if 'token_type_ids' in support_inputs: | |
| del support_inputs['token_type_ids'] | |
| if 'token_type_ids' in query_inputs: | |
| del query_inputs['token_type_ids'] | |
| episode = { | |
| 'support_inputs': {k: v.to(device) for k, v in support_inputs.items()}, | |
| 'support_labels': torch.tensor(support_labels, device=device), | |
| 'query_inputs': {k: v.to(device) for k, v in query_inputs.items()}, | |
| 'query_labels': torch.tensor(query_labels, device=device) | |
| } | |
| episodes.append(episode) | |
| return episodes | |
| tokenizer = AutoTokenizer.from_pretrained("tunis-ai/TunBERT") | |
| encoder = AutoModelForSequenceClassification.from_pretrained("tunis-ai/TunBERT",trust_remote_code=True) | |
| print(encoder.forward.__code__.co_varnames) | |
| class EpisodeDataset(Dataset): | |
| def __init__(self, episodes: List[Dict]): | |
| self.episodes = episodes | |
| def __len__(self): | |
| return len(self.episodes) | |
| def __getitem__(self, idx): | |
| return self.episodes[idx] | |
| def flatten_inputs(inputs): | |
| # inputs: dict with tensors of shape [batch_size, num_examples, seq_len] | |
| return { | |
| k: v.view(-1, v.size(-1)) for k, v in inputs.items() | |
| } | |
| def collate_fn(batch): | |
| def stack_dicts(dict_list): | |
| return {k: torch.stack([d[k] for d in dict_list]) for k in dict_list[0]} | |
| support_inputs = stack_dicts([item['support_inputs'] for item in batch]) | |
| query_inputs = stack_dicts([item['query_inputs'] for item in batch]) | |
| support_labels = torch.cat([item['support_labels'] for item in batch], dim=0) | |
| query_labels = torch.cat([item['query_labels'] for item in batch], dim=0) | |
| return { | |
| 'support_inputs': support_inputs, | |
| 'support_labels': support_labels, | |
| 'query_inputs': query_inputs, | |
| 'query_labels': query_labels | |
| } | |
| #training | |
| def train_meta(model, dataloader, optimizer, device): | |
| model.train() | |
| total_loss = 0 | |
| total_correct = 0 | |
| total_examples = 0 | |
| for batch in dataloader: | |
| support_inputs = {k: v.to(device) for k, v in batch['support_inputs'].items()} | |
| query_inputs = {k: v.to(device) for k, v in batch['query_inputs'].items()} | |
| support_labels = batch['support_labels'].to(device) | |
| query_labels = batch['query_labels'].to(device) | |
| optimizer.zero_grad() | |
| loss = model.compute_loss(support_inputs, support_labels, query_inputs, query_labels) | |
| #Forward pass to get predictions | |
| support_inputs = flatten_inputs(support_inputs) | |
| query_inputs = flatten_inputs(query_inputs) | |
| support_emb = model.forward(**support_inputs) | |
| query_emb = model.forward(**query_inputs) | |
| prototypes, classes = model.compute_prototypes(support_emb, support_labels) | |
| dists = torch.cdist(query_emb, prototypes) | |
| preds = torch.argmin(dists, dim=1) | |
| # Map query labels to prototype indices | |
| label_map = {cls.item(): i for i, cls in enumerate(classes)} | |
| query_indices = torch.tensor([label_map[label.item()] for label in query_labels], device=query_labels.device) | |
| correct = (preds == query_indices).sum().item() | |
| total_correct += correct | |
| total_examples += len(query_labels) | |
| loss.backward() | |
| optimizer.step() | |
| total_loss += loss.item() | |
| avg_loss = total_loss / len(dataloader) | |
| accuracy = total_correct / total_examples | |
| return avg_loss, accuracy | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| class ProtoNet(nn.Module): | |
| def __init__(self, encoder): | |
| super().__init__() | |
| self.encoder = encoder # TunBERT ou autre modèle HuggingFace | |
| def forward(self, input_ids, attention_mask): | |
| outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask) | |
| embeddings = outputs[-1] # CLS token | |
| return embeddings[:, 0, :] | |
| def compute_prototypes(self, support_embeddings, support_labels): | |
| classes = torch.unique(support_labels) | |
| prototypes = [] | |
| for cls in classes: | |
| cls_embeddings = support_embeddings[support_labels == cls] | |
| prototype = cls_embeddings.mean(dim=0) | |
| prototypes.append(prototype) | |
| return torch.stack(prototypes), classes | |
| def compute_loss(self, support_inputs, support_labels, query_inputs, query_labels): | |
| # Encode support and query | |
| support_inputs = flatten_inputs(support_inputs) | |
| query_inputs = flatten_inputs(query_inputs) | |
| support_emb = self.forward(**support_inputs) | |
| query_emb = self.forward(**query_inputs) | |
| # Compute prototypes | |
| prototypes, classes = self.compute_prototypes(support_emb, support_labels) | |
| # Compute distances | |
| dists = torch.cdist(query_emb, prototypes) # [num_query, num_classes] | |
| logits = -dists # plus proche = plus probable | |
| # Map query labels to indices | |
| label_map = {cls.item(): i for i, cls in enumerate(classes)} | |
| query_indices = torch.tensor([label_map[label.item()] for label in query_labels], device=query_labels.device) | |
| # Loss | |
| loss = F.cross_entropy(logits, query_indices) | |
| return loss | |
| def prepare_batch(texts, tokenizer, device): | |
| encoded = tokenizer(texts, padding=True, truncation=True, return_tensors="pt") | |
| if 'token_type_ids' in encoded: | |
| del encoded['token_type_ids'] | |
| return {k: v.to(device) for k, v in encoded.items()} | |
| model = ProtoNet(model).to(device) | |
| sample_data = [ | |
| ("نحب القهوة", "positive"), | |
| ("ما نحبش الجو", "negative"), | |
| ("يعجبني الفيلم", "positive"), | |
| ("كرهت الخدمة", "negative"), | |
| ("ممتازة التجربة", "positive"), | |
| ("تعبت برشة", "negative"), | |
| ] | |
| def split_episode(data, tokenizer, device): | |
| class_map = defaultdict(list) | |
| for text, label in data: | |
| class_map[label].append(text) | |
| support_texts, support_labels = [], [] | |
| query_texts, query_labels = [], [] | |
| for label, texts in class_map.items(): | |
| support_texts += texts[:2] | |
| support_labels += [label] * 2 | |
| query_texts += texts[2:3] | |
| query_labels += [label] | |
| support_inputs = prepare_batch(support_texts, tokenizer, device) | |
| query_inputs = prepare_batch(query_texts, tokenizer, device) | |
| label_to_id = {label: i for i, label in enumerate(class_map.keys())} | |
| support_labels = torch.tensor([label_to_id[l] for l in support_labels], device=device) | |
| query_labels = torch.tensor([label_to_id[l] for l in query_labels], device=device) | |
| return support_inputs, support_labels, query_inputs, query_labels | |
| support_inputs, support_labels, query_inputs, query_labels = split_episode(sample_data, tokenizer, device) | |
| loss = model.compute_loss(support_inputs, support_labels, query_inputs, query_labels) | |
| print("Loss:", loss.item()) | |
| episodes = create_episode(tsac_dataa, tokenizer, device) | |
| dataset = EpisodeDataset(episodes) | |
| dataloader = DataLoader(dataset, batch_size=1, shuffle=True, collate_fn=collate_fn) | |
| optimizer = torch.optim.Adam(model.parameters(), lr=2e-5) | |
| num_epochs = 20 # Tu peux commencer avec 3 pour tester | |
| learning_rate = 2e-5 | |
| losses = [] | |
| accuracies = [] | |
| import gc | |
| for epoch in range(num_epochs): | |
| avg_loss, accuracy = train_meta(model, dataloader, optimizer, device) | |
| print(f"Epoch {epoch+1} - Loss: {avg_loss:.4f} - Accuracy: {accuracy:.2%}") | |
| losses.append(avg_loss) | |
| accuracies.append(accuracy) | |
| # 🔥 Libération mémoire GPU | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| import os | |
| save_path = "/content/drive/MyDrive/protonet_models" | |
| os.makedirs(save_path, exist_ok=True) | |
| model_path = os.path.join(save_path, "protonet_400.pt") | |
| torch.save(model.state_dict(), model_path) | |
| print(f"✅ Modèle sauvegardé à : {model_path}") | |
| print(os.listdir(save_path)) | |
| # Recréer le modèle | |
| encoder = AutoModelForSequenceClassification.from_pretrained("tunis-ai/TunBERT") | |
| model = ProtoNet(encoder).to(device) | |
| # Charger les poids | |
| model.load_state_dict(torch.load("/content/drive/MyDrive/protonet_models/protonet_400.pt"), strict=False) | |
| model.eval() | |
| print("✅ Modèle chargé depuis Drive.") | |
| try: | |
| model.load_state_dict(torch.load("/content/drive/MyDrive/protonet_models/protonet_400.pt")) | |
| except RuntimeError as e: | |
| print("❌ Erreur de chargement :", e) | |
| import matplotlib.pyplot as plt | |
| plt.plot(losses, label='Loss') | |
| plt.plot(accuracies, label='Accuracy') | |
| plt.xlabel('Epoch') | |
| plt.ylabel('Value') | |
| plt.title('Meta-learning Performance') | |
| plt.legend() | |
| plt.grid(True) | |
| plt.show() | |
| from sklearn.metrics import precision_score, recall_score, f1_score | |
| def evaluate_meta(model, dataloader, device): | |
| model.eval() | |
| total_correct = 0 | |
| total_examples = 0 | |
| all_preds = [] | |
| all_labels = [] | |
| with torch.no_grad(): | |
| for batch in dataloader: | |
| support_inputs = {k: v.to(device) for k, v in batch['support_inputs'].items()} | |
| query_inputs = {k: v.to(device) for k, v in batch['query_inputs'].items()} | |
| support_labels = batch['support_labels'].to(device) | |
| query_labels = batch['query_labels'].to(device) | |
| # Forward pass | |
| support_inputs = flatten_inputs(support_inputs) | |
| query_inputs = flatten_inputs(query_inputs) | |
| support_emb = model.forward(**support_inputs) | |
| query_emb = model.forward(**query_inputs) | |
| prototypes, classes = model.compute_prototypes(support_emb, support_labels) | |
| dists = torch.cdist(query_emb, prototypes) | |
| preds = torch.argmin(dists, dim=1) | |
| # Accuracy | |
| label_map = {cls.item(): i for i, cls in enumerate(classes)} | |
| query_indices = torch.tensor([label_map[label.item()] for label in query_labels], device=query_labels.device) | |
| correct = (preds == query_indices).sum().item() | |
| total_correct += correct | |
| total_examples += len(query_labels) | |
| # Collect for metrics | |
| all_preds.extend(preds.cpu().numpy()) | |
| all_labels.extend(query_indices.cpu().numpy()) | |
| accuracy = total_correct / total_examples | |
| precision = precision_score(all_labels, all_preds, average='macro') | |
| recall = recall_score(all_labels, all_preds, average='macro') | |
| f1 = f1_score(all_labels, all_preds, average='macro') | |
| print(f"Evaluation Accuracy: {accuracy:.2%}") | |
| print(f"Precision: {precision:.2f}, Recall: {recall:.2f}, F1-score: {f1:.2f}") | |
| print(f"Evaluation Accuracy: {accuracy:.2%}") | |
| return accuracy, precision, recall, f1 | |
| hate_data = [ | |
| ("تفوه عليك يا كلب", "hate"), | |
| ("ربي يهديك إن شاء الله", "neutral"), | |
| ("أنت وصخ وما تستاهلش تعيش", "hate"), | |
| ("كلامك فيه احترام", "neutral"), | |
| ("ما تستاهلش حتى كلمة طيبة", "hate"), | |
| ("كلامك موزون وراقي", "neutral"), | |
| ] | |
| # Crée des épisodes à partir de T-HSAB | |
| episodes_thsab = create_episode(hate_data, tokenizer, device, n_ways=2, k_shots=1, q_queries=2, num_episodes=30) | |
| dataloader_thsab = DataLoader(EpisodeDataset(episodes_thsab), batch_size=1, shuffle=False, collate_fn=collate_fn) | |
| # Évalue sans réentraînement | |
| evaluate_meta(model, dataloader_thsab, device) | |
| from google.colab import files | |
| uploaded = files.upload() | |
| import pandas as pd | |
| df = pd.read_excel("T-HSAB.xlsx", engine="openpyxl") | |
| print(df.head()) | |
| print(df.columns) | |
| # Convertir en liste de tuples (text, label) | |
| thsab_data = list(zip(df["text"], df["label"])) | |
| episodes_test = create_episode( | |
| thsab_data, | |
| tokenizer, | |
| device, | |
| n_ways=2, | |
| k_shots=5, | |
| q_queries=5, | |
| num_episodes=100 | |
| ) | |
| dataloader_test = DataLoader( | |
| EpisodeDataset(episodes_test), | |
| batch_size=1, | |
| shuffle=False, | |
| collate_fn=collate_fn | |
| ) | |
| optimizer = torch.optim.Adam(model.parameters(), lr=1e-5) | |
| from tqdm import tqdm | |
| model.train() | |
| for epoch in range(num_epochs): | |
| total_loss = 0 | |
| for batch in tqdm(dataloader_test): | |
| support_inputs = {k: v.to(device) for k, v in batch['support_inputs'].items()} | |
| query_inputs = {k: v.to(device) for k, v in batch['query_inputs'].items()} | |
| support_labels = batch['support_labels'].to(device) | |
| query_labels = batch['query_labels'].to(device) | |
| # Flatten inputs | |
| support_inputs = flatten_inputs(support_inputs) | |
| query_inputs = flatten_inputs(query_inputs) | |
| # Forward pass | |
| support_emb = model(**support_inputs) | |
| query_emb = model(**query_inputs) | |
| prototypes, classes = model.compute_prototypes(support_emb, support_labels) | |
| dists = torch.cdist(query_emb, prototypes) | |
| preds = torch.argmin(dists, dim=1) | |
| # Map query labels to prototype indices | |
| label_map = {cls.item(): i for i, cls in enumerate(classes)} | |
| query_indices = torch.tensor([label_map[label.item()] for label in query_labels], device=query_labels.device) | |
| # Loss and backward | |
| loss = torch.nn.functional.cross_entropy(-dists, query_indices) | |
| total_loss += loss.item() | |
| optimizer.zero_grad() | |
| loss.backward() | |
| optimizer.step() | |
| print(f"Epoch {epoch+1} - Loss: {total_loss:.4f}") | |
| evaluate_meta(model, dataloader_test, device) | |
| import gradio as gr | |
| def predict(text): | |
| # Ici tu mettras ta logique de prédiction Few-Shot | |
| return "Analyse en cours (nécessite GPU pour l'inférence MAML)" | |
| demo = gr.Interface(fn=predict, inputs="text", outputs="text", title="Tunisian Dialect Meta-Learning") | |
| demo.launch() |