ai-techno-dj / timeline_editor.py
Rik Hoffbauer
Implement draggable timeline editor and enhanced objective transition diagnostics
958631f
"""Interactive cue-timeline payloads for manual transition editing.
The first cue editor rendered a static waveform image plus dropdowns. This
module adds a DAW-like review artifact: a self-contained HTML timeline with
waveform envelopes, candidate markers, draggable selected anchors, keyboard
nudge controls, and an explicit JSON payload that can be pasted/applied back to
Gradio.
It intentionally stays dependency-light and file-local. The browser-side editor
is not a full DAW, but it is a real interactive timeline rather than a static
plot: users can drag A-out, B-in and B-drop markers against the actual audio
peak envelope and then apply the resulting payload to the TransitionPlan.
"""
from __future__ import annotations
from dataclasses import dataclass, asdict
from html import escape
from pathlib import Path
from typing import Any, Mapping
import base64
import hashlib
import json
import math
import tempfile
import librosa
import numpy as np
from cue_editor import parse_cue_choice
@dataclass(frozen=True)
class TimelineMarker:
id: str
label: str
track: str
role: str
time: float
confidence: float = 1.0
locked: bool = False
@dataclass(frozen=True)
class TimelinePayload:
version: int
transition_index: int
transition_type: str
duration_beats: int
bpm_reference: float
markers: list[TimelineMarker]
tracks: dict[str, dict[str, Any]]
def to_dict(self) -> dict[str, Any]:
d = asdict(self)
d["markers"] = [asdict(m) for m in self.markers]
return d
def _load_mono(path: str, *, max_duration: float, sr: int = 8000) -> tuple[np.ndarray, int]:
try:
y, got_sr = librosa.load(path, sr=sr, mono=True, duration=max_duration)
except Exception:
got_sr = sr
y = np.zeros(sr, dtype=np.float32)
if y.size == 0:
y = np.zeros(sr, dtype=np.float32)
peak = float(np.max(np.abs(y))) if y.size else 0.0
if peak > 0:
y = y / peak
return np.asarray(y, dtype=np.float32), got_sr
def _peak_envelope(path: str, *, duration: float, bins: int = 900) -> list[float]:
y, _sr = _load_mono(path, max_duration=max(1.0, duration))
if y.size == 0:
return [0.0] * bins
bins = max(64, min(bins, len(y)))
edges = np.linspace(0, len(y), bins + 1, dtype=int)
env: list[float] = []
for i in range(bins):
seg = y[edges[i]:edges[i + 1]]
env.append(round(float(np.max(np.abs(seg))) if len(seg) else 0.0, 4))
return env
def _candidate_markers(track: Any, *, track_id: str, limit: int = 80) -> list[dict[str, Any]]:
cues = list(getattr(track, "cue_points", []) or [])
cues.sort(key=lambda c: (-float(c.get("confidence", 0.0) or 0.0), float(c.get("time", 0.0) or 0.0)))
out: list[dict[str, Any]] = []
for cue in cues[:limit]:
try:
t = float(cue.get("time", 0.0) or 0.0)
except Exception:
continue
if t < 0:
continue
out.append({
"track": track_id,
"time": round(t, 3),
"kind": str(cue.get("kind", cue.get("type", "cue"))),
"label": str(cue.get("label", cue.get("kind", "cue"))),
"confidence": round(float(cue.get("confidence", 0.0) or 0.0), 3),
})
return out
def build_timeline_payload(
*,
transition_index: int,
track_a: Any,
track_b: Any,
plan: Any,
transition_type: str | None = None,
) -> TimelinePayload:
selected = getattr(plan, "selected_cues", {}) or {}
a_out = float(selected.get("a_out", {}).get("time", getattr(plan, "mix_out_point", 0.0)) or 0.0)
b_in = float(selected.get("b_in", {}).get("time", getattr(plan, "mix_in_point", 0.0)) or 0.0)
b_drop = float(selected.get("b_drop", {}).get("time", b_in + getattr(plan, "duration_seconds", 0.0)) or 0.0)
typ = transition_type or getattr(plan, "transition_type", "eq_crossfade")
bpm = float(getattr(track_b, "bpm", getattr(plan, "bpm", 128.0)) or 128.0)
markers = [
TimelineMarker("a_out", "A OUT", "A", "mix_out", round(a_out, 3), float(selected.get("a_out", {}).get("confidence", 1.0) or 1.0)),
TimelineMarker("b_in", "B IN", "B", "mix_in", round(b_in, 3), float(selected.get("b_in", {}).get("confidence", 1.0) or 1.0)),
TimelineMarker("b_drop", "B DROP", "B", "drop", round(b_drop, 3), float(selected.get("b_drop", {}).get("confidence", 1.0) or 1.0)),
]
return TimelinePayload(
version=1,
transition_index=int(transition_index),
transition_type=typ,
duration_beats=int(getattr(plan, "duration_beats", 64) or 64),
bpm_reference=round(bpm, 3),
markers=markers,
tracks={
"A": {
"filename": getattr(track_a, "filename", "track A"),
"duration": round(float(getattr(track_a, "duration", 0.0) or 0.0), 3),
"bpm": round(float(getattr(track_a, "bpm", 0.0) or 0.0), 3),
"envelope": _peak_envelope(getattr(track_a, "path", ""), duration=float(getattr(track_a, "duration", 300.0) or 300.0)),
"candidates": _candidate_markers(track_a, track_id="A"),
},
"B": {
"filename": getattr(track_b, "filename", "track B"),
"duration": round(float(getattr(track_b, "duration", 0.0) or 0.0), 3),
"bpm": round(float(getattr(track_b, "bpm", 0.0) or 0.0), 3),
"envelope": _peak_envelope(getattr(track_b, "path", ""), duration=float(getattr(track_b, "duration", 300.0) or 300.0)),
"candidates": _candidate_markers(track_b, track_id="B"),
},
},
)
def _payload_json(payload: TimelinePayload | Mapping[str, Any]) -> str:
data = payload.to_dict() if isinstance(payload, TimelinePayload) else dict(payload)
return json.dumps(data, ensure_ascii=False, separators=(",", ":"))
def render_interactive_timeline_html(payload: TimelinePayload | Mapping[str, Any]) -> str:
"""Return self-contained HTML for a draggable two-track cue timeline."""
data = payload.to_dict() if isinstance(payload, TimelinePayload) else dict(payload)
raw = _payload_json(data)
encoded = base64.b64encode(raw.encode("utf-8")).decode("ascii")
title = escape(f"Transition {data.get('transition_index', '?')} · {data.get('transition_type', 'transition')}")
# The script intentionally avoids external dependencies so the artifact can
# be saved or reviewed outside Gradio.
return f"""
<div class="ai-dj-timeline" data-payload="{encoded}" style="font-family: ui-sans-serif, system-ui, -apple-system, BlinkMacSystemFont, sans-serif; border:1px solid #2a2a2a; border-radius:14px; padding:14px; background:#101114; color:#eaeaf0;">
<div style="display:flex; align-items:center; justify-content:space-between; gap:12px; margin-bottom:10px;">
<div>
<div style="font-size:16px; font-weight:700;">{title}</div>
<div style="font-size:12px; color:#a6a6b0;">Drag markers. Hold Shift for 0.01s nudges, arrow keys nudge selected marker, double-click a candidate to snap nearest marker.</div>
</div>
<button class="copyPayload" style="border:1px solid #555; border-radius:10px; padding:7px 10px; background:#1f2430; color:#fff; cursor:pointer;">Copy JSON</button>
</div>
<svg class="timelineSvg" width="100%" viewBox="0 0 1200 330" style="display:block; border-radius:12px; background:#151821; touch-action:none;"></svg>
<textarea class="payloadOut" spellcheck="false" style="width:100%; height:150px; margin-top:10px; box-sizing:border-box; border-radius:10px; border:1px solid #333; background:#0b0d12; color:#d6e3ff; padding:10px; font:12px ui-monospace,SFMono-Regular,Menlo,monospace;"></textarea>
</div>
<script>
(() => {{
const root = document.currentScript.previousElementSibling;
const svg = root.querySelector('.timelineSvg');
const out = root.querySelector('.payloadOut');
const btn = root.querySelector('.copyPayload');
const data = JSON.parse(atob(root.dataset.payload));
const W = 1200, H = 330, left = 78, right = 24, trackH = 104;
const tracks = {{ A: {{ y: 72, color: '#77d7ff' }}, B: {{ y: 214, color: '#a98bff' }} }};
let active = null;
const byId = Object.fromEntries(data.markers.map(m => [m.id, m]));
function esc(s) {{ return String(s).replace(/[&<>]/g, c => ({{'&':'&amp;','<':'&lt;','>':'&gt;'}}[c])); }}
function xFor(track, t) {{ const d = Math.max(1, data.tracks[track].duration || 1); return left + Math.max(0, Math.min(1, t / d)) * (W-left-right); }}
function tFor(track, x) {{ const d = Math.max(1, data.tracks[track].duration || 1); return Math.max(0, Math.min(d, ((x-left)/(W-left-right))*d)); }}
function markerPayload() {{
const clean = JSON.parse(JSON.stringify(data));
clean.markers.forEach(m => {{ m.time = Math.round(Number(m.time || 0) * 1000) / 1000; }});
const a = clean.markers.find(m => m.id === 'a_out');
const bi = clean.markers.find(m => m.id === 'b_in');
const bd = clean.markers.find(m => m.id === 'b_drop');
if (bi && bd) {{
const sec = Math.max(0.25, bd.time - bi.time);
clean.duration_seconds = Math.round(sec * 1000) / 1000;
clean.duration_beats = Math.max(1, Math.round(sec / (60 / Math.max(60, Number(clean.bpm_reference || 128)))));
}}
return clean;
}}
function updateOut() {{ out.value = JSON.stringify(markerPayload(), null, 2); }}
function drawEnvelope(track, cfg) {{
const info = data.tracks[track];
const env = info.envelope || [];
const mid = cfg.y, amp = 42;
let d = `M ${{left}} ${{mid}}`;
env.forEach((v, i) => {{ const x = left + i / Math.max(1, env.length-1) * (W-left-right); d += ` L ${{x.toFixed(2)}} ${{(mid - v*amp).toFixed(2)}}`; }});
for (let i=env.length-1; i>=0; --i) {{ const v=env[i]; const x = left + i / Math.max(1, env.length-1) * (W-left-right); d += ` L ${{x.toFixed(2)}} ${{(mid + v*amp).toFixed(2)}}`; }}
d += ' Z';
svg.insertAdjacentHTML('beforeend', `<text x="16" y="${{mid-48}}" fill="#dfe5ff" font-size="14" font-weight="700">${{track}}</text><text x="78" y="${{mid-48}}" fill="#a6a6b0" font-size="12">${{esc(info.filename)}} · ${{(info.duration||0).toFixed(1)}}s</text><line x1="${{left}}" y1="${{mid}}" x2="${{W-right}}" y2="${{mid}}" stroke="#333846"/><path d="${{d}}" fill="${{cfg.color}}" opacity="0.24"/><path d="${{d}}" fill="none" stroke="${{cfg.color}}" opacity="0.55" stroke-width="1"/>`);
(info.candidates || []).forEach(c => {{
const x = xFor(track, c.time); const alpha = Math.max(0.16, Math.min(0.62, Number(c.confidence || 0) * 0.66));
svg.insertAdjacentHTML('beforeend', `<line class="candidate" data-track="${{track}}" data-time="${{c.time}}" x1="${{x}}" y1="${{mid-48}}" x2="${{x}}" y2="${{mid+48}}" stroke="#ffffff" opacity="${{alpha}}" stroke-width="1"><title>${{esc(c.kind)}} · ${{Number(c.time).toFixed(2)}}s · ${{Math.round(Number(c.confidence||0)*100)}}%</title></line>`);
}});
}}
function drawMarkers() {{
svg.querySelectorAll('.marker').forEach(e => e.remove());
data.markers.forEach(m => {{
const cfg = tracks[m.track]; if (!cfg) return;
const x = xFor(m.track, m.time), y = cfg.y;
const activeCls = active === m.id ? ' stroke="#fff" stroke-width="3"' : ' stroke="#000" stroke-width="1"';
svg.insertAdjacentHTML('beforeend', `<g class="marker" data-id="${{m.id}}" style="cursor:ew-resize"><line x1="${{x}}" y1="${{y-58}}" x2="${{x}}" y2="${{y+58}}" stroke="#ffdd55" stroke-width="3"/><rect x="${{x-34}}" y="${{y-66}}" width="68" height="22" rx="7" fill="#ffdd55"${{activeCls}}/><text x="${{x}}" y="${{y-51}}" text-anchor="middle" fill="#161616" font-size="11" font-weight="800">${{esc(m.label)}}</text><text x="${{x}}" y="${{y+72}}" text-anchor="middle" fill="#ffed99" font-size="11">${{Number(m.time).toFixed(2)}}s</text></g>`);
}});
}}
function render() {{ svg.innerHTML = ''; drawEnvelope('A', tracks.A); drawEnvelope('B', tracks.B); drawMarkers(); updateOut(); }}
function eventX(ev) {{ const r = svg.getBoundingClientRect(); return (ev.clientX - r.left) * (W / r.width); }}
svg.addEventListener('pointerdown', ev => {{ const g = ev.target.closest('.marker'); if (!g) return; active = g.dataset.id; svg.setPointerCapture(ev.pointerId); render(); ev.preventDefault(); }});
svg.addEventListener('pointermove', ev => {{ if (!active) return; const m = byId[active]; if (!m || m.locked) return; const step = ev.shiftKey ? 0.01 : 0.05; m.time = Math.round(tFor(m.track, eventX(ev)) / step) * step; render(); }});
svg.addEventListener('pointerup', ev => {{ active = null; render(); }});
svg.addEventListener('dblclick', ev => {{
const c = ev.target.closest('.candidate'); if (!c) return;
const track = c.dataset.track; const t = Number(c.dataset.time);
const candidates = data.markers.filter(m => m.track === track);
if (!candidates.length) return;
let best = candidates[0], bestD = Infinity;
candidates.forEach(m => {{ const d=Math.abs((m.time||0)-t); if (d < bestD) {{ best=m; bestD=d; }} }});
best.time = t; active = best.id; render();
}});
root.tabIndex = 0;
root.addEventListener('keydown', ev => {{
if (!active || !['ArrowLeft','ArrowRight'].includes(ev.key)) return;
const m = byId[active]; if (!m || m.locked) return;
const step = ev.shiftKey ? 0.01 : 0.10;
m.time = Math.max(0, Math.min(data.tracks[m.track].duration || 9999, m.time + (ev.key === 'ArrowRight' ? step : -step)));
render(); ev.preventDefault();
}});
btn.addEventListener('click', async () => {{ updateOut(); try {{ await navigator.clipboard.writeText(out.value); btn.textContent='Copied'; setTimeout(()=>btn.textContent='Copy JSON',900); }} catch(e) {{ out.focus(); out.select(); }} }});
render();
}})();
</script>
""".strip()
def render_timeline_editor_file(
*,
transition_index: int,
track_a: Any,
track_b: Any,
plan: Any,
output_dir: str | Path | None = None,
) -> tuple[str, str, str]:
"""Create an HTML timeline file and return `(path, html, json)`.
The Gradio UI can embed `html`; external review can open `path`; tests and
backend application can use the JSON directly.
"""
payload = build_timeline_payload(transition_index=transition_index, track_a=track_a, track_b=track_b, plan=plan)
html = render_interactive_timeline_html(payload)
raw = json.dumps(payload.to_dict(), ensure_ascii=False, indent=2)
output_dir = Path(output_dir or tempfile.gettempdir())
output_dir.mkdir(parents=True, exist_ok=True)
digest = hashlib.sha1(raw.encode("utf-8")).hexdigest()[:12]
path = output_dir / f"ai-dj-transition-{int(transition_index)}-timeline-{digest}.html"
path.write_text(html, encoding="utf-8")
return str(path), html, raw
def _marker_map(data: Mapping[str, Any]) -> dict[str, Mapping[str, Any]]:
markers = data.get("markers", [])
if not isinstance(markers, list):
return {}
return {str(m.get("id", "")): m for m in markers if isinstance(m, Mapping)}
def apply_timeline_json_to_plan(plan: Any, track_a: Any, track_b: Any, payload_json: str) -> dict[str, Any]:
"""Apply a timeline payload to a TransitionPlan-like object.
Returns a summary dict useful for tests and UI messages. Raises ValueError
on malformed payloads so bad edits fail loudly.
"""
try:
data = json.loads(payload_json)
except json.JSONDecodeError as exc:
raise ValueError(f"invalid timeline JSON: {exc}") from exc
if int(data.get("version", 0) or 0) < 1:
raise ValueError("timeline JSON missing supported version")
markers = _marker_map(data)
required = ["a_out", "b_in", "b_drop"]
missing = [m for m in required if m not in markers]
if missing:
raise ValueError(f"timeline JSON missing marker(s): {', '.join(missing)}")
def clamp(marker_id: str, duration: float) -> float:
marker = markers[marker_id]
t = float(marker.get("time", 0.0) or 0.0)
if not math.isfinite(t):
raise ValueError(f"marker {marker_id} has non-finite time")
return round(max(0.0, min(t, float(duration or 0.0))), 3)
a_out = clamp("a_out", float(getattr(track_a, "duration", 0.0) or 0.0))
b_in = clamp("b_in", float(getattr(track_b, "duration", 0.0) or 0.0))
b_drop = clamp("b_drop", float(getattr(track_b, "duration", 0.0) or 0.0))
if b_drop <= b_in:
raise ValueError("B DROP must be after B IN")
typ = str(data.get("transition_type", getattr(plan, "transition_type", "eq_crossfade")))
bpm = max(60.0, float(data.get("bpm_reference", getattr(track_b, "bpm", 128.0)) or 128.0))
seconds = round(b_drop - b_in, 3)
beats = int(round(seconds / (60.0 / bpm)))
if beats <= 0:
raise ValueError("duration must be positive")
plan.transition_type = typ
plan.mix_out_point = a_out
plan.mix_in_point = b_in
plan.duration_seconds = seconds
plan.duration_beats = beats
plan.needs_stems = typ in {"bass_swap", "acapella_over_instrumental", "drums_first", "double_drop"}
plan.selected_cues = {
"a_out": {"kind": "mix_out", "label": "interactive timeline override", "time": a_out, "confidence": 1.0, "evidence": {"source": "interactive_timeline"}},
"b_in": {"kind": "mix_in", "label": "interactive timeline override", "time": b_in, "confidence": 1.0, "evidence": {"source": "interactive_timeline"}},
"b_drop": {"kind": "drop", "label": "interactive timeline override", "time": b_drop, "confidence": 1.0, "evidence": {"source": "interactive_timeline"}},
}
plan.cue_confidence = 1.0
if hasattr(plan, "score_breakdown"):
plan.score_breakdown = {**dict(getattr(plan, "score_breakdown", {}) or {}), "interactive_timeline_override": 1.0, "cue_confidence": 1.0}
assumptions = list(getattr(plan, "assumptions", []) or [])
assumptions = [a for a in assumptions if "timeline" not in str(a).lower()]
assumptions.append("interactive timeline override applied; preview before full-set render")
plan.assumptions = assumptions
return {
"transition_type": typ,
"a_out": a_out,
"b_in": b_in,
"b_drop": b_drop,
"duration_seconds": seconds,
"duration_beats": beats,
}