GRDN.AI.3 / src /backend /optimization_algo.py
danidanidani's picture
fix: Color scheme (teal+magenta), LLM prompt, algorithm quality
fa0da9f
import random
import numpy as np
import streamlit as st
from concurrent.futures import ThreadPoolExecutor
from functools import lru_cache
# import all functions from src.backend.chatbot
from src.backend.chatbot import *
def genetic_algorithm_plants(model, demo_lite):
# Define the compatibility matrix
compatibility_matrix = st.session_state.full_mat
# Define the list of plants
plant_list = st.session_state.plant_list
# Define the user-selected plants, number of plant beds, and constraints
user_plants = st.session_state.input_plants_raw
num_plant_beds = st.session_state.n_plant_beds
# 1 <= min_species_per_bed <= max_species_per_bed <= len(user_plants)
min_species_per_bed = st.session_state.min_species
# max_species_per_bed >= floor(length(user_plants)-(min_species_per_bed*num_plant_beds-1) & max_species_per_bed <= len(user_plants)
max_species_per_bed = st.session_state.max_species
# Genetic Algorithm parameters
population_size = st.session_state.population_size
num_generations = st.session_state.num_generations
tournament_size = st.session_state.tournament_size
crossover_rate = st.session_state.crossover_rate
mutation_rate = st.session_state.mutation_rate
seed_population_rate = st.session_state.seed_population_rate
# OPTIMIZATION: Create plant name to index mapping for O(1) lookups
plant_to_index = {plant: idx for idx, plant in enumerate(plant_list)}
# OPTIMIZATION: Convert compatibility matrix to numpy array ONCE (not in loop)
compat_array = np.array(compatibility_matrix)
# OPTIMIZATION: Fitness cache to avoid recalculating fitness for the same grouping
fitness_cache = {}
def generate_initial_population(model, demo_lite):
population = []
# OPTIMIZATION: Only call LLM if seed_population_rate > 0 and not demo_lite
# This was a MAJOR bottleneck - LLM initialization and inference is very slow
num_seeds = int(population_size * st.session_state.seed_population_rate)
if num_seeds > 0 and not demo_lite and model is not None:
# we generate just one seed grouping for this beta language model suggestion feature
seed_grouping = get_language_model_suggestions(model, demo_lite)
if seed_grouping != "no response yet":
valid_seed_grouping = validate_and_replace(seed_grouping)
population.append(valid_seed_grouping)
print(f" Added 1 LLM-generated seed to population")
# Fill the rest of the population with random groupings
# IMPORTANT: Validate all initial groupings for best solution quality
# A good initial population is critical for finding optimal solutions
while len(population) < population_size:
random_grouping = generate_random_grouping()
# Always validate to ensure we start with high-quality individuals
valid_grouping = validate_and_replace(random_grouping)
population.append(valid_grouping)
return population
def generate_random_grouping():
random.shuffle(user_plants)
remaining_plants = user_plants.copy()
grouping = []
total_plants = len(user_plants)
plants_per_bed = total_plants // num_plant_beds
extra_plants = total_plants % num_plant_beds
for bed_index in range(num_plant_beds):
if bed_index < extra_plants:
# Distribute extra plants among the first few beds
num_species_in_bed = plants_per_bed + 1
else:
num_species_in_bed = plants_per_bed
# Ensure the bed size is within the min and max constraints
num_species_in_bed = max(
min_species_per_bed, min(num_species_in_bed, max_species_per_bed)
)
bed = remaining_plants[:num_species_in_bed]
remaining_plants = remaining_plants[num_species_in_bed:]
grouping.append(bed)
return grouping
# Perform crossover between two parents, preserving at least one occurrence of each plant
def crossover(parent1, parent2):
if random.random() < crossover_rate:
crossover_point = random.randint(1, num_plant_beds - 1)
child1 = parent1[:crossover_point] + parent2[crossover_point:]
child2 = parent2[:crossover_point] + parent1[crossover_point:]
# Ensure each plant appears at least once in the offspring
for plant in user_plants:
if all(plant not in bed for bed in child1):
# Find a bed with fewer species and add the missing plant
min_bed_index = min(
range(len(child1)), key=lambda i: len(child1[i])
)
child1[min_bed_index].append(plant)
if all(plant not in bed for bed in child2):
# Find a bed with fewer species and add the missing plant
min_bed_index = min(
range(len(child2)), key=lambda i: len(child2[i])
)
child2[min_bed_index].append(plant)
return child1, child2
else:
return parent1, parent2
# Perform mutation on an individual, ensuring no bed exceeds the maximum species constraint
def mutate(individual):
if random.random() < mutation_rate:
mutated_bed = random.randint(0, num_plant_beds - 1)
species_in_bed = individual[mutated_bed]
# Remove excess species if there are more than the maximum constraint
if len(species_in_bed) > max_species_per_bed:
species_in_bed = random.sample(species_in_bed, max_species_per_bed)
# Add missing plants by performing swaps between current species and missing plants
missing_plants = [
plant for plant in user_plants if plant not in species_in_bed
]
num_missing_plants = min(
len(missing_plants), max_species_per_bed - len(species_in_bed)
)
for _ in range(num_missing_plants):
swap_species = random.choice(missing_plants)
missing_plants.remove(swap_species)
species_in_bed.append(swap_species)
species_in_bed.remove(random.choice(species_in_bed))
individual[mutated_bed] = species_in_bed
return individual
# calculate the fitness score of the grouping
def calculate_fitness(grouping):
# OPTIMIZATION: Create a hashable key for caching
grouping_key = tuple(tuple(sorted(bed)) for bed in grouping)
if grouping_key in fitness_cache:
return fitness_cache[grouping_key]
positive_reward_factor = 1000
negative_penalty_factor = 2000
# define penalties for not meeting constraints
penalty_for_exceeding_max = 500
penalty_for_not_meeting_min = 500
penalty_for_not_having_all_plants = 1000
score = 0
# VECTORIZED FITNESS CALCULATION - Much faster with numpy
for bed in grouping:
if len(bed) < 2:
continue
# Convert plant names to indices in bulk
bed_indices = np.array([plant_to_index[plant] for plant in bed])
# Get all pairwise compatibility scores using numpy advanced indexing
# This avoids nested loops and is 10-100x faster
n = len(bed_indices)
i_indices, j_indices = np.triu_indices(n, k=1)
# Vectorized compatibility score extraction (using pre-converted array)
compat_scores = compat_array[bed_indices[i_indices], bed_indices[j_indices]]
# Vectorized reward/penalty calculation
positive_scores = compat_scores[compat_scores > 0].sum() * positive_reward_factor
negative_scores = compat_scores[compat_scores < 0].sum() * negative_penalty_factor
score += positive_scores + negative_scores
# apply penalties for not meeting constraints (vectorized)
bed_sizes = np.array([len(bed) for bed in grouping])
score -= np.sum(bed_sizes > max_species_per_bed) * penalty_for_exceeding_max
score -= np.sum(bed_sizes < min_species_per_bed) * penalty_for_not_meeting_min
if len(set(plant for bed in grouping for plant in bed)) < len(user_plants):
score -= penalty_for_not_having_all_plants
# OPTIMIZATION: Cache the result
fitness_cache[grouping_key] = score
return score
# Perform tournament selection
def tournament_selection(population, population_fitness):
# OPTIMIZATION: Use pre-calculated fitness scores
selected = []
for _ in range(population_size):
participants_idx = random.sample(range(len(population)), tournament_size)
winner_idx = max(participants_idx, key=lambda idx: population_fitness[idx])
selected.append(population[winner_idx])
return selected
# OPTIMIZATION: Parallel fitness calculation for speed
def calculate_fitness_parallel(individuals):
"""Calculate fitness for multiple individuals in parallel"""
if len(individuals) <= 10:
# For small populations, parallel overhead isn't worth it
return [calculate_fitness(ind) for ind in individuals]
# Use ThreadPoolExecutor for parallel computation
with ThreadPoolExecutor(max_workers=4) as executor:
return list(executor.map(calculate_fitness, individuals))
# Perform replacement of the population with the offspring, ensuring maximum species constraint is met
def replacement(population, offspring, population_fitness):
# OPTIMIZATION: Use pre-calculated fitness and avoid re-sorting
# Calculate fitness for offspring in parallel
offspring_fitness = calculate_fitness_parallel(offspring)
# Adjust the offspring to meet the maximum species constraint
adjusted_offspring = []
adjusted_fitness = []
for idx, individual in enumerate(offspring):
for bed_idx in range(num_plant_beds):
species_in_bed = individual[bed_idx]
if len(species_in_bed) > max_species_per_bed:
species_in_bed = random.sample(species_in_bed, max_species_per_bed)
individual[bed_idx] = species_in_bed
adjusted_offspring.append(individual)
adjusted_fitness.append(offspring_fitness[idx])
# Combine population and offspring with their fitness scores
combined = list(zip(population + adjusted_offspring, population_fitness + adjusted_fitness))
# Sort by fitness and take top population_size individuals
combined.sort(key=lambda x: x[1], reverse=True)
new_population = [ind for ind, _ in combined[:population_size]]
new_fitness = [fit for _, fit in combined[:population_size]]
return new_population, new_fitness
# Genetic Algorithm main function
def genetic_algorithm(model, demo_lite):
population = generate_initial_population(model, demo_lite)
# OPTIMIZATION: Calculate fitness once for initial population (in parallel)
population_fitness = calculate_fitness_parallel(population)
for generation in range(num_generations):
print(f"Generation {generation + 1}")
selected_population = tournament_selection(population, population_fitness)
offspring = []
for _ in range(population_size // 2):
parent1 = random.choice(selected_population)
parent2 = random.choice(selected_population)
child1, child2 = crossover(parent1, parent2)
child1 = mutate(child1)
child2 = mutate(child2)
offspring.extend([child1, child2])
# OPTIMIZATION: Pass fitness and get updated fitness back
population, population_fitness = replacement(population, offspring, population_fitness)
# OPTIMIZATION: Validate periodically to ensure quality (every 10 generations)
# Too frequent = slow, too rare = poor quality. Every 10 is a good balance.
if generation % 10 == 0 or generation == num_generations - 1:
# Only validate if needed - most individuals are valid
validated_count = 0
invalid_indices = []
# Quick check which individuals need validation
for i in range(len(population)):
plants_in_grouping = set(plant for bed in population[i] for plant in bed)
if len(plants_in_grouping) != len(user_plants):
invalid_indices.append(i)
# Parallel validation for invalid individuals
if invalid_indices:
invalid_individuals = [population[i] for i in invalid_indices]
validated_individuals = [validate_and_replace(ind) for ind in invalid_individuals]
# Update population and recalculate fitness
for idx, validated_ind in zip(invalid_indices, validated_individuals):
population[idx] = validated_ind
population_fitness[idx] = calculate_fitness(validated_ind)
validated_count = len(invalid_indices)
if validated_count > 0:
print(f" Validated {validated_count} individuals")
# Find best solution
best_idx = max(range(len(population)), key=lambda i: population_fitness[i])
best_grouping = population[best_idx]
# Final validation of best solution only
best_grouping = validate_and_replace(best_grouping)
best_fitness = calculate_fitness(best_grouping)
print(f"Best Grouping: {best_grouping}")
print(f"Fitness Score: {best_fitness}")
st.session_state.best_grouping = best_grouping
st.session_state.best_fitness = best_fitness
# st.write(f"Best Grouping: {best_grouping}")
# st.write(f"Fitness Score: {best_fitness}")
return best_grouping
# def validate_and_replace(grouping):
# print("Grouping structure before validation:", grouping)
# all_plants = set(user_plants)
# for bed in grouping:
# all_plants -= set(bed)
# # Replace missing plants
# for missing_plant in all_plants:
# replaced = False
# for bed in grouping:
# if len(set(bed)) != len(bed): # Check for duplicates
# for i, plant in enumerate(bed):
# if bed.count(plant) > 1: # Found a duplicate
# bed[i] = missing_plant
# replaced = True
# break
# if replaced:
# break
# # If no duplicates were found, replace a random plant
# if not replaced:
# random_bed = random.choice(grouping)
# random_bed[random.randint(0, len(random_bed) - 1)] = missing_plant
# return grouping
############
############ experimental
def adjust_grouping(grouping):
# Determine the plants that are missing in the grouping
plants_in_grouping = set(plant for bed in grouping for plant in bed)
missing_plants = set(user_plants) - plants_in_grouping
for missing_plant in missing_plants:
# Find a bed that can accommodate the missing plant without exceeding max_species_per_bed
suitable_bed = next(
(bed for bed in grouping if len(bed) < max_species_per_bed), None
)
if suitable_bed is not None:
suitable_bed.append(missing_plant)
else:
# If no suitable bed is found, replace a random plant in a random bed
random_bed = random.choice(grouping)
random_bed[random.randint(0, len(random_bed) - 1)] = missing_plant
# Ensure min_species_per_bed and max_species_per_bed constraints
for bed in grouping:
while len(bed) < min_species_per_bed:
additional_plant = random.choice(
[plant for plant in user_plants if plant not in bed]
)
bed.append(additional_plant)
while len(bed) > max_species_per_bed:
bed.remove(random.choice(bed))
return grouping
def validate_and_replace(grouping):
# Try multiple configurations to find the best valid grouping
# This is important for solution quality - don't skip this!
best_grouping = None
best_fitness = float("-inf")
# Try 3 different configurations (balanced between speed and quality)
for _ in range(3):
temp_grouping = [bed.copy() for bed in grouping]
temp_grouping = adjust_grouping(temp_grouping)
current_fitness = calculate_fitness(temp_grouping)
if current_fitness > best_fitness:
best_fitness = current_fitness
best_grouping = temp_grouping
return best_grouping
############
def get_language_model_suggestions(model, demo_lite):
# This returns a list of seed groupings based on the compatibility matrix
st.session_state.seed_groupings = get_seed_groupings_from_LLM(model, demo_lite)
return st.session_state.seed_groupings
# Run the genetic algorithm
best_grouping = genetic_algorithm(model, demo_lite)
return best_grouping