Spaces:
Sleeping
Sleeping
| import random | |
| import numpy as np | |
| import streamlit as st | |
| from concurrent.futures import ThreadPoolExecutor | |
| from functools import lru_cache | |
| # import all functions from src.backend.chatbot | |
| from src.backend.chatbot import * | |
| def genetic_algorithm_plants(model, demo_lite): | |
| # Define the compatibility matrix | |
| compatibility_matrix = st.session_state.full_mat | |
| # Define the list of plants | |
| plant_list = st.session_state.plant_list | |
| # Define the user-selected plants, number of plant beds, and constraints | |
| user_plants = st.session_state.input_plants_raw | |
| num_plant_beds = st.session_state.n_plant_beds | |
| # 1 <= min_species_per_bed <= max_species_per_bed <= len(user_plants) | |
| min_species_per_bed = st.session_state.min_species | |
| # max_species_per_bed >= floor(length(user_plants)-(min_species_per_bed*num_plant_beds-1) & max_species_per_bed <= len(user_plants) | |
| max_species_per_bed = st.session_state.max_species | |
| # Genetic Algorithm parameters | |
| population_size = st.session_state.population_size | |
| num_generations = st.session_state.num_generations | |
| tournament_size = st.session_state.tournament_size | |
| crossover_rate = st.session_state.crossover_rate | |
| mutation_rate = st.session_state.mutation_rate | |
| seed_population_rate = st.session_state.seed_population_rate | |
| # OPTIMIZATION: Create plant name to index mapping for O(1) lookups | |
| plant_to_index = {plant: idx for idx, plant in enumerate(plant_list)} | |
| # OPTIMIZATION: Convert compatibility matrix to numpy array ONCE (not in loop) | |
| compat_array = np.array(compatibility_matrix) | |
| # OPTIMIZATION: Fitness cache to avoid recalculating fitness for the same grouping | |
| fitness_cache = {} | |
| def generate_initial_population(model, demo_lite): | |
| population = [] | |
| # OPTIMIZATION: Only call LLM if seed_population_rate > 0 and not demo_lite | |
| # This was a MAJOR bottleneck - LLM initialization and inference is very slow | |
| num_seeds = int(population_size * st.session_state.seed_population_rate) | |
| if num_seeds > 0 and not demo_lite and model is not None: | |
| # we generate just one seed grouping for this beta language model suggestion feature | |
| seed_grouping = get_language_model_suggestions(model, demo_lite) | |
| if seed_grouping != "no response yet": | |
| valid_seed_grouping = validate_and_replace(seed_grouping) | |
| population.append(valid_seed_grouping) | |
| print(f" Added 1 LLM-generated seed to population") | |
| # Fill the rest of the population with random groupings | |
| # IMPORTANT: Validate all initial groupings for best solution quality | |
| # A good initial population is critical for finding optimal solutions | |
| while len(population) < population_size: | |
| random_grouping = generate_random_grouping() | |
| # Always validate to ensure we start with high-quality individuals | |
| valid_grouping = validate_and_replace(random_grouping) | |
| population.append(valid_grouping) | |
| return population | |
| def generate_random_grouping(): | |
| random.shuffle(user_plants) | |
| remaining_plants = user_plants.copy() | |
| grouping = [] | |
| total_plants = len(user_plants) | |
| plants_per_bed = total_plants // num_plant_beds | |
| extra_plants = total_plants % num_plant_beds | |
| for bed_index in range(num_plant_beds): | |
| if bed_index < extra_plants: | |
| # Distribute extra plants among the first few beds | |
| num_species_in_bed = plants_per_bed + 1 | |
| else: | |
| num_species_in_bed = plants_per_bed | |
| # Ensure the bed size is within the min and max constraints | |
| num_species_in_bed = max( | |
| min_species_per_bed, min(num_species_in_bed, max_species_per_bed) | |
| ) | |
| bed = remaining_plants[:num_species_in_bed] | |
| remaining_plants = remaining_plants[num_species_in_bed:] | |
| grouping.append(bed) | |
| return grouping | |
| # Perform crossover between two parents, preserving at least one occurrence of each plant | |
| def crossover(parent1, parent2): | |
| if random.random() < crossover_rate: | |
| crossover_point = random.randint(1, num_plant_beds - 1) | |
| child1 = parent1[:crossover_point] + parent2[crossover_point:] | |
| child2 = parent2[:crossover_point] + parent1[crossover_point:] | |
| # Ensure each plant appears at least once in the offspring | |
| for plant in user_plants: | |
| if all(plant not in bed for bed in child1): | |
| # Find a bed with fewer species and add the missing plant | |
| min_bed_index = min( | |
| range(len(child1)), key=lambda i: len(child1[i]) | |
| ) | |
| child1[min_bed_index].append(plant) | |
| if all(plant not in bed for bed in child2): | |
| # Find a bed with fewer species and add the missing plant | |
| min_bed_index = min( | |
| range(len(child2)), key=lambda i: len(child2[i]) | |
| ) | |
| child2[min_bed_index].append(plant) | |
| return child1, child2 | |
| else: | |
| return parent1, parent2 | |
| # Perform mutation on an individual, ensuring no bed exceeds the maximum species constraint | |
| def mutate(individual): | |
| if random.random() < mutation_rate: | |
| mutated_bed = random.randint(0, num_plant_beds - 1) | |
| species_in_bed = individual[mutated_bed] | |
| # Remove excess species if there are more than the maximum constraint | |
| if len(species_in_bed) > max_species_per_bed: | |
| species_in_bed = random.sample(species_in_bed, max_species_per_bed) | |
| # Add missing plants by performing swaps between current species and missing plants | |
| missing_plants = [ | |
| plant for plant in user_plants if plant not in species_in_bed | |
| ] | |
| num_missing_plants = min( | |
| len(missing_plants), max_species_per_bed - len(species_in_bed) | |
| ) | |
| for _ in range(num_missing_plants): | |
| swap_species = random.choice(missing_plants) | |
| missing_plants.remove(swap_species) | |
| species_in_bed.append(swap_species) | |
| species_in_bed.remove(random.choice(species_in_bed)) | |
| individual[mutated_bed] = species_in_bed | |
| return individual | |
| # calculate the fitness score of the grouping | |
| def calculate_fitness(grouping): | |
| # OPTIMIZATION: Create a hashable key for caching | |
| grouping_key = tuple(tuple(sorted(bed)) for bed in grouping) | |
| if grouping_key in fitness_cache: | |
| return fitness_cache[grouping_key] | |
| positive_reward_factor = 1000 | |
| negative_penalty_factor = 2000 | |
| # define penalties for not meeting constraints | |
| penalty_for_exceeding_max = 500 | |
| penalty_for_not_meeting_min = 500 | |
| penalty_for_not_having_all_plants = 1000 | |
| score = 0 | |
| # VECTORIZED FITNESS CALCULATION - Much faster with numpy | |
| for bed in grouping: | |
| if len(bed) < 2: | |
| continue | |
| # Convert plant names to indices in bulk | |
| bed_indices = np.array([plant_to_index[plant] for plant in bed]) | |
| # Get all pairwise compatibility scores using numpy advanced indexing | |
| # This avoids nested loops and is 10-100x faster | |
| n = len(bed_indices) | |
| i_indices, j_indices = np.triu_indices(n, k=1) | |
| # Vectorized compatibility score extraction (using pre-converted array) | |
| compat_scores = compat_array[bed_indices[i_indices], bed_indices[j_indices]] | |
| # Vectorized reward/penalty calculation | |
| positive_scores = compat_scores[compat_scores > 0].sum() * positive_reward_factor | |
| negative_scores = compat_scores[compat_scores < 0].sum() * negative_penalty_factor | |
| score += positive_scores + negative_scores | |
| # apply penalties for not meeting constraints (vectorized) | |
| bed_sizes = np.array([len(bed) for bed in grouping]) | |
| score -= np.sum(bed_sizes > max_species_per_bed) * penalty_for_exceeding_max | |
| score -= np.sum(bed_sizes < min_species_per_bed) * penalty_for_not_meeting_min | |
| if len(set(plant for bed in grouping for plant in bed)) < len(user_plants): | |
| score -= penalty_for_not_having_all_plants | |
| # OPTIMIZATION: Cache the result | |
| fitness_cache[grouping_key] = score | |
| return score | |
| # Perform tournament selection | |
| def tournament_selection(population, population_fitness): | |
| # OPTIMIZATION: Use pre-calculated fitness scores | |
| selected = [] | |
| for _ in range(population_size): | |
| participants_idx = random.sample(range(len(population)), tournament_size) | |
| winner_idx = max(participants_idx, key=lambda idx: population_fitness[idx]) | |
| selected.append(population[winner_idx]) | |
| return selected | |
| # OPTIMIZATION: Parallel fitness calculation for speed | |
| def calculate_fitness_parallel(individuals): | |
| """Calculate fitness for multiple individuals in parallel""" | |
| if len(individuals) <= 10: | |
| # For small populations, parallel overhead isn't worth it | |
| return [calculate_fitness(ind) for ind in individuals] | |
| # Use ThreadPoolExecutor for parallel computation | |
| with ThreadPoolExecutor(max_workers=4) as executor: | |
| return list(executor.map(calculate_fitness, individuals)) | |
| # Perform replacement of the population with the offspring, ensuring maximum species constraint is met | |
| def replacement(population, offspring, population_fitness): | |
| # OPTIMIZATION: Use pre-calculated fitness and avoid re-sorting | |
| # Calculate fitness for offspring in parallel | |
| offspring_fitness = calculate_fitness_parallel(offspring) | |
| # Adjust the offspring to meet the maximum species constraint | |
| adjusted_offspring = [] | |
| adjusted_fitness = [] | |
| for idx, individual in enumerate(offspring): | |
| for bed_idx in range(num_plant_beds): | |
| species_in_bed = individual[bed_idx] | |
| if len(species_in_bed) > max_species_per_bed: | |
| species_in_bed = random.sample(species_in_bed, max_species_per_bed) | |
| individual[bed_idx] = species_in_bed | |
| adjusted_offspring.append(individual) | |
| adjusted_fitness.append(offspring_fitness[idx]) | |
| # Combine population and offspring with their fitness scores | |
| combined = list(zip(population + adjusted_offspring, population_fitness + adjusted_fitness)) | |
| # Sort by fitness and take top population_size individuals | |
| combined.sort(key=lambda x: x[1], reverse=True) | |
| new_population = [ind for ind, _ in combined[:population_size]] | |
| new_fitness = [fit for _, fit in combined[:population_size]] | |
| return new_population, new_fitness | |
| # Genetic Algorithm main function | |
| def genetic_algorithm(model, demo_lite): | |
| population = generate_initial_population(model, demo_lite) | |
| # OPTIMIZATION: Calculate fitness once for initial population (in parallel) | |
| population_fitness = calculate_fitness_parallel(population) | |
| for generation in range(num_generations): | |
| print(f"Generation {generation + 1}") | |
| selected_population = tournament_selection(population, population_fitness) | |
| offspring = [] | |
| for _ in range(population_size // 2): | |
| parent1 = random.choice(selected_population) | |
| parent2 = random.choice(selected_population) | |
| child1, child2 = crossover(parent1, parent2) | |
| child1 = mutate(child1) | |
| child2 = mutate(child2) | |
| offspring.extend([child1, child2]) | |
| # OPTIMIZATION: Pass fitness and get updated fitness back | |
| population, population_fitness = replacement(population, offspring, population_fitness) | |
| # OPTIMIZATION: Validate periodically to ensure quality (every 10 generations) | |
| # Too frequent = slow, too rare = poor quality. Every 10 is a good balance. | |
| if generation % 10 == 0 or generation == num_generations - 1: | |
| # Only validate if needed - most individuals are valid | |
| validated_count = 0 | |
| invalid_indices = [] | |
| # Quick check which individuals need validation | |
| for i in range(len(population)): | |
| plants_in_grouping = set(plant for bed in population[i] for plant in bed) | |
| if len(plants_in_grouping) != len(user_plants): | |
| invalid_indices.append(i) | |
| # Parallel validation for invalid individuals | |
| if invalid_indices: | |
| invalid_individuals = [population[i] for i in invalid_indices] | |
| validated_individuals = [validate_and_replace(ind) for ind in invalid_individuals] | |
| # Update population and recalculate fitness | |
| for idx, validated_ind in zip(invalid_indices, validated_individuals): | |
| population[idx] = validated_ind | |
| population_fitness[idx] = calculate_fitness(validated_ind) | |
| validated_count = len(invalid_indices) | |
| if validated_count > 0: | |
| print(f" Validated {validated_count} individuals") | |
| # Find best solution | |
| best_idx = max(range(len(population)), key=lambda i: population_fitness[i]) | |
| best_grouping = population[best_idx] | |
| # Final validation of best solution only | |
| best_grouping = validate_and_replace(best_grouping) | |
| best_fitness = calculate_fitness(best_grouping) | |
| print(f"Best Grouping: {best_grouping}") | |
| print(f"Fitness Score: {best_fitness}") | |
| st.session_state.best_grouping = best_grouping | |
| st.session_state.best_fitness = best_fitness | |
| # st.write(f"Best Grouping: {best_grouping}") | |
| # st.write(f"Fitness Score: {best_fitness}") | |
| return best_grouping | |
| # def validate_and_replace(grouping): | |
| # print("Grouping structure before validation:", grouping) | |
| # all_plants = set(user_plants) | |
| # for bed in grouping: | |
| # all_plants -= set(bed) | |
| # # Replace missing plants | |
| # for missing_plant in all_plants: | |
| # replaced = False | |
| # for bed in grouping: | |
| # if len(set(bed)) != len(bed): # Check for duplicates | |
| # for i, plant in enumerate(bed): | |
| # if bed.count(plant) > 1: # Found a duplicate | |
| # bed[i] = missing_plant | |
| # replaced = True | |
| # break | |
| # if replaced: | |
| # break | |
| # # If no duplicates were found, replace a random plant | |
| # if not replaced: | |
| # random_bed = random.choice(grouping) | |
| # random_bed[random.randint(0, len(random_bed) - 1)] = missing_plant | |
| # return grouping | |
| ############ | |
| ############ experimental | |
| def adjust_grouping(grouping): | |
| # Determine the plants that are missing in the grouping | |
| plants_in_grouping = set(plant for bed in grouping for plant in bed) | |
| missing_plants = set(user_plants) - plants_in_grouping | |
| for missing_plant in missing_plants: | |
| # Find a bed that can accommodate the missing plant without exceeding max_species_per_bed | |
| suitable_bed = next( | |
| (bed for bed in grouping if len(bed) < max_species_per_bed), None | |
| ) | |
| if suitable_bed is not None: | |
| suitable_bed.append(missing_plant) | |
| else: | |
| # If no suitable bed is found, replace a random plant in a random bed | |
| random_bed = random.choice(grouping) | |
| random_bed[random.randint(0, len(random_bed) - 1)] = missing_plant | |
| # Ensure min_species_per_bed and max_species_per_bed constraints | |
| for bed in grouping: | |
| while len(bed) < min_species_per_bed: | |
| additional_plant = random.choice( | |
| [plant for plant in user_plants if plant not in bed] | |
| ) | |
| bed.append(additional_plant) | |
| while len(bed) > max_species_per_bed: | |
| bed.remove(random.choice(bed)) | |
| return grouping | |
| def validate_and_replace(grouping): | |
| # Try multiple configurations to find the best valid grouping | |
| # This is important for solution quality - don't skip this! | |
| best_grouping = None | |
| best_fitness = float("-inf") | |
| # Try 3 different configurations (balanced between speed and quality) | |
| for _ in range(3): | |
| temp_grouping = [bed.copy() for bed in grouping] | |
| temp_grouping = adjust_grouping(temp_grouping) | |
| current_fitness = calculate_fitness(temp_grouping) | |
| if current_fitness > best_fitness: | |
| best_fitness = current_fitness | |
| best_grouping = temp_grouping | |
| return best_grouping | |
| ############ | |
| def get_language_model_suggestions(model, demo_lite): | |
| # This returns a list of seed groupings based on the compatibility matrix | |
| st.session_state.seed_groupings = get_seed_groupings_from_LLM(model, demo_lite) | |
| return st.session_state.seed_groupings | |
| # Run the genetic algorithm | |
| best_grouping = genetic_algorithm(model, demo_lite) | |
| return best_grouping | |