ai-techno-dj / app_callbacks.py
Rik Hoffbauer
Refactor app.py into logical sub-modules
a2d9995
from __future__ import annotations
import json
import logging
import os
import tempfile
import gradio as gr
import numpy as np
import soundfile as sf
import app_planning
from app_audio import load_audio_segment, time_stretch_audio
from app_benchmarks import format_benchmarks, run_benchmarks
from app_planning import build_set_order, compute_compatibility, LAST_SET_ORDER_METADATA, plan_transition, update_transition_tempo_policy
from app_state import app_state
from app_track_analysis import analyze_track
from transitions import TRANSITION_TYPES
logger = logging.getLogger("dj_engine")
def analyze_uploaded_tracks(files, progress=gr.Progress()):
"""Analyze all uploaded audio files."""
if not files:
return "⚠️ No files uploaded", "", ""
app_state.tracks = []
app_state.analyses = []
analysis_reports = []
for i, f in enumerate(files):
filepath = f if isinstance(f, str) else f.name
progress((i / len(files)) * 0.9, desc=f"Analyzing {os.path.basename(filepath)}...")
try:
analysis = analyze_track(filepath, progress_cb=lambda p, m: progress(
(i + p) / len(files) * 0.9, desc=m
))
app_state.analyses.append(analysis)
top_cues = ", ".join(
f"{c.get('kind', c.get('type', 'cue'))}@{float(c.get('time', 0.0)):.1f}s/{float(c.get('confidence', 0.0)):.0%}"
for c in analysis.cue_points[:4]
) or "none"
report = (
f"### 🎵 {analysis.filename}\n"
f"- **BPM:** {analysis.bpm} (confidence: {analysis.bpm_confidence:.0%})\n"
f"- **Key:** {analysis.key} {analysis.scale} ({analysis.camelot})\n"
f"- **Duration:** {analysis.duration:.1f}s\n"
f"- **Energy:** {analysis.avg_energy:.4f} ({analysis.loudness_db:.1f} dB)\n"
f"- **Brightness:** {analysis.spectral_centroid_mean:.0f} Hz\n"
f"- **Segments:** {len(analysis.segments)} "
f"({', '.join(s['label'] for s in analysis.segments[:5])}{'...' if len(analysis.segments) > 5 else ''})\n"
f"- **Downbeat phase:** {analysis.downbeat_phase} (confidence: {analysis.downbeat_confidence:.0%})\n"
f"- **Cue Points:** {len(analysis.cue_points)} ranked candidates; top: {top_cues}\n"
)
analysis_reports.append(report)
except Exception as e:
analysis_reports.append(f"### ❌ {os.path.basename(filepath)}\nError: {str(e)}\n")
logger.error(f"Analysis failed for {filepath}: {e}", exc_info=True)
progress(0.95, desc="Computing compatibility matrix...")
# Compute compatibility matrix
n = len(app_state.analyses)
compat_text = ""
if n >= 2:
compat_text = "### 🔗 Compatibility Matrix\n\n"
compat_text += "| | " + " | ".join(a.filename[:15] for a in app_state.analyses) + " |\n"
compat_text += "|" + "---|" * (n + 1) + "\n"
for i in range(n):
row = f"| **{app_state.analyses[i].filename[:15]}** |"
for j in range(n):
if i == j:
row += " — |"
else:
compat = compute_compatibility(app_state.analyses[i], app_state.analyses[j])
score = compat["overall"]
emoji = "🟢" if score >= 0.7 else "🟡" if score >= 0.5 else "🔴"
row += f" {emoji} {score:.2f} |"
compat_text += row + "\n"
# Run benchmarks on analysis
benchmarks = run_benchmarks(app_state.analyses)
app_state.benchmarks = benchmarks
benchmark_text = format_benchmarks(benchmarks)
full_report = "\n".join(analysis_reports)
return full_report, compat_text, benchmark_text
def generate_set_plan(allow_repeat, progress=gr.Progress()):
"""Generate the optimal set order and transition plans."""
if len(app_state.analyses) < 2:
return "⚠️ Need at least 2 analyzed tracks to build a set"
progress(0.1, desc="Computing optimal track order...")
order = build_set_order(app_state.analyses, allow_repeat=allow_repeat)
app_state.set_order = order
app_state.set_order_metadata = dict(LAST_SET_ORDER_METADATA)
progress(0.3, desc="Planning transitions...")
transitions = []
set_len = len(order)
prev_type = None
for pos in range(len(order) - 1):
idx_a = order[pos]
idx_b = order[pos + 1]
compat = compute_compatibility(app_state.analyses[idx_a], app_state.analyses[idx_b])
position_frac = pos / max(set_len - 1, 1)
plan = plan_transition(
app_state.analyses[idx_a], app_state.analyses[idx_b], compat,
position_in_set=position_frac,
set_length=set_len,
prev_transition_type=prev_type,
)
plan.track_a_idx = idx_a
plan.track_b_idx = idx_b
transitions.append(plan)
prev_type = plan.transition_type
app_state.transitions = transitions
# Format the plan
plan_text = "# 🎧 DJ Set Plan\n\n"
method = app_state.set_order_metadata.get("method", "unknown")
if app_state.set_order_metadata:
plan_text += f"_Set optimizer:_ `{method}`; score={app_state.set_order_metadata.get('score', 'n/a')}\n\n"
plan_text += "## Track Order\n"
for pos, idx in enumerate(order):
t = app_state.analyses[idx]
target = None
if app_state.set_order_metadata.get('energy_targets'):
target = app_state.set_order_metadata['energy_targets'][pos]
target_txt = f", target_energy={target:.2f}" if isinstance(target, (int, float)) else ""
plan_text += f"{pos+1}. **{t.filename}** — {t.bpm} BPM, {t.camelot}, energy={t.avg_energy:.3f}{target_txt}\n"
if app_state.set_order_metadata.get('pair_scores'):
plan_text += "\n## Global optimizer pair scores\n"
for item in app_state.set_order_metadata['pair_scores']:
a = app_state.analyses[item['src']].filename
b = app_state.analyses[item['dst']].filename
plan_text += f"- **{a}{b}:** total={item['total']:.2f}, compat={item['compatibility']:.2f}, edge={item['edge_score']:.2f}, cue={item['cue_confidence']:.2f}\n"
plan_text += "\n## Transitions\n\n"
for i, trans in enumerate(transitions):
t_a = app_state.analyses[trans.track_a_idx]
t_b = app_state.analyses[trans.track_b_idx]
plan_text += f"### Transition {i+1}: {t_a.filename}{t_b.filename}\n"
plan_text += f"- **Type:** `{trans.transition_type}` — {TRANSITION_TYPES[trans.transition_type]}\n"
plan_text += f"- **Mix out:** {trans.mix_out_point:.1f}s into track A\n"
plan_text += f"- **Mix in:** {trans.mix_in_point:.1f}s into track B\n"
plan_text += f"- **Duration:** {trans.duration_beats} beats ({trans.duration_seconds:.1f}s)\n"
tempo_reason = (getattr(trans, "tempo_policy", {}) or {}).get("reason", "pitch-preserving if enabled")
plan_text += f"- **Tempo policy:** ×{trans.bpm_adjustment:.3f}{tempo_reason}\n"
plan_text += f"- **Stems needed:** {'Yes (demucs)' if trans.needs_stems else 'No'}\n"
plan_text += f"- **Compatibility:** {trans.compatibility_score:.2f}; planner edge score: {trans.score_breakdown.get('overall', trans.compatibility_score):.2f}\n"
if trans.selected_cues:
a_cue = trans.selected_cues.get('a_out', {})
b_in = trans.selected_cues.get('b_in', {})
b_drop = trans.selected_cues.get('b_drop', {})
plan_text += f"- **Selected cues:** A out {a_cue.get('time', trans.mix_out_point):.1f}s ({a_cue.get('confidence', 0):.0%}), "
plan_text += f"B in {b_in.get('time', trans.mix_in_point):.1f}s ({b_in.get('confidence', 0):.0%}), "
plan_text += f"B drop {b_drop.get('time', trans.mix_in_point + trans.duration_seconds):.1f}s ({b_drop.get('confidence', 0):.0%})\n"
if trans.score_breakdown:
plan_text += "- **Score breakdown:** " + ", ".join(f"{k}={v:.2f}" for k, v in trans.score_breakdown.items() if isinstance(v, (int, float))) + "\n"
if trans.assumptions:
plan_text += "- **Assumptions / risks:** " + "; ".join(trans.assumptions) + "\n"
if trans.alternatives:
plan_text += f"- **Alternatives retained:** {len(trans.alternatives)} candidate cue edges for audition/future rendering\n"
plan_text += f"- **Reasoning:** {trans.reason}\n\n"
return plan_text
def apply_manual_transition_edit(transition_idx, mix_out_point, mix_in_point, duration_beats, transition_type):
"""Apply manual cue/timing/type edits to a planned transition."""
if not app_state.transitions:
return "⚠️ Generate a set plan first"
idx = int(transition_idx) - 1
if idx < 0 or idx >= len(app_state.transitions):
return f"⚠️ Invalid transition index. Choose 1-{len(app_state.transitions)}"
trans = app_state.transitions[idx]
track_a = app_state.analyses[trans.track_a_idx]
track_b = app_state.analyses[trans.track_b_idx]
if transition_type in TRANSITION_TYPES:
trans.transition_type = transition_type
trans.mix_out_point = round(max(0.0, min(float(mix_out_point), track_a.duration)), 2)
trans.mix_in_point = round(max(0.0, min(float(mix_in_point), track_b.duration)), 2)
beats = max(1, int(duration_beats))
trans.duration_beats = beats
trans.duration_seconds = round(beats * 60.0 / max(track_b.bpm, 60), 2)
trans.needs_stems = trans.transition_type in ("bass_swap", "acapella_over_instrumental", "drums_first", "double_drop")
update_transition_tempo_policy(trans, track_a, track_b)
trans.selected_cues = {
"a_out": {"kind": "mix_out", "label": "manual override", "time": trans.mix_out_point, "confidence": 1.0},
"b_in": {"kind": "mix_in", "label": "manual override", "time": trans.mix_in_point, "confidence": 1.0},
"b_drop": {"kind": "drop", "label": "manual implied from duration", "time": round(trans.mix_in_point + trans.duration_seconds, 2), "confidence": 0.8},
}
trans.cue_confidence = 1.0
trans.score_breakdown = {**dict(trans.score_breakdown), "manual_override": 1.0, "cue_confidence": 1.0}
assumptions = [a for a in trans.assumptions if "manual" not in a.lower()]
assumptions.append("manual cue/timing override applied; preview this candidate before full render")
trans.assumptions = assumptions
try:
from cue_learning import append_training_example
for cue in trans.selected_cues.values():
append_training_example("data/manual-cue-edits.jsonl", cue, duration=track_b.duration, label=1, source="numeric_manual_editor")
except Exception as exc:
logger.warning(f"Could not append manual cue training examples: {exc}")
return (
f"✅ Updated transition {idx+1}\n\n"
f"- Type: `{trans.transition_type}`\n"
f"- A mix-out: {trans.mix_out_point:.2f}s\n"
f"- B mix-in: {trans.mix_in_point:.2f}s\n"
f"- Duration: {trans.duration_beats} beats / {trans.duration_seconds:.2f}s\n"
f"- B implied drop: {trans.selected_cues['b_drop']['time']:.2f}s"
)
def load_waveform_cue_editor(transition_idx):
"""Load waveform image and cue choices for the selected transition."""
if not app_state.transitions:
return None, "⚠️ Generate a set plan first", gr.update(choices=[]), gr.update(choices=[]), gr.update(choices=[])
idx = int(transition_idx) - 1
if idx < 0 or idx >= len(app_state.transitions):
return None, f"⚠️ Invalid transition index. Choose 1-{len(app_state.transitions)}", gr.update(choices=[]), gr.update(choices=[]), gr.update(choices=[])
trans = app_state.transitions[idx]
track_a = app_state.analyses[trans.track_a_idx]
track_b = app_state.analyses[trans.track_b_idx]
from cue_editor import render_transition_cue_editor, choices_for_transition
image_path, summary = render_transition_cue_editor(track_a, track_b, trans)
choices = choices_for_transition(track_a, track_b, trans)
return (
image_path,
summary,
gr.update(choices=choices["a_choices"], value=choices["a_default"]),
gr.update(choices=choices["b_in_choices"], value=choices["b_in_default"]),
gr.update(choices=choices["b_drop_choices"], value=choices["b_drop_default"]),
)
def apply_waveform_cue_choices(transition_idx, a_choice, b_in_choice, b_drop_choice, transition_type):
"""Apply cue choices from the waveform editor."""
if not app_state.transitions:
return "⚠️ Generate a set plan first"
idx = int(transition_idx) - 1
if idx < 0 or idx >= len(app_state.transitions):
return f"⚠️ Invalid transition index. Choose 1-{len(app_state.transitions)}"
trans = app_state.transitions[idx]
track_a = app_state.analyses[trans.track_a_idx]
track_b = app_state.analyses[trans.track_b_idx]
from cue_editor import apply_choices_to_plan
mix_out, mix_in, duration, selected = apply_choices_to_plan(
trans,
a_choice=a_choice,
b_in_choice=b_in_choice,
b_drop_choice=b_drop_choice,
transition_type=transition_type if transition_type in TRANSITION_TYPES else None,
)
trans.mix_out_point = round(max(0.0, min(mix_out, track_a.duration)), 3)
trans.mix_in_point = round(max(0.0, min(mix_in, track_b.duration)), 3)
trans.duration_seconds = round(max(0.25, duration), 3)
trans.duration_beats = max(1, round(trans.duration_seconds * max(track_b.bpm, 60.0) / 60.0))
trans.needs_stems = trans.transition_type in ("bass_swap", "acapella_over_instrumental", "drums_first", "double_drop")
update_transition_tempo_policy(trans, track_a, track_b)
trans.selected_cues = selected
confs = [float(c.get("confidence", 0.0) or 0.0) for c in selected.values()]
trans.cue_confidence = round(sum(confs) / len(confs), 3) if confs else 1.0
trans.score_breakdown = {**dict(trans.score_breakdown), "waveform_editor_override": 1.0, "cue_confidence": trans.cue_confidence}
assumptions = [a for a in trans.assumptions if "waveform" not in a.lower()]
assumptions.append("waveform cue editor override applied; audition before full render")
trans.assumptions = assumptions
# Persist positive cue examples as manual supervision. The user can later
# train a cue model from this file or merge it with listening ratings.
try:
from cue_learning import append_training_example
for cue in selected.values():
append_training_example("data/manual-cue-edits.jsonl", cue, duration=track_b.duration, label=1, source="waveform_editor")
except Exception as exc:
logger.warning(f"Could not append manual cue training examples: {exc}")
return (
f"✅ Applied waveform cue edit to transition {idx+1}\n\n"
f"- Type: `{trans.transition_type}`\n"
f"- A mix-out: {trans.mix_out_point:.2f}s\n"
f"- B mix-in: {trans.mix_in_point:.2f}s\n"
f"- B drop: {trans.selected_cues['b_drop']['time']:.2f}s\n"
f"- Duration: {trans.duration_beats} beats / {trans.duration_seconds:.2f}s\n"
f"- Cue confidence: {trans.cue_confidence:.0%}"
)
def load_interactive_timeline_editor(transition_idx):
"""Render the draggable HTML timeline editor and editable JSON payload."""
if not app_state.transitions:
return "", "", "⚠️ Generate a set plan first"
idx = int(transition_idx) - 1
if idx < 0 or idx >= len(app_state.transitions):
return "", "", f"⚠️ Invalid transition index. Choose 1-{len(app_state.transitions)}"
trans = app_state.transitions[idx]
track_a = app_state.analyses[trans.track_a_idx]
track_b = app_state.analyses[trans.track_b_idx]
from timeline_editor import render_timeline_editor_file
html_path, html, payload_json = render_timeline_editor_file(
transition_index=idx + 1,
track_a=track_a,
track_b=track_b,
plan=trans,
)
summary = (
"✅ Interactive timeline editor generated\n\n"
f"- File: `{html_path}`\n"
"- Drag markers in the embedded editor, copy the JSON, then paste/apply it below.\n"
"- Double-click a candidate marker to snap the nearest selected cue.\n"
"- Use Shift while dragging or nudging for finer 0.01s movement."
)
return html, payload_json, summary
def apply_interactive_timeline_json(transition_idx, payload_json):
"""Apply the JSON payload produced by the draggable timeline editor."""
if not app_state.transitions:
return "⚠️ Generate a set plan first"
idx = int(transition_idx) - 1
if idx < 0 or idx >= len(app_state.transitions):
return f"⚠️ Invalid transition index. Choose 1-{len(app_state.transitions)}"
trans = app_state.transitions[idx]
track_a = app_state.analyses[trans.track_a_idx]
track_b = app_state.analyses[trans.track_b_idx]
try:
from timeline_editor import apply_timeline_json_to_plan
summary = apply_timeline_json_to_plan(trans, track_a, track_b, payload_json or "")
update_transition_tempo_policy(trans, track_a, track_b)
except Exception as exc:
return f"⚠️ Could not apply timeline JSON: {exc}"
try:
from cue_learning import append_training_example
for cue in trans.selected_cues.values():
append_training_example("data/manual-cue-edits.jsonl", cue, duration=track_b.duration, label=1, source="interactive_timeline")
except Exception as exc:
logger.warning(f"Could not append interactive timeline training examples: {exc}")
return (
f"✅ Applied interactive timeline edit to transition {idx+1}\n\n"
f"- Type: `{summary['transition_type']}`\n"
f"- A mix-out: {summary['a_out']:.2f}s\n"
f"- B mix-in: {summary['b_in']:.2f}s\n"
f"- B drop: {summary['b_drop']:.2f}s\n"
f"- Duration: {summary['duration_beats']} beats / {summary['duration_seconds']:.2f}s\n"
"- Source: interactive timeline JSON"
)