Spaces:
Sleeping
Sleeping
| """Global set-order optimization over transition edges.""" | |
| from __future__ import annotations | |
| from dataclasses import dataclass | |
| from typing import Any, Callable, Mapping | |
| from transition_optimizer import choose_best_transition_edge | |
| class PairScore: | |
| src: int | |
| dst: int | |
| total: float | |
| compatibility: float | |
| edge_score: float | |
| energy_arc: float | |
| cue_confidence: float | |
| musical_quality: float | |
| def to_dict(self) -> dict[str, float | int]: | |
| return { | |
| "src": self.src, | |
| "dst": self.dst, | |
| "total": round(self.total, 3), | |
| "compatibility": round(self.compatibility, 3), | |
| "edge_score": round(self.edge_score, 3), | |
| "energy_arc": round(self.energy_arc, 3), | |
| "cue_confidence": round(self.cue_confidence, 3), | |
| "musical_quality": round(self.musical_quality, 3), | |
| } | |
| def _norm_energy(values: list[float]) -> list[float]: | |
| lo = min(values) if values else 0.0 | |
| hi = max(values) if values else 1.0 | |
| if hi <= lo: | |
| return [0.5 for _ in values] | |
| return [(v - lo) / (hi - lo) for v in values] | |
| def _target_energy(position: float) -> float: | |
| # Warm-up -> peak -> controlled release. | |
| if position <= 0.65: | |
| return 0.25 + 0.75 * (position / 0.65) | |
| return 1.0 - 0.45 * ((position - 0.65) / 0.35) | |
| def precompute_pair_scores(tracks: list[Any], compute_compatibility: Callable[[Any, Any], Mapping[str, Any]]) -> dict[tuple[int, int], PairScore]: | |
| n = len(tracks) | |
| energies = _norm_energy([float(getattr(t, "avg_energy", 0.0) or 0.0) for t in tracks]) | |
| scores: dict[tuple[int, int], PairScore] = {} | |
| for i in range(n): | |
| for j in range(n): | |
| if i == j: | |
| continue | |
| compat = compute_compatibility(tracks[i], tracks[j]) | |
| try: | |
| edge, _ = choose_best_transition_edge(tracks[i], tracks[j], compat) | |
| edge_score = float(edge.score) | |
| cue_conf = float(edge.score_breakdown.get("cue_confidence", 0.0)) | |
| musical_quality = float(getattr(edge, "musical_quality_score", edge.score_breakdown.get("musical_quality", 0.0)) or 0.0) | |
| except Exception: | |
| edge_score = float(compat.get("overall", 0.0)) * 0.5 | |
| cue_conf = 0.0 | |
| musical_quality = 0.0 | |
| # Prefer gentle upward movement early and avoid huge drops unless late. | |
| direction_bonus = 0.5 + 0.35 * max(-1.0, min(1.0, energies[j] - energies[i])) | |
| total = ( | |
| 0.34 * float(compat.get("overall", 0.0)) | |
| + 0.30 * edge_score | |
| + 0.24 * musical_quality | |
| + 0.12 * direction_bonus | |
| ) | |
| scores[(i, j)] = PairScore(i, j, total, float(compat.get("overall", 0.0)), edge_score, direction_bonus, cue_conf, musical_quality) | |
| return scores | |
| def optimize_set_order( | |
| tracks: list[Any], | |
| compute_compatibility: Callable[[Any, Any], Mapping[str, Any]], | |
| *, | |
| allow_repeat: bool = False, | |
| beam_width: int = 64, | |
| ) -> tuple[list[int], dict[str, Any]]: | |
| """Beam-search for a high-scoring Hamiltonian path. | |
| When allow_repeat=True, the optimizer emits an extended set path that still | |
| avoids immediate duplicates and adds a coverage bonus until every track has | |
| appeared at least once. | |
| """ | |
| n = len(tracks) | |
| if n <= 1: | |
| return list(range(n)), {"method": "trivial", "pair_scores": []} | |
| pair_scores = precompute_pair_scores(tracks, compute_compatibility) | |
| energies = _norm_energy([float(getattr(t, "avg_energy", 0.0) or 0.0) for t in tracks]) | |
| beams: list[tuple[list[int], float, list[PairScore]]] = [] | |
| for start in range(n): | |
| start_target = _target_energy(0.0) | |
| start_bonus = 1.0 - abs(energies[start] - start_target) | |
| beams.append(([start], 0.12 * start_bonus, [])) | |
| target_length = n if not allow_repeat else n + max(1, min(n, n // 2 or 1)) | |
| for depth in range(1, target_length): | |
| pos = depth / max(target_length - 1, 1) | |
| target = _target_energy(pos) | |
| next_beams: list[tuple[list[int], float, list[PairScore]]] = [] | |
| for order, score, edges in beams: | |
| used = set(order) | |
| current = order[-1] | |
| for cand in range(n): | |
| if cand in used and not allow_repeat: | |
| continue | |
| if cand == current: | |
| continue | |
| pair = pair_scores.get((current, cand)) | |
| if pair is None: | |
| continue | |
| arc = 1.0 - abs(energies[cand] - target) | |
| repeated_penalty = -0.25 if cand in used else 0.0 | |
| coverage_bonus = 0.18 if cand not in used else 0.0 | |
| step = pair.total + 0.18 * arc + coverage_bonus + repeated_penalty | |
| next_beams.append((order + [cand], score + step, edges + [pair])) | |
| if not next_beams: | |
| break | |
| next_beams.sort(key=lambda item: item[1], reverse=True) | |
| beams = next_beams[:beam_width] | |
| best_order, best_score, best_edges = max(beams, key=lambda item: (len(set(item[0])), len(item[0]), item[1])) | |
| return best_order, { | |
| "method": "beam_search_transition_edges" + ("_with_repeats" if allow_repeat else ""), | |
| "score": round(best_score, 3), | |
| "beam_width": beam_width, | |
| "allow_repeat": allow_repeat, | |
| "target_length": target_length, | |
| "unique_tracks": len(set(best_order)), | |
| "pair_scores": [edge.to_dict() for edge in best_edges], | |
| "energy_targets": [round(_target_energy(i / max(target_length - 1, 1)), 3) for i in range(target_length)], | |
| } | |