Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| from transformers import AutoModelForCausalLM, AutoTokenizer, TrainingArguments, Trainer, TrainerCallback, DataCollatorWithPadding, DefaultDataCollator | |
| from openai import OpenAI | |
| from huggingface_hub import login | |
| import datasets | |
| from datasets import Dataset | |
| import json | |
| import pandas as pd | |
| import torch | |
| import wandb | |
| import os | |
| import sys | |
| from peft import LoraConfig, TaskType, get_peft_model, AutoPeftModelForCausalLM | |
| from sklearn.model_selection import train_test_split | |
| IS_COLAB = False | |
| if "google.colab" in sys.modules or "google.colab" in os.environ: | |
| IS_COLAB = True | |
| # Load env secrets | |
| if IS_COLAB: | |
| from google.colab import userdata | |
| OPENAI_API_KEY=userdata.get('OPENAI_API_KEY') | |
| WANDB_API_KEY=userdata.get('WANDB_API_KEY') | |
| else: | |
| OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") | |
| WANDB_API_KEY = os.environ.get("WANDB_API_KEY") | |
| # Authenticate Weights and Biases | |
| wandb.login(key=WANDB_API_KEY) | |
| # Custom callback to capture logs | |
| class LoggingCallback(TrainerCallback): | |
| def __init__(self): | |
| self.logs = [] # Store logs | |
| def on_log(self, args, state, control, logs=None, **kwargs): | |
| if logs: | |
| self.logs.append(logs) # Append logs to list | |
| class LLMTrainingApp: | |
| def __init__(self): | |
| # self.metric = datasets.load_metric('sacrebleu') | |
| self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
| self.finetuning_dataset = [] | |
| self.prompt_template = """### Question: {question} ### Answer: """ | |
| self.training_output = "/content/peft-model" if IS_COLAB else "./peft-model" | |
| self.localpath = "/content/finetuned-model" if IS_COLAB else "./finetuned-model" | |
| self.tokenizer = None | |
| self.model = None | |
| self.model_name = None | |
| self.fine_tuned_model = None | |
| self.teacher_model = OpenAI(api_key=OPENAI_API_KEY) | |
| self.base_models = { | |
| "SmolLM": {"hf_name":"HuggingFaceTB/SmolLM2-135M", | |
| "model_size": "135M", | |
| "training_size": "2T", | |
| "context_window": "8192"}, | |
| "GPT2": {"hf_name":"openai-community/gpt2", | |
| "model_size": "137M", | |
| "training_size": "2T", | |
| "context_window": "1024"} | |
| } | |
| self.peft_config = LoraConfig(task_type=TaskType.CAUSAL_LM, inference_mode=False, r=8, lora_alpha=32, lora_dropout=0.1) | |
| self.logging_callback = LoggingCallback() | |
| def login_into_hf(self, token): | |
| if not token: | |
| return "β Please enter a valid token." | |
| try: | |
| login(token) | |
| return f"β Logged in successfully!" | |
| except Exception as e: | |
| return f"β Login failed: {str(e)}" | |
| def select_model(self, model_name): | |
| self.model_name = model_name | |
| model_hf_name = self.base_models[model_name]["hf_name"] | |
| try: | |
| self.tokenizer = AutoTokenizer.from_pretrained(model_hf_name) | |
| self.tokenizer.pad_token = self.tokenizer.eos_token | |
| base_model = AutoModelForCausalLM.from_pretrained( | |
| model_hf_name, | |
| torch_dtype="auto", | |
| device_map="auto" | |
| ) | |
| self.model = get_peft_model(base_model, self.peft_config) | |
| params = self.model.get_nb_trainable_parameters() | |
| percent_trainable = round(100 * (params[0] / params[1]), 2) | |
| return f"β Loaded model into memory! Base Model card: {json.dumps(self.base_models[model_name])} - % of trainable parameters for PEFT model: {percent_trainable}" | |
| except Exception as e: | |
| return f"β Failed to load model and/or tokenizer: {str(e)}" | |
| def create_golden_dataset(self, dataset): | |
| try: | |
| dataset = pd.DataFrame(dataset) | |
| for i, row in dataset.iterrows(): | |
| self.finetuning_dataset.append({"question": self.prompt_template.format(question=row["Question"]), "answer": row["Answer"]}) | |
| return "β Golden dataset created!" | |
| except Exception as e: | |
| return f"β Failed to create dataset: {str(e)}" | |
| def extend_dataset(self): | |
| try: | |
| completion = self.teacher_model.chat.completions.create( | |
| model="gpt-4o", | |
| messages=[ | |
| { | |
| "role": "user", | |
| "content": """Given the following question-answer pairs, generate 10 similar pairs in the following json format below. Do not respond with anything other than the json. | |
| ```json | |
| [ | |
| { | |
| "question": "question 1", | |
| "answer": "answer 1" | |
| }, | |
| { | |
| "question": "question 2", | |
| "answer": "answer 2" | |
| } | |
| ] | |
| """ | |
| } | |
| ] | |
| ) | |
| response = completion.choices[0].message.content | |
| print(f"raw response: {response}") | |
| clean_response = response.replace("```json", "").replace("```", "").strip() | |
| print(f"clean response: {clean_response}") | |
| new_data = json.loads(clean_response) | |
| for i, row in enumerate(new_data): | |
| self.finetuning_dataset.append({"question": self.prompt_template.format(question=row["question"]), "answer": row["answer"]}) | |
| # create df to display | |
| df = pd.DataFrame(new_data) | |
| return "β Synthetic dataset generated!", df | |
| except Exception as e: | |
| return f"β Failed to generate synthetic dataset: {str(e)}", pd.DataFrame() | |
| def tokenize_function(self, examples): | |
| try: | |
| # Tokenize the question and answer as input and target (labels) for causal LM | |
| encoding = self.tokenizer(examples['question'], examples['answer'], padding=True) | |
| # Set the labels as the input_ids | |
| encoding['labels'] = encoding['input_ids'].copy() | |
| return encoding | |
| except Exception as e: | |
| return f"β Failed to tokenize input: {str(e)}" | |
| def prepare_data_for_training(self): | |
| try: | |
| dataset = Dataset.from_dict({ | |
| "question": [entry["question"] for entry in self.finetuning_dataset], | |
| "answer": [entry["answer"] for entry in self.finetuning_dataset], | |
| }) | |
| dataset = dataset.map(self.tokenize_function, batched=True) | |
| train_dataset, test_dataset = dataset.train_test_split(test_size=0.2).values() | |
| return {"train": train_dataset, "test": test_dataset} | |
| except Exception as e: | |
| return f"β Failed to prepare data for training: {str(e)}" | |
| def compute_bleu(self, eval_pred): | |
| predictions, labels = eval_pred | |
| # # Flatten predictions and labels if they are in nested lists | |
| # predictions = predictions.flatten() | |
| # labels = labels.flatten() | |
| # # Ensure that predictions and labels are integers | |
| # predictions = predictions.astype(int) # Convert to integer | |
| # labels = labels.astype(int) # Convert to integer | |
| # # Decode the predicted tokens | |
| # decoded_preds = self.tokenizer.decode(predictions, skip_special_tokens=True) | |
| # decoded_labels = self.tokenizer.decode(labels, skip_special_tokens=True) | |
| # result = self.metric.compute(predictions=[decoded_preds], references=[[decoded_labels]]) | |
| result = {"bleu": 1} | |
| return result | |
| def log_generator(self): | |
| """ Continuously send logs to frontend during training """ | |
| for log in self.logging_callback.logs: | |
| yield str(log) | |
| def train_model(self): | |
| try: | |
| tokenized_datasets = self.prepare_data_for_training() | |
| # Create training arguments | |
| training_args = TrainingArguments( | |
| output_dir=self.training_output, | |
| learning_rate=1e-3, | |
| per_device_train_batch_size=32, | |
| per_device_eval_batch_size=32, | |
| num_train_epochs=2, | |
| weight_decay=0.01, | |
| eval_strategy="epoch", | |
| save_strategy="epoch", | |
| load_best_model_at_end=True, | |
| ) | |
| # Create trainer & attach logging callback | |
| trainer = Trainer( | |
| model=self.model, | |
| args=training_args, | |
| train_dataset=tokenized_datasets["train"], | |
| eval_dataset=tokenized_datasets["test"], | |
| tokenizer=self.tokenizer, | |
| data_collator=DefaultDataCollator(), | |
| compute_metrics=self.compute_bleu, | |
| callbacks=[self.logging_callback], | |
| ) | |
| # Start training and yield logs in real-time | |
| trainer.train() | |
| # for log in logging_callback.logs: | |
| # yield str(log) | |
| # Save trained model to HF | |
| self.model.save_pretrained(self.localpath) # save to local | |
| self.model.push_to_hub(f"{self.model_name}-lora") | |
| return "β Training complete!" | |
| except Exception as e: | |
| return f"β Training failed: {str(e)}" | |
| def run_inference(self, prompt): | |
| try: | |
| # Load fine-tuned memory into memory and set mode to eval | |
| self.fine_tuned_model = AutoPeftModelForCausalLM.from_pretrained(self.localpath) | |
| self.fine_tuned_model = self.fine_tuned_model.to(self.device) | |
| self.fine_tuned_model.eval() | |
| # Tokenize input with padding and attention mask | |
| inputs = self.tokenizer(prompt, return_tensors="pt", padding=True).to(self.device) | |
| # Generate response | |
| output = self.fine_tuned_model.generate( | |
| **inputs, | |
| max_length=50, # Limit response length | |
| num_return_sequences=1, # Single response | |
| temperature=0.7, # Sampling randomness | |
| top_p=0.9 # Nucleus sampling | |
| ) | |
| response = self.tokenizer.batch_decode(output.detach().cpu().numpy(), skip_special_tokens=True)[0] | |
| return response | |
| except Exception as e: | |
| return f"β Inference failed: {str(e)}" | |
| def build_ui(self): | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# LLM Fine-tuning") | |
| # Model Selection | |
| with gr.Group(): | |
| gr.Markdown("### 1. Login into Hugging Face") | |
| with gr.Column(): | |
| token = gr.Textbox(label="Enter Hugging Face Access Token (w/ write permissions)", type="password") | |
| inference_btn = gr.Button("Login", variant="primary") | |
| status = gr.Textbox(label="Status") | |
| inference_btn.click(self.login_into_hf, inputs=token, outputs=status) | |
| # Model Selection | |
| with gr.Group(): | |
| gr.Markdown("### 2. Select Model") | |
| with gr.Column(): | |
| model_dropdown = gr.Dropdown([key for key in self.base_models.keys()], label="Small Models") | |
| select_model_btn = gr.Button("Select", variant="primary") | |
| selected_model_text = gr.Textbox(label="Model Status") | |
| select_model_btn.click(self.select_model, inputs=model_dropdown, outputs=[selected_model_text]) | |
| # Create Golden Dataset | |
| with gr.Group(): | |
| gr.Markdown("### 3. Create Golden Dataset") | |
| with gr.Column(): | |
| dataset_table = gr.Dataframe( | |
| headers=["Question", "Answer"], | |
| value=[["", ""] for _ in range(3)], | |
| label="Golden Dataset" | |
| ) | |
| create_data_btn = gr.Button("Create Dataset", variant="primary") | |
| dataset_status = gr.Textbox(label="Dataset Status") | |
| create_data_btn.click(self.create_golden_dataset, inputs=dataset_table, outputs=[dataset_status]) | |
| # Generate Full Dataset | |
| with gr.Group(): | |
| gr.Markdown("### 4. Extend Dataset with Synthetic Data") | |
| with gr.Column(): | |
| dataset_table = gr.Dataframe( | |
| headers=["Question", "Answer"], | |
| label="Golden + Synthetic Dataset" | |
| ) | |
| generate_status = gr.Textbox(label="Dataset Generation Status") | |
| generate_data_btn = gr.Button("Generate Dataset", variant="primary") | |
| generate_data_btn.click(self.extend_dataset, outputs=[generate_status, dataset_table]) | |
| # Train Model & Visualize Loss | |
| with gr.Group(): | |
| gr.Markdown("### 5. Start Logging") | |
| with gr.Column(): | |
| train_status = gr.Textbox(label="Training Status", lines=10) | |
| train_btn = gr.Button("Train", variant="primary") | |
| train_btn.click(self.log_generator, outputs=[train_status]) | |
| # Train Model & Visualize Loss | |
| with gr.Group(): | |
| gr.Markdown("### 6. Train Model") | |
| with gr.Column(): | |
| train_status = gr.Textbox(label="Training Status") | |
| train_btn = gr.Button("Train", variant="primary") | |
| train_btn.click(self.train_model, outputs=[train_status]) | |
| # Run Inference | |
| with gr.Group(): | |
| gr.Markdown("### 7. Run Inference") | |
| with gr.Column(): | |
| user_prompt = gr.Textbox(label="Enter Prompt") | |
| inference_btn = gr.Button("Run Inference", variant="primary") | |
| inference_output = gr.Textbox(label="Inference Output") | |
| inference_btn.click(self.run_inference, inputs=user_prompt, outputs=inference_output) | |
| return demo | |
| # Create an instance of the app | |
| app = LLMTrainingApp() | |
| # Launch the Gradio app using the class method | |
| app.build_ui().launch() |