from __future__ import annotations import json import logging import os import tempfile import gradio as gr import numpy as np import soundfile as sf import app_planning from app_audio import load_audio_segment, time_stretch_audio from app_benchmarks import format_benchmarks, run_benchmarks from app_planning import build_set_order, compute_compatibility, LAST_SET_ORDER_METADATA, plan_transition, update_transition_tempo_policy from app_state import app_state from app_track_analysis import analyze_track from transitions import TRANSITION_TYPES logger = logging.getLogger("dj_engine") def analyze_uploaded_tracks(files, progress=gr.Progress()): """Analyze all uploaded audio files.""" if not files: return "⚠️ No files uploaded", "", "" app_state.tracks = [] app_state.analyses = [] analysis_reports = [] for i, f in enumerate(files): filepath = f if isinstance(f, str) else f.name progress((i / len(files)) * 0.9, desc=f"Analyzing {os.path.basename(filepath)}...") try: analysis = analyze_track(filepath, progress_cb=lambda p, m: progress( (i + p) / len(files) * 0.9, desc=m )) app_state.analyses.append(analysis) top_cues = ", ".join( f"{c.get('kind', c.get('type', 'cue'))}@{float(c.get('time', 0.0)):.1f}s/{float(c.get('confidence', 0.0)):.0%}" for c in analysis.cue_points[:4] ) or "none" report = ( f"### 🎵 {analysis.filename}\n" f"- **BPM:** {analysis.bpm} (confidence: {analysis.bpm_confidence:.0%})\n" f"- **Key:** {analysis.key} {analysis.scale} ({analysis.camelot})\n" f"- **Duration:** {analysis.duration:.1f}s\n" f"- **Energy:** {analysis.avg_energy:.4f} ({analysis.loudness_db:.1f} dB)\n" f"- **Brightness:** {analysis.spectral_centroid_mean:.0f} Hz\n" f"- **Segments:** {len(analysis.segments)} " f"({', '.join(s['label'] for s in analysis.segments[:5])}{'...' if len(analysis.segments) > 5 else ''})\n" f"- **Downbeat phase:** {analysis.downbeat_phase} (confidence: {analysis.downbeat_confidence:.0%})\n" f"- **Cue Points:** {len(analysis.cue_points)} ranked candidates; top: {top_cues}\n" ) analysis_reports.append(report) except Exception as e: analysis_reports.append(f"### ❌ {os.path.basename(filepath)}\nError: {str(e)}\n") logger.error(f"Analysis failed for {filepath}: {e}", exc_info=True) progress(0.95, desc="Computing compatibility matrix...") # Compute compatibility matrix n = len(app_state.analyses) compat_text = "" if n >= 2: compat_text = "### 🔗 Compatibility Matrix\n\n" compat_text += "| | " + " | ".join(a.filename[:15] for a in app_state.analyses) + " |\n" compat_text += "|" + "---|" * (n + 1) + "\n" for i in range(n): row = f"| **{app_state.analyses[i].filename[:15]}** |" for j in range(n): if i == j: row += " — |" else: compat = compute_compatibility(app_state.analyses[i], app_state.analyses[j]) score = compat["overall"] emoji = "🟢" if score >= 0.7 else "🟡" if score >= 0.5 else "🔴" row += f" {emoji} {score:.2f} |" compat_text += row + "\n" # Run benchmarks on analysis benchmarks = run_benchmarks(app_state.analyses) app_state.benchmarks = benchmarks benchmark_text = format_benchmarks(benchmarks) full_report = "\n".join(analysis_reports) return full_report, compat_text, benchmark_text def generate_set_plan(allow_repeat, progress=gr.Progress()): """Generate the optimal set order and transition plans.""" if len(app_state.analyses) < 2: return "⚠️ Need at least 2 analyzed tracks to build a set" progress(0.1, desc="Computing optimal track order...") order = build_set_order(app_state.analyses, allow_repeat=allow_repeat) app_state.set_order = order app_state.set_order_metadata = dict(LAST_SET_ORDER_METADATA) progress(0.3, desc="Planning transitions...") transitions = [] set_len = len(order) prev_type = None for pos in range(len(order) - 1): idx_a = order[pos] idx_b = order[pos + 1] compat = compute_compatibility(app_state.analyses[idx_a], app_state.analyses[idx_b]) position_frac = pos / max(set_len - 1, 1) plan = plan_transition( app_state.analyses[idx_a], app_state.analyses[idx_b], compat, position_in_set=position_frac, set_length=set_len, prev_transition_type=prev_type, ) plan.track_a_idx = idx_a plan.track_b_idx = idx_b transitions.append(plan) prev_type = plan.transition_type app_state.transitions = transitions # Format the plan plan_text = "# 🎧 DJ Set Plan\n\n" method = app_state.set_order_metadata.get("method", "unknown") if app_state.set_order_metadata: plan_text += f"_Set optimizer:_ `{method}`; score={app_state.set_order_metadata.get('score', 'n/a')}\n\n" plan_text += "## Track Order\n" for pos, idx in enumerate(order): t = app_state.analyses[idx] target = None if app_state.set_order_metadata.get('energy_targets'): target = app_state.set_order_metadata['energy_targets'][pos] target_txt = f", target_energy={target:.2f}" if isinstance(target, (int, float)) else "" plan_text += f"{pos+1}. **{t.filename}** — {t.bpm} BPM, {t.camelot}, energy={t.avg_energy:.3f}{target_txt}\n" if app_state.set_order_metadata.get('pair_scores'): plan_text += "\n## Global optimizer pair scores\n" for item in app_state.set_order_metadata['pair_scores']: a = app_state.analyses[item['src']].filename b = app_state.analyses[item['dst']].filename plan_text += f"- **{a} → {b}:** total={item['total']:.2f}, compat={item['compatibility']:.2f}, edge={item['edge_score']:.2f}, cue={item['cue_confidence']:.2f}\n" plan_text += "\n## Transitions\n\n" for i, trans in enumerate(transitions): t_a = app_state.analyses[trans.track_a_idx] t_b = app_state.analyses[trans.track_b_idx] plan_text += f"### Transition {i+1}: {t_a.filename} → {t_b.filename}\n" plan_text += f"- **Type:** `{trans.transition_type}` — {TRANSITION_TYPES[trans.transition_type]}\n" plan_text += f"- **Mix out:** {trans.mix_out_point:.1f}s into track A\n" plan_text += f"- **Mix in:** {trans.mix_in_point:.1f}s into track B\n" plan_text += f"- **Duration:** {trans.duration_beats} beats ({trans.duration_seconds:.1f}s)\n" tempo_reason = (getattr(trans, "tempo_policy", {}) or {}).get("reason", "pitch-preserving if enabled") plan_text += f"- **Tempo policy:** ×{trans.bpm_adjustment:.3f} — {tempo_reason}\n" plan_text += f"- **Stems needed:** {'Yes (demucs)' if trans.needs_stems else 'No'}\n" plan_text += f"- **Compatibility:** {trans.compatibility_score:.2f}; planner edge score: {trans.score_breakdown.get('overall', trans.compatibility_score):.2f}\n" if trans.selected_cues: a_cue = trans.selected_cues.get('a_out', {}) b_in = trans.selected_cues.get('b_in', {}) b_drop = trans.selected_cues.get('b_drop', {}) plan_text += f"- **Selected cues:** A out {a_cue.get('time', trans.mix_out_point):.1f}s ({a_cue.get('confidence', 0):.0%}), " plan_text += f"B in {b_in.get('time', trans.mix_in_point):.1f}s ({b_in.get('confidence', 0):.0%}), " plan_text += f"B drop {b_drop.get('time', trans.mix_in_point + trans.duration_seconds):.1f}s ({b_drop.get('confidence', 0):.0%})\n" if trans.score_breakdown: plan_text += "- **Score breakdown:** " + ", ".join(f"{k}={v:.2f}" for k, v in trans.score_breakdown.items() if isinstance(v, (int, float))) + "\n" if trans.assumptions: plan_text += "- **Assumptions / risks:** " + "; ".join(trans.assumptions) + "\n" if trans.alternatives: plan_text += f"- **Alternatives retained:** {len(trans.alternatives)} candidate cue edges for audition/future rendering\n" plan_text += f"- **Reasoning:** {trans.reason}\n\n" return plan_text def apply_manual_transition_edit(transition_idx, mix_out_point, mix_in_point, duration_beats, transition_type): """Apply manual cue/timing/type edits to a planned transition.""" if not app_state.transitions: return "⚠️ Generate a set plan first" idx = int(transition_idx) - 1 if idx < 0 or idx >= len(app_state.transitions): return f"⚠️ Invalid transition index. Choose 1-{len(app_state.transitions)}" trans = app_state.transitions[idx] track_a = app_state.analyses[trans.track_a_idx] track_b = app_state.analyses[trans.track_b_idx] if transition_type in TRANSITION_TYPES: trans.transition_type = transition_type trans.mix_out_point = round(max(0.0, min(float(mix_out_point), track_a.duration)), 2) trans.mix_in_point = round(max(0.0, min(float(mix_in_point), track_b.duration)), 2) beats = max(1, int(duration_beats)) trans.duration_beats = beats trans.duration_seconds = round(beats * 60.0 / max(track_b.bpm, 60), 2) trans.needs_stems = trans.transition_type in ("bass_swap", "acapella_over_instrumental", "drums_first", "double_drop") update_transition_tempo_policy(trans, track_a, track_b) trans.selected_cues = { "a_out": {"kind": "mix_out", "label": "manual override", "time": trans.mix_out_point, "confidence": 1.0}, "b_in": {"kind": "mix_in", "label": "manual override", "time": trans.mix_in_point, "confidence": 1.0}, "b_drop": {"kind": "drop", "label": "manual implied from duration", "time": round(trans.mix_in_point + trans.duration_seconds, 2), "confidence": 0.8}, } trans.cue_confidence = 1.0 trans.score_breakdown = {**dict(trans.score_breakdown), "manual_override": 1.0, "cue_confidence": 1.0} assumptions = [a for a in trans.assumptions if "manual" not in a.lower()] assumptions.append("manual cue/timing override applied; preview this candidate before full render") trans.assumptions = assumptions try: from cue_learning import append_training_example for cue in trans.selected_cues.values(): append_training_example("data/manual-cue-edits.jsonl", cue, duration=track_b.duration, label=1, source="numeric_manual_editor") except Exception as exc: logger.warning(f"Could not append manual cue training examples: {exc}") return ( f"✅ Updated transition {idx+1}\n\n" f"- Type: `{trans.transition_type}`\n" f"- A mix-out: {trans.mix_out_point:.2f}s\n" f"- B mix-in: {trans.mix_in_point:.2f}s\n" f"- Duration: {trans.duration_beats} beats / {trans.duration_seconds:.2f}s\n" f"- B implied drop: {trans.selected_cues['b_drop']['time']:.2f}s" ) def load_waveform_cue_editor(transition_idx): """Load waveform image and cue choices for the selected transition.""" if not app_state.transitions: return None, "⚠️ Generate a set plan first", gr.update(choices=[]), gr.update(choices=[]), gr.update(choices=[]) idx = int(transition_idx) - 1 if idx < 0 or idx >= len(app_state.transitions): return None, f"⚠️ Invalid transition index. Choose 1-{len(app_state.transitions)}", gr.update(choices=[]), gr.update(choices=[]), gr.update(choices=[]) trans = app_state.transitions[idx] track_a = app_state.analyses[trans.track_a_idx] track_b = app_state.analyses[trans.track_b_idx] from cue_editor import render_transition_cue_editor, choices_for_transition image_path, summary = render_transition_cue_editor(track_a, track_b, trans) choices = choices_for_transition(track_a, track_b, trans) return ( image_path, summary, gr.update(choices=choices["a_choices"], value=choices["a_default"]), gr.update(choices=choices["b_in_choices"], value=choices["b_in_default"]), gr.update(choices=choices["b_drop_choices"], value=choices["b_drop_default"]), ) def apply_waveform_cue_choices(transition_idx, a_choice, b_in_choice, b_drop_choice, transition_type): """Apply cue choices from the waveform editor.""" if not app_state.transitions: return "⚠️ Generate a set plan first" idx = int(transition_idx) - 1 if idx < 0 or idx >= len(app_state.transitions): return f"⚠️ Invalid transition index. Choose 1-{len(app_state.transitions)}" trans = app_state.transitions[idx] track_a = app_state.analyses[trans.track_a_idx] track_b = app_state.analyses[trans.track_b_idx] from cue_editor import apply_choices_to_plan mix_out, mix_in, duration, selected = apply_choices_to_plan( trans, a_choice=a_choice, b_in_choice=b_in_choice, b_drop_choice=b_drop_choice, transition_type=transition_type if transition_type in TRANSITION_TYPES else None, ) trans.mix_out_point = round(max(0.0, min(mix_out, track_a.duration)), 3) trans.mix_in_point = round(max(0.0, min(mix_in, track_b.duration)), 3) trans.duration_seconds = round(max(0.25, duration), 3) trans.duration_beats = max(1, round(trans.duration_seconds * max(track_b.bpm, 60.0) / 60.0)) trans.needs_stems = trans.transition_type in ("bass_swap", "acapella_over_instrumental", "drums_first", "double_drop") update_transition_tempo_policy(trans, track_a, track_b) trans.selected_cues = selected confs = [float(c.get("confidence", 0.0) or 0.0) for c in selected.values()] trans.cue_confidence = round(sum(confs) / len(confs), 3) if confs else 1.0 trans.score_breakdown = {**dict(trans.score_breakdown), "waveform_editor_override": 1.0, "cue_confidence": trans.cue_confidence} assumptions = [a for a in trans.assumptions if "waveform" not in a.lower()] assumptions.append("waveform cue editor override applied; audition before full render") trans.assumptions = assumptions # Persist positive cue examples as manual supervision. The user can later # train a cue model from this file or merge it with listening ratings. try: from cue_learning import append_training_example for cue in selected.values(): append_training_example("data/manual-cue-edits.jsonl", cue, duration=track_b.duration, label=1, source="waveform_editor") except Exception as exc: logger.warning(f"Could not append manual cue training examples: {exc}") return ( f"✅ Applied waveform cue edit to transition {idx+1}\n\n" f"- Type: `{trans.transition_type}`\n" f"- A mix-out: {trans.mix_out_point:.2f}s\n" f"- B mix-in: {trans.mix_in_point:.2f}s\n" f"- B drop: {trans.selected_cues['b_drop']['time']:.2f}s\n" f"- Duration: {trans.duration_beats} beats / {trans.duration_seconds:.2f}s\n" f"- Cue confidence: {trans.cue_confidence:.0%}" ) def load_interactive_timeline_editor(transition_idx): """Render the draggable HTML timeline editor and editable JSON payload.""" if not app_state.transitions: return "", "", "⚠️ Generate a set plan first" idx = int(transition_idx) - 1 if idx < 0 or idx >= len(app_state.transitions): return "", "", f"⚠️ Invalid transition index. Choose 1-{len(app_state.transitions)}" trans = app_state.transitions[idx] track_a = app_state.analyses[trans.track_a_idx] track_b = app_state.analyses[trans.track_b_idx] from timeline_editor import render_timeline_editor_file html_path, html, payload_json = render_timeline_editor_file( transition_index=idx + 1, track_a=track_a, track_b=track_b, plan=trans, ) summary = ( "✅ Interactive timeline editor generated\n\n" f"- File: `{html_path}`\n" "- Drag markers in the embedded editor, copy the JSON, then paste/apply it below.\n" "- Double-click a candidate marker to snap the nearest selected cue.\n" "- Use Shift while dragging or nudging for finer 0.01s movement." ) return html, payload_json, summary def apply_interactive_timeline_json(transition_idx, payload_json): """Apply the JSON payload produced by the draggable timeline editor.""" if not app_state.transitions: return "⚠️ Generate a set plan first" idx = int(transition_idx) - 1 if idx < 0 or idx >= len(app_state.transitions): return f"⚠️ Invalid transition index. Choose 1-{len(app_state.transitions)}" trans = app_state.transitions[idx] track_a = app_state.analyses[trans.track_a_idx] track_b = app_state.analyses[trans.track_b_idx] try: from timeline_editor import apply_timeline_json_to_plan summary = apply_timeline_json_to_plan(trans, track_a, track_b, payload_json or "") update_transition_tempo_policy(trans, track_a, track_b) except Exception as exc: return f"⚠️ Could not apply timeline JSON: {exc}" try: from cue_learning import append_training_example for cue in trans.selected_cues.values(): append_training_example("data/manual-cue-edits.jsonl", cue, duration=track_b.duration, label=1, source="interactive_timeline") except Exception as exc: logger.warning(f"Could not append interactive timeline training examples: {exc}") return ( f"✅ Applied interactive timeline edit to transition {idx+1}\n\n" f"- Type: `{summary['transition_type']}`\n" f"- A mix-out: {summary['a_out']:.2f}s\n" f"- B mix-in: {summary['b_in']:.2f}s\n" f"- B drop: {summary['b_drop']:.2f}s\n" f"- Duration: {summary['duration_beats']} beats / {summary['duration_seconds']:.2f}s\n" "- Source: interactive timeline JSON" )