File size: 6,238 Bytes
639f871
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
"""
ga/operators.py — Operatori genetici: crossover, mutation, selection.
Tutti gli operatori lavorano su copie degli individui senza modificare
i genitori (operazioni pure).
"""
from __future__ import annotations
import random
from core.models import Individual, PoI


# ---------------------------------------------------------------------------
# SELECTION
# ---------------------------------------------------------------------------

def tournament_select(
    population: list[Individual],
    k:          int = 3,
    use_pareto: bool = True,
) -> Individual:
    """
    Selezione torneo: estrae k individui casuali e restituisce il migliore.
    Con use_pareto=True preferisce rango Pareto basso + crowding alto.
    """
    contestants = random.sample(population, k)
    if use_pareto:
        return min(contestants, key=lambda x: (x.fitness.rank, -x.fitness.crowd))
    return max(contestants, key=lambda x: x.fitness.scalar)


# ---------------------------------------------------------------------------
# CROSSOVER
# ---------------------------------------------------------------------------

def order_crossover(parent1: Individual, parent2: Individual) -> tuple[Individual, Individual]:
    """
    Order Crossover (OX) adattato a tour a lunghezza variabile.
    Preserva l'ordine relativo dei PoI condivisi tra i genitori.
    """
    g1, g2 = parent1.genes, parent2.genes
    if len(g1) < 2 or len(g2) < 2:
        return parent1.clone(), parent2.clone()

    def ox(donor: list[PoI], receiver: list[PoI]) -> list[PoI]:
        if len(donor) < 2:
            return list(donor)
        cut1 = random.randint(0, len(donor) - 1)
        cut2 = random.randint(cut1, len(donor) - 1)
        segment  = donor[cut1:cut2+1]
        seg_ids  = {p.id for p in segment}
        # Riempi con i PoI del receiver non già nel segmento
        filling  = [p for p in receiver if p.id not in seg_ids]
        child    = filling[:cut1] + segment + filling[cut1:]
        return child

    return (
        Individual(genes=ox(g1, g2)),
        Individual(genes=ox(g2, g1)),
    )


def poi_aware_crossover(
    parent1:    Individual,
    parent2:    Individual,
    categories: list[str] | None = None,
) -> tuple[Individual, Individual]:
    """
    PoI-aware Crossover: scambia interi sottoinsiemi per categoria.
    Utile per preservare "nicchie tematiche" (es. tour musei vs tour gastronomico).
    """
    from core.models import PoICategory

    if categories is None:
        categories = [c.value for c in PoICategory]

    def by_category(genes: list[PoI]) -> dict[str, list[PoI]]:
        d: dict[str, list[PoI]] = {c: [] for c in categories}
        for p in genes:
            d[p.category.value].append(p)
        return d

    cat1 = by_category(parent1.genes)
    cat2 = by_category(parent2.genes)

    child1_genes, child2_genes = [], []

    for cat in categories:
        # Con probabilità 0.5 scambia le categorie tra i figli
        if random.random() < 0.5:
            child1_genes.extend(cat1[cat])
            child2_genes.extend(cat2[cat])
        else:
            child1_genes.extend(cat2[cat])
            child2_genes.extend(cat1[cat])

    # Rimuovi duplicati mantenendo l'ordine
    def deduplicate(genes: list[PoI]) -> list[PoI]:
        seen, result = set(), []
        for p in genes:
            if p.id not in seen:
                seen.add(p.id)
                result.append(p)
        return result

    return (
        Individual(genes=deduplicate(child1_genes)),
        Individual(genes=deduplicate(child2_genes)),
    )


# ---------------------------------------------------------------------------
# MUTATION
# ---------------------------------------------------------------------------

def mutate(
    individual: Individual,
    poi_pool:   list[PoI],
    mut_prob:   float = 0.15,
) -> Individual:
    """
    Seleziona casualmente uno degli operatori di mutazione.
    Applicato con probabilità mut_prob.
    """
    if random.random() > mut_prob or not individual.genes:
        return individual

    operator = random.choice([
        swap_mutation,
        insert_mutation,
        reverse_segment_mutation,
        lambda ind, pool: add_remove_mutation(ind, pool),
    ])
    result = operator(individual.clone(), poi_pool)
    result.invalidate_cache()
    return result


def swap_mutation(individual: Individual, _pool: list[PoI]) -> Individual:
    """Scambia due PoI scelti casualmente nella sequenza."""
    g = individual.genes
    if len(g) < 2:
        return individual
    i, j = random.sample(range(len(g)), 2)
    g[i], g[j] = g[j], g[i]
    return individual


def insert_mutation(individual: Individual, _pool: list[PoI]) -> Individual:
    """Rimuove un PoI da una posizione e lo inserisce altrove."""
    g = individual.genes
    if len(g) < 2:
        return individual
    i = random.randrange(len(g))
    poi = g.pop(i)
    j = random.randint(0, len(g))
    g.insert(j, poi)
    return individual


def reverse_segment_mutation(individual: Individual, _pool: list[PoI]) -> Individual:
    """Inverte un sottosegmento casuale del tour (elimina incroci geografici)."""
    g = individual.genes
    if len(g) < 3:
        return individual
    i = random.randint(0, len(g) - 2)
    j = random.randint(i + 1, len(g) - 1)
    g[i:j+1] = reversed(g[i:j+1])
    return individual


def add_remove_mutation(individual: Individual, pool: list[PoI]) -> Individual:
    """
    Con prob 0.70: aggiunge un PoI casuale non ancora nel tour (esplora).
    Con prob 0.30: rimuove il PoI con il peggior score/durata (semplifica).
    Il bias verso l'aggiunta contrasta la tendenza del GA a produrre
    tour sempre più corti dopo molte generazioni di mutazione.
    """
    g        = individual.genes
    visited  = {p.id for p in g}

    if random.random() < 0.70:   # era 0.50: bias verso aggiunta
        candidates = [p for p in pool if p.id not in visited]
        if candidates:
            new_poi = random.choice(candidates)
            pos     = random.randint(0, len(g))
            g.insert(pos, new_poi)
    else:
        if g:
            worst = min(g, key=lambda p: p.score / (p.visit_duration + 1e-9))
            g.remove(worst)

    return individual