ai-techno-dj / app_render_callbacks.py
Rik Hoffbauer
Implement musical candidate ranking and feedback-driven learning
f02c67e
from __future__ import annotations
import json
import tempfile
import gradio as gr
import numpy as np
import soundfile as sf
from app_state import app_state
def render_full_set(max_iter, progress=gr.Progress()):
"""Render the DJ set through the AutomationIR full-set renderer."""
if not app_state.transitions:
return None, "⚠️ Generate a set plan first"
import app as core
progress(0.05, desc="Compiling full-set AutomationIR...")
from render_engine import render_full_set as render_full_set_shared
from stem_provider import StemProvider
stem_provider = StemProvider()
set_audio, set_info, ir = render_full_set_shared(
app_state.analyses,
app_state.set_order,
app_state.transitions,
load_audio_segment=core.load_audio_segment,
time_stretch_audio=core.time_stretch_audio,
stem_resolver=stem_provider.resolver(),
sr=44100,
)
app_state.last_stem_diagnostics = dict(stem_provider.diagnostics)
app_state.rendered_set = set_audio
progress(0.82, desc="Running diagnostics...")
benchmarks = core.run_benchmarks(app_state.analyses, app_state.transitions, set_audio)
benchmark_text = core.format_benchmarks(benchmarks)
progress(0.95, desc="Saving audio...")
tmp = tempfile.NamedTemporaryFile(suffix='.wav', delete=False)
audio_for_save = set_audio.T if set_audio.ndim == 2 else set_audio
if audio_for_save.ndim == 2 and audio_for_save.shape[1] > audio_for_save.shape[0]:
audio_for_save = audio_for_save.T
audio_for_save = np.clip(audio_for_save, -1.0, 1.0)
sf.write(tmp.name, audio_for_save.astype(np.float32), 44100)
summary = "# ✅ DJ Set Rendered via AutomationIR\n\n"
summary += f"- **Total duration:** {set_info['total_duration']:.1f}s ({set_info['total_duration']/60:.1f} min)\n"
summary += f"- **Tracks:** {len(set_info['tracks'])}\n"
summary += f"- **Transitions:** {len(set_info.get('transitions', []))}\n"
summary += f"- **AutomationIR:** {set_info['automation_ir']['clips']} clips, {set_info['automation_ir']['lanes']} lanes\n"
summary += f"- **Stem lane method:** `{set_info['automation_ir'].get('component_lane_method', 'n/a')}`\n\n"
summary += "## Tracklist\n"
for i, t in enumerate(set_info["tracks"]):
summary += f"{i+1}. **{t['filename']}** — tl={t['tl_start']:.1f}s\n"
if set_info.get("transitions"):
summary += "\n## Transitions Applied\n"
for i, t in enumerate(set_info["transitions"], start=1):
score = t.get("score_breakdown", {}).get("overall", "n/a")
summary += f"- **{i}. `{t['type']}`**: {t['track_a']}{t['track_b']} / score={score}\n"
summary += "\n## AutomationIR preview\n"
summary += "```json\n" + json.dumps(ir.to_dict(), indent=2)[:8000] + "\n```\n"
summary += f"\n{benchmark_text}"
return tmp.name, summary
def render_single_transition(transition_idx, candidate_rank=0, progress=gr.Progress()):
"""Preview a selected or alternative transition candidate via AutomationIR."""
if not app_state.transitions:
return None, "⚠️ Generate a set plan first"
import app as core
idx = int(transition_idx) - 1
if idx < 0 or idx >= len(app_state.transitions):
return None, f"⚠️ Invalid transition index. Choose 1-{len(app_state.transitions)}"
rank = max(0, int(candidate_rank or 0))
trans = app_state.transitions[idx]
track_a = app_state.analyses[trans.track_a_idx]
track_b = app_state.analyses[trans.track_b_idx]
progress(0.15, desc="Compiling automation IR...")
from render_engine import render_transition_preview
from stem_provider import StemProvider
stem_provider = StemProvider()
audio, ir, candidate = render_transition_preview(
trans,
track_a,
track_b,
candidate_rank=rank,
load_audio_segment=core.load_audio_segment,
time_stretch_audio=core.time_stretch_audio,
stem_resolver=stem_provider.resolver(),
sr=44100,
)
app_state.last_stem_diagnostics = dict(stem_provider.diagnostics)
progress(0.85, desc="Saving preview...")
tmp = tempfile.NamedTemporaryFile(suffix='.wav', delete=False)
sf.write(tmp.name, audio.T, 44100)
cue_source = "selected plan" if rank == 0 or not candidate else f"alternative #{rank}"
edge_score = None
if candidate:
edge_score = candidate.get('score')
else:
edge_score = trans.score_breakdown.get('overall')
from transition_diagnostics import diagnose_transition_audio, format_transition_diagnostics
diag = ir.metadata.get("rendered_diagnostics") or diagnose_transition_audio(audio, sr=44100, anchor_seconds=ir.anchor_seconds)
rendered_scores = ir.metadata.get("rendered_candidate_scores", {})
stem_diag = json.dumps(app_state.last_stem_diagnostics, indent=2)[:2500] if app_state.last_stem_diagnostics else "{}"
info = (
f"**Transition {idx+1}:** {track_a.filename}{track_b.filename}\n"
f"**Candidate:** {cue_source}\n"
f"**Type:** `{getattr(ir, 'transition_type', trans.transition_type)}`\n"
f"**Automation IR:** {len(ir.clips)} clips, {len(ir.lanes)} lanes, anchor={ir.anchor_seconds:.2f}s\n"
f"**Cue timing:** A out {ir.metadata['mix_out_point']:.2f}s, "
f"B in {ir.metadata['mix_in_point']:.2f}s, B drop {ir.metadata['b_drop']:.2f}s\n"
f"**Duration:** {ir.metadata['duration_seconds']:.2f}s; score={edge_score if edge_score is not None else 'n/a'}\n"
f"**Tempo policy:** ×{float(ir.metadata.get('bpm_adjustment', 1.0) or 1.0):.3f}; "
f"{(getattr(trans, 'tempo_policy', {}) or {}).get('reason', 'source tempo unless explicitly forced')}\n"
f"**Preview file duration:** {audio.shape[-1] / 44100:.1f}s\n\n"
f"**Rendered candidate scores:** `{json.dumps(rendered_scores)}`\n\n"
f"{format_transition_diagnostics(diag)}\n\n"
f"### Stem provider diagnostics\n```json\n{stem_diag}\n```\n\n"
f"```json\n{json.dumps(ir.to_dict(), indent=2)[:6000]}\n```"
)
return tmp.name, info