Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import numpy as np | |
| import random | |
| import torch | |
| import transformers | |
| from transformers import GPT2LMHeadModel, GPT2Tokenizer, Trainer, TrainingArguments, DataCollatorForLanguageModeling | |
| from datasets import Dataset | |
| import os | |
| # Set random seeds for reproducibility | |
| random.seed(42) | |
| np.random.seed(42) | |
| torch.manual_seed(42) | |
| def generate_demo_data(num_samples=60): | |
| # Generate meaningful sentences on various topics | |
| subjects = [ | |
| 'Artificial intelligence', 'Climate change', 'Renewable energy', | |
| 'Space exploration', 'Quantum computing', 'Genetic engineering', | |
| 'Blockchain technology', 'Virtual reality', 'Cybersecurity', | |
| 'Biotechnology', 'Nanotechnology', 'Astrophysics' | |
| ] | |
| verbs = [ | |
| 'is transforming', 'is influencing', 'is revolutionizing', | |
| 'is challenging', 'is advancing', 'is reshaping', 'is impacting', | |
| 'is enhancing', 'is disrupting', 'is redefining' | |
| ] | |
| objects = [ | |
| 'modern science', 'global economies', 'healthcare systems', | |
| 'communication methods', 'educational approaches', | |
| 'environmental policies', 'social interactions', 'the job market', | |
| 'data security', 'the entertainment industry' | |
| ] | |
| data = [] | |
| for i in range(num_samples): | |
| subject = random.choice(subjects) | |
| verb = random.choice(verbs) | |
| obj = random.choice(objects) | |
| sentence = f"{subject} {verb} {obj}." | |
| data.append(sentence) | |
| return data | |
| def load_data(uploaded_file): | |
| # Load user-uploaded text file | |
| data = uploaded_file.read().decode("utf-8") | |
| data = data.splitlines() | |
| return data | |
| def prepare_dataset(data, tokenizer, block_size=128): | |
| # Tokenize the texts | |
| def tokenize_function(examples): | |
| return tokenizer(examples['text'], truncation=True, max_length=block_size, padding='max_length') | |
| raw_dataset = Dataset.from_dict({'text': data}) | |
| tokenized_dataset = raw_dataset.map(tokenize_function, batched=True, remove_columns=['text']) | |
| # Create labels for language modeling | |
| tokenized_dataset = tokenized_dataset.map( | |
| lambda examples: {'labels': examples['input_ids']}, | |
| batched=True | |
| ) | |
| # Set the format for PyTorch | |
| tokenized_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'labels']) | |
| return tokenized_dataset | |
| def fitness_function(individual, train_dataset, model, tokenizer): | |
| # Define the training arguments | |
| training_args = TrainingArguments( | |
| output_dir='./results', | |
| overwrite_output_dir=True, | |
| num_train_epochs=individual['epochs'], | |
| per_device_train_batch_size=individual['batch_size'], | |
| learning_rate=individual['learning_rate'], | |
| logging_steps=10, | |
| save_steps=10, | |
| save_total_limit=2, | |
| report_to='none', # Disable logging to Wandb or other services | |
| ) | |
| data_collator = DataCollatorForLanguageModeling( | |
| tokenizer=tokenizer, mlm=False | |
| ) | |
| # Train the model | |
| trainer = Trainer( | |
| model=model, | |
| args=training_args, | |
| data_collator=data_collator, | |
| train_dataset=train_dataset, | |
| eval_dataset=None, | |
| ) | |
| trainer.train() | |
| # For simplicity, use final training loss as fitness score | |
| logs = [log for log in trainer.state.log_history if 'loss' in log] | |
| if logs: | |
| loss = logs[-1]['loss'] | |
| else: | |
| loss = float('inf') | |
| return loss | |
| # Genetic Algorithm Functions | |
| def create_population(size, param_bounds): | |
| population = [] | |
| for _ in range(size): | |
| individual = { | |
| 'learning_rate': random.uniform(*param_bounds['learning_rate']), | |
| 'epochs': random.randint(*param_bounds['epochs']), | |
| 'batch_size': random.choice(param_bounds['batch_size']), | |
| } | |
| population.append(individual) | |
| return population | |
| def select_mating_pool(population, fitnesses, num_parents): | |
| parents = [population[i] for i in np.argsort(fitnesses)[:num_parents]] | |
| return parents | |
| def crossover(parents, offspring_size): | |
| offspring = [] | |
| for _ in range(offspring_size): | |
| parent1 = random.choice(parents) | |
| parent2 = random.choice(parents) | |
| child = { | |
| 'learning_rate': random.choice([parent1['learning_rate'], parent2['learning_rate']]), | |
| 'epochs': random.choice([parent1['epochs'], parent2['epochs']]), | |
| 'batch_size': random.choice([parent1['batch_size'], parent2['batch_size']]), | |
| } | |
| offspring.append(child) | |
| return offspring | |
| def mutation(offspring, param_bounds, mutation_rate=0.1): | |
| for individual in offspring: | |
| if random.random() < mutation_rate: | |
| individual['learning_rate'] = random.uniform(*param_bounds['learning_rate']) | |
| if random.random() < mutation_rate: | |
| individual['epochs'] = random.randint(*param_bounds['epochs']) | |
| if random.random() < mutation_rate: | |
| individual['batch_size'] = random.choice(param_bounds['batch_size']) | |
| return offspring | |
| # Streamlit App | |
| def main(): | |
| st.title("GPT-2 Fine-Tuning with Genetic Algorithm") | |
| option = st.sidebar.selectbox( | |
| 'Choose Data Source', | |
| ('DEMO', 'Upload Text File') | |
| ) | |
| if option == 'DEMO': | |
| st.write("Using DEMO data...") | |
| data = generate_demo_data() | |
| else: | |
| st.write("Upload a text file for fine-tuning.") | |
| uploaded_file = st.file_uploader("Choose a text file", type="txt") | |
| if uploaded_file is not None: | |
| data = load_data(uploaded_file) | |
| else: | |
| st.warning("Please upload a text file.") | |
| st.stop() | |
| # Load tokenizer and model | |
| st.write("Loading GPT-2 tokenizer and model...") | |
| tokenizer = GPT2Tokenizer.from_pretrained('gpt2') | |
| model = GPT2LMHeadModel.from_pretrained('gpt2') | |
| model.to('cuda' if torch.cuda.is_available() else 'cpu') | |
| # Set the pad token | |
| tokenizer.pad_token = tokenizer.eos_token | |
| model.config.pad_token_id = model.config.eos_token_id | |
| # Prepare dataset | |
| st.write("Preparing dataset...") | |
| train_dataset = prepare_dataset(data, tokenizer) | |
| # GA Parameters | |
| st.sidebar.subheader("Genetic Algorithm Parameters") | |
| population_size = st.sidebar.number_input("Population Size", 4, 20, 6) | |
| num_generations = st.sidebar.number_input("Number of Generations", 1, 10, 3) | |
| num_parents = st.sidebar.number_input("Number of Parents", 2, population_size, 2) | |
| mutation_rate = st.sidebar.slider("Mutation Rate", 0.0, 1.0, 0.1) | |
| # Hyperparameter bounds | |
| param_bounds = { | |
| 'learning_rate': (1e-5, 5e-5), | |
| 'epochs': (1, 3), | |
| 'batch_size': [2, 4, 8] | |
| } | |
| if st.button("Start Training"): | |
| st.write("Initializing Genetic Algorithm...") | |
| population = create_population(population_size, param_bounds) | |
| best_individual = None | |
| best_fitness = float('inf') | |
| fitness_history = [] | |
| progress_bar = st.progress(0) | |
| status_text = st.empty() | |
| total_evaluations = num_generations * len(population) | |
| current_evaluation = 0 | |
| for generation in range(num_generations): | |
| st.write(f"Generation {generation+1}/{num_generations}") | |
| fitnesses = [] | |
| for idx, individual in enumerate(population): | |
| status_text.text(f"Evaluating individual {idx+1}/{len(population)} in generation {generation+1}") | |
| # Clone the model to avoid reusing the same model | |
| model_clone = GPT2LMHeadModel.from_pretrained('gpt2') | |
| model_clone.to('cuda' if torch.cuda.is_available() else 'cpu') | |
| fitness = fitness_function(individual, train_dataset, model_clone, tokenizer) | |
| fitnesses.append(fitness) | |
| if fitness < best_fitness: | |
| best_fitness = fitness | |
| best_individual = individual | |
| current_evaluation += 1 | |
| progress_bar.progress(current_evaluation / total_evaluations) | |
| fitness_history.append(min(fitnesses)) | |
| parents = select_mating_pool(population, fitnesses, num_parents) | |
| offspring_size = population_size - num_parents | |
| offspring = crossover(parents, offspring_size) | |
| offspring = mutation(offspring, param_bounds, mutation_rate) | |
| population = parents + offspring | |
| st.write("Training completed!") | |
| st.write(f"Best Hyperparameters: {best_individual}") | |
| st.write(f"Best Fitness (Loss): {best_fitness}") | |
| # Plot fitness history | |
| st.line_chart(fitness_history) | |
| # Save the best model | |
| if st.button("Save Model"): | |
| model_clone.save_pretrained('./fine_tuned_model') | |
| tokenizer.save_pretrained('./fine_tuned_model') | |
| st.write("Model saved successfully!") | |
| if __name__ == "__main__": | |
| main() | |