"""Global set-order optimization over transition edges.""" from __future__ import annotations from dataclasses import dataclass from typing import Any, Callable, Mapping from transition_optimizer import choose_best_transition_edge @dataclass(frozen=True) class PairScore: src: int dst: int total: float compatibility: float edge_score: float energy_arc: float cue_confidence: float musical_quality: float def to_dict(self) -> dict[str, float | int]: return { "src": self.src, "dst": self.dst, "total": round(self.total, 3), "compatibility": round(self.compatibility, 3), "edge_score": round(self.edge_score, 3), "energy_arc": round(self.energy_arc, 3), "cue_confidence": round(self.cue_confidence, 3), "musical_quality": round(self.musical_quality, 3), } def _norm_energy(values: list[float]) -> list[float]: lo = min(values) if values else 0.0 hi = max(values) if values else 1.0 if hi <= lo: return [0.5 for _ in values] return [(v - lo) / (hi - lo) for v in values] def _target_energy(position: float) -> float: # Warm-up -> peak -> controlled release. if position <= 0.65: return 0.25 + 0.75 * (position / 0.65) return 1.0 - 0.45 * ((position - 0.65) / 0.35) def precompute_pair_scores(tracks: list[Any], compute_compatibility: Callable[[Any, Any], Mapping[str, Any]]) -> dict[tuple[int, int], PairScore]: n = len(tracks) energies = _norm_energy([float(getattr(t, "avg_energy", 0.0) or 0.0) for t in tracks]) scores: dict[tuple[int, int], PairScore] = {} for i in range(n): for j in range(n): if i == j: continue compat = compute_compatibility(tracks[i], tracks[j]) try: edge, _ = choose_best_transition_edge(tracks[i], tracks[j], compat) edge_score = float(edge.score) cue_conf = float(edge.score_breakdown.get("cue_confidence", 0.0)) musical_quality = float(getattr(edge, "musical_quality_score", edge.score_breakdown.get("musical_quality", 0.0)) or 0.0) except Exception: edge_score = float(compat.get("overall", 0.0)) * 0.5 cue_conf = 0.0 musical_quality = 0.0 # Prefer gentle upward movement early and avoid huge drops unless late. direction_bonus = 0.5 + 0.35 * max(-1.0, min(1.0, energies[j] - energies[i])) total = ( 0.34 * float(compat.get("overall", 0.0)) + 0.30 * edge_score + 0.24 * musical_quality + 0.12 * direction_bonus ) scores[(i, j)] = PairScore(i, j, total, float(compat.get("overall", 0.0)), edge_score, direction_bonus, cue_conf, musical_quality) return scores def optimize_set_order( tracks: list[Any], compute_compatibility: Callable[[Any, Any], Mapping[str, Any]], *, allow_repeat: bool = False, beam_width: int = 64, ) -> tuple[list[int], dict[str, Any]]: """Beam-search for a high-scoring Hamiltonian path. When allow_repeat=True, the optimizer emits an extended set path that still avoids immediate duplicates and adds a coverage bonus until every track has appeared at least once. """ n = len(tracks) if n <= 1: return list(range(n)), {"method": "trivial", "pair_scores": []} pair_scores = precompute_pair_scores(tracks, compute_compatibility) energies = _norm_energy([float(getattr(t, "avg_energy", 0.0) or 0.0) for t in tracks]) beams: list[tuple[list[int], float, list[PairScore]]] = [] for start in range(n): start_target = _target_energy(0.0) start_bonus = 1.0 - abs(energies[start] - start_target) beams.append(([start], 0.12 * start_bonus, [])) target_length = n if not allow_repeat else n + max(1, min(n, n // 2 or 1)) for depth in range(1, target_length): pos = depth / max(target_length - 1, 1) target = _target_energy(pos) next_beams: list[tuple[list[int], float, list[PairScore]]] = [] for order, score, edges in beams: used = set(order) current = order[-1] for cand in range(n): if cand in used and not allow_repeat: continue if cand == current: continue pair = pair_scores.get((current, cand)) if pair is None: continue arc = 1.0 - abs(energies[cand] - target) repeated_penalty = -0.25 if cand in used else 0.0 coverage_bonus = 0.18 if cand not in used else 0.0 step = pair.total + 0.18 * arc + coverage_bonus + repeated_penalty next_beams.append((order + [cand], score + step, edges + [pair])) if not next_beams: break next_beams.sort(key=lambda item: item[1], reverse=True) beams = next_beams[:beam_width] best_order, best_score, best_edges = max(beams, key=lambda item: (len(set(item[0])), len(item[0]), item[1])) return best_order, { "method": "beam_search_transition_edges" + ("_with_repeats" if allow_repeat else ""), "score": round(best_score, 3), "beam_width": beam_width, "allow_repeat": allow_repeat, "target_length": target_length, "unique_tracks": len(set(best_order)), "pair_scores": [edge.to_dict() for edge in best_edges], "energy_targets": [round(_target_energy(i / max(target_length - 1, 1)), 3) for i in range(target_length)], }