Spaces:
Runtime error
Runtime error
| # -*- coding: utf-8 -*- | |
| """Untitled7.ipynb | |
| Automatically generated by Colab. | |
| Original file is located at | |
| https://colab.research.google.com/drive/1MWc3B3JSbW5VvEuftDi2WoCjUWN1CtVj | |
| """ | |
| pip install transformers datasets evaluate accelerate | |
| data_files = { | |
| "train": "./train.json", # If saved in current working directory | |
| "validation": "./validation.json" | |
| } | |
| from google.colab import files | |
| uploaded = files.upload() # Select and upload your train.json and validation.json files | |
| from google.colab import files | |
| uploaded = files.upload() # Select and upload your train.json and validation.json files | |
| import json | |
| import pandas as pd | |
| from datasets import Dataset, DatasetDict | |
| with open("train.json", "r") as f: | |
| train_data = json.load(f) | |
| with open("validation.json", "r") as f: | |
| validation_data = json.load(f) | |
| train_list = train_data.get("data", []) | |
| validation_list = validation_data.get("data", []) | |
| train_df = pd.DataFrame(train_list) | |
| validation_df = pd.DataFrame(validation_list) | |
| train_dataset = Dataset.from_pandas(train_df) | |
| validation_dataset = Dataset.from_pandas(validation_df) | |
| dataset = DatasetDict({ | |
| "train": train_dataset, | |
| "validation": validation_dataset | |
| }) | |
| print(dataset) | |
| from transformers import AutoTokenizer, AutoModelForQuestionAnswering | |
| model_checkpoint = "deepset/roberta-base-squad2" | |
| tokenizer = AutoTokenizer.from_pretrained(model_checkpoint) | |
| model = AutoModelForQuestionAnswering.from_pretrained(model_checkpoint) | |
| def prepare_features(examples): | |
| tokenized_examples = { | |
| "input_ids": [], | |
| "attention_mask": [], | |
| "offset_mapping": [], | |
| "overflow_to_sample_mapping": [], | |
| "start_positions": [], | |
| "end_positions": [], | |
| "example_id": [], # Add example_id to link back to original examples | |
| } | |
| for example_index, paragraphs in enumerate(examples["paragraphs"]): | |
| for para in paragraphs: | |
| context = para["context"] | |
| for qa in para["qas"]: | |
| question = qa["question"] | |
| answers = qa["answers"] # This is a list of answer dictionaries | |
| tokenized = tokenizer( | |
| question, | |
| context, | |
| truncation="only_second", | |
| max_length=384, | |
| stride=128, | |
| return_overflowing_tokens=True, | |
| return_offsets_mapping=True, | |
| padding="max_length" | |
| ) | |
| sample_mapping = tokenized.pop("overflow_to_sample_mapping") | |
| offset_mapping = tokenized.pop("offset_mapping") | |
| for i, offsets in enumerate(offset_mapping): | |
| input_ids = tokenized["input_ids"][i] | |
| cls_index = input_ids.index(tokenizer.cls_token_id) | |
| sequence_ids = tokenized.sequence_ids(i) | |
| start_position = cls_index | |
| end_position = cls_index | |
| if len(answers) > 0: | |
| first_answer = answers[0] # Get the first answer dictionary | |
| start_char = first_answer["answer_start"] | |
| end_char = start_char + len(first_answer["text"]) | |
| token_start_index = 0 | |
| while sequence_ids[token_start_index] != (1 if tokenizer.is_fast else 0): | |
| token_start_index += 1 | |
| token_end_index = len(input_ids) - 1 | |
| while sequence_ids[token_end_index] != (1 if tokenizer.is_fast else 0): | |
| token_end_index -= 1 | |
| if offsets[token_start_index][0] <= start_char and offsets[token_end_index][1] >= end_char: | |
| # Move the token_start_index and token_end_index to the two ends of the answer | |
| while token_start_index < len(offsets) and offsets[token_start_index][0] <= start_char: | |
| token_start_index += 1 | |
| start_position = token_start_index - 1 | |
| while token_end_index >= 0 and offsets[token_end_index][1] >= end_char: | |
| token_end_index -= 1 | |
| end_position = token_end_index + 1 | |
| tokenized_examples["input_ids"].append(input_ids) | |
| tokenized_examples["attention_mask"].append(tokenized["attention_mask"][i]) | |
| tokenized_examples["offset_mapping"].append(offsets) | |
| tokenized_examples["overflow_to_sample_mapping"].append(example_index) # Map back to the original example index in the batch | |
| tokenized_examples["start_positions"].append(start_position) | |
| tokenized_examples["end_positions"].append(end_position) | |
| tokenized_examples["example_id"].append(qa.get("id", f"{examples.get('title', ['no_title'])[example_index]}_{len(tokenized_examples['input_ids'])}")) | |
| tokenized_dataset = dataset.map( | |
| prepare_features, | |
| batched=True, | |
| remove_columns=dataset["train"].column_names # Remove original columns after processing | |
| ) | |
| print(tokenized_dataset) | |
| from transformers import TrainingArguments, Trainer | |
| training_args = TrainingArguments( | |
| output_dir="./finetuned-roberta-squad2", | |
| eval_strategy="epoch", # Corrected argument name | |
| save_strategy="epoch", # Match save strategy to evaluation strategy | |
| learning_rate=2e-5, | |
| num_train_epochs=3, | |
| weight_decay=0.01, | |
| per_device_train_batch_size=8, | |
| per_device_eval_batch_size=8, | |
| save_total_limit=1, | |
| load_best_model_at_end=True, | |
| ) | |
| trainer = Trainer( | |
| model=model, | |
| args=training_args, | |
| train_dataset=tokenized_dataset["train"], | |
| eval_dataset=tokenized_dataset["validation"], | |
| tokenizer=tokenizer | |
| ) | |
| trainer.train() | |
| trainer.save_model("./finetuned-roberta-squad2") | |
| tokenizer.save_pretrained("./finetuned-roberta-squad2") | |
| # EVALUATION | |
| !pip install bert-score -q | |
| from transformers import pipeline | |
| qa_pipeline = pipeline("question-answering", model="./finetuned-roberta-squad2", tokenizer=tokenizer) | |
| examples = dataset["validation"] | |
| predictions = [] | |
| references = [] | |
| for example in examples: | |
| for para in example["paragraphs"]: | |
| context = para["context"] | |
| for qa in para["qas"]: | |
| question = qa["question"] | |
| answers = qa["answers"] # This is a list of answer dictionaries | |
| result = qa_pipeline({ | |
| "context": context, | |
| "question": question | |
| }) | |
| predictions.append(result["answer"]) | |
| if len(answers) > 0: | |
| references.append(answers[0]["text"]) | |
| else: | |
| references.append("") # Append empty string for unanswerable questions | |
| from bert_score import score | |
| P, R, F1 = score(predictions, references, lang="en", model_type="roberta-base") | |
| print(f"🔹 BERTScore Precision: {P.mean().item():.4f}") | |
| print(f"🔹 BERTScore Recall: {R.mean().item():.4f}") | |
| print(f"🔹 BERTScore F1: {F1.mean().item():.4f}") | |
| from transformers import AutoModel, AutoTokenizer | |
| import torch | |
| import torch.nn.functional as F | |
| # Use sentence transformer or same QA model encoder | |
| embed_model = AutoModel.from_pretrained("sentence-transformers/all-MiniLM-L6-v2") | |
| embed_tokenizer = AutoTokenizer.from_pretrained("sentence-transformers/all-MiniLM-L6-v2") | |
| def get_embedding(text): | |
| inputs = embed_tokenizer(text, return_tensors="pt", truncation=True, padding=True) | |
| with torch.no_grad(): | |
| outputs = embed_model(**inputs) | |
| return outputs.last_hidden_state.mean(dim=1) | |
| # Compute cosine similarities | |
| cosine_scores = [] | |
| for pred, ref in zip(predictions, references): | |
| pred_emb = get_embedding(pred) | |
| ref_emb = get_embedding(ref) | |
| cosine_sim = F.cosine_similarity(pred_emb, ref_emb).item() | |
| cosine_scores.append(cosine_sim) | |
| avg_cosine = sum(cosine_scores) / len(cosine_scores) | |
| print(f"🔹 Average Cosine Similarity: {avg_cosine:.4f}") | |