Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import json | |
| import logging | |
| import os | |
| import tempfile | |
| import gradio as gr | |
| import numpy as np | |
| import soundfile as sf | |
| import app_planning | |
| from app_audio import load_audio_segment, time_stretch_audio | |
| from app_benchmarks import format_benchmarks, run_benchmarks | |
| from app_planning import build_set_order, compute_compatibility, LAST_SET_ORDER_METADATA, plan_transition, update_transition_tempo_policy | |
| from app_state import app_state | |
| from app_track_analysis import analyze_track | |
| from transitions import TRANSITION_TYPES | |
| logger = logging.getLogger("dj_engine") | |
| def analyze_uploaded_tracks(files, progress=gr.Progress()): | |
| """Analyze all uploaded audio files.""" | |
| if not files: | |
| return "⚠️ No files uploaded", "", "" | |
| app_state.tracks = [] | |
| app_state.analyses = [] | |
| analysis_reports = [] | |
| for i, f in enumerate(files): | |
| filepath = f if isinstance(f, str) else f.name | |
| progress((i / len(files)) * 0.9, desc=f"Analyzing {os.path.basename(filepath)}...") | |
| try: | |
| analysis = analyze_track(filepath, progress_cb=lambda p, m: progress( | |
| (i + p) / len(files) * 0.9, desc=m | |
| )) | |
| app_state.analyses.append(analysis) | |
| top_cues = ", ".join( | |
| f"{c.get('kind', c.get('type', 'cue'))}@{float(c.get('time', 0.0)):.1f}s/{float(c.get('confidence', 0.0)):.0%}" | |
| for c in analysis.cue_points[:4] | |
| ) or "none" | |
| report = ( | |
| f"### 🎵 {analysis.filename}\n" | |
| f"- **BPM:** {analysis.bpm} (confidence: {analysis.bpm_confidence:.0%})\n" | |
| f"- **Key:** {analysis.key} {analysis.scale} ({analysis.camelot})\n" | |
| f"- **Duration:** {analysis.duration:.1f}s\n" | |
| f"- **Energy:** {analysis.avg_energy:.4f} ({analysis.loudness_db:.1f} dB)\n" | |
| f"- **Brightness:** {analysis.spectral_centroid_mean:.0f} Hz\n" | |
| f"- **Segments:** {len(analysis.segments)} " | |
| f"({', '.join(s['label'] for s in analysis.segments[:5])}{'...' if len(analysis.segments) > 5 else ''})\n" | |
| f"- **Downbeat phase:** {analysis.downbeat_phase} (confidence: {analysis.downbeat_confidence:.0%})\n" | |
| f"- **Cue Points:** {len(analysis.cue_points)} ranked candidates; top: {top_cues}\n" | |
| ) | |
| analysis_reports.append(report) | |
| except Exception as e: | |
| analysis_reports.append(f"### ❌ {os.path.basename(filepath)}\nError: {str(e)}\n") | |
| logger.error(f"Analysis failed for {filepath}: {e}", exc_info=True) | |
| progress(0.95, desc="Computing compatibility matrix...") | |
| # Compute compatibility matrix | |
| n = len(app_state.analyses) | |
| compat_text = "" | |
| if n >= 2: | |
| compat_text = "### 🔗 Compatibility Matrix\n\n" | |
| compat_text += "| | " + " | ".join(a.filename[:15] for a in app_state.analyses) + " |\n" | |
| compat_text += "|" + "---|" * (n + 1) + "\n" | |
| for i in range(n): | |
| row = f"| **{app_state.analyses[i].filename[:15]}** |" | |
| for j in range(n): | |
| if i == j: | |
| row += " — |" | |
| else: | |
| compat = compute_compatibility(app_state.analyses[i], app_state.analyses[j]) | |
| score = compat["overall"] | |
| emoji = "🟢" if score >= 0.7 else "🟡" if score >= 0.5 else "🔴" | |
| row += f" {emoji} {score:.2f} |" | |
| compat_text += row + "\n" | |
| # Run benchmarks on analysis | |
| benchmarks = run_benchmarks(app_state.analyses) | |
| app_state.benchmarks = benchmarks | |
| benchmark_text = format_benchmarks(benchmarks) | |
| full_report = "\n".join(analysis_reports) | |
| return full_report, compat_text, benchmark_text | |
| def generate_set_plan(allow_repeat, progress=gr.Progress()): | |
| """Generate the optimal set order and transition plans.""" | |
| if len(app_state.analyses) < 2: | |
| return "⚠️ Need at least 2 analyzed tracks to build a set" | |
| progress(0.1, desc="Computing optimal track order...") | |
| order = build_set_order(app_state.analyses, allow_repeat=allow_repeat) | |
| app_state.set_order = order | |
| app_state.set_order_metadata = dict(LAST_SET_ORDER_METADATA) | |
| progress(0.3, desc="Planning transitions...") | |
| transitions = [] | |
| set_len = len(order) | |
| prev_type = None | |
| for pos in range(len(order) - 1): | |
| idx_a = order[pos] | |
| idx_b = order[pos + 1] | |
| compat = compute_compatibility(app_state.analyses[idx_a], app_state.analyses[idx_b]) | |
| position_frac = pos / max(set_len - 1, 1) | |
| plan = plan_transition( | |
| app_state.analyses[idx_a], app_state.analyses[idx_b], compat, | |
| position_in_set=position_frac, | |
| set_length=set_len, | |
| prev_transition_type=prev_type, | |
| ) | |
| plan.track_a_idx = idx_a | |
| plan.track_b_idx = idx_b | |
| transitions.append(plan) | |
| prev_type = plan.transition_type | |
| app_state.transitions = transitions | |
| # Format the plan | |
| plan_text = "# 🎧 DJ Set Plan\n\n" | |
| method = app_state.set_order_metadata.get("method", "unknown") | |
| if app_state.set_order_metadata: | |
| plan_text += f"_Set optimizer:_ `{method}`; score={app_state.set_order_metadata.get('score', 'n/a')}\n\n" | |
| plan_text += "## Track Order\n" | |
| for pos, idx in enumerate(order): | |
| t = app_state.analyses[idx] | |
| target = None | |
| if app_state.set_order_metadata.get('energy_targets'): | |
| target = app_state.set_order_metadata['energy_targets'][pos] | |
| target_txt = f", target_energy={target:.2f}" if isinstance(target, (int, float)) else "" | |
| plan_text += f"{pos+1}. **{t.filename}** — {t.bpm} BPM, {t.camelot}, energy={t.avg_energy:.3f}{target_txt}\n" | |
| if app_state.set_order_metadata.get('pair_scores'): | |
| plan_text += "\n## Global optimizer pair scores\n" | |
| for item in app_state.set_order_metadata['pair_scores']: | |
| a = app_state.analyses[item['src']].filename | |
| b = app_state.analyses[item['dst']].filename | |
| plan_text += f"- **{a} → {b}:** total={item['total']:.2f}, compat={item['compatibility']:.2f}, edge={item['edge_score']:.2f}, cue={item['cue_confidence']:.2f}\n" | |
| plan_text += "\n## Transitions\n\n" | |
| for i, trans in enumerate(transitions): | |
| t_a = app_state.analyses[trans.track_a_idx] | |
| t_b = app_state.analyses[trans.track_b_idx] | |
| plan_text += f"### Transition {i+1}: {t_a.filename} → {t_b.filename}\n" | |
| plan_text += f"- **Type:** `{trans.transition_type}` — {TRANSITION_TYPES[trans.transition_type]}\n" | |
| plan_text += f"- **Mix out:** {trans.mix_out_point:.1f}s into track A\n" | |
| plan_text += f"- **Mix in:** {trans.mix_in_point:.1f}s into track B\n" | |
| plan_text += f"- **Duration:** {trans.duration_beats} beats ({trans.duration_seconds:.1f}s)\n" | |
| tempo_reason = (getattr(trans, "tempo_policy", {}) or {}).get("reason", "pitch-preserving if enabled") | |
| plan_text += f"- **Tempo policy:** ×{trans.bpm_adjustment:.3f} — {tempo_reason}\n" | |
| plan_text += f"- **Stems needed:** {'Yes (demucs)' if trans.needs_stems else 'No'}\n" | |
| plan_text += f"- **Compatibility:** {trans.compatibility_score:.2f}; planner edge score: {trans.score_breakdown.get('overall', trans.compatibility_score):.2f}\n" | |
| if trans.selected_cues: | |
| a_cue = trans.selected_cues.get('a_out', {}) | |
| b_in = trans.selected_cues.get('b_in', {}) | |
| b_drop = trans.selected_cues.get('b_drop', {}) | |
| plan_text += f"- **Selected cues:** A out {a_cue.get('time', trans.mix_out_point):.1f}s ({a_cue.get('confidence', 0):.0%}), " | |
| plan_text += f"B in {b_in.get('time', trans.mix_in_point):.1f}s ({b_in.get('confidence', 0):.0%}), " | |
| plan_text += f"B drop {b_drop.get('time', trans.mix_in_point + trans.duration_seconds):.1f}s ({b_drop.get('confidence', 0):.0%})\n" | |
| if trans.score_breakdown: | |
| plan_text += "- **Score breakdown:** " + ", ".join(f"{k}={v:.2f}" for k, v in trans.score_breakdown.items() if isinstance(v, (int, float))) + "\n" | |
| if trans.assumptions: | |
| plan_text += "- **Assumptions / risks:** " + "; ".join(trans.assumptions) + "\n" | |
| if trans.alternatives: | |
| plan_text += f"- **Alternatives retained:** {len(trans.alternatives)} candidate cue edges for audition/future rendering\n" | |
| plan_text += f"- **Reasoning:** {trans.reason}\n\n" | |
| return plan_text | |
| def apply_manual_transition_edit(transition_idx, mix_out_point, mix_in_point, duration_beats, transition_type): | |
| """Apply manual cue/timing/type edits to a planned transition.""" | |
| if not app_state.transitions: | |
| return "⚠️ Generate a set plan first" | |
| idx = int(transition_idx) - 1 | |
| if idx < 0 or idx >= len(app_state.transitions): | |
| return f"⚠️ Invalid transition index. Choose 1-{len(app_state.transitions)}" | |
| trans = app_state.transitions[idx] | |
| track_a = app_state.analyses[trans.track_a_idx] | |
| track_b = app_state.analyses[trans.track_b_idx] | |
| if transition_type in TRANSITION_TYPES: | |
| trans.transition_type = transition_type | |
| trans.mix_out_point = round(max(0.0, min(float(mix_out_point), track_a.duration)), 2) | |
| trans.mix_in_point = round(max(0.0, min(float(mix_in_point), track_b.duration)), 2) | |
| beats = max(1, int(duration_beats)) | |
| trans.duration_beats = beats | |
| trans.duration_seconds = round(beats * 60.0 / max(track_b.bpm, 60), 2) | |
| trans.needs_stems = trans.transition_type in ("bass_swap", "acapella_over_instrumental", "drums_first", "double_drop") | |
| update_transition_tempo_policy(trans, track_a, track_b) | |
| trans.selected_cues = { | |
| "a_out": {"kind": "mix_out", "label": "manual override", "time": trans.mix_out_point, "confidence": 1.0}, | |
| "b_in": {"kind": "mix_in", "label": "manual override", "time": trans.mix_in_point, "confidence": 1.0}, | |
| "b_drop": {"kind": "drop", "label": "manual implied from duration", "time": round(trans.mix_in_point + trans.duration_seconds, 2), "confidence": 0.8}, | |
| } | |
| trans.cue_confidence = 1.0 | |
| trans.score_breakdown = {**dict(trans.score_breakdown), "manual_override": 1.0, "cue_confidence": 1.0} | |
| assumptions = [a for a in trans.assumptions if "manual" not in a.lower()] | |
| assumptions.append("manual cue/timing override applied; preview this candidate before full render") | |
| trans.assumptions = assumptions | |
| try: | |
| from cue_learning import append_training_example | |
| for cue in trans.selected_cues.values(): | |
| append_training_example("data/manual-cue-edits.jsonl", cue, duration=track_b.duration, label=1, source="numeric_manual_editor") | |
| except Exception as exc: | |
| logger.warning(f"Could not append manual cue training examples: {exc}") | |
| return ( | |
| f"✅ Updated transition {idx+1}\n\n" | |
| f"- Type: `{trans.transition_type}`\n" | |
| f"- A mix-out: {trans.mix_out_point:.2f}s\n" | |
| f"- B mix-in: {trans.mix_in_point:.2f}s\n" | |
| f"- Duration: {trans.duration_beats} beats / {trans.duration_seconds:.2f}s\n" | |
| f"- B implied drop: {trans.selected_cues['b_drop']['time']:.2f}s" | |
| ) | |
| def load_waveform_cue_editor(transition_idx): | |
| """Load waveform image and cue choices for the selected transition.""" | |
| if not app_state.transitions: | |
| return None, "⚠️ Generate a set plan first", gr.update(choices=[]), gr.update(choices=[]), gr.update(choices=[]) | |
| idx = int(transition_idx) - 1 | |
| if idx < 0 or idx >= len(app_state.transitions): | |
| return None, f"⚠️ Invalid transition index. Choose 1-{len(app_state.transitions)}", gr.update(choices=[]), gr.update(choices=[]), gr.update(choices=[]) | |
| trans = app_state.transitions[idx] | |
| track_a = app_state.analyses[trans.track_a_idx] | |
| track_b = app_state.analyses[trans.track_b_idx] | |
| from cue_editor import render_transition_cue_editor, choices_for_transition | |
| image_path, summary = render_transition_cue_editor(track_a, track_b, trans) | |
| choices = choices_for_transition(track_a, track_b, trans) | |
| return ( | |
| image_path, | |
| summary, | |
| gr.update(choices=choices["a_choices"], value=choices["a_default"]), | |
| gr.update(choices=choices["b_in_choices"], value=choices["b_in_default"]), | |
| gr.update(choices=choices["b_drop_choices"], value=choices["b_drop_default"]), | |
| ) | |
| def apply_waveform_cue_choices(transition_idx, a_choice, b_in_choice, b_drop_choice, transition_type): | |
| """Apply cue choices from the waveform editor.""" | |
| if not app_state.transitions: | |
| return "⚠️ Generate a set plan first" | |
| idx = int(transition_idx) - 1 | |
| if idx < 0 or idx >= len(app_state.transitions): | |
| return f"⚠️ Invalid transition index. Choose 1-{len(app_state.transitions)}" | |
| trans = app_state.transitions[idx] | |
| track_a = app_state.analyses[trans.track_a_idx] | |
| track_b = app_state.analyses[trans.track_b_idx] | |
| from cue_editor import apply_choices_to_plan | |
| mix_out, mix_in, duration, selected = apply_choices_to_plan( | |
| trans, | |
| a_choice=a_choice, | |
| b_in_choice=b_in_choice, | |
| b_drop_choice=b_drop_choice, | |
| transition_type=transition_type if transition_type in TRANSITION_TYPES else None, | |
| ) | |
| trans.mix_out_point = round(max(0.0, min(mix_out, track_a.duration)), 3) | |
| trans.mix_in_point = round(max(0.0, min(mix_in, track_b.duration)), 3) | |
| trans.duration_seconds = round(max(0.25, duration), 3) | |
| trans.duration_beats = max(1, round(trans.duration_seconds * max(track_b.bpm, 60.0) / 60.0)) | |
| trans.needs_stems = trans.transition_type in ("bass_swap", "acapella_over_instrumental", "drums_first", "double_drop") | |
| update_transition_tempo_policy(trans, track_a, track_b) | |
| trans.selected_cues = selected | |
| confs = [float(c.get("confidence", 0.0) or 0.0) for c in selected.values()] | |
| trans.cue_confidence = round(sum(confs) / len(confs), 3) if confs else 1.0 | |
| trans.score_breakdown = {**dict(trans.score_breakdown), "waveform_editor_override": 1.0, "cue_confidence": trans.cue_confidence} | |
| assumptions = [a for a in trans.assumptions if "waveform" not in a.lower()] | |
| assumptions.append("waveform cue editor override applied; audition before full render") | |
| trans.assumptions = assumptions | |
| # Persist positive cue examples as manual supervision. The user can later | |
| # train a cue model from this file or merge it with listening ratings. | |
| try: | |
| from cue_learning import append_training_example | |
| for cue in selected.values(): | |
| append_training_example("data/manual-cue-edits.jsonl", cue, duration=track_b.duration, label=1, source="waveform_editor") | |
| except Exception as exc: | |
| logger.warning(f"Could not append manual cue training examples: {exc}") | |
| return ( | |
| f"✅ Applied waveform cue edit to transition {idx+1}\n\n" | |
| f"- Type: `{trans.transition_type}`\n" | |
| f"- A mix-out: {trans.mix_out_point:.2f}s\n" | |
| f"- B mix-in: {trans.mix_in_point:.2f}s\n" | |
| f"- B drop: {trans.selected_cues['b_drop']['time']:.2f}s\n" | |
| f"- Duration: {trans.duration_beats} beats / {trans.duration_seconds:.2f}s\n" | |
| f"- Cue confidence: {trans.cue_confidence:.0%}" | |
| ) | |
| def load_interactive_timeline_editor(transition_idx): | |
| """Render the draggable HTML timeline editor and editable JSON payload.""" | |
| if not app_state.transitions: | |
| return "", "", "⚠️ Generate a set plan first" | |
| idx = int(transition_idx) - 1 | |
| if idx < 0 or idx >= len(app_state.transitions): | |
| return "", "", f"⚠️ Invalid transition index. Choose 1-{len(app_state.transitions)}" | |
| trans = app_state.transitions[idx] | |
| track_a = app_state.analyses[trans.track_a_idx] | |
| track_b = app_state.analyses[trans.track_b_idx] | |
| from timeline_editor import render_timeline_editor_file | |
| html_path, html, payload_json = render_timeline_editor_file( | |
| transition_index=idx + 1, | |
| track_a=track_a, | |
| track_b=track_b, | |
| plan=trans, | |
| ) | |
| summary = ( | |
| "✅ Interactive timeline editor generated\n\n" | |
| f"- File: `{html_path}`\n" | |
| "- Drag markers in the embedded editor, copy the JSON, then paste/apply it below.\n" | |
| "- Double-click a candidate marker to snap the nearest selected cue.\n" | |
| "- Use Shift while dragging or nudging for finer 0.01s movement." | |
| ) | |
| return html, payload_json, summary | |
| def apply_interactive_timeline_json(transition_idx, payload_json): | |
| """Apply the JSON payload produced by the draggable timeline editor.""" | |
| if not app_state.transitions: | |
| return "⚠️ Generate a set plan first" | |
| idx = int(transition_idx) - 1 | |
| if idx < 0 or idx >= len(app_state.transitions): | |
| return f"⚠️ Invalid transition index. Choose 1-{len(app_state.transitions)}" | |
| trans = app_state.transitions[idx] | |
| track_a = app_state.analyses[trans.track_a_idx] | |
| track_b = app_state.analyses[trans.track_b_idx] | |
| try: | |
| from timeline_editor import apply_timeline_json_to_plan | |
| summary = apply_timeline_json_to_plan(trans, track_a, track_b, payload_json or "") | |
| update_transition_tempo_policy(trans, track_a, track_b) | |
| except Exception as exc: | |
| return f"⚠️ Could not apply timeline JSON: {exc}" | |
| try: | |
| from cue_learning import append_training_example | |
| for cue in trans.selected_cues.values(): | |
| append_training_example("data/manual-cue-edits.jsonl", cue, duration=track_b.duration, label=1, source="interactive_timeline") | |
| except Exception as exc: | |
| logger.warning(f"Could not append interactive timeline training examples: {exc}") | |
| return ( | |
| f"✅ Applied interactive timeline edit to transition {idx+1}\n\n" | |
| f"- Type: `{summary['transition_type']}`\n" | |
| f"- A mix-out: {summary['a_out']:.2f}s\n" | |
| f"- B mix-in: {summary['b_in']:.2f}s\n" | |
| f"- B drop: {summary['b_drop']:.2f}s\n" | |
| f"- Duration: {summary['duration_beats']} beats / {summary['duration_seconds']:.2f}s\n" | |
| "- Source: interactive timeline JSON" | |
| ) | |