File size: 5,720 Bytes
d1b6204
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f02c67e
d1b6204
 
 
 
 
 
 
 
 
 
f02c67e
d1b6204
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f02c67e
d1b6204
 
 
f02c67e
d1b6204
 
f02c67e
 
 
 
 
 
 
d1b6204
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
"""Global set-order optimization over transition edges."""

from __future__ import annotations

from dataclasses import dataclass
from typing import Any, Callable, Mapping

from transition_optimizer import choose_best_transition_edge


@dataclass(frozen=True)
class PairScore:
    src: int
    dst: int
    total: float
    compatibility: float
    edge_score: float
    energy_arc: float
    cue_confidence: float
    musical_quality: float

    def to_dict(self) -> dict[str, float | int]:
        return {
            "src": self.src,
            "dst": self.dst,
            "total": round(self.total, 3),
            "compatibility": round(self.compatibility, 3),
            "edge_score": round(self.edge_score, 3),
            "energy_arc": round(self.energy_arc, 3),
            "cue_confidence": round(self.cue_confidence, 3),
            "musical_quality": round(self.musical_quality, 3),
        }


def _norm_energy(values: list[float]) -> list[float]:
    lo = min(values) if values else 0.0
    hi = max(values) if values else 1.0
    if hi <= lo:
        return [0.5 for _ in values]
    return [(v - lo) / (hi - lo) for v in values]


def _target_energy(position: float) -> float:
    # Warm-up -> peak -> controlled release.
    if position <= 0.65:
        return 0.25 + 0.75 * (position / 0.65)
    return 1.0 - 0.45 * ((position - 0.65) / 0.35)


def precompute_pair_scores(tracks: list[Any], compute_compatibility: Callable[[Any, Any], Mapping[str, Any]]) -> dict[tuple[int, int], PairScore]:
    n = len(tracks)
    energies = _norm_energy([float(getattr(t, "avg_energy", 0.0) or 0.0) for t in tracks])
    scores: dict[tuple[int, int], PairScore] = {}
    for i in range(n):
        for j in range(n):
            if i == j:
                continue
            compat = compute_compatibility(tracks[i], tracks[j])
            try:
                edge, _ = choose_best_transition_edge(tracks[i], tracks[j], compat)
                edge_score = float(edge.score)
                cue_conf = float(edge.score_breakdown.get("cue_confidence", 0.0))
                musical_quality = float(getattr(edge, "musical_quality_score", edge.score_breakdown.get("musical_quality", 0.0)) or 0.0)
            except Exception:
                edge_score = float(compat.get("overall", 0.0)) * 0.5
                cue_conf = 0.0
                musical_quality = 0.0
            # Prefer gentle upward movement early and avoid huge drops unless late.
            direction_bonus = 0.5 + 0.35 * max(-1.0, min(1.0, energies[j] - energies[i]))
            total = (
                0.34 * float(compat.get("overall", 0.0))
                + 0.30 * edge_score
                + 0.24 * musical_quality
                + 0.12 * direction_bonus
            )
            scores[(i, j)] = PairScore(i, j, total, float(compat.get("overall", 0.0)), edge_score, direction_bonus, cue_conf, musical_quality)
    return scores


def optimize_set_order(
    tracks: list[Any],
    compute_compatibility: Callable[[Any, Any], Mapping[str, Any]],
    *,
    allow_repeat: bool = False,
    beam_width: int = 64,
) -> tuple[list[int], dict[str, Any]]:
    """Beam-search for a high-scoring Hamiltonian path.

    When allow_repeat=True, the optimizer emits an extended set path that still
    avoids immediate duplicates and adds a coverage bonus until every track has
    appeared at least once.
    """
    n = len(tracks)
    if n <= 1:
        return list(range(n)), {"method": "trivial", "pair_scores": []}
    pair_scores = precompute_pair_scores(tracks, compute_compatibility)
    energies = _norm_energy([float(getattr(t, "avg_energy", 0.0) or 0.0) for t in tracks])

    beams: list[tuple[list[int], float, list[PairScore]]] = []
    for start in range(n):
        start_target = _target_energy(0.0)
        start_bonus = 1.0 - abs(energies[start] - start_target)
        beams.append(([start], 0.12 * start_bonus, []))

    target_length = n if not allow_repeat else n + max(1, min(n, n // 2 or 1))
    for depth in range(1, target_length):
        pos = depth / max(target_length - 1, 1)
        target = _target_energy(pos)
        next_beams: list[tuple[list[int], float, list[PairScore]]] = []
        for order, score, edges in beams:
            used = set(order)
            current = order[-1]
            for cand in range(n):
                if cand in used and not allow_repeat:
                    continue
                if cand == current:
                    continue
                pair = pair_scores.get((current, cand))
                if pair is None:
                    continue
                arc = 1.0 - abs(energies[cand] - target)
                repeated_penalty = -0.25 if cand in used else 0.0
                coverage_bonus = 0.18 if cand not in used else 0.0
                step = pair.total + 0.18 * arc + coverage_bonus + repeated_penalty
                next_beams.append((order + [cand], score + step, edges + [pair]))
        if not next_beams:
            break
        next_beams.sort(key=lambda item: item[1], reverse=True)
        beams = next_beams[:beam_width]

    best_order, best_score, best_edges = max(beams, key=lambda item: (len(set(item[0])), len(item[0]), item[1]))
    return best_order, {
        "method": "beam_search_transition_edges" + ("_with_repeats" if allow_repeat else ""),
        "score": round(best_score, 3),
        "beam_width": beam_width,
        "allow_repeat": allow_repeat,
        "target_length": target_length,
        "unique_tracks": len(set(best_order)),
        "pair_scores": [edge.to_dict() for edge in best_edges],
        "energy_targets": [round(_target_energy(i / max(target_length - 1, 1)), 3) for i in range(target_length)],
    }