Spaces:
Sleeping
Sleeping
| """Waveform-backed manual cue editing helpers. | |
| The previous manual editor only exposed numeric inputs. This module adds a | |
| visual, audio-derived cue editor: it renders waveform overviews for the two | |
| tracks in a transition, overlays selected cue positions and alternative cue | |
| candidates, and returns stable cue-choice strings that can be applied back to | |
| TransitionPlan objects. | |
| The UI remains deliberately simple because Gradio event/click APIs vary across | |
| versions. The backend is still real: the waveform image is computed from the | |
| actual audio files, candidate lists are built from analysis cue objects, and | |
| manual edits become explicit cue overrides that can later be exported as | |
| training examples. | |
| """ | |
| from __future__ import annotations | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Any, Iterable, Mapping | |
| import hashlib | |
| import math | |
| import tempfile | |
| import librosa | |
| import matplotlib | |
| matplotlib.use("Agg") | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| class CueChoice: | |
| role: str | |
| time: float | |
| label: str | |
| confidence: float | |
| source: str = "" | |
| def value(self) -> str: | |
| safe_label = self.label.replace("|", "/") | |
| safe_source = self.source.replace("|", "/") | |
| return f"{self.role}|{self.time:.3f}|{self.confidence:.3f}|{safe_label}|{safe_source}" | |
| def display(self) -> str: | |
| source = f" · {self.source}" if self.source else "" | |
| return f"{self.role} @ {self.time:.2f}s · {self.confidence:.0%} · {self.label}{source}" | |
| def parse_cue_choice(value: str | None) -> CueChoice | None: | |
| if not value: | |
| return None | |
| parts = str(value).split("|", 4) | |
| if len(parts) < 4: | |
| return None | |
| role, time_s, confidence, label = parts[:4] | |
| source = parts[4] if len(parts) > 4 else "" | |
| try: | |
| return CueChoice(role=role, time=float(time_s), confidence=float(confidence), label=label, source=source) | |
| except ValueError: | |
| return None | |
| def _cue_source(cue: Mapping[str, Any]) -> str: | |
| ev = cue.get("evidence", {}) | |
| return str(ev.get("source", "")) if isinstance(ev, Mapping) else "" | |
| def cue_choices(track: Any, role: str, *, limit: int = 12) -> list[tuple[str, str]]: | |
| """Return Gradio-compatible `(label, value)` cue choices for a role.""" | |
| aliases = { | |
| "a_out": {"mix_out", "loopable", "drop"}, | |
| "b_in": {"mix_in", "loopable"}, | |
| "b_drop": {"first_drop", "drop"}, | |
| }.get(role, {role}) | |
| cues = [c for c in getattr(track, "cue_points", []) if str(c.get("kind", c.get("type", ""))) in aliases] | |
| cues.sort(key=lambda c: (-float(c.get("confidence", 0.0) or 0.0), float(c.get("time", 0.0) or 0.0))) | |
| out: list[tuple[str, str]] = [] | |
| for cue in cues[:limit]: | |
| choice = CueChoice( | |
| role=role, | |
| time=float(cue.get("time", 0.0) or 0.0), | |
| label=str(cue.get("label", cue.get("kind", "cue"))), | |
| confidence=float(cue.get("confidence", 0.0) or 0.0), | |
| source=_cue_source(cue), | |
| ) | |
| out.append((choice.display, choice.value)) | |
| return out | |
| def default_choice(track: Any, role: str, time_s: float) -> str | None: | |
| choices = cue_choices(track, role) | |
| if not choices: | |
| return None | |
| parsed = [(label, value, parse_cue_choice(value)) for label, value in choices] | |
| parsed = [(label, value, cue) for label, value, cue in parsed if cue is not None] | |
| if not parsed: | |
| return None | |
| return min(parsed, key=lambda item: abs(item[2].time - float(time_s)))[1] | |
| def _load_preview(path: str, *, max_duration: float = 300.0, sr: int = 12000) -> tuple[np.ndarray, int]: | |
| try: | |
| y, got_sr = librosa.load(path, sr=sr, mono=True, duration=max_duration) | |
| except Exception: | |
| # A missing/corrupt file should not kill the editor. Return a visible flatline. | |
| got_sr = sr | |
| y = np.zeros(sr, dtype=np.float32) | |
| if y.size == 0: | |
| y = np.zeros(sr, dtype=np.float32) | |
| y = np.asarray(y, dtype=np.float32) | |
| peak = float(np.max(np.abs(y))) if y.size else 0.0 | |
| if peak > 0: | |
| y = y / peak | |
| return y, got_sr | |
| def _amplitude_envelope(y: np.ndarray, sr: int, *, bins: int = 1800) -> tuple[np.ndarray, np.ndarray]: | |
| n = len(y) | |
| if n == 0: | |
| return np.array([0.0]), np.array([0.0]) | |
| bins = max(64, min(bins, n)) | |
| edges = np.linspace(0, n, bins + 1, dtype=int) | |
| env = np.zeros(bins, dtype=np.float32) | |
| for i in range(bins): | |
| seg = y[edges[i]:edges[i + 1]] | |
| env[i] = float(np.max(np.abs(seg))) if len(seg) else 0.0 | |
| times = np.linspace(0.0, n / sr, bins) | |
| return times, env | |
| def _draw_track(ax: Any, track: Any, *, selected: dict[str, float], title: str) -> None: | |
| y, sr = _load_preview(getattr(track, "path", ""), max_duration=float(getattr(track, "duration", 300.0) or 300.0)) | |
| times, env = _amplitude_envelope(y, sr) | |
| ax.fill_between(times, -env, env, alpha=0.35, linewidth=0) | |
| ax.plot(times, env, linewidth=0.35) | |
| ax.plot(times, -env, linewidth=0.35) | |
| duration = float(getattr(track, "duration", times[-1] if len(times) else 0.0) or 0.0) | |
| ax.set_xlim(0, max(1.0, min(duration, times[-1] if len(times) else duration))) | |
| ax.set_ylim(-1.05, 1.05) | |
| ax.set_yticks([]) | |
| ax.set_title(title, loc="left", fontsize=10) | |
| ax.set_xlabel("seconds") | |
| # Segment spans give the user context beyond the raw waveform. | |
| for seg in getattr(track, "segments", [])[:40]: | |
| if not isinstance(seg, Mapping): | |
| continue | |
| start = float(seg.get("start", 0.0) or 0.0) | |
| end = float(seg.get("end", start) or start) | |
| label = str(seg.get("label", "section")) | |
| if end <= start: | |
| continue | |
| ax.axvspan(start, end, alpha=0.04) | |
| if end - start > 5: | |
| ax.text(start + 0.15, 0.82, label, fontsize=7, alpha=0.65) | |
| cue_palette = { | |
| "mix_in": (0.2, 0.7, 0.2), | |
| "mix_out": (0.8, 0.25, 0.2), | |
| "first_drop": (0.55, 0.2, 0.8), | |
| "drop": (0.55, 0.2, 0.8), | |
| "loopable": (0.2, 0.45, 0.85), | |
| } | |
| for cue in getattr(track, "cue_points", [])[:60]: | |
| kind = str(cue.get("kind", cue.get("type", "cue"))) | |
| t = float(cue.get("time", 0.0) or 0.0) | |
| if t < 0 or t > duration: | |
| continue | |
| conf = float(cue.get("confidence", 0.0) or 0.0) | |
| color = cue_palette.get(kind, (0.3, 0.3, 0.3)) | |
| ax.axvline(t, color=color, alpha=max(0.12, min(0.55, conf * 0.55)), linewidth=0.8) | |
| for name, t in selected.items(): | |
| ax.axvline(float(t), color="black", linewidth=2.0, alpha=0.95) | |
| ax.text(float(t), -0.92, name, rotation=90, va="bottom", ha="right", fontsize=8, fontweight="bold") | |
| def render_transition_cue_editor(track_a: Any, track_b: Any, plan: Any, *, output_dir: str | Path | None = None) -> tuple[str, str]: | |
| """Render a two-track waveform/cue overview and return `(png_path, markdown)`.""" | |
| output_dir = Path(output_dir or tempfile.gettempdir()) | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| fingerprint = hashlib.sha1( | |
| f"{getattr(track_a, 'path', '')}|{getattr(track_b, 'path', '')}|{getattr(plan, 'mix_out_point', 0)}|{getattr(plan, 'mix_in_point', 0)}|{getattr(plan, 'duration_seconds', 0)}|{getattr(plan, 'transition_type', '')}".encode() | |
| ).hexdigest()[:12] | |
| out = output_dir / f"ai-dj-cue-editor-{fingerprint}.png" | |
| selected = getattr(plan, "selected_cues", {}) or {} | |
| a_out = float(selected.get("a_out", {}).get("time", getattr(plan, "mix_out_point", 0.0))) | |
| b_in = float(selected.get("b_in", {}).get("time", getattr(plan, "mix_in_point", 0.0))) | |
| b_drop = float(selected.get("b_drop", {}).get("time", b_in + getattr(plan, "duration_seconds", 0.0))) | |
| fig, axes = plt.subplots(2, 1, figsize=(15, 5.2), constrained_layout=True) | |
| _draw_track(axes[0], track_a, selected={"A OUT": a_out}, title=f"A: {getattr(track_a, 'filename', 'track A')}") | |
| _draw_track(axes[1], track_b, selected={"B IN": b_in, "B DROP": b_drop}, title=f"B: {getattr(track_b, 'filename', 'track B')}") | |
| fig.suptitle(f"Transition cue editor · {getattr(plan, 'transition_type', 'transition')} · {getattr(plan, 'duration_beats', '?')} beats", fontsize=12) | |
| fig.savefig(out, dpi=150) | |
| plt.close(fig) | |
| summary = [ | |
| "### Waveform cue editor", | |
| "The black markers are the currently selected transition anchors. Thin colored lines are ranked cue candidates from analysis.", | |
| f"- A mix-out: **{a_out:.2f}s**", | |
| f"- B mix-in: **{b_in:.2f}s**", | |
| f"- B drop: **{b_drop:.2f}s**", | |
| f"- Transition type: `{getattr(plan, 'transition_type', 'unknown')}`", | |
| f"- Duration: **{float(getattr(plan, 'duration_seconds', 0.0)):.2f}s** / **{int(getattr(plan, 'duration_beats', 0))} beats**", | |
| ] | |
| return str(out), "\n".join(summary) | |
| def choices_for_transition(track_a: Any, track_b: Any, plan: Any) -> dict[str, Any]: | |
| """Return choice lists and defaults for the UI/backend tests.""" | |
| return { | |
| "a_choices": cue_choices(track_a, "a_out"), | |
| "b_in_choices": cue_choices(track_b, "b_in"), | |
| "b_drop_choices": cue_choices(track_b, "b_drop"), | |
| "a_default": default_choice(track_a, "a_out", float(getattr(plan, "mix_out_point", 0.0) or 0.0)), | |
| "b_in_default": default_choice(track_b, "b_in", float(getattr(plan, "mix_in_point", 0.0) or 0.0)), | |
| "b_drop_default": default_choice(track_b, "b_drop", float(getattr(plan, "mix_in_point", 0.0) + getattr(plan, "duration_seconds", 0.0))), | |
| } | |
| def apply_choices_to_plan(plan: Any, *, a_choice: str | None, b_in_choice: str | None, b_drop_choice: str | None, transition_type: str | None = None) -> tuple[float, float, float, dict[str, Any]]: | |
| """Apply cue-choice strings to a TransitionPlan-like object. | |
| Returns `(mix_out, mix_in, duration_seconds, selected_cues)` so callers can | |
| update additional derived fields such as beat count. | |
| """ | |
| a = parse_cue_choice(a_choice) | |
| b = parse_cue_choice(b_in_choice) | |
| d = parse_cue_choice(b_drop_choice) | |
| mix_out = float(a.time if a else getattr(plan, "mix_out_point", 0.0)) | |
| mix_in = float(b.time if b else getattr(plan, "mix_in_point", 0.0)) | |
| drop = float(d.time if d else mix_in + float(getattr(plan, "duration_seconds", 0.0))) | |
| duration = max(0.25, drop - mix_in) | |
| if transition_type: | |
| setattr(plan, "transition_type", transition_type) | |
| selected = { | |
| "a_out": {"kind": "mix_out", "label": a.label if a else "manual waveform value", "time": round(mix_out, 3), "confidence": a.confidence if a else 1.0, "evidence": {"source": a.source if a else "waveform_editor"}}, | |
| "b_in": {"kind": "mix_in", "label": b.label if b else "manual waveform value", "time": round(mix_in, 3), "confidence": b.confidence if b else 1.0, "evidence": {"source": b.source if b else "waveform_editor"}}, | |
| "b_drop": {"kind": "drop", "label": d.label if d else "manual waveform value", "time": round(drop, 3), "confidence": d.confidence if d else 1.0, "evidence": {"source": d.source if d else "waveform_editor"}}, | |
| } | |
| return mix_out, mix_in, duration, selected | |