genetic_aesthetics_evolver / genetic_backend.py
echoboi's picture
Add yuruyurau_5 (#3) equation and merge feature
b5633bb
from flask import Flask, render_template, jsonify, request
import json
import random
import math
from typing import List, Dict, Any, Optional
app = Flask(__name__)
# --- Constants (hot-path) ---
TRIG_FUNCS = ('sin', 'cos', 'tan', 'atan')
class EquationGene:
"""Represents a single equation variant with its parameters """
def __init__(self, params: Dict[str, Any]):
self.params = params
self.fitness = 0.0
self.id = 0
self.mutation_log: List[str] = []
def mutate(
self,
mutation_rate: float = 0.3,
trig_keys: Optional[List[str]] = None,
site_mutation_rates: Optional[Dict[str, float]] = None,
trig_mutation_prob: float = 1.0,
constant_mutation_delta: float = math.pi / 8.0,
):
"""Apply mutations to the equation parameters (trigonometric sites and constants).
For each mutation opportunity, decides whether to mutate a trig function (prob=trig_mutation_prob)
or a constant (prob=1-trig_mutation_prob).
"""
keys = trig_keys if trig_keys is not None else [k for k in self.params.keys() if k.startswith('trig')]
# Get all constant keys (numeric parameters, not trig function names)
constant_keys = [k for k in self.params.keys() if not k.startswith('trig') and not k.startswith('_2_trig') and isinstance(self.params.get(k), (int, float))]
self.mutation_log = []
# Check if any trig uses atan (for adjusting constant mutation delta)
has_atan = any(self.params.get(k) == 'atan' for k in keys)
const_delta = constant_mutation_delta * 0.5 if has_atan else constant_mutation_delta
# Decide upfront: will this mutation cycle be trig-focused or constant-focused?
# This ensures the probability is properly respected
use_trig_mutation = random.random() < trig_mutation_prob
if use_trig_mutation:
# Trig-focused: mutate trig sites, with small chance for constant mutations
for key in keys:
# Use site-specific mutation rate if provided, otherwise use default
site_rate = site_mutation_rates.get(key, mutation_rate) if site_mutation_rates else mutation_rate
if random.random() < site_rate:
# Trig function mutation
old = self.params.get(key)
if isinstance(old, str):
# Choose a different trig function (avoids wasted "mutations" that don't change anything).
choices = [f for f in TRIG_FUNCS if f != old]
if choices:
new = random.choice(choices)
self.params[key] = new
self.mutation_log.append(f"{key}:{old}->{new}")
# Small chance for constant mutations even in trig-focused mode (10% of constant mutation rate)
if constant_keys and random.random() < 0.1 * mutation_rate:
const_key = random.choice(constant_keys)
old = self.params.get(const_key)
if isinstance(old, (int, float)):
direction = random.choice([-1, 1])
new = old + direction * const_delta
if 'div' in const_key.lower():
new = max(0.1, new)
elif 'mult' in const_key.lower():
new = max(0.01, new)
self.params[const_key] = new
self.mutation_log.append(f"{const_key}:{old:.3f}->{new:.3f}")
else:
# Constant-focused: primarily mutate constants, with very small chance for trig mutations
# Mutate constants
if constant_keys and random.random() < mutation_rate:
key = random.choice(constant_keys)
old = self.params.get(key)
if isinstance(old, (int, float)):
direction = random.choice([-1, 1])
new = old + direction * const_delta
if 'div' in key.lower():
new = max(0.1, new)
elif 'mult' in key.lower():
new = max(0.01, new)
self.params[key] = new
self.mutation_log.append(f"{key}:{old:.3f}->{new:.3f}")
# Very small chance for trig mutations in constant-focused mode (5% of trig mutation rate)
if keys and random.random() < 0.05 * trig_mutation_prob * mutation_rate:
key = random.choice(keys)
old = self.params.get(key)
if isinstance(old, str):
choices = [f for f in TRIG_FUNCS if f != old]
if choices:
new = random.choice(choices)
self.params[key] = new
self.mutation_log.append(f"{key}:{old}->{new}")
class GeneticEquationEvolver:
def __init__(self, population_size: int = 16, elite_size: int = 4):
self.population_size = population_size # 4x4 grid = 16 variants
self.elite_size = elite_size
self.generation = 0
self.population: List[EquationGene] = []
self.feedback_history = []
self.elites: List[EquationGene] = []
self.next_node_id = 1
self.graph: Dict[str, Any] = {'nodes': [], 'edges': [], 'feedback': []}
self.mutation_penalties: Dict[str, float] = {} # Track penalized mutations
self.max_graph_size = 500 # Limit graph to last 500 nodes for performance
self.save_interval = 5 # Save graph every 5 generations instead of every generation
self.liked_variants: List[EquationGene] = [] # Store all user-liked variants across generations
self.select_none_history: List[int] = [] # Track generations where "select none" was chosen
self.parameter_preferences: Dict[str, float] = {} # Track user preference for each parameter (-1 to 1)
self.systematic_phase_generations = 3 # First 3 generations are systematic
self.approved_trig_sites: set = set() # Track which trig sites user has approved (selected)
self.current_base_set = 'always_finding_yourself' # Current base equation set name
self.merged_secondary_set = None # Optional secondary equation for merge mode
self.jump_size_multiplier = 1.0 # Multiplier for mutation delta (controls jump size)
self.trig_mutation_prob_base = 1.0 # Base value for auto-decay (starts at 1.0, can be set by user)
self.trig_mutation_prob_base_generation = 0 # Generation when base was last set (for decay calculation)
# Define all base equation sets
self.base_equation_sets = {
'always_finding_yourself': {
'k_div': 4, 'k_sub': 10.5, 'e_div': 9, 'e_add': 9, 'o_div': 9,
'x_offset': 60, 'y_trig_div': 8, 'q_mult': 0.7,
'px_add': 200, 'py_add': 200, 'py_q_div': 3, 'time_mult': 1/12,
'trig_y': 'cos', 'trig_e': 'sin', 'trig_o': 'sin',
'trig_c_sin': 'sin', 'trig_c_cos': 'cos', 'trig_c4_cos': 'cos'
},
'a_constant_state_of_bliss': {
'k_div': 8, 'k_sub': 11.5, 'e_div': 8, 'e_sub': 12.5, 'o_div': 139,
'd_mult': 10, 'px_div': 2, 'px_add1': 150,
'py_y_div': 9, 'py_d_mult': 15, 'py_cos_mult': 2, 'py_add': 220,
'stroke_base': 99, 'stroke_mult': 99, 'stroke_pow': 30,
# Trig sites identified from equation structure:
'trig_d_cos': 'cos', # cos(o) in d calculation
'trig_px_sin1': 'sin', # sin(d) in px calculation
'trig_px_sin2': 'sin', # sin(t + d*o) in px calculation
'trig_py_cos': 'cos', # cos(d*py_cos_mult) in py calculation
'trig_py_sin': 'sin', # sin(d - t) in py calculation
'trig_stroke_sin1': 'sin', # sin(k) in stroke_r calculation
'trig_stroke_sin2': 'sin' # sin(t + e) in stroke_r calculation
},
'yuruyurau': {
# Parameters from the equation: a=(x,y,d=mag(k=(4+sin(y*2-t)*3)*cos(x/29),e=y/8-13))=>point((q=3*sin(k*2)+.3/k+sin(y/25)*k*(9+4*sin(e*9-d*3+t*2)))+30*cos(c=d-t)+200,q*sin(c)+d*39-220)
'k_base': 4, 'k_sin_mult': 3, 'k_cos_div': 29,
'e_div': 8, 'e_sub': 13,
'q_sin1_mult': 3, 'q_sin1_mult2': 2, 'q_div': 0.3,
'q_sin2_div': 25, 'q_base': 9, 'q_sin3_mult': 4, 'q_sin3_e_mult': 9, 'q_sin3_d_mult': 3, 'q_sin3_t_mult': 2,
'px_cos_mult': 30, 'px_add': 200,
'py_d_mult': 39, 'py_sub': 220,
# Trig sites:
'trig_k_sin': 'sin', # sin(y*2-t) in k calculation
'trig_k_cos': 'cos', # cos(x/29) in k calculation
'trig_q_sin1': 'sin', # sin(k*2) in q calculation
'trig_q_sin2': 'sin', # sin(y/25) in q calculation
'trig_q_sin3': 'sin', # sin(e*9-d*3+t*2) in q calculation
'trig_px_cos': 'cos', # cos(c) in px calculation
'trig_py_sin': 'sin' # sin(c) in py calculation
},
'yuruyurau_2': {
# Equation: for(t+=PI/30,i=3e4;i--;) d=mag(k=i<2e4?sin(i/9)*9:4*cos(i/49)*cos(i/3690),e=i/984-12)**2/99+1, point((q=k*(4+sin(d*16-t+k))-5*sin(atan2(k,e)*9))+60*sin(c=d*1.1-t/18+i%2*3)+200,(q+40)*sin(c-d)+d*79)
'k_cond_threshold': 20000, 'k_sin_div': 9, 'k_sin_mult': 9, 'k_cos_mult': 4, 'k_cos_div1': 49, 'k_cos_div2': 3690,
'e_div': 984, 'e_sub': 12,
'd_pow': 2, 'd_div': 99, 'd_add': 1,
'q_k_mult': 4, 'q_sin_d_mult': 16, 'q_atan2_mult': 5, 'q_atan2_mult2': 9,
'px_sin_mult': 60, 'px_add': 200,
'py_q_add': 40, 'py_d_mult': 79,
'c_d_mult': 1.1, 'c_t_div': 18,
# Trig sites:
'trig_k_sin': 'sin', # sin(i/9) in k (conditional)
'trig_k_cos1': 'cos', # cos(i/49) in k (conditional)
'trig_k_cos2': 'cos', # cos(i/3690) in k (conditional)
'trig_q_sin': 'sin', # sin(d*16-t+k) in q
'trig_q_atan2': 'atan', # atan2(k,e) in q
'trig_px_sin': 'sin', # sin(c) in px
'trig_py_sin': 'sin' # sin(c-d) in py
},
'yuruyurau_3': {
# Equation: d=mag(k=5*cos(i/49)*cos(i/3690),e=i/984-12)**2/99+1, point(k*(4+sin(d*18-t*2+i%3*2))-5*sin(atan2(k,e)*9)+30*sin(c=d-t/18+i%3*4)+200,80*sin(c-d)+d*79)
'k_mult': 5, 'k_cos_div1': 49, 'k_cos_div2': 3690,
'e_div': 984, 'e_sub': 12,
'd_pow': 2, 'd_div': 99, 'd_add': 1,
'q_k_mult': 4, 'q_sin_d_mult': 18, 'q_sin_t_mult': 2, 'q_sin_mod': 3, 'q_sin_mod_mult': 2,
'q_atan2_mult': 5, 'q_atan2_mult2': 9,
'px_sin_mult': 30, 'px_add': 200,
'py_sin_mult': 80, 'py_d_mult': 79,
'c_t_div': 18, 'c_mod': 3, 'c_mod_mult': 4,
# Trig sites:
'trig_k_cos1': 'cos', # cos(i/49) in k
'trig_k_cos2': 'cos', # cos(i/3690) in k
'trig_q_sin': 'sin', # sin(d*18-t*2+i%3*2) in q
'trig_q_atan2': 'atan', # atan2(k,e) in q
'trig_px_sin': 'sin', # sin(c) in px
'trig_py_sin': 'sin', # sin(c-d) in py
'trig_stroke_cos': 'cos' # cos(t+e) in stroke
},
'yuruyurau_4': {
# Equation: a=(y,d=(mag(k=5*cos(i/8),e=5*cos(y/9))/(6+i%5))**4+4)=>point((q=k*(3+e/2*sin(d*d-t))-3*sin(k*d/3)-~(i&1)*70)*sin(c=d-t/9+i%5*5+i%2)+200,q*cos(c-i%2+i%5*3+7)+200)
'k_mult': 5, 'k_cos_div': 8,
'e_mult': 5, 'e_cos_div': 9,
'd_div_base': 6, 'd_pow': 4, 'd_add': 4,
'q_k_mult': 3, 'q_e_div': 2, 'q_sin_mult': 3, 'q_sin_kd_div': 3, 'q_bitwise_mult': 70,
'px_sin_mult': 1, 'px_add': 200,
'py_cos_add1': 7, 'py_add': 200,
'c_t_div': 9, 'c_mod1': 5, 'c_mod1_mult': 5, 'c_mod2': 2,
# Trig sites:
'trig_k_cos': 'cos', # cos(i/8) in k
'trig_e_cos': 'cos', # cos(y/9) in e
'trig_q_sin': 'sin', # sin(d*d-t) in q
'trig_q_sin2': 'sin', # sin(k*d/3) in q
'trig_px_sin': 'sin', # sin(c) in px
'trig_py_cos': 'cos' # cos(c-i%2+i%5*3+7) in py
},
'yuruyurau_5': {
# Equation: a=(m,d=mag(k=9*cos(i/61),e=i/692-13)**2/99+1)=>point(
# (q=79-e/2*sin(k/d*4)+k/d*(8+5*sin(sin(d*d+e/9-t+m))))*cos(c=d/2+cos(t-d*2+m)/9-t/16+m)+200,
# (q+40)*sin(c)+200)
# t=0,draw=$=>{t||createCanvas(w=400,w);background(9).stroke(w,96);for(t+=PI/45,i=2e4;i--;)a(i%2*9)}
# k = k_mult * cos(i / k_cos_div)
'k_mult': 9, 'k_cos_div': 61,
# e = i / e_div - e_sub
'e_div': 692, 'e_sub': 13,
# d = mag(k,e)**d_pow / d_div + d_add
'd_pow': 2, 'd_div': 99, 'd_add': 1,
# m = (i%2) * m_mult (alternates 0 or m_mult per point)
'm_mult': 9,
# q = q_base - e/q_e_div*sin1(k/d*q_sin1_mult) + k/d*(q_k_mult + q_sin2_mult*sin2(sin3(d*d+e/q_sin_e_div-t+m)))
'q_base': 79, 'q_e_div': 2, 'q_sin1_mult': 4,
'q_k_mult': 8, 'q_sin2_mult': 5, 'q_sin_e_div': 9,
# c = d/c_d_div + cosc(t - d*c_d2_mult + m)/c_cos_div - t/c_t_div + m
'c_d_div': 2, 'c_d2_mult': 2, 'c_cos_div': 9, 'c_t_div': 16,
# px = q * cosp(c) + px_add
'px_add': 200,
# py = (q + py_q_add) * sinp(c) + py_add
'py_q_add': 40, 'py_add': 200,
# Trig sites:
'trig_k_cos': 'cos', # cos(i/k_cos_div) in k
'trig_q_sin1': 'sin', # outer sin in e term: -e/q_e_div*sin(k/d*q_sin1_mult)
'trig_q_sin2': 'sin', # outer sin in nested double-sin: q_sin2_mult*sin(sin(...))
'trig_q_sin3': 'sin', # inner sin in nested double-sin: sin(d*d+e/q_sin_e_div-t+m)
'trig_c_cos': 'cos', # cos(t-d*c_d2_mult+m) in c
'trig_px_cos': 'cos', # cos(c) in px
'trig_py_sin': 'sin', # sin(c) in py
}
}
self.initialize_population()
@property
def base_params(self):
"""Get current base params based on selected equation set (and merged secondary if active)"""
primary = self.base_equation_sets.get(self.current_base_set, self.base_equation_sets['always_finding_yourself'])
if self.merged_secondary_set and self.merged_secondary_set in self.base_equation_sets:
secondary = self.base_equation_sets[self.merged_secondary_set]
combined = dict(primary)
for k, v in secondary.items():
combined['_2_' + k] = v
return combined
return primary
@property
def trig_keys(self):
"""Get trig keys for current base equation set (including merged secondary)"""
return [k for k in self.base_params.keys() if k.startswith('trig') or k.startswith('_2_trig')]
def get_trig_mutation_probability(self) -> float:
"""Calculate probability of trig mutations vs constant mutations.
Always decays from base value using square root, based on generations since base was set.
Minimum probability is 0.02."""
if self.generation == self.trig_mutation_prob_base_generation:
return max(0.02, self.trig_mutation_prob_base)
# Clamp to >= 0 to guard against generation reset (e.g. set_base_equation_set)
# without resetting trig_mutation_prob_base_generation
generations_since_base = max(0, self.generation - self.trig_mutation_prob_base_generation)
prob = self.trig_mutation_prob_base / math.sqrt(generations_since_base + 1)
return max(0.02, prob)
def get_constant_mutation_delta(self) -> float:
"""Calculate mutation delta for constants.
Starts at pi/8 and decays as initial_delta / log(generation+2)
Multiplied by jump_size_multiplier for user control"""
initial_delta = math.pi / 8.0
if self.generation == 0:
return initial_delta * self.jump_size_multiplier
return (initial_delta / math.log(self.generation + 2)) * self.jump_size_multiplier
def initialize_population(self):
"""Create initial population with base equation + mutations"""
self.population = [] # Clear existing population first
# If no trig keys available, just create base variants
if not self.trig_keys:
for i in range(self.population_size):
params = dict(self.base_params)
gene = EquationGene(params)
gene.id = i + 1
gene.global_id = self.next_node_id; self.next_node_id += 1
self.log_node(gene, parent_id=0)
self.population.append(gene)
self.elites = [g for g in self.population[:self.elite_size]]
return
# Systematic phase: test each trig site individually with unique single mutations
if self.generation < self.systematic_phase_generations:
# Generate all possible unique single mutations (site + new function combinations)
all_mutations = []
for trig_site in self.trig_keys:
old_func = self.base_params[trig_site]
available_funcs = [f for f in TRIG_FUNCS if f != old_func]
for new_func in available_funcs:
all_mutations.append((trig_site, old_func, new_func))
# Shuffle and select up to population_size unique mutations
random.shuffle(all_mutations)
selected_mutations = all_mutations[:self.population_size]
# Create one variant per selected mutation
for i, (trig_site, old_func, new_func) in enumerate(selected_mutations):
params = dict(self.base_params)
gene = EquationGene(params)
gene.id = i + 1
gene.global_id = self.next_node_id; self.next_node_id += 1
# Apply exactly one mutation
gene.params[trig_site] = new_func
gene.mutation_log.append(f"{trig_site}:{old_func}->{new_func}")
self.log_node(gene, parent_id=0)
self.population.append(gene)
# If we don't have enough mutations, fill with base variants
while len(self.population) < self.population_size:
params = dict(self.base_params)
gene = EquationGene(params)
gene.id = len(self.population) + 1
gene.global_id = self.next_node_id; self.next_node_id += 1
self.log_node(gene, parent_id=0)
self.population.append(gene)
else:
# Normal random mutations
for i in range(self.population_size):
params = dict(self.base_params) # Faster than .copy()
gene = EquationGene(params)
gene.id = i + 1
gene.global_id = self.next_node_id; self.next_node_id += 1
gene.mutate(mutation_rate=0.4, trig_keys=self.trig_keys)
self.log_node(gene, parent_id=0)
self.population.append(gene)
# Seed elites with the first few variants (will be replaced after first feedback)
self.elites = [g for g in self.population[:self.elite_size]]
def evaluate_population(self, feedback_ids: List[int]):
"""Update fitness based on user feedback"""
for gene in self.population:
gene.fitness = 0.0
# Update parameter preferences based on user feedback
self.update_parameter_preferences(feedback_ids)
# Track "select none" signals
if not feedback_ids:
self.select_none_history.append(self.generation)
# Penalize recent mutations when "select none" is chosen
self.penalize_recent_mutations()
else:
# Save liked variants for future recombination
for gene_id in feedback_ids:
if 1 <= gene_id <= len(self.population):
liked_gene = EquationGene(dict(self.population[gene_id - 1].params))
liked_gene.fitness = 1.0
liked_gene.liked_generation = self.generation
self.liked_variants.append(liked_gene)
self.population[gene_id - 1].fitness = 1.0
# In systematic phase, track which trig sites are approved
if self.generation < self.systematic_phase_generations:
# Extract trig sites that were mutated in this variant
for mutation in liked_gene.mutation_log:
if ':' in mutation and '->' in mutation:
trig_site = mutation.split(':')[0]
if trig_site.startswith('trig'):
self.approved_trig_sites.add(trig_site)
self.population.sort(key=lambda x: x.fitness, reverse=True)
def evolve_generation(self, feedback_ids: List[int]):
"""Evolve to next generation based on feedback"""
# Keep reference to current generation before replacing
prev_population = list(self.population)
self.evaluate_population(feedback_ids)
self.feedback_history.append({
'generation': self.generation,
'selected_ids': feedback_ids,
'elite_params': [gene.params for gene in self.population[:self.elite_size]]
})
# Handle "none selected" case - penalize current mutations and go back to previous elite
if not feedback_ids:
self.penalize_current_mutations()
# Go back to previous generation's elite and mutate again
if self.generation > 0:
# Use previous generation's elite as base
prev_elite = self.elites if self.elites else [EquationGene(dict(self.base_params))]
else:
prev_elite = [EquationGene(dict(self.base_params))]
else:
# Refresh elites (kept internally, not shown directly)
self.elites = [self.population[i] for i in range(self.elite_size)]
prev_elite = self.elites
# Log feedback mapping to global ids
selected_global = []
for x in feedback_ids:
if 1 <= x <= len(prev_population):
selected_global.append(getattr(prev_population[x-1], 'global_id', None))
self.graph['feedback'].append({'generation': self.generation, 'selected': selected_global})
# Generate a FULL set of mutated children to display as choices
new_population: List[EquationGene] = []
# If no trig keys available, just create base variants
if not self.trig_keys:
for i in range(self.population_size):
if prev_elite:
parent = random.choice(prev_elite)
else:
parent = EquationGene(dict(self.base_params))
child = EquationGene(dict(parent.params))
child.id = i + 1
child.global_id = self.next_node_id; self.next_node_id += 1
self.log_node(child, parent_id=getattr(parent, 'global_id', 0))
self.graph['edges'].append({'from': getattr(parent, 'global_id', 0), 'to': child.global_id, 'gen': self.generation})
new_population.append(child)
self.population = new_population
self.generation += 1
if self.generation % self.save_interval == 0:
self.save_graph()
return
# Systematic phase: test each trig site individually with unique single mutations
if self.generation < self.systematic_phase_generations:
# Use base params or previous elite as parent
if prev_elite:
base_parent = prev_elite[0]
else:
base_parent = EquationGene(dict(self.base_params))
# Generate all possible unique single mutations from current base
all_mutations = []
for trig_site in self.trig_keys:
old_func = base_parent.params[trig_site]
available_funcs = [f for f in TRIG_FUNCS if f != old_func]
for new_func in available_funcs:
all_mutations.append((trig_site, old_func, new_func))
# Shuffle and select up to population_size unique mutations
random.shuffle(all_mutations)
selected_mutations = all_mutations[:self.population_size]
# Create one variant per selected mutation (each panel = one unique change)
for i, (trig_site, old_func, new_func) in enumerate(selected_mutations):
child = EquationGene(dict(base_parent.params))
child.id = i + 1
child.global_id = self.next_node_id; self.next_node_id += 1
# Apply exactly one mutation
child.params[trig_site] = new_func
child.mutation_log.append(f"{trig_site}:{old_func}->{new_func}")
self.log_node(child, parent_id=getattr(base_parent, 'global_id', 0))
self.graph['edges'].append({'from': getattr(base_parent, 'global_id', 0), 'to': child.global_id, 'gen': self.generation})
new_population.append(child)
# If we don't have enough mutations, fill with base variants
while len(new_population) < self.population_size:
child = EquationGene(dict(base_parent.params))
child.id = len(new_population) + 1
child.global_id = self.next_node_id; self.next_node_id += 1
self.log_node(child, parent_id=getattr(base_parent, 'global_id', 0))
self.graph['edges'].append({'from': getattr(base_parent, 'global_id', 0), 'to': child.global_id, 'gen': self.generation})
new_population.append(child)
else:
# Normal evolution: mutate all trig sites, but unapproved sites have 5x lower probability
# Get selected variants from previous population for crossover
selected_parents = []
if feedback_ids:
for gene_id in feedback_ids:
if 1 <= gene_id <= len(prev_population):
selected_parents.append(prev_population[gene_id - 1])
# Create recombination pool: selected parents (if any) + elites + liked variants from past generations
recombination_pool = selected_parents[:] if selected_parents else prev_elite[:]
if not selected_parents:
recombination_pool = prev_elite[:]
if self.liked_variants:
# Add some liked variants for diversity (up to 25% of population)
num_liked_to_add = min(len(self.liked_variants), self.population_size // 4)
recombination_pool.extend(random.sample(self.liked_variants, num_liked_to_add))
# Create site-specific mutation rates: approved sites get normal rate, unapproved get 5x lower
base_mutation_rate = 0.25
site_mutation_rates = {}
for trig_site in self.trig_keys:
if trig_site in self.approved_trig_sites:
# Approved sites: normal mutation rate
site_mutation_rates[trig_site] = base_mutation_rate
else:
# Unapproved sites: 5x lower probability
site_mutation_rates[trig_site] = base_mutation_rate / 5.0
# Get current trig mutation probability and constant delta
trig_mutation_prob = self.get_trig_mutation_probability()
constant_mutation_delta = self.get_constant_mutation_delta()
for i in range(self.population_size):
# If multiple parents were selected, use crossover; otherwise use single parent
if len(selected_parents) > 1:
# Use crossover with selected parents
child = self.crossover(selected_parents)
parent_for_logging = selected_parents[0] # Use first parent for primary logging
else:
# Single parent or no selection - use recombination pool
parent = random.choice(recombination_pool)
child = EquationGene(dict(parent.params))
parent_for_logging = parent
child.id = i + 1
child.mutate(
mutation_rate=base_mutation_rate,
trig_keys=self.trig_keys,
site_mutation_rates=site_mutation_rates,
trig_mutation_prob=trig_mutation_prob,
constant_mutation_delta=constant_mutation_delta
)
# Ensure at least one visible mutation
if not child.mutation_log:
self.force_single_mutation(child)
child.global_id = self.next_node_id; self.next_node_id += 1
# Log node and edge(s)
if len(selected_parents) > 1:
# For crossover, log edges from all parents
parent_ids = [getattr(p, 'global_id', 0) for p in selected_parents]
self.log_node(child, parent_id=parent_ids[0] if parent_ids else 0)
for parent_id in parent_ids:
self.graph['edges'].append({'from': parent_id, 'to': child.global_id, 'gen': self.generation})
else:
# Single parent
self.log_node(child, parent_id=getattr(parent_for_logging, 'global_id', 0))
self.graph['edges'].append({'from': getattr(parent_for_logging, 'global_id', 0), 'to': child.global_id, 'gen': self.generation})
new_population.append(child)
self.population = new_population
self.generation += 1
# Prune graph if it gets too large
self.prune_graph()
# Save graph periodically instead of every generation
if self.generation % self.save_interval == 0:
self.save_graph()
def force_single_mutation(self, gene: EquationGene):
"""Guarantee at least one mutation is recorded for a child - trigonometric functions or constants."""
trig_mutation_prob = self.get_trig_mutation_probability()
constant_mutation_delta = self.get_constant_mutation_delta()
if random.random() < trig_mutation_prob:
# Trig function mutation
trig_sites = self.trig_keys
if trig_sites:
site = random.choice(trig_sites)
oldf = gene.params[site]
# Choose a different function directly
available_funcs = [f for f in TRIG_FUNCS if f != oldf]
if available_funcs:
newf = random.choice(available_funcs)
gene.params[site] = newf
gene.mutation_log.append(f"{site}:{oldf}->{newf}")
else:
# Constant mutation
constant_keys = [k for k in gene.params.keys() if not k.startswith('trig') and isinstance(gene.params.get(k), (int, float))]
if constant_keys:
key = random.choice(constant_keys)
old = gene.params[key]
is_inverse_trig = any(gene.params.get(k) == 'atan' for k in self.trig_keys if k.startswith('trig'))
delta = constant_mutation_delta * 0.5 if is_inverse_trig else constant_mutation_delta
direction = random.choice([-1, 1])
new = old + direction * delta
if 'div' in key.lower():
new = max(0.1, new)
elif 'mult' in key.lower():
new = max(0.01, new)
gene.params[key] = new
gene.mutation_log.append(f"{key}:{old:.3f}->{new:.3f}")
def crossover(self, parents: List[EquationGene]) -> EquationGene:
"""Create a child by crossing over parameters from multiple parents.
For each parameter, randomly select from one of the parents."""
if not parents:
return EquationGene(dict(self.base_params))
if len(parents) == 1:
return EquationGene(dict(parents[0].params))
# Get all parameter keys from the first parent (all should have same keys)
child_params = {}
for key in parents[0].params.keys():
# Randomly select which parent to inherit this parameter from
selected_parent = random.choice(parents)
child_params[key] = selected_parent.params[key]
child = EquationGene(child_params)
# Log crossover in mutation log
parent_ids = [getattr(p, 'global_id', 0) for p in parents]
child.mutation_log.append(f"crossover:parents={parent_ids}")
return child
def log_node(self, gene: EquationGene, parent_id: int):
"""Log node with minimal data for performance"""
self.graph['nodes'].append({
'id': getattr(gene, 'global_id', None),
'gen': self.generation,
'parent': parent_id,
'params': dict(gene.params), # Shallow copy
'mutations': gene.mutation_log[:5] # Limit mutations log size
})
def penalize_recent_mutations(self):
"""Penalize mutations from recent generations when 'select none' is chosen"""
# Look at last few generations and penalize their mutations
recent_generations = set()
for gen in self.select_none_history[-3:]: # Last 3 select-none events
recent_generations.add(gen)
# Penalize mutations that appeared in those generations
for gene in self.population:
for mutation in gene.mutation_log:
if '->' in mutation:
key = mutation.split(':')[0]
if key.startswith('trig'):
cat = 'trig'
# Apply penalty for select-none feedback
current_penalty = self.mutation_penalties.get(cat, 1.0)
self.mutation_penalties[cat] = current_penalty * 0.8 # 20% penalty
def penalize_current_mutations(self):
"""Penalize mutations from current generation by 30% - only trigonometric functions now"""
for gene in self.population:
for mutation in gene.mutation_log:
# Extract category from mutation string
if '->' in mutation:
key = mutation.split(':')[0]
if key.startswith('trig'):
cat = 'trig'
# Apply 30% penalty
current_penalty = self.mutation_penalties.get(cat, 1.0)
self.mutation_penalties[cat] = current_penalty * 0.7
def prune_graph(self):
"""Prune graph to keep only recent nodes for performance."""
if len(self.graph['nodes']) > self.max_graph_size:
# Keep last ~50 generations (same policy as before, but O(N) with low constant).
min_gen = max(0, self.generation - 50)
kept_nodes = [n for n in self.graph['nodes'] if n.get('gen', 0) >= min_gen]
kept_ids = {n.get('id') for n in kept_nodes}
self.graph['nodes'] = kept_nodes
self.graph['edges'] = [
e for e in self.graph['edges']
if e.get('from') in kept_ids and e.get('to') in kept_ids
]
self.graph['feedback'] = [
f for f in self.graph['feedback']
if f.get('generation', 0) >= min_gen
]
# Also prune liked variants to prevent memory growth
max_liked_variants = 100 # Keep at most 100 liked variants
if len(self.liked_variants) > max_liked_variants:
self.liked_variants = self.liked_variants[-max_liked_variants:]
# Limit select_none_history
max_select_none = 20 # Keep last 20 select-none events
if len(self.select_none_history) > max_select_none:
self.select_none_history = self.select_none_history[-max_select_none:]
# Limit feedback_history (not used by UI, but can otherwise grow unbounded).
max_feedback_history = 200
if len(self.feedback_history) > max_feedback_history:
self.feedback_history = self.feedback_history[-max_feedback_history:]
def update_parameter_preferences(self, feedback_ids: List[int]):
"""Update user preference scores for parameters based on feedback"""
if not feedback_ids:
# "Select none" - penalize parameters that were mutated in this generation
for gene in self.population:
for mutation in gene.mutation_log:
if '->' in mutation:
param = mutation.split(':')[0]
# Decrease preference for this parameter
current_pref = self.parameter_preferences.get(param, 0.0)
self.parameter_preferences[param] = max(-1.0, current_pref - 0.1)
else:
# Positive feedback - reward parameters that were mutated in selected variants
selected_params = set()
for gene_id in feedback_ids:
if 1 <= gene_id <= len(self.population):
gene = self.population[gene_id - 1]
for mutation in gene.mutation_log:
if '->' in mutation:
param = mutation.split(':')[0]
selected_params.add(param)
# Increase preference for parameters in selected variants
for param in selected_params:
current_pref = self.parameter_preferences.get(param, 0.0)
self.parameter_preferences[param] = min(1.0, current_pref + 0.05)
# Slightly decrease preference for parameters not selected (but not as much)
all_mutated_params = set()
for gene in self.population:
for mutation in gene.mutation_log:
if '->' in mutation:
param = mutation.split(':')[0]
all_mutated_params.add(param)
unselected_params = all_mutated_params - selected_params
for param in unselected_params:
current_pref = self.parameter_preferences.get(param, 0.0)
self.parameter_preferences[param] = max(-1.0, current_pref - 0.02)
def get_parameter_color(self, param: str) -> str:
"""Get RGB color string based on parameter preference (-1 to 1)"""
preference = self.parameter_preferences.get(param, 0.0)
# Map -1 to 1 to red to green
# -1 = red (255, 0, 0), 0 = yellow (255, 255, 0), 1 = green (0, 255, 0)
if preference < 0:
# Red to yellow
red = 255
green = int(255 * (preference + 1)) # -1 -> 0, 0 -> 255
blue = 0
else:
# Yellow to green
red = int(255 * (1 - preference)) # 0 -> 255, 1 -> 0
green = 255
blue = 0
return f"rgb({red}, {green}, {blue})"
def save_graph(self):
"""Save graph to file - optimized with compact JSON"""
try:
with open('exploration_log.json', 'w', encoding='utf-8') as f:
json.dump(self.graph, f, separators=(',', ':')) # Compact JSON, no indentation
except Exception:
pass
def get_population_data(self, lite: bool = False):
"""Get current population data for frontend.
If lite=True, omit large/static fields (base_params, parameter_preferences).
"""
data = {
'generation': self.generation,
'variants': [
{
'id': gene.id,
'params': gene.params,
'mutations': gene.mutation_log[:3], # Show top 3 mutations
}
for gene in self.population
],
'systematic_phase': self.generation < self.systematic_phase_generations,
'approved_trig_sites': list(self.approved_trig_sites),
'current_base_set': self.current_base_set,
'trig_mutation_prob': self.get_trig_mutation_probability(),
'trig_mutation_prob_base': self.trig_mutation_prob_base,
'trig_mutation_prob_base_generation': self.trig_mutation_prob_base_generation,
'jump_size_multiplier': self.jump_size_multiplier,
}
data['merged_secondary_set'] = self.merged_secondary_set
if not lite:
data['base_params'] = self.base_params
data['parameter_preferences'] = self.parameter_preferences
return data
def set_base_equation_set(self, set_name: str):
"""Change the base equation set and reset population"""
if set_name in self.base_equation_sets:
self.current_base_set = set_name
self.merged_secondary_set = None # Clear merge when changing base set
# Reset all state when changing equation sets
self.generation = 0
self.trig_mutation_prob_base = 1.0
self.trig_mutation_prob_base_generation = 0 # Must reset or get_trig_mutation_probability gets negative sqrt arg
self.jump_size_multiplier = 1.0
self.approved_trig_sites = set()
self.parameter_preferences = {}
self.mutation_penalties = {}
self.liked_variants = []
self.select_none_history = []
self.initialize_population()
return True
return False
def set_merged_secondary(self, set_name: Optional[str]):
"""Set (or clear) the secondary equation for merge mode and reinitialize population"""
if set_name is None or set_name in self.base_equation_sets:
self.merged_secondary_set = set_name
# Reset evolution state for the new combined genome
self.generation = 0
self.trig_mutation_prob_base = 1.0
self.trig_mutation_prob_base_generation = 0
self.approved_trig_sites = set()
self.parameter_preferences = {}
self.mutation_penalties = {}
self.liked_variants = []
self.select_none_history = []
self.initialize_population()
return True
return False
# Global evolver instance
evolver = GeneticEquationEvolver()
@app.errorhandler(404)
def not_found(e):
return jsonify({'error': 'Not found', 'message': str(e)}), 404
@app.errorhandler(500)
def server_error(e):
return jsonify({'error': 'Internal server error', 'message': str(e)}), 500
@app.route('/')
def index():
return render_template('genetic_evolver.html')
@app.route('/api/population')
def get_population():
try:
lite = request.args.get('lite', '0') in ('1', 'true', 'True')
return jsonify(evolver.get_population_data(lite=lite))
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/evolve', methods=['POST'])
def evolve():
try:
data = request.get_json()
feedback_ids = data.get('selected_ids', [])
# Always evolve: empty selection means "Select None" feedback
evolver.evolve_generation(feedback_ids)
return jsonify({'success': True, 'generation': evolver.generation})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/set_base_equation', methods=['POST'])
def set_base_equation():
try:
data = request.get_json()
set_name = data.get('set_name')
if evolver.set_base_equation_set(set_name):
return jsonify({'success': True, 'base_set': evolver.current_base_set})
return jsonify({'success': False, 'error': 'Invalid equation set name'}), 400
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/set_jump_size', methods=['POST'])
def set_jump_size():
try:
data = request.get_json()
jump_size = data.get('jump_size')
if jump_size is not None and isinstance(jump_size, (int, float)) and 0.01 <= jump_size <= 10:
evolver.jump_size_multiplier = float(jump_size)
return jsonify({'success': True, 'jump_size': evolver.jump_size_multiplier})
return jsonify({'success': False, 'error': 'Invalid jump size'}), 400
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/reset_preferences', methods=['POST'])
def reset_preferences():
"""Reset all parameter preferences (color history)"""
try:
evolver.parameter_preferences = {}
return jsonify({'success': True})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/reset_all', methods=['POST'])
def reset_all():
"""Reset the entire evolution process from the start"""
try:
# Reset all evolution state
evolver.generation = 0
evolver.parameter_preferences = {}
evolver.approved_trig_sites = set()
evolver.liked_variants = []
evolver.select_none_history = []
evolver.mutation_penalties = {}
evolver.feedback_history = []
evolver.elites = []
evolver.next_node_id = 1
evolver.graph = {'nodes': [], 'edges': [], 'feedback': []}
evolver.trig_mutation_prob_base = 1.0 # Reset base to 1.0
evolver.trig_mutation_prob_base_generation = 0 # Reset generation offset
evolver.jump_size_multiplier = 1.0 # Reset jump size to default
evolver.merged_secondary_set = None # Clear merge on full reset
# Reinitialize population from scratch
evolver.initialize_population()
return jsonify({'success': True, 'generation': evolver.generation})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/set_trig_mutation_prob', methods=['POST'])
def set_trig_mutation_prob():
"""Set trigonometric mutation probability. Sets base value and generation for auto-decay."""
try:
data = request.get_json()
prob = data.get('trig_mutation_prob')
if prob is None:
# User wants to use current value as new base (resume auto-decay from current)
current_prob = evolver.get_trig_mutation_probability()
evolver.trig_mutation_prob_base = current_prob
evolver.trig_mutation_prob_base_generation = evolver.generation
return jsonify({'success': True, 'trig_mutation_prob': current_prob, 'auto': True})
elif isinstance(prob, (int, float)) and 0.0 <= prob <= 1.0:
# User sets a value - this becomes the new base for auto-decay starting from current generation
evolver.trig_mutation_prob_base = float(prob)
evolver.trig_mutation_prob_base_generation = evolver.generation
return jsonify({'success': True, 'trig_mutation_prob': float(prob), 'auto': False})
return jsonify({'success': False, 'error': 'Invalid probability (must be 0.0-1.0 or null for auto)'}), 400
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/set_merged_secondary', methods=['POST'])
def set_merged_secondary():
"""Set or clear the secondary equation for merge mode."""
try:
data = request.get_json()
set_name = data.get('set_name') # None/null to clear merge
if evolver.set_merged_secondary(set_name):
return jsonify({'success': True, 'merged_secondary_set': evolver.merged_secondary_set})
return jsonify({'success': False, 'error': 'Invalid equation set name'}), 400
except Exception as e:
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000)