Spaces:
Sleeping
Sleeping
Rik Hoffbauer
Implement draggable timeline editor and enhanced objective transition diagnostics
958631f | """Interactive cue-timeline payloads for manual transition editing. | |
| The first cue editor rendered a static waveform image plus dropdowns. This | |
| module adds a DAW-like review artifact: a self-contained HTML timeline with | |
| waveform envelopes, candidate markers, draggable selected anchors, keyboard | |
| nudge controls, and an explicit JSON payload that can be pasted/applied back to | |
| Gradio. | |
| It intentionally stays dependency-light and file-local. The browser-side editor | |
| is not a full DAW, but it is a real interactive timeline rather than a static | |
| plot: users can drag A-out, B-in and B-drop markers against the actual audio | |
| peak envelope and then apply the resulting payload to the TransitionPlan. | |
| """ | |
| from __future__ import annotations | |
| from dataclasses import dataclass, asdict | |
| from html import escape | |
| from pathlib import Path | |
| from typing import Any, Mapping | |
| import base64 | |
| import hashlib | |
| import json | |
| import math | |
| import tempfile | |
| import librosa | |
| import numpy as np | |
| from cue_editor import parse_cue_choice | |
| class TimelineMarker: | |
| id: str | |
| label: str | |
| track: str | |
| role: str | |
| time: float | |
| confidence: float = 1.0 | |
| locked: bool = False | |
| class TimelinePayload: | |
| version: int | |
| transition_index: int | |
| transition_type: str | |
| duration_beats: int | |
| bpm_reference: float | |
| markers: list[TimelineMarker] | |
| tracks: dict[str, dict[str, Any]] | |
| def to_dict(self) -> dict[str, Any]: | |
| d = asdict(self) | |
| d["markers"] = [asdict(m) for m in self.markers] | |
| return d | |
| def _load_mono(path: str, *, max_duration: float, sr: int = 8000) -> tuple[np.ndarray, int]: | |
| try: | |
| y, got_sr = librosa.load(path, sr=sr, mono=True, duration=max_duration) | |
| except Exception: | |
| got_sr = sr | |
| y = np.zeros(sr, dtype=np.float32) | |
| if y.size == 0: | |
| y = np.zeros(sr, dtype=np.float32) | |
| peak = float(np.max(np.abs(y))) if y.size else 0.0 | |
| if peak > 0: | |
| y = y / peak | |
| return np.asarray(y, dtype=np.float32), got_sr | |
| def _peak_envelope(path: str, *, duration: float, bins: int = 900) -> list[float]: | |
| y, _sr = _load_mono(path, max_duration=max(1.0, duration)) | |
| if y.size == 0: | |
| return [0.0] * bins | |
| bins = max(64, min(bins, len(y))) | |
| edges = np.linspace(0, len(y), bins + 1, dtype=int) | |
| env: list[float] = [] | |
| for i in range(bins): | |
| seg = y[edges[i]:edges[i + 1]] | |
| env.append(round(float(np.max(np.abs(seg))) if len(seg) else 0.0, 4)) | |
| return env | |
| def _candidate_markers(track: Any, *, track_id: str, limit: int = 80) -> list[dict[str, Any]]: | |
| cues = list(getattr(track, "cue_points", []) or []) | |
| cues.sort(key=lambda c: (-float(c.get("confidence", 0.0) or 0.0), float(c.get("time", 0.0) or 0.0))) | |
| out: list[dict[str, Any]] = [] | |
| for cue in cues[:limit]: | |
| try: | |
| t = float(cue.get("time", 0.0) or 0.0) | |
| except Exception: | |
| continue | |
| if t < 0: | |
| continue | |
| out.append({ | |
| "track": track_id, | |
| "time": round(t, 3), | |
| "kind": str(cue.get("kind", cue.get("type", "cue"))), | |
| "label": str(cue.get("label", cue.get("kind", "cue"))), | |
| "confidence": round(float(cue.get("confidence", 0.0) or 0.0), 3), | |
| }) | |
| return out | |
| def build_timeline_payload( | |
| *, | |
| transition_index: int, | |
| track_a: Any, | |
| track_b: Any, | |
| plan: Any, | |
| transition_type: str | None = None, | |
| ) -> TimelinePayload: | |
| selected = getattr(plan, "selected_cues", {}) or {} | |
| a_out = float(selected.get("a_out", {}).get("time", getattr(plan, "mix_out_point", 0.0)) or 0.0) | |
| b_in = float(selected.get("b_in", {}).get("time", getattr(plan, "mix_in_point", 0.0)) or 0.0) | |
| b_drop = float(selected.get("b_drop", {}).get("time", b_in + getattr(plan, "duration_seconds", 0.0)) or 0.0) | |
| typ = transition_type or getattr(plan, "transition_type", "eq_crossfade") | |
| bpm = float(getattr(track_b, "bpm", getattr(plan, "bpm", 128.0)) or 128.0) | |
| markers = [ | |
| TimelineMarker("a_out", "A OUT", "A", "mix_out", round(a_out, 3), float(selected.get("a_out", {}).get("confidence", 1.0) or 1.0)), | |
| TimelineMarker("b_in", "B IN", "B", "mix_in", round(b_in, 3), float(selected.get("b_in", {}).get("confidence", 1.0) or 1.0)), | |
| TimelineMarker("b_drop", "B DROP", "B", "drop", round(b_drop, 3), float(selected.get("b_drop", {}).get("confidence", 1.0) or 1.0)), | |
| ] | |
| return TimelinePayload( | |
| version=1, | |
| transition_index=int(transition_index), | |
| transition_type=typ, | |
| duration_beats=int(getattr(plan, "duration_beats", 64) or 64), | |
| bpm_reference=round(bpm, 3), | |
| markers=markers, | |
| tracks={ | |
| "A": { | |
| "filename": getattr(track_a, "filename", "track A"), | |
| "duration": round(float(getattr(track_a, "duration", 0.0) or 0.0), 3), | |
| "bpm": round(float(getattr(track_a, "bpm", 0.0) or 0.0), 3), | |
| "envelope": _peak_envelope(getattr(track_a, "path", ""), duration=float(getattr(track_a, "duration", 300.0) or 300.0)), | |
| "candidates": _candidate_markers(track_a, track_id="A"), | |
| }, | |
| "B": { | |
| "filename": getattr(track_b, "filename", "track B"), | |
| "duration": round(float(getattr(track_b, "duration", 0.0) or 0.0), 3), | |
| "bpm": round(float(getattr(track_b, "bpm", 0.0) or 0.0), 3), | |
| "envelope": _peak_envelope(getattr(track_b, "path", ""), duration=float(getattr(track_b, "duration", 300.0) or 300.0)), | |
| "candidates": _candidate_markers(track_b, track_id="B"), | |
| }, | |
| }, | |
| ) | |
| def _payload_json(payload: TimelinePayload | Mapping[str, Any]) -> str: | |
| data = payload.to_dict() if isinstance(payload, TimelinePayload) else dict(payload) | |
| return json.dumps(data, ensure_ascii=False, separators=(",", ":")) | |
| def render_interactive_timeline_html(payload: TimelinePayload | Mapping[str, Any]) -> str: | |
| """Return self-contained HTML for a draggable two-track cue timeline.""" | |
| data = payload.to_dict() if isinstance(payload, TimelinePayload) else dict(payload) | |
| raw = _payload_json(data) | |
| encoded = base64.b64encode(raw.encode("utf-8")).decode("ascii") | |
| title = escape(f"Transition {data.get('transition_index', '?')} · {data.get('transition_type', 'transition')}") | |
| # The script intentionally avoids external dependencies so the artifact can | |
| # be saved or reviewed outside Gradio. | |
| return f""" | |
| <div class="ai-dj-timeline" data-payload="{encoded}" style="font-family: ui-sans-serif, system-ui, -apple-system, BlinkMacSystemFont, sans-serif; border:1px solid #2a2a2a; border-radius:14px; padding:14px; background:#101114; color:#eaeaf0;"> | |
| <div style="display:flex; align-items:center; justify-content:space-between; gap:12px; margin-bottom:10px;"> | |
| <div> | |
| <div style="font-size:16px; font-weight:700;">{title}</div> | |
| <div style="font-size:12px; color:#a6a6b0;">Drag markers. Hold Shift for 0.01s nudges, arrow keys nudge selected marker, double-click a candidate to snap nearest marker.</div> | |
| </div> | |
| <button class="copyPayload" style="border:1px solid #555; border-radius:10px; padding:7px 10px; background:#1f2430; color:#fff; cursor:pointer;">Copy JSON</button> | |
| </div> | |
| <svg class="timelineSvg" width="100%" viewBox="0 0 1200 330" style="display:block; border-radius:12px; background:#151821; touch-action:none;"></svg> | |
| <textarea class="payloadOut" spellcheck="false" style="width:100%; height:150px; margin-top:10px; box-sizing:border-box; border-radius:10px; border:1px solid #333; background:#0b0d12; color:#d6e3ff; padding:10px; font:12px ui-monospace,SFMono-Regular,Menlo,monospace;"></textarea> | |
| </div> | |
| <script> | |
| (() => {{ | |
| const root = document.currentScript.previousElementSibling; | |
| const svg = root.querySelector('.timelineSvg'); | |
| const out = root.querySelector('.payloadOut'); | |
| const btn = root.querySelector('.copyPayload'); | |
| const data = JSON.parse(atob(root.dataset.payload)); | |
| const W = 1200, H = 330, left = 78, right = 24, trackH = 104; | |
| const tracks = {{ A: {{ y: 72, color: '#77d7ff' }}, B: {{ y: 214, color: '#a98bff' }} }}; | |
| let active = null; | |
| const byId = Object.fromEntries(data.markers.map(m => [m.id, m])); | |
| function esc(s) {{ return String(s).replace(/[&<>]/g, c => ({{'&':'&','<':'<','>':'>'}}[c])); }} | |
| function xFor(track, t) {{ const d = Math.max(1, data.tracks[track].duration || 1); return left + Math.max(0, Math.min(1, t / d)) * (W-left-right); }} | |
| function tFor(track, x) {{ const d = Math.max(1, data.tracks[track].duration || 1); return Math.max(0, Math.min(d, ((x-left)/(W-left-right))*d)); }} | |
| function markerPayload() {{ | |
| const clean = JSON.parse(JSON.stringify(data)); | |
| clean.markers.forEach(m => {{ m.time = Math.round(Number(m.time || 0) * 1000) / 1000; }}); | |
| const a = clean.markers.find(m => m.id === 'a_out'); | |
| const bi = clean.markers.find(m => m.id === 'b_in'); | |
| const bd = clean.markers.find(m => m.id === 'b_drop'); | |
| if (bi && bd) {{ | |
| const sec = Math.max(0.25, bd.time - bi.time); | |
| clean.duration_seconds = Math.round(sec * 1000) / 1000; | |
| clean.duration_beats = Math.max(1, Math.round(sec / (60 / Math.max(60, Number(clean.bpm_reference || 128))))); | |
| }} | |
| return clean; | |
| }} | |
| function updateOut() {{ out.value = JSON.stringify(markerPayload(), null, 2); }} | |
| function drawEnvelope(track, cfg) {{ | |
| const info = data.tracks[track]; | |
| const env = info.envelope || []; | |
| const mid = cfg.y, amp = 42; | |
| let d = `M ${{left}} ${{mid}}`; | |
| env.forEach((v, i) => {{ const x = left + i / Math.max(1, env.length-1) * (W-left-right); d += ` L ${{x.toFixed(2)}} ${{(mid - v*amp).toFixed(2)}}`; }}); | |
| for (let i=env.length-1; i>=0; --i) {{ const v=env[i]; const x = left + i / Math.max(1, env.length-1) * (W-left-right); d += ` L ${{x.toFixed(2)}} ${{(mid + v*amp).toFixed(2)}}`; }} | |
| d += ' Z'; | |
| svg.insertAdjacentHTML('beforeend', `<text x="16" y="${{mid-48}}" fill="#dfe5ff" font-size="14" font-weight="700">${{track}}</text><text x="78" y="${{mid-48}}" fill="#a6a6b0" font-size="12">${{esc(info.filename)}} · ${{(info.duration||0).toFixed(1)}}s</text><line x1="${{left}}" y1="${{mid}}" x2="${{W-right}}" y2="${{mid}}" stroke="#333846"/><path d="${{d}}" fill="${{cfg.color}}" opacity="0.24"/><path d="${{d}}" fill="none" stroke="${{cfg.color}}" opacity="0.55" stroke-width="1"/>`); | |
| (info.candidates || []).forEach(c => {{ | |
| const x = xFor(track, c.time); const alpha = Math.max(0.16, Math.min(0.62, Number(c.confidence || 0) * 0.66)); | |
| svg.insertAdjacentHTML('beforeend', `<line class="candidate" data-track="${{track}}" data-time="${{c.time}}" x1="${{x}}" y1="${{mid-48}}" x2="${{x}}" y2="${{mid+48}}" stroke="#ffffff" opacity="${{alpha}}" stroke-width="1"><title>${{esc(c.kind)}} · ${{Number(c.time).toFixed(2)}}s · ${{Math.round(Number(c.confidence||0)*100)}}%</title></line>`); | |
| }}); | |
| }} | |
| function drawMarkers() {{ | |
| svg.querySelectorAll('.marker').forEach(e => e.remove()); | |
| data.markers.forEach(m => {{ | |
| const cfg = tracks[m.track]; if (!cfg) return; | |
| const x = xFor(m.track, m.time), y = cfg.y; | |
| const activeCls = active === m.id ? ' stroke="#fff" stroke-width="3"' : ' stroke="#000" stroke-width="1"'; | |
| svg.insertAdjacentHTML('beforeend', `<g class="marker" data-id="${{m.id}}" style="cursor:ew-resize"><line x1="${{x}}" y1="${{y-58}}" x2="${{x}}" y2="${{y+58}}" stroke="#ffdd55" stroke-width="3"/><rect x="${{x-34}}" y="${{y-66}}" width="68" height="22" rx="7" fill="#ffdd55"${{activeCls}}/><text x="${{x}}" y="${{y-51}}" text-anchor="middle" fill="#161616" font-size="11" font-weight="800">${{esc(m.label)}}</text><text x="${{x}}" y="${{y+72}}" text-anchor="middle" fill="#ffed99" font-size="11">${{Number(m.time).toFixed(2)}}s</text></g>`); | |
| }}); | |
| }} | |
| function render() {{ svg.innerHTML = ''; drawEnvelope('A', tracks.A); drawEnvelope('B', tracks.B); drawMarkers(); updateOut(); }} | |
| function eventX(ev) {{ const r = svg.getBoundingClientRect(); return (ev.clientX - r.left) * (W / r.width); }} | |
| svg.addEventListener('pointerdown', ev => {{ const g = ev.target.closest('.marker'); if (!g) return; active = g.dataset.id; svg.setPointerCapture(ev.pointerId); render(); ev.preventDefault(); }}); | |
| svg.addEventListener('pointermove', ev => {{ if (!active) return; const m = byId[active]; if (!m || m.locked) return; const step = ev.shiftKey ? 0.01 : 0.05; m.time = Math.round(tFor(m.track, eventX(ev)) / step) * step; render(); }}); | |
| svg.addEventListener('pointerup', ev => {{ active = null; render(); }}); | |
| svg.addEventListener('dblclick', ev => {{ | |
| const c = ev.target.closest('.candidate'); if (!c) return; | |
| const track = c.dataset.track; const t = Number(c.dataset.time); | |
| const candidates = data.markers.filter(m => m.track === track); | |
| if (!candidates.length) return; | |
| let best = candidates[0], bestD = Infinity; | |
| candidates.forEach(m => {{ const d=Math.abs((m.time||0)-t); if (d < bestD) {{ best=m; bestD=d; }} }}); | |
| best.time = t; active = best.id; render(); | |
| }}); | |
| root.tabIndex = 0; | |
| root.addEventListener('keydown', ev => {{ | |
| if (!active || !['ArrowLeft','ArrowRight'].includes(ev.key)) return; | |
| const m = byId[active]; if (!m || m.locked) return; | |
| const step = ev.shiftKey ? 0.01 : 0.10; | |
| m.time = Math.max(0, Math.min(data.tracks[m.track].duration || 9999, m.time + (ev.key === 'ArrowRight' ? step : -step))); | |
| render(); ev.preventDefault(); | |
| }}); | |
| btn.addEventListener('click', async () => {{ updateOut(); try {{ await navigator.clipboard.writeText(out.value); btn.textContent='Copied'; setTimeout(()=>btn.textContent='Copy JSON',900); }} catch(e) {{ out.focus(); out.select(); }} }}); | |
| render(); | |
| }})(); | |
| </script> | |
| """.strip() | |
| def render_timeline_editor_file( | |
| *, | |
| transition_index: int, | |
| track_a: Any, | |
| track_b: Any, | |
| plan: Any, | |
| output_dir: str | Path | None = None, | |
| ) -> tuple[str, str, str]: | |
| """Create an HTML timeline file and return `(path, html, json)`. | |
| The Gradio UI can embed `html`; external review can open `path`; tests and | |
| backend application can use the JSON directly. | |
| """ | |
| payload = build_timeline_payload(transition_index=transition_index, track_a=track_a, track_b=track_b, plan=plan) | |
| html = render_interactive_timeline_html(payload) | |
| raw = json.dumps(payload.to_dict(), ensure_ascii=False, indent=2) | |
| output_dir = Path(output_dir or tempfile.gettempdir()) | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| digest = hashlib.sha1(raw.encode("utf-8")).hexdigest()[:12] | |
| path = output_dir / f"ai-dj-transition-{int(transition_index)}-timeline-{digest}.html" | |
| path.write_text(html, encoding="utf-8") | |
| return str(path), html, raw | |
| def _marker_map(data: Mapping[str, Any]) -> dict[str, Mapping[str, Any]]: | |
| markers = data.get("markers", []) | |
| if not isinstance(markers, list): | |
| return {} | |
| return {str(m.get("id", "")): m for m in markers if isinstance(m, Mapping)} | |
| def apply_timeline_json_to_plan(plan: Any, track_a: Any, track_b: Any, payload_json: str) -> dict[str, Any]: | |
| """Apply a timeline payload to a TransitionPlan-like object. | |
| Returns a summary dict useful for tests and UI messages. Raises ValueError | |
| on malformed payloads so bad edits fail loudly. | |
| """ | |
| try: | |
| data = json.loads(payload_json) | |
| except json.JSONDecodeError as exc: | |
| raise ValueError(f"invalid timeline JSON: {exc}") from exc | |
| if int(data.get("version", 0) or 0) < 1: | |
| raise ValueError("timeline JSON missing supported version") | |
| markers = _marker_map(data) | |
| required = ["a_out", "b_in", "b_drop"] | |
| missing = [m for m in required if m not in markers] | |
| if missing: | |
| raise ValueError(f"timeline JSON missing marker(s): {', '.join(missing)}") | |
| def clamp(marker_id: str, duration: float) -> float: | |
| marker = markers[marker_id] | |
| t = float(marker.get("time", 0.0) or 0.0) | |
| if not math.isfinite(t): | |
| raise ValueError(f"marker {marker_id} has non-finite time") | |
| return round(max(0.0, min(t, float(duration or 0.0))), 3) | |
| a_out = clamp("a_out", float(getattr(track_a, "duration", 0.0) or 0.0)) | |
| b_in = clamp("b_in", float(getattr(track_b, "duration", 0.0) or 0.0)) | |
| b_drop = clamp("b_drop", float(getattr(track_b, "duration", 0.0) or 0.0)) | |
| if b_drop <= b_in: | |
| raise ValueError("B DROP must be after B IN") | |
| typ = str(data.get("transition_type", getattr(plan, "transition_type", "eq_crossfade"))) | |
| bpm = max(60.0, float(data.get("bpm_reference", getattr(track_b, "bpm", 128.0)) or 128.0)) | |
| seconds = round(b_drop - b_in, 3) | |
| beats = int(round(seconds / (60.0 / bpm))) | |
| if beats <= 0: | |
| raise ValueError("duration must be positive") | |
| plan.transition_type = typ | |
| plan.mix_out_point = a_out | |
| plan.mix_in_point = b_in | |
| plan.duration_seconds = seconds | |
| plan.duration_beats = beats | |
| plan.needs_stems = typ in {"bass_swap", "acapella_over_instrumental", "drums_first", "double_drop"} | |
| plan.selected_cues = { | |
| "a_out": {"kind": "mix_out", "label": "interactive timeline override", "time": a_out, "confidence": 1.0, "evidence": {"source": "interactive_timeline"}}, | |
| "b_in": {"kind": "mix_in", "label": "interactive timeline override", "time": b_in, "confidence": 1.0, "evidence": {"source": "interactive_timeline"}}, | |
| "b_drop": {"kind": "drop", "label": "interactive timeline override", "time": b_drop, "confidence": 1.0, "evidence": {"source": "interactive_timeline"}}, | |
| } | |
| plan.cue_confidence = 1.0 | |
| if hasattr(plan, "score_breakdown"): | |
| plan.score_breakdown = {**dict(getattr(plan, "score_breakdown", {}) or {}), "interactive_timeline_override": 1.0, "cue_confidence": 1.0} | |
| assumptions = list(getattr(plan, "assumptions", []) or []) | |
| assumptions = [a for a in assumptions if "timeline" not in str(a).lower()] | |
| assumptions.append("interactive timeline override applied; preview before full-set render") | |
| plan.assumptions = assumptions | |
| return { | |
| "transition_type": typ, | |
| "a_out": a_out, | |
| "b_in": b_in, | |
| "b_drop": b_drop, | |
| "duration_seconds": seconds, | |
| "duration_beats": beats, | |
| } | |