Spaces:
Sleeping
Sleeping
File size: 6,238 Bytes
639f871 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 | """
ga/operators.py — Operatori genetici: crossover, mutation, selection.
Tutti gli operatori lavorano su copie degli individui senza modificare
i genitori (operazioni pure).
"""
from __future__ import annotations
import random
from core.models import Individual, PoI
# ---------------------------------------------------------------------------
# SELECTION
# ---------------------------------------------------------------------------
def tournament_select(
population: list[Individual],
k: int = 3,
use_pareto: bool = True,
) -> Individual:
"""
Selezione torneo: estrae k individui casuali e restituisce il migliore.
Con use_pareto=True preferisce rango Pareto basso + crowding alto.
"""
contestants = random.sample(population, k)
if use_pareto:
return min(contestants, key=lambda x: (x.fitness.rank, -x.fitness.crowd))
return max(contestants, key=lambda x: x.fitness.scalar)
# ---------------------------------------------------------------------------
# CROSSOVER
# ---------------------------------------------------------------------------
def order_crossover(parent1: Individual, parent2: Individual) -> tuple[Individual, Individual]:
"""
Order Crossover (OX) adattato a tour a lunghezza variabile.
Preserva l'ordine relativo dei PoI condivisi tra i genitori.
"""
g1, g2 = parent1.genes, parent2.genes
if len(g1) < 2 or len(g2) < 2:
return parent1.clone(), parent2.clone()
def ox(donor: list[PoI], receiver: list[PoI]) -> list[PoI]:
if len(donor) < 2:
return list(donor)
cut1 = random.randint(0, len(donor) - 1)
cut2 = random.randint(cut1, len(donor) - 1)
segment = donor[cut1:cut2+1]
seg_ids = {p.id for p in segment}
# Riempi con i PoI del receiver non già nel segmento
filling = [p for p in receiver if p.id not in seg_ids]
child = filling[:cut1] + segment + filling[cut1:]
return child
return (
Individual(genes=ox(g1, g2)),
Individual(genes=ox(g2, g1)),
)
def poi_aware_crossover(
parent1: Individual,
parent2: Individual,
categories: list[str] | None = None,
) -> tuple[Individual, Individual]:
"""
PoI-aware Crossover: scambia interi sottoinsiemi per categoria.
Utile per preservare "nicchie tematiche" (es. tour musei vs tour gastronomico).
"""
from core.models import PoICategory
if categories is None:
categories = [c.value for c in PoICategory]
def by_category(genes: list[PoI]) -> dict[str, list[PoI]]:
d: dict[str, list[PoI]] = {c: [] for c in categories}
for p in genes:
d[p.category.value].append(p)
return d
cat1 = by_category(parent1.genes)
cat2 = by_category(parent2.genes)
child1_genes, child2_genes = [], []
for cat in categories:
# Con probabilità 0.5 scambia le categorie tra i figli
if random.random() < 0.5:
child1_genes.extend(cat1[cat])
child2_genes.extend(cat2[cat])
else:
child1_genes.extend(cat2[cat])
child2_genes.extend(cat1[cat])
# Rimuovi duplicati mantenendo l'ordine
def deduplicate(genes: list[PoI]) -> list[PoI]:
seen, result = set(), []
for p in genes:
if p.id not in seen:
seen.add(p.id)
result.append(p)
return result
return (
Individual(genes=deduplicate(child1_genes)),
Individual(genes=deduplicate(child2_genes)),
)
# ---------------------------------------------------------------------------
# MUTATION
# ---------------------------------------------------------------------------
def mutate(
individual: Individual,
poi_pool: list[PoI],
mut_prob: float = 0.15,
) -> Individual:
"""
Seleziona casualmente uno degli operatori di mutazione.
Applicato con probabilità mut_prob.
"""
if random.random() > mut_prob or not individual.genes:
return individual
operator = random.choice([
swap_mutation,
insert_mutation,
reverse_segment_mutation,
lambda ind, pool: add_remove_mutation(ind, pool),
])
result = operator(individual.clone(), poi_pool)
result.invalidate_cache()
return result
def swap_mutation(individual: Individual, _pool: list[PoI]) -> Individual:
"""Scambia due PoI scelti casualmente nella sequenza."""
g = individual.genes
if len(g) < 2:
return individual
i, j = random.sample(range(len(g)), 2)
g[i], g[j] = g[j], g[i]
return individual
def insert_mutation(individual: Individual, _pool: list[PoI]) -> Individual:
"""Rimuove un PoI da una posizione e lo inserisce altrove."""
g = individual.genes
if len(g) < 2:
return individual
i = random.randrange(len(g))
poi = g.pop(i)
j = random.randint(0, len(g))
g.insert(j, poi)
return individual
def reverse_segment_mutation(individual: Individual, _pool: list[PoI]) -> Individual:
"""Inverte un sottosegmento casuale del tour (elimina incroci geografici)."""
g = individual.genes
if len(g) < 3:
return individual
i = random.randint(0, len(g) - 2)
j = random.randint(i + 1, len(g) - 1)
g[i:j+1] = reversed(g[i:j+1])
return individual
def add_remove_mutation(individual: Individual, pool: list[PoI]) -> Individual:
"""
Con prob 0.70: aggiunge un PoI casuale non ancora nel tour (esplora).
Con prob 0.30: rimuove il PoI con il peggior score/durata (semplifica).
Il bias verso l'aggiunta contrasta la tendenza del GA a produrre
tour sempre più corti dopo molte generazioni di mutazione.
"""
g = individual.genes
visited = {p.id for p in g}
if random.random() < 0.70: # era 0.50: bias verso aggiunta
candidates = [p for p in pool if p.id not in visited]
if candidates:
new_poi = random.choice(candidates)
pos = random.randint(0, len(g))
g.insert(pos, new_poi)
else:
if g:
worst = min(g, key=lambda p: p.score / (p.visit_duration + 1e-9))
g.remove(worst)
return individual |