Spaces:
Sleeping
Sleeping
File size: 5,720 Bytes
d1b6204 f02c67e d1b6204 f02c67e d1b6204 f02c67e d1b6204 f02c67e d1b6204 f02c67e d1b6204 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 | """Global set-order optimization over transition edges."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Callable, Mapping
from transition_optimizer import choose_best_transition_edge
@dataclass(frozen=True)
class PairScore:
src: int
dst: int
total: float
compatibility: float
edge_score: float
energy_arc: float
cue_confidence: float
musical_quality: float
def to_dict(self) -> dict[str, float | int]:
return {
"src": self.src,
"dst": self.dst,
"total": round(self.total, 3),
"compatibility": round(self.compatibility, 3),
"edge_score": round(self.edge_score, 3),
"energy_arc": round(self.energy_arc, 3),
"cue_confidence": round(self.cue_confidence, 3),
"musical_quality": round(self.musical_quality, 3),
}
def _norm_energy(values: list[float]) -> list[float]:
lo = min(values) if values else 0.0
hi = max(values) if values else 1.0
if hi <= lo:
return [0.5 for _ in values]
return [(v - lo) / (hi - lo) for v in values]
def _target_energy(position: float) -> float:
# Warm-up -> peak -> controlled release.
if position <= 0.65:
return 0.25 + 0.75 * (position / 0.65)
return 1.0 - 0.45 * ((position - 0.65) / 0.35)
def precompute_pair_scores(tracks: list[Any], compute_compatibility: Callable[[Any, Any], Mapping[str, Any]]) -> dict[tuple[int, int], PairScore]:
n = len(tracks)
energies = _norm_energy([float(getattr(t, "avg_energy", 0.0) or 0.0) for t in tracks])
scores: dict[tuple[int, int], PairScore] = {}
for i in range(n):
for j in range(n):
if i == j:
continue
compat = compute_compatibility(tracks[i], tracks[j])
try:
edge, _ = choose_best_transition_edge(tracks[i], tracks[j], compat)
edge_score = float(edge.score)
cue_conf = float(edge.score_breakdown.get("cue_confidence", 0.0))
musical_quality = float(getattr(edge, "musical_quality_score", edge.score_breakdown.get("musical_quality", 0.0)) or 0.0)
except Exception:
edge_score = float(compat.get("overall", 0.0)) * 0.5
cue_conf = 0.0
musical_quality = 0.0
# Prefer gentle upward movement early and avoid huge drops unless late.
direction_bonus = 0.5 + 0.35 * max(-1.0, min(1.0, energies[j] - energies[i]))
total = (
0.34 * float(compat.get("overall", 0.0))
+ 0.30 * edge_score
+ 0.24 * musical_quality
+ 0.12 * direction_bonus
)
scores[(i, j)] = PairScore(i, j, total, float(compat.get("overall", 0.0)), edge_score, direction_bonus, cue_conf, musical_quality)
return scores
def optimize_set_order(
tracks: list[Any],
compute_compatibility: Callable[[Any, Any], Mapping[str, Any]],
*,
allow_repeat: bool = False,
beam_width: int = 64,
) -> tuple[list[int], dict[str, Any]]:
"""Beam-search for a high-scoring Hamiltonian path.
When allow_repeat=True, the optimizer emits an extended set path that still
avoids immediate duplicates and adds a coverage bonus until every track has
appeared at least once.
"""
n = len(tracks)
if n <= 1:
return list(range(n)), {"method": "trivial", "pair_scores": []}
pair_scores = precompute_pair_scores(tracks, compute_compatibility)
energies = _norm_energy([float(getattr(t, "avg_energy", 0.0) or 0.0) for t in tracks])
beams: list[tuple[list[int], float, list[PairScore]]] = []
for start in range(n):
start_target = _target_energy(0.0)
start_bonus = 1.0 - abs(energies[start] - start_target)
beams.append(([start], 0.12 * start_bonus, []))
target_length = n if not allow_repeat else n + max(1, min(n, n // 2 or 1))
for depth in range(1, target_length):
pos = depth / max(target_length - 1, 1)
target = _target_energy(pos)
next_beams: list[tuple[list[int], float, list[PairScore]]] = []
for order, score, edges in beams:
used = set(order)
current = order[-1]
for cand in range(n):
if cand in used and not allow_repeat:
continue
if cand == current:
continue
pair = pair_scores.get((current, cand))
if pair is None:
continue
arc = 1.0 - abs(energies[cand] - target)
repeated_penalty = -0.25 if cand in used else 0.0
coverage_bonus = 0.18 if cand not in used else 0.0
step = pair.total + 0.18 * arc + coverage_bonus + repeated_penalty
next_beams.append((order + [cand], score + step, edges + [pair]))
if not next_beams:
break
next_beams.sort(key=lambda item: item[1], reverse=True)
beams = next_beams[:beam_width]
best_order, best_score, best_edges = max(beams, key=lambda item: (len(set(item[0])), len(item[0]), item[1]))
return best_order, {
"method": "beam_search_transition_edges" + ("_with_repeats" if allow_repeat else ""),
"score": round(best_score, 3),
"beam_width": beam_width,
"allow_repeat": allow_repeat,
"target_length": target_length,
"unique_tracks": len(set(best_order)),
"pair_scores": [edge.to_dict() for edge in best_edges],
"energy_targets": [round(_target_energy(i / max(target_length - 1, 1)), 3) for i in range(target_length)],
}
|