| from flask import Flask, render_template, jsonify, request |
| import json |
| import random |
| import math |
| from typing import List, Dict, Any, Optional |
|
|
| app = Flask(__name__) |
|
|
| |
| TRIG_FUNCS = ('sin', 'cos', 'tan', 'atan') |
|
|
| class EquationGene: |
| """Represents a single equation variant with its parameters """ |
| def __init__(self, params: Dict[str, Any]): |
| self.params = params |
| self.fitness = 0.0 |
| self.id = 0 |
| self.mutation_log: List[str] = [] |
| |
| def mutate( |
| self, |
| mutation_rate: float = 0.3, |
| trig_keys: Optional[List[str]] = None, |
| site_mutation_rates: Optional[Dict[str, float]] = None, |
| trig_mutation_prob: float = 1.0, |
| constant_mutation_delta: float = math.pi / 8.0, |
| ): |
| """Apply mutations to the equation parameters (trigonometric sites and constants). |
| |
| For each mutation opportunity, decides whether to mutate a trig function (prob=trig_mutation_prob) |
| or a constant (prob=1-trig_mutation_prob). |
| """ |
| keys = trig_keys if trig_keys is not None else [k for k in self.params.keys() if k.startswith('trig')] |
| |
| constant_keys = [k for k in self.params.keys() if not k.startswith('trig') and not k.startswith('_2_trig') and isinstance(self.params.get(k), (int, float))] |
|
|
| self.mutation_log = [] |
| |
| |
| has_atan = any(self.params.get(k) == 'atan' for k in keys) |
| const_delta = constant_mutation_delta * 0.5 if has_atan else constant_mutation_delta |
| |
| |
| |
| use_trig_mutation = random.random() < trig_mutation_prob |
| |
| if use_trig_mutation: |
| |
| for key in keys: |
| |
| site_rate = site_mutation_rates.get(key, mutation_rate) if site_mutation_rates else mutation_rate |
| if random.random() < site_rate: |
| |
| old = self.params.get(key) |
| if isinstance(old, str): |
| |
| choices = [f for f in TRIG_FUNCS if f != old] |
| if choices: |
| new = random.choice(choices) |
| self.params[key] = new |
| self.mutation_log.append(f"{key}:{old}->{new}") |
| |
| |
| if constant_keys and random.random() < 0.1 * mutation_rate: |
| const_key = random.choice(constant_keys) |
| old = self.params.get(const_key) |
| if isinstance(old, (int, float)): |
| direction = random.choice([-1, 1]) |
| new = old + direction * const_delta |
| if 'div' in const_key.lower(): |
| new = max(0.1, new) |
| elif 'mult' in const_key.lower(): |
| new = max(0.01, new) |
| self.params[const_key] = new |
| self.mutation_log.append(f"{const_key}:{old:.3f}->{new:.3f}") |
| else: |
| |
| |
| if constant_keys and random.random() < mutation_rate: |
| key = random.choice(constant_keys) |
| old = self.params.get(key) |
| if isinstance(old, (int, float)): |
| direction = random.choice([-1, 1]) |
| new = old + direction * const_delta |
| if 'div' in key.lower(): |
| new = max(0.1, new) |
| elif 'mult' in key.lower(): |
| new = max(0.01, new) |
| self.params[key] = new |
| self.mutation_log.append(f"{key}:{old:.3f}->{new:.3f}") |
| |
| |
| if keys and random.random() < 0.05 * trig_mutation_prob * mutation_rate: |
| key = random.choice(keys) |
| old = self.params.get(key) |
| if isinstance(old, str): |
| choices = [f for f in TRIG_FUNCS if f != old] |
| if choices: |
| new = random.choice(choices) |
| self.params[key] = new |
| self.mutation_log.append(f"{key}:{old}->{new}") |
|
|
| class GeneticEquationEvolver: |
| def __init__(self, population_size: int = 16, elite_size: int = 4): |
| self.population_size = population_size |
| self.elite_size = elite_size |
| self.generation = 0 |
| self.population: List[EquationGene] = [] |
| self.feedback_history = [] |
| self.elites: List[EquationGene] = [] |
| self.next_node_id = 1 |
| self.graph: Dict[str, Any] = {'nodes': [], 'edges': [], 'feedback': []} |
| self.mutation_penalties: Dict[str, float] = {} |
| self.max_graph_size = 500 |
| self.save_interval = 5 |
| self.liked_variants: List[EquationGene] = [] |
| self.select_none_history: List[int] = [] |
| self.parameter_preferences: Dict[str, float] = {} |
| self.systematic_phase_generations = 3 |
| self.approved_trig_sites: set = set() |
| self.current_base_set = 'always_finding_yourself' |
| self.merged_secondary_set = None |
| self.jump_size_multiplier = 1.0 |
| self.trig_mutation_prob_base = 1.0 |
| self.trig_mutation_prob_base_generation = 0 |
| |
| |
| self.base_equation_sets = { |
| 'always_finding_yourself': { |
| 'k_div': 4, 'k_sub': 10.5, 'e_div': 9, 'e_add': 9, 'o_div': 9, |
| 'x_offset': 60, 'y_trig_div': 8, 'q_mult': 0.7, |
| 'px_add': 200, 'py_add': 200, 'py_q_div': 3, 'time_mult': 1/12, |
| 'trig_y': 'cos', 'trig_e': 'sin', 'trig_o': 'sin', |
| 'trig_c_sin': 'sin', 'trig_c_cos': 'cos', 'trig_c4_cos': 'cos' |
| }, |
| 'a_constant_state_of_bliss': { |
| 'k_div': 8, 'k_sub': 11.5, 'e_div': 8, 'e_sub': 12.5, 'o_div': 139, |
| 'd_mult': 10, 'px_div': 2, 'px_add1': 150, |
| 'py_y_div': 9, 'py_d_mult': 15, 'py_cos_mult': 2, 'py_add': 220, |
| 'stroke_base': 99, 'stroke_mult': 99, 'stroke_pow': 30, |
| |
| 'trig_d_cos': 'cos', |
| 'trig_px_sin1': 'sin', |
| 'trig_px_sin2': 'sin', |
| 'trig_py_cos': 'cos', |
| 'trig_py_sin': 'sin', |
| 'trig_stroke_sin1': 'sin', |
| 'trig_stroke_sin2': 'sin' |
| }, |
| 'yuruyurau': { |
| |
| 'k_base': 4, 'k_sin_mult': 3, 'k_cos_div': 29, |
| 'e_div': 8, 'e_sub': 13, |
| 'q_sin1_mult': 3, 'q_sin1_mult2': 2, 'q_div': 0.3, |
| 'q_sin2_div': 25, 'q_base': 9, 'q_sin3_mult': 4, 'q_sin3_e_mult': 9, 'q_sin3_d_mult': 3, 'q_sin3_t_mult': 2, |
| 'px_cos_mult': 30, 'px_add': 200, |
| 'py_d_mult': 39, 'py_sub': 220, |
| |
| 'trig_k_sin': 'sin', |
| 'trig_k_cos': 'cos', |
| 'trig_q_sin1': 'sin', |
| 'trig_q_sin2': 'sin', |
| 'trig_q_sin3': 'sin', |
| 'trig_px_cos': 'cos', |
| 'trig_py_sin': 'sin' |
| }, |
| 'yuruyurau_2': { |
| |
| 'k_cond_threshold': 20000, 'k_sin_div': 9, 'k_sin_mult': 9, 'k_cos_mult': 4, 'k_cos_div1': 49, 'k_cos_div2': 3690, |
| 'e_div': 984, 'e_sub': 12, |
| 'd_pow': 2, 'd_div': 99, 'd_add': 1, |
| 'q_k_mult': 4, 'q_sin_d_mult': 16, 'q_atan2_mult': 5, 'q_atan2_mult2': 9, |
| 'px_sin_mult': 60, 'px_add': 200, |
| 'py_q_add': 40, 'py_d_mult': 79, |
| 'c_d_mult': 1.1, 'c_t_div': 18, |
| |
| 'trig_k_sin': 'sin', |
| 'trig_k_cos1': 'cos', |
| 'trig_k_cos2': 'cos', |
| 'trig_q_sin': 'sin', |
| 'trig_q_atan2': 'atan', |
| 'trig_px_sin': 'sin', |
| 'trig_py_sin': 'sin' |
| }, |
| 'yuruyurau_3': { |
| |
| 'k_mult': 5, 'k_cos_div1': 49, 'k_cos_div2': 3690, |
| 'e_div': 984, 'e_sub': 12, |
| 'd_pow': 2, 'd_div': 99, 'd_add': 1, |
| 'q_k_mult': 4, 'q_sin_d_mult': 18, 'q_sin_t_mult': 2, 'q_sin_mod': 3, 'q_sin_mod_mult': 2, |
| 'q_atan2_mult': 5, 'q_atan2_mult2': 9, |
| 'px_sin_mult': 30, 'px_add': 200, |
| 'py_sin_mult': 80, 'py_d_mult': 79, |
| 'c_t_div': 18, 'c_mod': 3, 'c_mod_mult': 4, |
| |
| 'trig_k_cos1': 'cos', |
| 'trig_k_cos2': 'cos', |
| 'trig_q_sin': 'sin', |
| 'trig_q_atan2': 'atan', |
| 'trig_px_sin': 'sin', |
| 'trig_py_sin': 'sin', |
| 'trig_stroke_cos': 'cos' |
| }, |
| 'yuruyurau_4': { |
| |
| 'k_mult': 5, 'k_cos_div': 8, |
| 'e_mult': 5, 'e_cos_div': 9, |
| 'd_div_base': 6, 'd_pow': 4, 'd_add': 4, |
| 'q_k_mult': 3, 'q_e_div': 2, 'q_sin_mult': 3, 'q_sin_kd_div': 3, 'q_bitwise_mult': 70, |
| 'px_sin_mult': 1, 'px_add': 200, |
| 'py_cos_add1': 7, 'py_add': 200, |
| 'c_t_div': 9, 'c_mod1': 5, 'c_mod1_mult': 5, 'c_mod2': 2, |
| |
| 'trig_k_cos': 'cos', |
| 'trig_e_cos': 'cos', |
| 'trig_q_sin': 'sin', |
| 'trig_q_sin2': 'sin', |
| 'trig_px_sin': 'sin', |
| 'trig_py_cos': 'cos' |
| }, |
| 'yuruyurau_5': { |
| |
| |
| |
| |
| |
| 'k_mult': 9, 'k_cos_div': 61, |
| |
| 'e_div': 692, 'e_sub': 13, |
| |
| 'd_pow': 2, 'd_div': 99, 'd_add': 1, |
| |
| 'm_mult': 9, |
| |
| 'q_base': 79, 'q_e_div': 2, 'q_sin1_mult': 4, |
| 'q_k_mult': 8, 'q_sin2_mult': 5, 'q_sin_e_div': 9, |
| |
| 'c_d_div': 2, 'c_d2_mult': 2, 'c_cos_div': 9, 'c_t_div': 16, |
| |
| 'px_add': 200, |
| |
| 'py_q_add': 40, 'py_add': 200, |
| |
| 'trig_k_cos': 'cos', |
| 'trig_q_sin1': 'sin', |
| 'trig_q_sin2': 'sin', |
| 'trig_q_sin3': 'sin', |
| 'trig_c_cos': 'cos', |
| 'trig_px_cos': 'cos', |
| 'trig_py_sin': 'sin', |
| } |
| } |
| |
| self.initialize_population() |
| |
| @property |
| def base_params(self): |
| """Get current base params based on selected equation set (and merged secondary if active)""" |
| primary = self.base_equation_sets.get(self.current_base_set, self.base_equation_sets['always_finding_yourself']) |
| if self.merged_secondary_set and self.merged_secondary_set in self.base_equation_sets: |
| secondary = self.base_equation_sets[self.merged_secondary_set] |
| combined = dict(primary) |
| for k, v in secondary.items(): |
| combined['_2_' + k] = v |
| return combined |
| return primary |
|
|
| @property |
| def trig_keys(self): |
| """Get trig keys for current base equation set (including merged secondary)""" |
| return [k for k in self.base_params.keys() if k.startswith('trig') or k.startswith('_2_trig')] |
| |
| def get_trig_mutation_probability(self) -> float: |
| """Calculate probability of trig mutations vs constant mutations. |
| Always decays from base value using square root, based on generations since base was set. |
| Minimum probability is 0.02.""" |
| if self.generation == self.trig_mutation_prob_base_generation: |
| return max(0.02, self.trig_mutation_prob_base) |
| |
| |
| generations_since_base = max(0, self.generation - self.trig_mutation_prob_base_generation) |
| prob = self.trig_mutation_prob_base / math.sqrt(generations_since_base + 1) |
| return max(0.02, prob) |
| |
| def get_constant_mutation_delta(self) -> float: |
| """Calculate mutation delta for constants. |
| Starts at pi/8 and decays as initial_delta / log(generation+2) |
| Multiplied by jump_size_multiplier for user control""" |
| initial_delta = math.pi / 8.0 |
| if self.generation == 0: |
| return initial_delta * self.jump_size_multiplier |
| return (initial_delta / math.log(self.generation + 2)) * self.jump_size_multiplier |
| |
| def initialize_population(self): |
| """Create initial population with base equation + mutations""" |
| self.population = [] |
| |
| if not self.trig_keys: |
| for i in range(self.population_size): |
| params = dict(self.base_params) |
| gene = EquationGene(params) |
| gene.id = i + 1 |
| gene.global_id = self.next_node_id; self.next_node_id += 1 |
| self.log_node(gene, parent_id=0) |
| self.population.append(gene) |
| self.elites = [g for g in self.population[:self.elite_size]] |
| return |
| |
| |
| if self.generation < self.systematic_phase_generations: |
| |
| all_mutations = [] |
| for trig_site in self.trig_keys: |
| old_func = self.base_params[trig_site] |
| available_funcs = [f for f in TRIG_FUNCS if f != old_func] |
| for new_func in available_funcs: |
| all_mutations.append((trig_site, old_func, new_func)) |
| |
| |
| random.shuffle(all_mutations) |
| selected_mutations = all_mutations[:self.population_size] |
| |
| |
| for i, (trig_site, old_func, new_func) in enumerate(selected_mutations): |
| params = dict(self.base_params) |
| gene = EquationGene(params) |
| gene.id = i + 1 |
| gene.global_id = self.next_node_id; self.next_node_id += 1 |
| |
| |
| gene.params[trig_site] = new_func |
| gene.mutation_log.append(f"{trig_site}:{old_func}->{new_func}") |
| |
| self.log_node(gene, parent_id=0) |
| self.population.append(gene) |
| |
| |
| while len(self.population) < self.population_size: |
| params = dict(self.base_params) |
| gene = EquationGene(params) |
| gene.id = len(self.population) + 1 |
| gene.global_id = self.next_node_id; self.next_node_id += 1 |
| self.log_node(gene, parent_id=0) |
| self.population.append(gene) |
| else: |
| |
| for i in range(self.population_size): |
| params = dict(self.base_params) |
| gene = EquationGene(params) |
| gene.id = i + 1 |
| gene.global_id = self.next_node_id; self.next_node_id += 1 |
| gene.mutate(mutation_rate=0.4, trig_keys=self.trig_keys) |
| self.log_node(gene, parent_id=0) |
| self.population.append(gene) |
| |
| self.elites = [g for g in self.population[:self.elite_size]] |
| |
| def evaluate_population(self, feedback_ids: List[int]): |
| """Update fitness based on user feedback""" |
| for gene in self.population: |
| gene.fitness = 0.0 |
|
|
| |
| self.update_parameter_preferences(feedback_ids) |
|
|
| |
| if not feedback_ids: |
| self.select_none_history.append(self.generation) |
| |
| self.penalize_recent_mutations() |
| else: |
| |
| for gene_id in feedback_ids: |
| if 1 <= gene_id <= len(self.population): |
| liked_gene = EquationGene(dict(self.population[gene_id - 1].params)) |
| liked_gene.fitness = 1.0 |
| liked_gene.liked_generation = self.generation |
| self.liked_variants.append(liked_gene) |
| self.population[gene_id - 1].fitness = 1.0 |
| |
| |
| if self.generation < self.systematic_phase_generations: |
| |
| for mutation in liked_gene.mutation_log: |
| if ':' in mutation and '->' in mutation: |
| trig_site = mutation.split(':')[0] |
| if trig_site.startswith('trig'): |
| self.approved_trig_sites.add(trig_site) |
|
|
| self.population.sort(key=lambda x: x.fitness, reverse=True) |
| |
| def evolve_generation(self, feedback_ids: List[int]): |
| """Evolve to next generation based on feedback""" |
| |
| prev_population = list(self.population) |
| self.evaluate_population(feedback_ids) |
| |
| self.feedback_history.append({ |
| 'generation': self.generation, |
| 'selected_ids': feedback_ids, |
| 'elite_params': [gene.params for gene in self.population[:self.elite_size]] |
| }) |
| |
| |
| if not feedback_ids: |
| self.penalize_current_mutations() |
| |
| if self.generation > 0: |
| |
| prev_elite = self.elites if self.elites else [EquationGene(dict(self.base_params))] |
| else: |
| prev_elite = [EquationGene(dict(self.base_params))] |
| else: |
| |
| self.elites = [self.population[i] for i in range(self.elite_size)] |
| prev_elite = self.elites |
|
|
| |
| selected_global = [] |
| for x in feedback_ids: |
| if 1 <= x <= len(prev_population): |
| selected_global.append(getattr(prev_population[x-1], 'global_id', None)) |
| self.graph['feedback'].append({'generation': self.generation, 'selected': selected_global}) |
|
|
| |
| new_population: List[EquationGene] = [] |
|
|
| |
| if not self.trig_keys: |
| for i in range(self.population_size): |
| if prev_elite: |
| parent = random.choice(prev_elite) |
| else: |
| parent = EquationGene(dict(self.base_params)) |
| child = EquationGene(dict(parent.params)) |
| child.id = i + 1 |
| child.global_id = self.next_node_id; self.next_node_id += 1 |
| self.log_node(child, parent_id=getattr(parent, 'global_id', 0)) |
| self.graph['edges'].append({'from': getattr(parent, 'global_id', 0), 'to': child.global_id, 'gen': self.generation}) |
| new_population.append(child) |
| self.population = new_population |
| self.generation += 1 |
| if self.generation % self.save_interval == 0: |
| self.save_graph() |
| return |
|
|
| |
| if self.generation < self.systematic_phase_generations: |
| |
| if prev_elite: |
| base_parent = prev_elite[0] |
| else: |
| base_parent = EquationGene(dict(self.base_params)) |
| |
| |
| all_mutations = [] |
| for trig_site in self.trig_keys: |
| old_func = base_parent.params[trig_site] |
| available_funcs = [f for f in TRIG_FUNCS if f != old_func] |
| for new_func in available_funcs: |
| all_mutations.append((trig_site, old_func, new_func)) |
| |
| |
| random.shuffle(all_mutations) |
| selected_mutations = all_mutations[:self.population_size] |
| |
| |
| for i, (trig_site, old_func, new_func) in enumerate(selected_mutations): |
| child = EquationGene(dict(base_parent.params)) |
| child.id = i + 1 |
| child.global_id = self.next_node_id; self.next_node_id += 1 |
| |
| |
| child.params[trig_site] = new_func |
| child.mutation_log.append(f"{trig_site}:{old_func}->{new_func}") |
| |
| self.log_node(child, parent_id=getattr(base_parent, 'global_id', 0)) |
| self.graph['edges'].append({'from': getattr(base_parent, 'global_id', 0), 'to': child.global_id, 'gen': self.generation}) |
| new_population.append(child) |
| |
| |
| while len(new_population) < self.population_size: |
| child = EquationGene(dict(base_parent.params)) |
| child.id = len(new_population) + 1 |
| child.global_id = self.next_node_id; self.next_node_id += 1 |
| self.log_node(child, parent_id=getattr(base_parent, 'global_id', 0)) |
| self.graph['edges'].append({'from': getattr(base_parent, 'global_id', 0), 'to': child.global_id, 'gen': self.generation}) |
| new_population.append(child) |
| else: |
| |
| |
| selected_parents = [] |
| if feedback_ids: |
| for gene_id in feedback_ids: |
| if 1 <= gene_id <= len(prev_population): |
| selected_parents.append(prev_population[gene_id - 1]) |
| |
| |
| recombination_pool = selected_parents[:] if selected_parents else prev_elite[:] |
| if not selected_parents: |
| recombination_pool = prev_elite[:] |
| if self.liked_variants: |
| |
| num_liked_to_add = min(len(self.liked_variants), self.population_size // 4) |
| recombination_pool.extend(random.sample(self.liked_variants, num_liked_to_add)) |
|
|
| |
| base_mutation_rate = 0.25 |
| site_mutation_rates = {} |
| for trig_site in self.trig_keys: |
| if trig_site in self.approved_trig_sites: |
| |
| site_mutation_rates[trig_site] = base_mutation_rate |
| else: |
| |
| site_mutation_rates[trig_site] = base_mutation_rate / 5.0 |
|
|
| |
| trig_mutation_prob = self.get_trig_mutation_probability() |
| constant_mutation_delta = self.get_constant_mutation_delta() |
| |
| for i in range(self.population_size): |
| |
| if len(selected_parents) > 1: |
| |
| child = self.crossover(selected_parents) |
| parent_for_logging = selected_parents[0] |
| else: |
| |
| parent = random.choice(recombination_pool) |
| child = EquationGene(dict(parent.params)) |
| parent_for_logging = parent |
| |
| child.id = i + 1 |
| child.mutate( |
| mutation_rate=base_mutation_rate, |
| trig_keys=self.trig_keys, |
| site_mutation_rates=site_mutation_rates, |
| trig_mutation_prob=trig_mutation_prob, |
| constant_mutation_delta=constant_mutation_delta |
| ) |
| |
| if not child.mutation_log: |
| self.force_single_mutation(child) |
| child.global_id = self.next_node_id; self.next_node_id += 1 |
| |
| if len(selected_parents) > 1: |
| |
| parent_ids = [getattr(p, 'global_id', 0) for p in selected_parents] |
| self.log_node(child, parent_id=parent_ids[0] if parent_ids else 0) |
| for parent_id in parent_ids: |
| self.graph['edges'].append({'from': parent_id, 'to': child.global_id, 'gen': self.generation}) |
| else: |
| |
| self.log_node(child, parent_id=getattr(parent_for_logging, 'global_id', 0)) |
| self.graph['edges'].append({'from': getattr(parent_for_logging, 'global_id', 0), 'to': child.global_id, 'gen': self.generation}) |
| new_population.append(child) |
|
|
| self.population = new_population |
| self.generation += 1 |
| |
| self.prune_graph() |
| |
| if self.generation % self.save_interval == 0: |
| self.save_graph() |
| |
| def force_single_mutation(self, gene: EquationGene): |
| """Guarantee at least one mutation is recorded for a child - trigonometric functions or constants.""" |
| trig_mutation_prob = self.get_trig_mutation_probability() |
| constant_mutation_delta = self.get_constant_mutation_delta() |
| |
| if random.random() < trig_mutation_prob: |
| |
| trig_sites = self.trig_keys |
| if trig_sites: |
| site = random.choice(trig_sites) |
| oldf = gene.params[site] |
| |
| available_funcs = [f for f in TRIG_FUNCS if f != oldf] |
| if available_funcs: |
| newf = random.choice(available_funcs) |
| gene.params[site] = newf |
| gene.mutation_log.append(f"{site}:{oldf}->{newf}") |
| else: |
| |
| constant_keys = [k for k in gene.params.keys() if not k.startswith('trig') and isinstance(gene.params.get(k), (int, float))] |
| if constant_keys: |
| key = random.choice(constant_keys) |
| old = gene.params[key] |
| is_inverse_trig = any(gene.params.get(k) == 'atan' for k in self.trig_keys if k.startswith('trig')) |
| delta = constant_mutation_delta * 0.5 if is_inverse_trig else constant_mutation_delta |
| direction = random.choice([-1, 1]) |
| new = old + direction * delta |
| if 'div' in key.lower(): |
| new = max(0.1, new) |
| elif 'mult' in key.lower(): |
| new = max(0.01, new) |
| gene.params[key] = new |
| gene.mutation_log.append(f"{key}:{old:.3f}->{new:.3f}") |
| |
| def crossover(self, parents: List[EquationGene]) -> EquationGene: |
| """Create a child by crossing over parameters from multiple parents. |
| For each parameter, randomly select from one of the parents.""" |
| if not parents: |
| return EquationGene(dict(self.base_params)) |
| |
| if len(parents) == 1: |
| return EquationGene(dict(parents[0].params)) |
| |
| |
| child_params = {} |
| for key in parents[0].params.keys(): |
| |
| selected_parent = random.choice(parents) |
| child_params[key] = selected_parent.params[key] |
| |
| child = EquationGene(child_params) |
| |
| parent_ids = [getattr(p, 'global_id', 0) for p in parents] |
| child.mutation_log.append(f"crossover:parents={parent_ids}") |
| return child |
|
|
| def log_node(self, gene: EquationGene, parent_id: int): |
| """Log node with minimal data for performance""" |
| self.graph['nodes'].append({ |
| 'id': getattr(gene, 'global_id', None), |
| 'gen': self.generation, |
| 'parent': parent_id, |
| 'params': dict(gene.params), |
| 'mutations': gene.mutation_log[:5] |
| }) |
|
|
| def penalize_recent_mutations(self): |
| """Penalize mutations from recent generations when 'select none' is chosen""" |
| |
| recent_generations = set() |
| for gen in self.select_none_history[-3:]: |
| recent_generations.add(gen) |
|
|
| |
| for gene in self.population: |
| for mutation in gene.mutation_log: |
| if '->' in mutation: |
| key = mutation.split(':')[0] |
| if key.startswith('trig'): |
| cat = 'trig' |
| |
| current_penalty = self.mutation_penalties.get(cat, 1.0) |
| self.mutation_penalties[cat] = current_penalty * 0.8 |
|
|
| def penalize_current_mutations(self): |
| """Penalize mutations from current generation by 30% - only trigonometric functions now""" |
| for gene in self.population: |
| for mutation in gene.mutation_log: |
| |
| if '->' in mutation: |
| key = mutation.split(':')[0] |
| if key.startswith('trig'): |
| cat = 'trig' |
| |
| current_penalty = self.mutation_penalties.get(cat, 1.0) |
| self.mutation_penalties[cat] = current_penalty * 0.7 |
|
|
| def prune_graph(self): |
| """Prune graph to keep only recent nodes for performance.""" |
| if len(self.graph['nodes']) > self.max_graph_size: |
| |
| min_gen = max(0, self.generation - 50) |
| kept_nodes = [n for n in self.graph['nodes'] if n.get('gen', 0) >= min_gen] |
| kept_ids = {n.get('id') for n in kept_nodes} |
|
|
| self.graph['nodes'] = kept_nodes |
| self.graph['edges'] = [ |
| e for e in self.graph['edges'] |
| if e.get('from') in kept_ids and e.get('to') in kept_ids |
| ] |
| self.graph['feedback'] = [ |
| f for f in self.graph['feedback'] |
| if f.get('generation', 0) >= min_gen |
| ] |
|
|
| |
| max_liked_variants = 100 |
| if len(self.liked_variants) > max_liked_variants: |
| self.liked_variants = self.liked_variants[-max_liked_variants:] |
|
|
| |
| max_select_none = 20 |
| if len(self.select_none_history) > max_select_none: |
| self.select_none_history = self.select_none_history[-max_select_none:] |
|
|
| |
| max_feedback_history = 200 |
| if len(self.feedback_history) > max_feedback_history: |
| self.feedback_history = self.feedback_history[-max_feedback_history:] |
|
|
| def update_parameter_preferences(self, feedback_ids: List[int]): |
| """Update user preference scores for parameters based on feedback""" |
| if not feedback_ids: |
| |
| for gene in self.population: |
| for mutation in gene.mutation_log: |
| if '->' in mutation: |
| param = mutation.split(':')[0] |
| |
| current_pref = self.parameter_preferences.get(param, 0.0) |
| self.parameter_preferences[param] = max(-1.0, current_pref - 0.1) |
| else: |
| |
| selected_params = set() |
| for gene_id in feedback_ids: |
| if 1 <= gene_id <= len(self.population): |
| gene = self.population[gene_id - 1] |
| for mutation in gene.mutation_log: |
| if '->' in mutation: |
| param = mutation.split(':')[0] |
| selected_params.add(param) |
|
|
| |
| for param in selected_params: |
| current_pref = self.parameter_preferences.get(param, 0.0) |
| self.parameter_preferences[param] = min(1.0, current_pref + 0.05) |
|
|
| |
| all_mutated_params = set() |
| for gene in self.population: |
| for mutation in gene.mutation_log: |
| if '->' in mutation: |
| param = mutation.split(':')[0] |
| all_mutated_params.add(param) |
|
|
| unselected_params = all_mutated_params - selected_params |
| for param in unselected_params: |
| current_pref = self.parameter_preferences.get(param, 0.0) |
| self.parameter_preferences[param] = max(-1.0, current_pref - 0.02) |
|
|
| def get_parameter_color(self, param: str) -> str: |
| """Get RGB color string based on parameter preference (-1 to 1)""" |
| preference = self.parameter_preferences.get(param, 0.0) |
| |
| |
| if preference < 0: |
| |
| red = 255 |
| green = int(255 * (preference + 1)) |
| blue = 0 |
| else: |
| |
| red = int(255 * (1 - preference)) |
| green = 255 |
| blue = 0 |
|
|
| return f"rgb({red}, {green}, {blue})" |
|
|
| def save_graph(self): |
| """Save graph to file - optimized with compact JSON""" |
| try: |
| with open('exploration_log.json', 'w', encoding='utf-8') as f: |
| json.dump(self.graph, f, separators=(',', ':')) |
| except Exception: |
| pass |
| |
| def get_population_data(self, lite: bool = False): |
| """Get current population data for frontend. |
| |
| If lite=True, omit large/static fields (base_params, parameter_preferences). |
| """ |
| data = { |
| 'generation': self.generation, |
| 'variants': [ |
| { |
| 'id': gene.id, |
| 'params': gene.params, |
| 'mutations': gene.mutation_log[:3], |
| } |
| for gene in self.population |
| ], |
| 'systematic_phase': self.generation < self.systematic_phase_generations, |
| 'approved_trig_sites': list(self.approved_trig_sites), |
| 'current_base_set': self.current_base_set, |
| 'trig_mutation_prob': self.get_trig_mutation_probability(), |
| 'trig_mutation_prob_base': self.trig_mutation_prob_base, |
| 'trig_mutation_prob_base_generation': self.trig_mutation_prob_base_generation, |
| 'jump_size_multiplier': self.jump_size_multiplier, |
| } |
| data['merged_secondary_set'] = self.merged_secondary_set |
| if not lite: |
| data['base_params'] = self.base_params |
| data['parameter_preferences'] = self.parameter_preferences |
| return data |
| |
| def set_base_equation_set(self, set_name: str): |
| """Change the base equation set and reset population""" |
| if set_name in self.base_equation_sets: |
| self.current_base_set = set_name |
| self.merged_secondary_set = None |
| |
| self.generation = 0 |
| self.trig_mutation_prob_base = 1.0 |
| self.trig_mutation_prob_base_generation = 0 |
| self.jump_size_multiplier = 1.0 |
| self.approved_trig_sites = set() |
| self.parameter_preferences = {} |
| self.mutation_penalties = {} |
| self.liked_variants = [] |
| self.select_none_history = [] |
| self.initialize_population() |
| return True |
| return False |
|
|
| def set_merged_secondary(self, set_name: Optional[str]): |
| """Set (or clear) the secondary equation for merge mode and reinitialize population""" |
| if set_name is None or set_name in self.base_equation_sets: |
| self.merged_secondary_set = set_name |
| |
| self.generation = 0 |
| self.trig_mutation_prob_base = 1.0 |
| self.trig_mutation_prob_base_generation = 0 |
| self.approved_trig_sites = set() |
| self.parameter_preferences = {} |
| self.mutation_penalties = {} |
| self.liked_variants = [] |
| self.select_none_history = [] |
| self.initialize_population() |
| return True |
| return False |
|
|
| |
| evolver = GeneticEquationEvolver() |
|
|
| @app.errorhandler(404) |
| def not_found(e): |
| return jsonify({'error': 'Not found', 'message': str(e)}), 404 |
|
|
| @app.errorhandler(500) |
| def server_error(e): |
| return jsonify({'error': 'Internal server error', 'message': str(e)}), 500 |
|
|
| @app.route('/') |
| def index(): |
| return render_template('genetic_evolver.html') |
|
|
| @app.route('/api/population') |
| def get_population(): |
| try: |
| lite = request.args.get('lite', '0') in ('1', 'true', 'True') |
| return jsonify(evolver.get_population_data(lite=lite)) |
| except Exception as e: |
| return jsonify({'error': str(e)}), 500 |
|
|
| @app.route('/api/evolve', methods=['POST']) |
| def evolve(): |
| try: |
| data = request.get_json() |
| feedback_ids = data.get('selected_ids', []) |
| |
| evolver.evolve_generation(feedback_ids) |
| return jsonify({'success': True, 'generation': evolver.generation}) |
| except Exception as e: |
| return jsonify({'error': str(e)}), 500 |
|
|
| @app.route('/api/set_base_equation', methods=['POST']) |
| def set_base_equation(): |
| try: |
| data = request.get_json() |
| set_name = data.get('set_name') |
| if evolver.set_base_equation_set(set_name): |
| return jsonify({'success': True, 'base_set': evolver.current_base_set}) |
| return jsonify({'success': False, 'error': 'Invalid equation set name'}), 400 |
| except Exception as e: |
| return jsonify({'error': str(e)}), 500 |
|
|
| @app.route('/api/set_jump_size', methods=['POST']) |
| def set_jump_size(): |
| try: |
| data = request.get_json() |
| jump_size = data.get('jump_size') |
| if jump_size is not None and isinstance(jump_size, (int, float)) and 0.01 <= jump_size <= 10: |
| evolver.jump_size_multiplier = float(jump_size) |
| return jsonify({'success': True, 'jump_size': evolver.jump_size_multiplier}) |
| return jsonify({'success': False, 'error': 'Invalid jump size'}), 400 |
| except Exception as e: |
| return jsonify({'error': str(e)}), 500 |
|
|
| @app.route('/api/reset_preferences', methods=['POST']) |
| def reset_preferences(): |
| """Reset all parameter preferences (color history)""" |
| try: |
| evolver.parameter_preferences = {} |
| return jsonify({'success': True}) |
| except Exception as e: |
| return jsonify({'error': str(e)}), 500 |
|
|
| @app.route('/api/reset_all', methods=['POST']) |
| def reset_all(): |
| """Reset the entire evolution process from the start""" |
| try: |
| |
| evolver.generation = 0 |
| evolver.parameter_preferences = {} |
| evolver.approved_trig_sites = set() |
| evolver.liked_variants = [] |
| evolver.select_none_history = [] |
| evolver.mutation_penalties = {} |
| evolver.feedback_history = [] |
| evolver.elites = [] |
| evolver.next_node_id = 1 |
| evolver.graph = {'nodes': [], 'edges': [], 'feedback': []} |
| evolver.trig_mutation_prob_base = 1.0 |
| evolver.trig_mutation_prob_base_generation = 0 |
| evolver.jump_size_multiplier = 1.0 |
|
|
| evolver.merged_secondary_set = None |
|
|
| |
| evolver.initialize_population() |
|
|
| return jsonify({'success': True, 'generation': evolver.generation}) |
| except Exception as e: |
| return jsonify({'error': str(e)}), 500 |
|
|
| @app.route('/api/set_trig_mutation_prob', methods=['POST']) |
| def set_trig_mutation_prob(): |
| """Set trigonometric mutation probability. Sets base value and generation for auto-decay.""" |
| try: |
| data = request.get_json() |
| prob = data.get('trig_mutation_prob') |
| if prob is None: |
| |
| current_prob = evolver.get_trig_mutation_probability() |
| evolver.trig_mutation_prob_base = current_prob |
| evolver.trig_mutation_prob_base_generation = evolver.generation |
| return jsonify({'success': True, 'trig_mutation_prob': current_prob, 'auto': True}) |
| elif isinstance(prob, (int, float)) and 0.0 <= prob <= 1.0: |
| |
| evolver.trig_mutation_prob_base = float(prob) |
| evolver.trig_mutation_prob_base_generation = evolver.generation |
| return jsonify({'success': True, 'trig_mutation_prob': float(prob), 'auto': False}) |
| return jsonify({'success': False, 'error': 'Invalid probability (must be 0.0-1.0 or null for auto)'}), 400 |
| except Exception as e: |
| return jsonify({'error': str(e)}), 500 |
|
|
| @app.route('/api/set_merged_secondary', methods=['POST']) |
| def set_merged_secondary(): |
| """Set or clear the secondary equation for merge mode.""" |
| try: |
| data = request.get_json() |
| set_name = data.get('set_name') |
| if evolver.set_merged_secondary(set_name): |
| return jsonify({'success': True, 'merged_secondary_set': evolver.merged_secondary_set}) |
| return jsonify({'success': False, 'error': 'Invalid equation set name'}), 400 |
| except Exception as e: |
| return jsonify({'error': str(e)}), 500 |
|
|
| if __name__ == '__main__': |
| app.run(debug=True, host='0.0.0.0', port=5000) |
|
|