"""Waveform-backed manual cue editing helpers. The previous manual editor only exposed numeric inputs. This module adds a visual, audio-derived cue editor: it renders waveform overviews for the two tracks in a transition, overlays selected cue positions and alternative cue candidates, and returns stable cue-choice strings that can be applied back to TransitionPlan objects. The UI remains deliberately simple because Gradio event/click APIs vary across versions. The backend is still real: the waveform image is computed from the actual audio files, candidate lists are built from analysis cue objects, and manual edits become explicit cue overrides that can later be exported as training examples. """ from __future__ import annotations from dataclasses import dataclass from pathlib import Path from typing import Any, Iterable, Mapping import hashlib import math import tempfile import librosa import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt import numpy as np @dataclass(frozen=True) class CueChoice: role: str time: float label: str confidence: float source: str = "" @property def value(self) -> str: safe_label = self.label.replace("|", "/") safe_source = self.source.replace("|", "/") return f"{self.role}|{self.time:.3f}|{self.confidence:.3f}|{safe_label}|{safe_source}" @property def display(self) -> str: source = f" · {self.source}" if self.source else "" return f"{self.role} @ {self.time:.2f}s · {self.confidence:.0%} · {self.label}{source}" def parse_cue_choice(value: str | None) -> CueChoice | None: if not value: return None parts = str(value).split("|", 4) if len(parts) < 4: return None role, time_s, confidence, label = parts[:4] source = parts[4] if len(parts) > 4 else "" try: return CueChoice(role=role, time=float(time_s), confidence=float(confidence), label=label, source=source) except ValueError: return None def _cue_source(cue: Mapping[str, Any]) -> str: ev = cue.get("evidence", {}) return str(ev.get("source", "")) if isinstance(ev, Mapping) else "" def cue_choices(track: Any, role: str, *, limit: int = 12) -> list[tuple[str, str]]: """Return Gradio-compatible `(label, value)` cue choices for a role.""" aliases = { "a_out": {"mix_out", "loopable", "drop"}, "b_in": {"mix_in", "loopable"}, "b_drop": {"first_drop", "drop"}, }.get(role, {role}) cues = [c for c in getattr(track, "cue_points", []) if str(c.get("kind", c.get("type", ""))) in aliases] cues.sort(key=lambda c: (-float(c.get("confidence", 0.0) or 0.0), float(c.get("time", 0.0) or 0.0))) out: list[tuple[str, str]] = [] for cue in cues[:limit]: choice = CueChoice( role=role, time=float(cue.get("time", 0.0) or 0.0), label=str(cue.get("label", cue.get("kind", "cue"))), confidence=float(cue.get("confidence", 0.0) or 0.0), source=_cue_source(cue), ) out.append((choice.display, choice.value)) return out def default_choice(track: Any, role: str, time_s: float) -> str | None: choices = cue_choices(track, role) if not choices: return None parsed = [(label, value, parse_cue_choice(value)) for label, value in choices] parsed = [(label, value, cue) for label, value, cue in parsed if cue is not None] if not parsed: return None return min(parsed, key=lambda item: abs(item[2].time - float(time_s)))[1] def _load_preview(path: str, *, max_duration: float = 300.0, sr: int = 12000) -> tuple[np.ndarray, int]: try: y, got_sr = librosa.load(path, sr=sr, mono=True, duration=max_duration) except Exception: # A missing/corrupt file should not kill the editor. Return a visible flatline. got_sr = sr y = np.zeros(sr, dtype=np.float32) if y.size == 0: y = np.zeros(sr, dtype=np.float32) y = np.asarray(y, dtype=np.float32) peak = float(np.max(np.abs(y))) if y.size else 0.0 if peak > 0: y = y / peak return y, got_sr def _amplitude_envelope(y: np.ndarray, sr: int, *, bins: int = 1800) -> tuple[np.ndarray, np.ndarray]: n = len(y) if n == 0: return np.array([0.0]), np.array([0.0]) bins = max(64, min(bins, n)) edges = np.linspace(0, n, bins + 1, dtype=int) env = np.zeros(bins, dtype=np.float32) for i in range(bins): seg = y[edges[i]:edges[i + 1]] env[i] = float(np.max(np.abs(seg))) if len(seg) else 0.0 times = np.linspace(0.0, n / sr, bins) return times, env def _draw_track(ax: Any, track: Any, *, selected: dict[str, float], title: str) -> None: y, sr = _load_preview(getattr(track, "path", ""), max_duration=float(getattr(track, "duration", 300.0) or 300.0)) times, env = _amplitude_envelope(y, sr) ax.fill_between(times, -env, env, alpha=0.35, linewidth=0) ax.plot(times, env, linewidth=0.35) ax.plot(times, -env, linewidth=0.35) duration = float(getattr(track, "duration", times[-1] if len(times) else 0.0) or 0.0) ax.set_xlim(0, max(1.0, min(duration, times[-1] if len(times) else duration))) ax.set_ylim(-1.05, 1.05) ax.set_yticks([]) ax.set_title(title, loc="left", fontsize=10) ax.set_xlabel("seconds") # Segment spans give the user context beyond the raw waveform. for seg in getattr(track, "segments", [])[:40]: if not isinstance(seg, Mapping): continue start = float(seg.get("start", 0.0) or 0.0) end = float(seg.get("end", start) or start) label = str(seg.get("label", "section")) if end <= start: continue ax.axvspan(start, end, alpha=0.04) if end - start > 5: ax.text(start + 0.15, 0.82, label, fontsize=7, alpha=0.65) cue_palette = { "mix_in": (0.2, 0.7, 0.2), "mix_out": (0.8, 0.25, 0.2), "first_drop": (0.55, 0.2, 0.8), "drop": (0.55, 0.2, 0.8), "loopable": (0.2, 0.45, 0.85), } for cue in getattr(track, "cue_points", [])[:60]: kind = str(cue.get("kind", cue.get("type", "cue"))) t = float(cue.get("time", 0.0) or 0.0) if t < 0 or t > duration: continue conf = float(cue.get("confidence", 0.0) or 0.0) color = cue_palette.get(kind, (0.3, 0.3, 0.3)) ax.axvline(t, color=color, alpha=max(0.12, min(0.55, conf * 0.55)), linewidth=0.8) for name, t in selected.items(): ax.axvline(float(t), color="black", linewidth=2.0, alpha=0.95) ax.text(float(t), -0.92, name, rotation=90, va="bottom", ha="right", fontsize=8, fontweight="bold") def render_transition_cue_editor(track_a: Any, track_b: Any, plan: Any, *, output_dir: str | Path | None = None) -> tuple[str, str]: """Render a two-track waveform/cue overview and return `(png_path, markdown)`.""" output_dir = Path(output_dir or tempfile.gettempdir()) output_dir.mkdir(parents=True, exist_ok=True) fingerprint = hashlib.sha1( f"{getattr(track_a, 'path', '')}|{getattr(track_b, 'path', '')}|{getattr(plan, 'mix_out_point', 0)}|{getattr(plan, 'mix_in_point', 0)}|{getattr(plan, 'duration_seconds', 0)}|{getattr(plan, 'transition_type', '')}".encode() ).hexdigest()[:12] out = output_dir / f"ai-dj-cue-editor-{fingerprint}.png" selected = getattr(plan, "selected_cues", {}) or {} a_out = float(selected.get("a_out", {}).get("time", getattr(plan, "mix_out_point", 0.0))) b_in = float(selected.get("b_in", {}).get("time", getattr(plan, "mix_in_point", 0.0))) b_drop = float(selected.get("b_drop", {}).get("time", b_in + getattr(plan, "duration_seconds", 0.0))) fig, axes = plt.subplots(2, 1, figsize=(15, 5.2), constrained_layout=True) _draw_track(axes[0], track_a, selected={"A OUT": a_out}, title=f"A: {getattr(track_a, 'filename', 'track A')}") _draw_track(axes[1], track_b, selected={"B IN": b_in, "B DROP": b_drop}, title=f"B: {getattr(track_b, 'filename', 'track B')}") fig.suptitle(f"Transition cue editor · {getattr(plan, 'transition_type', 'transition')} · {getattr(plan, 'duration_beats', '?')} beats", fontsize=12) fig.savefig(out, dpi=150) plt.close(fig) summary = [ "### Waveform cue editor", "The black markers are the currently selected transition anchors. Thin colored lines are ranked cue candidates from analysis.", f"- A mix-out: **{a_out:.2f}s**", f"- B mix-in: **{b_in:.2f}s**", f"- B drop: **{b_drop:.2f}s**", f"- Transition type: `{getattr(plan, 'transition_type', 'unknown')}`", f"- Duration: **{float(getattr(plan, 'duration_seconds', 0.0)):.2f}s** / **{int(getattr(plan, 'duration_beats', 0))} beats**", ] return str(out), "\n".join(summary) def choices_for_transition(track_a: Any, track_b: Any, plan: Any) -> dict[str, Any]: """Return choice lists and defaults for the UI/backend tests.""" return { "a_choices": cue_choices(track_a, "a_out"), "b_in_choices": cue_choices(track_b, "b_in"), "b_drop_choices": cue_choices(track_b, "b_drop"), "a_default": default_choice(track_a, "a_out", float(getattr(plan, "mix_out_point", 0.0) or 0.0)), "b_in_default": default_choice(track_b, "b_in", float(getattr(plan, "mix_in_point", 0.0) or 0.0)), "b_drop_default": default_choice(track_b, "b_drop", float(getattr(plan, "mix_in_point", 0.0) + getattr(plan, "duration_seconds", 0.0))), } def apply_choices_to_plan(plan: Any, *, a_choice: str | None, b_in_choice: str | None, b_drop_choice: str | None, transition_type: str | None = None) -> tuple[float, float, float, dict[str, Any]]: """Apply cue-choice strings to a TransitionPlan-like object. Returns `(mix_out, mix_in, duration_seconds, selected_cues)` so callers can update additional derived fields such as beat count. """ a = parse_cue_choice(a_choice) b = parse_cue_choice(b_in_choice) d = parse_cue_choice(b_drop_choice) mix_out = float(a.time if a else getattr(plan, "mix_out_point", 0.0)) mix_in = float(b.time if b else getattr(plan, "mix_in_point", 0.0)) drop = float(d.time if d else mix_in + float(getattr(plan, "duration_seconds", 0.0))) duration = max(0.25, drop - mix_in) if transition_type: setattr(plan, "transition_type", transition_type) selected = { "a_out": {"kind": "mix_out", "label": a.label if a else "manual waveform value", "time": round(mix_out, 3), "confidence": a.confidence if a else 1.0, "evidence": {"source": a.source if a else "waveform_editor"}}, "b_in": {"kind": "mix_in", "label": b.label if b else "manual waveform value", "time": round(mix_in, 3), "confidence": b.confidence if b else 1.0, "evidence": {"source": b.source if b else "waveform_editor"}}, "b_drop": {"kind": "drop", "label": d.label if d else "manual waveform value", "time": round(drop, 3), "confidence": d.confidence if d else 1.0, "evidence": {"source": d.source if d else "waveform_editor"}}, } return mix_out, mix_in, duration, selected