import random import numpy as np import streamlit as st from concurrent.futures import ThreadPoolExecutor from functools import lru_cache # import all functions from src.backend.chatbot from src.backend.chatbot import * def genetic_algorithm_plants(model, demo_lite): # Define the compatibility matrix compatibility_matrix = st.session_state.full_mat # Define the list of plants plant_list = st.session_state.plant_list # Define the user-selected plants, number of plant beds, and constraints user_plants = st.session_state.input_plants_raw num_plant_beds = st.session_state.n_plant_beds # 1 <= min_species_per_bed <= max_species_per_bed <= len(user_plants) min_species_per_bed = st.session_state.min_species # max_species_per_bed >= floor(length(user_plants)-(min_species_per_bed*num_plant_beds-1) & max_species_per_bed <= len(user_plants) max_species_per_bed = st.session_state.max_species # Genetic Algorithm parameters population_size = st.session_state.population_size num_generations = st.session_state.num_generations tournament_size = st.session_state.tournament_size crossover_rate = st.session_state.crossover_rate mutation_rate = st.session_state.mutation_rate seed_population_rate = st.session_state.seed_population_rate # OPTIMIZATION: Create plant name to index mapping for O(1) lookups plant_to_index = {plant: idx for idx, plant in enumerate(plant_list)} # OPTIMIZATION: Convert compatibility matrix to numpy array ONCE (not in loop) compat_array = np.array(compatibility_matrix) # OPTIMIZATION: Fitness cache to avoid recalculating fitness for the same grouping fitness_cache = {} def generate_initial_population(model, demo_lite): population = [] # OPTIMIZATION: Only call LLM if seed_population_rate > 0 and not demo_lite # This was a MAJOR bottleneck - LLM initialization and inference is very slow num_seeds = int(population_size * st.session_state.seed_population_rate) if num_seeds > 0 and not demo_lite and model is not None: # we generate just one seed grouping for this beta language model suggestion feature seed_grouping = get_language_model_suggestions(model, demo_lite) if seed_grouping != "no response yet": valid_seed_grouping = validate_and_replace(seed_grouping) population.append(valid_seed_grouping) print(f" Added 1 LLM-generated seed to population") # Fill the rest of the population with random groupings # IMPORTANT: Validate all initial groupings for best solution quality # A good initial population is critical for finding optimal solutions while len(population) < population_size: random_grouping = generate_random_grouping() # Always validate to ensure we start with high-quality individuals valid_grouping = validate_and_replace(random_grouping) population.append(valid_grouping) return population def generate_random_grouping(): random.shuffle(user_plants) remaining_plants = user_plants.copy() grouping = [] total_plants = len(user_plants) plants_per_bed = total_plants // num_plant_beds extra_plants = total_plants % num_plant_beds for bed_index in range(num_plant_beds): if bed_index < extra_plants: # Distribute extra plants among the first few beds num_species_in_bed = plants_per_bed + 1 else: num_species_in_bed = plants_per_bed # Ensure the bed size is within the min and max constraints num_species_in_bed = max( min_species_per_bed, min(num_species_in_bed, max_species_per_bed) ) bed = remaining_plants[:num_species_in_bed] remaining_plants = remaining_plants[num_species_in_bed:] grouping.append(bed) return grouping # Perform crossover between two parents, preserving at least one occurrence of each plant def crossover(parent1, parent2): if random.random() < crossover_rate: crossover_point = random.randint(1, num_plant_beds - 1) child1 = parent1[:crossover_point] + parent2[crossover_point:] child2 = parent2[:crossover_point] + parent1[crossover_point:] # Ensure each plant appears at least once in the offspring for plant in user_plants: if all(plant not in bed for bed in child1): # Find a bed with fewer species and add the missing plant min_bed_index = min( range(len(child1)), key=lambda i: len(child1[i]) ) child1[min_bed_index].append(plant) if all(plant not in bed for bed in child2): # Find a bed with fewer species and add the missing plant min_bed_index = min( range(len(child2)), key=lambda i: len(child2[i]) ) child2[min_bed_index].append(plant) return child1, child2 else: return parent1, parent2 # Perform mutation on an individual, ensuring no bed exceeds the maximum species constraint def mutate(individual): if random.random() < mutation_rate: mutated_bed = random.randint(0, num_plant_beds - 1) species_in_bed = individual[mutated_bed] # Remove excess species if there are more than the maximum constraint if len(species_in_bed) > max_species_per_bed: species_in_bed = random.sample(species_in_bed, max_species_per_bed) # Add missing plants by performing swaps between current species and missing plants missing_plants = [ plant for plant in user_plants if plant not in species_in_bed ] num_missing_plants = min( len(missing_plants), max_species_per_bed - len(species_in_bed) ) for _ in range(num_missing_plants): swap_species = random.choice(missing_plants) missing_plants.remove(swap_species) species_in_bed.append(swap_species) species_in_bed.remove(random.choice(species_in_bed)) individual[mutated_bed] = species_in_bed return individual # calculate the fitness score of the grouping def calculate_fitness(grouping): # OPTIMIZATION: Create a hashable key for caching grouping_key = tuple(tuple(sorted(bed)) for bed in grouping) if grouping_key in fitness_cache: return fitness_cache[grouping_key] positive_reward_factor = 1000 negative_penalty_factor = 2000 # define penalties for not meeting constraints penalty_for_exceeding_max = 500 penalty_for_not_meeting_min = 500 penalty_for_not_having_all_plants = 1000 score = 0 # VECTORIZED FITNESS CALCULATION - Much faster with numpy for bed in grouping: if len(bed) < 2: continue # Convert plant names to indices in bulk bed_indices = np.array([plant_to_index[plant] for plant in bed]) # Get all pairwise compatibility scores using numpy advanced indexing # This avoids nested loops and is 10-100x faster n = len(bed_indices) i_indices, j_indices = np.triu_indices(n, k=1) # Vectorized compatibility score extraction (using pre-converted array) compat_scores = compat_array[bed_indices[i_indices], bed_indices[j_indices]] # Vectorized reward/penalty calculation positive_scores = compat_scores[compat_scores > 0].sum() * positive_reward_factor negative_scores = compat_scores[compat_scores < 0].sum() * negative_penalty_factor score += positive_scores + negative_scores # apply penalties for not meeting constraints (vectorized) bed_sizes = np.array([len(bed) for bed in grouping]) score -= np.sum(bed_sizes > max_species_per_bed) * penalty_for_exceeding_max score -= np.sum(bed_sizes < min_species_per_bed) * penalty_for_not_meeting_min if len(set(plant for bed in grouping for plant in bed)) < len(user_plants): score -= penalty_for_not_having_all_plants # OPTIMIZATION: Cache the result fitness_cache[grouping_key] = score return score # Perform tournament selection def tournament_selection(population, population_fitness): # OPTIMIZATION: Use pre-calculated fitness scores selected = [] for _ in range(population_size): participants_idx = random.sample(range(len(population)), tournament_size) winner_idx = max(participants_idx, key=lambda idx: population_fitness[idx]) selected.append(population[winner_idx]) return selected # OPTIMIZATION: Parallel fitness calculation for speed def calculate_fitness_parallel(individuals): """Calculate fitness for multiple individuals in parallel""" if len(individuals) <= 10: # For small populations, parallel overhead isn't worth it return [calculate_fitness(ind) for ind in individuals] # Use ThreadPoolExecutor for parallel computation with ThreadPoolExecutor(max_workers=4) as executor: return list(executor.map(calculate_fitness, individuals)) # Perform replacement of the population with the offspring, ensuring maximum species constraint is met def replacement(population, offspring, population_fitness): # OPTIMIZATION: Use pre-calculated fitness and avoid re-sorting # Calculate fitness for offspring in parallel offspring_fitness = calculate_fitness_parallel(offspring) # Adjust the offspring to meet the maximum species constraint adjusted_offspring = [] adjusted_fitness = [] for idx, individual in enumerate(offspring): for bed_idx in range(num_plant_beds): species_in_bed = individual[bed_idx] if len(species_in_bed) > max_species_per_bed: species_in_bed = random.sample(species_in_bed, max_species_per_bed) individual[bed_idx] = species_in_bed adjusted_offspring.append(individual) adjusted_fitness.append(offspring_fitness[idx]) # Combine population and offspring with their fitness scores combined = list(zip(population + adjusted_offspring, population_fitness + adjusted_fitness)) # Sort by fitness and take top population_size individuals combined.sort(key=lambda x: x[1], reverse=True) new_population = [ind for ind, _ in combined[:population_size]] new_fitness = [fit for _, fit in combined[:population_size]] return new_population, new_fitness # Genetic Algorithm main function def genetic_algorithm(model, demo_lite): population = generate_initial_population(model, demo_lite) # OPTIMIZATION: Calculate fitness once for initial population (in parallel) population_fitness = calculate_fitness_parallel(population) for generation in range(num_generations): print(f"Generation {generation + 1}") selected_population = tournament_selection(population, population_fitness) offspring = [] for _ in range(population_size // 2): parent1 = random.choice(selected_population) parent2 = random.choice(selected_population) child1, child2 = crossover(parent1, parent2) child1 = mutate(child1) child2 = mutate(child2) offspring.extend([child1, child2]) # OPTIMIZATION: Pass fitness and get updated fitness back population, population_fitness = replacement(population, offspring, population_fitness) # OPTIMIZATION: Validate periodically to ensure quality (every 10 generations) # Too frequent = slow, too rare = poor quality. Every 10 is a good balance. if generation % 10 == 0 or generation == num_generations - 1: # Only validate if needed - most individuals are valid validated_count = 0 invalid_indices = [] # Quick check which individuals need validation for i in range(len(population)): plants_in_grouping = set(plant for bed in population[i] for plant in bed) if len(plants_in_grouping) != len(user_plants): invalid_indices.append(i) # Parallel validation for invalid individuals if invalid_indices: invalid_individuals = [population[i] for i in invalid_indices] validated_individuals = [validate_and_replace(ind) for ind in invalid_individuals] # Update population and recalculate fitness for idx, validated_ind in zip(invalid_indices, validated_individuals): population[idx] = validated_ind population_fitness[idx] = calculate_fitness(validated_ind) validated_count = len(invalid_indices) if validated_count > 0: print(f" Validated {validated_count} individuals") # Find best solution best_idx = max(range(len(population)), key=lambda i: population_fitness[i]) best_grouping = population[best_idx] # Final validation of best solution only best_grouping = validate_and_replace(best_grouping) best_fitness = calculate_fitness(best_grouping) print(f"Best Grouping: {best_grouping}") print(f"Fitness Score: {best_fitness}") st.session_state.best_grouping = best_grouping st.session_state.best_fitness = best_fitness # st.write(f"Best Grouping: {best_grouping}") # st.write(f"Fitness Score: {best_fitness}") return best_grouping # def validate_and_replace(grouping): # print("Grouping structure before validation:", grouping) # all_plants = set(user_plants) # for bed in grouping: # all_plants -= set(bed) # # Replace missing plants # for missing_plant in all_plants: # replaced = False # for bed in grouping: # if len(set(bed)) != len(bed): # Check for duplicates # for i, plant in enumerate(bed): # if bed.count(plant) > 1: # Found a duplicate # bed[i] = missing_plant # replaced = True # break # if replaced: # break # # If no duplicates were found, replace a random plant # if not replaced: # random_bed = random.choice(grouping) # random_bed[random.randint(0, len(random_bed) - 1)] = missing_plant # return grouping ############ ############ experimental def adjust_grouping(grouping): # Determine the plants that are missing in the grouping plants_in_grouping = set(plant for bed in grouping for plant in bed) missing_plants = set(user_plants) - plants_in_grouping for missing_plant in missing_plants: # Find a bed that can accommodate the missing plant without exceeding max_species_per_bed suitable_bed = next( (bed for bed in grouping if len(bed) < max_species_per_bed), None ) if suitable_bed is not None: suitable_bed.append(missing_plant) else: # If no suitable bed is found, replace a random plant in a random bed random_bed = random.choice(grouping) random_bed[random.randint(0, len(random_bed) - 1)] = missing_plant # Ensure min_species_per_bed and max_species_per_bed constraints for bed in grouping: while len(bed) < min_species_per_bed: additional_plant = random.choice( [plant for plant in user_plants if plant not in bed] ) bed.append(additional_plant) while len(bed) > max_species_per_bed: bed.remove(random.choice(bed)) return grouping def validate_and_replace(grouping): # Try multiple configurations to find the best valid grouping # This is important for solution quality - don't skip this! best_grouping = None best_fitness = float("-inf") # Try 3 different configurations (balanced between speed and quality) for _ in range(3): temp_grouping = [bed.copy() for bed in grouping] temp_grouping = adjust_grouping(temp_grouping) current_fitness = calculate_fitness(temp_grouping) if current_fitness > best_fitness: best_fitness = current_fitness best_grouping = temp_grouping return best_grouping ############ def get_language_model_suggestions(model, demo_lite): # This returns a list of seed groupings based on the compatibility matrix st.session_state.seed_groupings = get_seed_groupings_from_LLM(model, demo_lite) return st.session_state.seed_groupings # Run the genetic algorithm best_grouping = genetic_algorithm(model, demo_lite) return best_grouping