Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| from transformers import AutoModelForCausalLM, AutoTokenizer, TrainingArguments, Trainer, TrainerCallback, DataCollatorWithPadding, DefaultDataCollator | |
| from openai import OpenAI | |
| from huggingface_hub import login | |
| import datasets | |
| from datasets import Dataset | |
| import json | |
| import pandas as pd | |
| import numpy as np | |
| import torch | |
| import wandb | |
| import copy | |
| import os | |
| import sys | |
| import re | |
| from peft import LoraConfig, TaskType, get_peft_model, AutoPeftModelForCausalLM | |
| from sklearn.model_selection import train_test_split | |
| import nltk | |
| from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction | |
| IS_COLAB = False | |
| if "google.colab" in sys.modules or "google.colab" in os.environ: | |
| IS_COLAB = True | |
| # Load env secrets | |
| if IS_COLAB: | |
| from google.colab import userdata | |
| OPENAI_API_KEY=userdata.get('OPENAI_API_KEY') | |
| WANDB_API_KEY=userdata.get('WANDB_API_KEY') | |
| else: | |
| OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") | |
| WANDB_API_KEY = os.environ.get("WANDB_API_KEY") | |
| # Authenticate Weights and Biases | |
| wandb.login(key=WANDB_API_KEY) | |
| # Custom callback to capture logs | |
| class LoggingCallback(TrainerCallback): | |
| def __init__(self): | |
| self.logs = [] # Store logs | |
| def on_log(self, args, state, control, logs=None, **kwargs): | |
| if logs: | |
| self.logs.append(logs) # Append logs to list | |
| class LLMTrainingApp: | |
| def __init__(self): | |
| # self.metric = datasets.load_metric('sacrebleu') | |
| self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
| self.finetuning_dataset = [] | |
| self.prompt_template = """### Question: {question} ### Answer: """ | |
| self.training_output = "/content/peft-model" if IS_COLAB else "./peft-model" | |
| self.localpath = "/content/finetuned-model" if IS_COLAB else "./finetuned-model" | |
| self.tokenizer = None | |
| self.model = None | |
| self.model_name = None | |
| self.fine_tuned_model = None | |
| self.teacher_model = OpenAI(api_key=OPENAI_API_KEY) | |
| self.base_models = { | |
| "SmolLM": {"hf_name":"HuggingFaceTB/SmolLM2-135M", | |
| "model_size": "135M", | |
| "training_size": "2T", | |
| "context_window": "8192"}, | |
| "GPT2": {"hf_name":"openai-community/gpt2", | |
| "model_size": "137M", | |
| "training_size": "2T", | |
| "context_window": "1024"} | |
| } | |
| self.peft_config = LoraConfig(task_type=TaskType.CAUSAL_LM, inference_mode=False, r=8, lora_alpha=32, lora_dropout=0.1) | |
| self.logging_callback = LoggingCallback() | |
| def login_into_hf(self, token): | |
| if not token: | |
| return "β Please enter a valid token." | |
| try: | |
| login(token) | |
| return f"β Logged in successfully!" | |
| except Exception as e: | |
| return f"β Login failed: {str(e)}" | |
| def select_model(self, model_name): | |
| self.model_name = model_name | |
| model_hf_name = self.base_models[model_name]["hf_name"] | |
| try: | |
| self.tokenizer = AutoTokenizer.from_pretrained(model_hf_name) | |
| self.tokenizer.pad_token = self.tokenizer.eos_token | |
| base_model = AutoModelForCausalLM.from_pretrained( | |
| model_hf_name, | |
| torch_dtype="auto", | |
| device_map="auto" | |
| ) | |
| self.model = get_peft_model(base_model, self.peft_config) | |
| params = self.model.get_nb_trainable_parameters() | |
| percent_trainable = round(100 * (params[0] / params[1]), 2) | |
| return f"β Loaded model into memory! Base Model card: {json.dumps(self.base_models[model_name])} - % of trainable parameters for PEFT model: {percent_trainable}%" | |
| except Exception as e: | |
| return f"β Failed to load model and/or tokenizer: {str(e)}" | |
| def create_golden_dataset(self, dataset): | |
| try: | |
| dataset = pd.DataFrame(dataset) | |
| for i, row in dataset.iterrows(): | |
| self.finetuning_dataset.append({"question": self.prompt_template.format(question=row["Question"]), "answer": row["Answer"]}) | |
| return "β Golden dataset created!" | |
| except Exception as e: | |
| return f"β Failed to create dataset: {str(e)}" | |
| def extend_dataset(self): | |
| try: | |
| completion = self.teacher_model.chat.completions.create( | |
| model="gpt-4o", | |
| messages=[ | |
| { | |
| "role": "system", | |
| "content": """Given the following question-answer pairs, generate 20 similar pairs in the following json format below. Do not respond with anything other than the json. | |
| ```json | |
| [ | |
| { | |
| "question": "question 1", | |
| "answer": "answer 1" | |
| }, | |
| { | |
| "question": "question 2", | |
| "answer": "answer 2" | |
| } | |
| ] | |
| """ | |
| }, | |
| { | |
| "role": "user", | |
| "content": f"""Here are the question-answer pairs: {json.dumps(self.finetuning_dataset)} | |
| """ | |
| } | |
| ] | |
| ) | |
| response = completion.choices[0].message.content | |
| print(f"raw response: {response}") | |
| clean_response = response.replace("```json", "").replace("```", "").strip() | |
| print(f"clean response: {clean_response}") | |
| new_data = json.loads(clean_response) | |
| for i, row in enumerate(new_data): | |
| row["question"] = row["question"].replace("### Question:", "").replace("### Answer:", "").strip() | |
| row["answer"] = row["answer"].replace("### Answer:", "").strip() | |
| self.finetuning_dataset.append({"question": self.prompt_template.format(question=row["question"]), "answer": row["answer"]}) | |
| # create df to display | |
| df = pd.DataFrame(new_data) | |
| return "β Synthetic dataset generated!", df | |
| except Exception as e: | |
| return f"β Failed to generate synthetic dataset: {str(e)}", pd.DataFrame() | |
| def tokenize_function(self, examples): | |
| try: | |
| # Tokenize the question and answer as input and target (labels) for causal LM | |
| encoding = self.tokenizer(examples['question'], examples['answer'], padding=True) | |
| # Create labels (same as input_ids, but mask the non-answer part) | |
| labels = copy.deepcopy(encoding["input_ids"]) | |
| for i in range(len(examples["question"])): | |
| # print(examples["question"][i]) | |
| question_length = len(self.tokenizer(examples['question'][i], add_special_tokens=False)["input_ids"]) | |
| # print(f'question length: {question_length}') | |
| labels[i][:question_length] = [-100] * question_length # Mask question tokens | |
| encoding["labels"] = labels | |
| return encoding | |
| except Exception as e: | |
| return f"β Failed to tokenize input: {str(e)}" | |
| def prepare_data_for_training(self): | |
| try: | |
| dataset = Dataset.from_dict({ | |
| "question": [entry["question"] for entry in self.finetuning_dataset], | |
| "answer": [entry["answer"] for entry in self.finetuning_dataset], | |
| }) | |
| dataset = dataset.map(self.tokenize_function, batched=True) | |
| train_dataset, test_dataset = dataset.train_test_split(test_size=0.2).values() | |
| return {"train": train_dataset, "test": test_dataset} | |
| except Exception as e: | |
| return f"β Failed to prepare data for training: {str(e)}" | |
| def compute_bleu(self, eval_pred): | |
| predictions, labels = eval_pred | |
| self.predictions = predictions | |
| self.labels = labels | |
| # Convert logits to token IDs using argmax | |
| predictions = np.argmax(predictions, axis=-1) | |
| # Ensure predictions and labels are integers within vocab range | |
| predictions = np.clip(predictions, 0, self.tokenizer.vocab_size - 1).astype(int) | |
| labels = np.clip(labels, 0, self.tokenizer.vocab_size - 1).astype(int) | |
| scores = [] | |
| for prediction, label in zip(predictions, labels): | |
| print(f"Prediction: {prediction}, Label: {label}") | |
| # Remove leading 0's from array | |
| prediction = prediction[np.argmax(prediction != 0):] | |
| label = label[np.argmax(label != 0):] | |
| # Decode predicted tokens | |
| decoded_preds = self.tokenizer.decode(prediction, skip_special_tokens=True).split() | |
| decoded_labels = self.tokenizer.decode(label, skip_special_tokens=True).split() | |
| scores.append(sentence_bleu([decoded_labels], decoded_preds, smoothing_function=SmoothingFunction().method1)) | |
| average_score = sum(scores) / len(scores) | |
| print(f"Average BLEU score: {average_score}") | |
| return {"bleu": average_score} | |
| # return score | |
| # return {"bleu": 1} | |
| def train_model(self): | |
| try: | |
| tokenized_datasets = self.prepare_data_for_training() | |
| print('finished preparing data for training') | |
| # Create training arguments | |
| training_args = TrainingArguments( | |
| output_dir=self.training_output, | |
| learning_rate=1e-3, | |
| per_device_train_batch_size=32, | |
| per_device_eval_batch_size=32, | |
| num_train_epochs=5, | |
| weight_decay=0.01, | |
| eval_strategy="epoch", | |
| save_strategy="epoch", | |
| load_best_model_at_end=True, | |
| ) | |
| print('training arguments set...') | |
| # Create trainer & attach logging callback | |
| trainer = Trainer( | |
| model=self.model, | |
| args=training_args, | |
| train_dataset=tokenized_datasets["train"], | |
| eval_dataset=tokenized_datasets["test"], | |
| tokenizer=self.tokenizer, | |
| data_collator=DefaultDataCollator(), | |
| compute_metrics=self.compute_bleu, | |
| callbacks=[self.logging_callback], | |
| ) | |
| print('trainer set...') | |
| # Start training and yield logs in real-time | |
| trainer.train() | |
| # Save trained model to HF | |
| self.model.save_pretrained(self.localpath) # save to local | |
| self.model.push_to_hub(f"{self.model_name}-lora") | |
| return f"β Training complete!\n {json.dumps(self.logging_callback.logs)}" | |
| except Exception as e: | |
| return f"β Training failed: {str(e)}" | |
| def run_inference(self, prompt): | |
| try: | |
| # Load fine-tuned memory into memory and set mode to eval | |
| self.fine_tuned_model = AutoPeftModelForCausalLM.from_pretrained(self.localpath) | |
| self.fine_tuned_model = self.fine_tuned_model.to(self.device) | |
| self.fine_tuned_model.eval() | |
| # Tokenize input with padding and attention mask | |
| inputs = self.tokenizer(prompt, return_tensors="pt", padding=True).to(self.device) | |
| # Generate response | |
| output = self.fine_tuned_model.generate( | |
| **inputs, | |
| max_length=50, # Limit response length | |
| num_return_sequences=1, # Single response | |
| temperature=0.7, # Sampling randomness | |
| top_p=0.9 # Nucleus sampling | |
| ) | |
| response = self.tokenizer.batch_decode(output.detach().cpu().numpy(), skip_special_tokens=True)[0] | |
| return response | |
| except Exception as e: | |
| return f"β Inference failed: {str(e)}" | |
| def build_ui(self): | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# LLM Fine-tuning") | |
| # Model Selection | |
| with gr.Group(): | |
| gr.Markdown("### 1. Login into Hugging Face") | |
| with gr.Column(): | |
| token = gr.Textbox(label="Enter Hugging Face Access Token (w/ write permissions)", type="password") | |
| inference_btn = gr.Button("Login", variant="primary") | |
| status = gr.Textbox(label="Status") | |
| inference_btn.click(self.login_into_hf, inputs=token, outputs=status) | |
| # Model Selection | |
| with gr.Group(): | |
| gr.Markdown("### 2. Select Model") | |
| with gr.Column(): | |
| model_dropdown = gr.Dropdown([key for key in self.base_models.keys()], label="Small Models") | |
| select_model_btn = gr.Button("Select", variant="primary") | |
| selected_model_text = gr.Textbox(label="Model Status") | |
| select_model_btn.click(self.select_model, inputs=model_dropdown, outputs=[selected_model_text]) | |
| # Create Golden Dataset | |
| with gr.Group(): | |
| gr.Markdown("### 3. Create Golden Dataset") | |
| with gr.Column(): | |
| dataset_table = gr.Dataframe( | |
| headers=["Question", "Answer"], | |
| value=[["", ""] for _ in range(3)], | |
| label="Golden Dataset" | |
| ) | |
| create_data_btn = gr.Button("Create Dataset", variant="primary") | |
| dataset_status = gr.Textbox(label="Dataset Status") | |
| create_data_btn.click(self.create_golden_dataset, inputs=dataset_table, outputs=[dataset_status]) | |
| # Generate Full Dataset | |
| with gr.Group(): | |
| gr.Markdown("### 4. Extend Dataset with Synthetic Data") | |
| with gr.Column(): | |
| dataset_table = gr.Dataframe( | |
| headers=["Question", "Answer"], | |
| label="Golden + Synthetic Dataset" | |
| ) | |
| generate_status = gr.Textbox(label="Dataset Generation Status") | |
| generate_data_btn = gr.Button("Extend Dataset", variant="primary") | |
| generate_data_btn.click(self.extend_dataset, outputs=[generate_status, dataset_table]) | |
| # Train Model & Visualize Loss | |
| with gr.Group(): | |
| gr.Markdown("### 5. Train Model") | |
| with gr.Column(): | |
| train_status = gr.Textbox(label="Training Status") | |
| train_btn = gr.Button("Train", variant="primary") | |
| train_btn.click(self.train_model, outputs=[train_status]) | |
| # Run Inference | |
| with gr.Group(): | |
| gr.Markdown("### 6. Run Inference") | |
| with gr.Column(): | |
| user_prompt = gr.Textbox(label="Enter Prompt") | |
| inference_btn = gr.Button("Run Inference", variant="primary") | |
| inference_output = gr.Textbox(label="Inference Output") | |
| inference_btn.click(self.run_inference, inputs=user_prompt, outputs=inference_output) | |
| return demo | |
| # Create an instance of the app | |
| app = LLMTrainingApp() | |
| # Launch the Gradio app using the class method | |
| app.build_ui().launch() | |