ai-techno-dj / set_optimizer.py
Rik Hoffbauer
Implement musical candidate ranking and feedback-driven learning
f02c67e
"""Global set-order optimization over transition edges."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Callable, Mapping
from transition_optimizer import choose_best_transition_edge
@dataclass(frozen=True)
class PairScore:
src: int
dst: int
total: float
compatibility: float
edge_score: float
energy_arc: float
cue_confidence: float
musical_quality: float
def to_dict(self) -> dict[str, float | int]:
return {
"src": self.src,
"dst": self.dst,
"total": round(self.total, 3),
"compatibility": round(self.compatibility, 3),
"edge_score": round(self.edge_score, 3),
"energy_arc": round(self.energy_arc, 3),
"cue_confidence": round(self.cue_confidence, 3),
"musical_quality": round(self.musical_quality, 3),
}
def _norm_energy(values: list[float]) -> list[float]:
lo = min(values) if values else 0.0
hi = max(values) if values else 1.0
if hi <= lo:
return [0.5 for _ in values]
return [(v - lo) / (hi - lo) for v in values]
def _target_energy(position: float) -> float:
# Warm-up -> peak -> controlled release.
if position <= 0.65:
return 0.25 + 0.75 * (position / 0.65)
return 1.0 - 0.45 * ((position - 0.65) / 0.35)
def precompute_pair_scores(tracks: list[Any], compute_compatibility: Callable[[Any, Any], Mapping[str, Any]]) -> dict[tuple[int, int], PairScore]:
n = len(tracks)
energies = _norm_energy([float(getattr(t, "avg_energy", 0.0) or 0.0) for t in tracks])
scores: dict[tuple[int, int], PairScore] = {}
for i in range(n):
for j in range(n):
if i == j:
continue
compat = compute_compatibility(tracks[i], tracks[j])
try:
edge, _ = choose_best_transition_edge(tracks[i], tracks[j], compat)
edge_score = float(edge.score)
cue_conf = float(edge.score_breakdown.get("cue_confidence", 0.0))
musical_quality = float(getattr(edge, "musical_quality_score", edge.score_breakdown.get("musical_quality", 0.0)) or 0.0)
except Exception:
edge_score = float(compat.get("overall", 0.0)) * 0.5
cue_conf = 0.0
musical_quality = 0.0
# Prefer gentle upward movement early and avoid huge drops unless late.
direction_bonus = 0.5 + 0.35 * max(-1.0, min(1.0, energies[j] - energies[i]))
total = (
0.34 * float(compat.get("overall", 0.0))
+ 0.30 * edge_score
+ 0.24 * musical_quality
+ 0.12 * direction_bonus
)
scores[(i, j)] = PairScore(i, j, total, float(compat.get("overall", 0.0)), edge_score, direction_bonus, cue_conf, musical_quality)
return scores
def optimize_set_order(
tracks: list[Any],
compute_compatibility: Callable[[Any, Any], Mapping[str, Any]],
*,
allow_repeat: bool = False,
beam_width: int = 64,
) -> tuple[list[int], dict[str, Any]]:
"""Beam-search for a high-scoring Hamiltonian path.
When allow_repeat=True, the optimizer emits an extended set path that still
avoids immediate duplicates and adds a coverage bonus until every track has
appeared at least once.
"""
n = len(tracks)
if n <= 1:
return list(range(n)), {"method": "trivial", "pair_scores": []}
pair_scores = precompute_pair_scores(tracks, compute_compatibility)
energies = _norm_energy([float(getattr(t, "avg_energy", 0.0) or 0.0) for t in tracks])
beams: list[tuple[list[int], float, list[PairScore]]] = []
for start in range(n):
start_target = _target_energy(0.0)
start_bonus = 1.0 - abs(energies[start] - start_target)
beams.append(([start], 0.12 * start_bonus, []))
target_length = n if not allow_repeat else n + max(1, min(n, n // 2 or 1))
for depth in range(1, target_length):
pos = depth / max(target_length - 1, 1)
target = _target_energy(pos)
next_beams: list[tuple[list[int], float, list[PairScore]]] = []
for order, score, edges in beams:
used = set(order)
current = order[-1]
for cand in range(n):
if cand in used and not allow_repeat:
continue
if cand == current:
continue
pair = pair_scores.get((current, cand))
if pair is None:
continue
arc = 1.0 - abs(energies[cand] - target)
repeated_penalty = -0.25 if cand in used else 0.0
coverage_bonus = 0.18 if cand not in used else 0.0
step = pair.total + 0.18 * arc + coverage_bonus + repeated_penalty
next_beams.append((order + [cand], score + step, edges + [pair]))
if not next_beams:
break
next_beams.sort(key=lambda item: item[1], reverse=True)
beams = next_beams[:beam_width]
best_order, best_score, best_edges = max(beams, key=lambda item: (len(set(item[0])), len(item[0]), item[1]))
return best_order, {
"method": "beam_search_transition_edges" + ("_with_repeats" if allow_repeat else ""),
"score": round(best_score, 3),
"beam_width": beam_width,
"allow_repeat": allow_repeat,
"target_length": target_length,
"unique_tracks": len(set(best_order)),
"pair_scores": [edge.to_dict() for edge in best_edges],
"energy_targets": [round(_target_energy(i / max(target_length - 1, 1)), 3) for i in range(target_length)],
}