""" Structural analysis and DJ transition planning — complete rewrite. The core model is intentionally limited and uncertainty-aware: - Structure detection proposes cue candidates; it does not claim certainty. - A transition is scored as an edge from an outgoing cue on A to an incoming cue and drop cue on B. - The renderer still consumes one selected plan, but the plan records cue evidence, score breakdown, assumptions, and alternatives for audition. This is one useful electronic-music transition archetype, not a universal model of how every professional DJ mixes. """ import numpy as np import librosa import logging from dataclasses import dataclass, asdict logger = logging.getLogger("dj_engine") @dataclass class StructuralSection: """A section of a track with a musical function.""" start: float # seconds end: float # seconds label: str # intro / buildup / drop / breakdown / outro energy: float # mean RMS energy energy_slope: float # positive = energy rising, negative = falling def analyze_structure(y, sr, beat_times, rms) -> list: """Detect structural sections based on energy contour. Uses a simple, robust approach: 1. Smooth the energy curve to bar-level resolution 2. Classify each bar by its energy level (high/mid/low) 3. Detect energy *transitions* (rising/falling) between bars 4. Combine into sections: intro, buildup, drop, breakdown, outro """ hop = 512 duration = len(y) / sr if len(rms) < 20: return [StructuralSection(0, duration, "full", float(np.mean(rms)), 0)] # Step 1: Compute bar-level energy # Each "bar" is 4 beats if beat_times and len(beat_times) > 8: avg_beat = np.median(np.diff(beat_times[:20])) bar_dur = avg_beat * 4 else: bar_dur = 4 * 60.0 / 128 # default 128 BPM bar_samples = int(bar_dur * sr / hop) n_bars = max(1, len(rms) // bar_samples) # Average energy per bar bar_energy = np.zeros(n_bars) for i in range(n_bars): s = i * bar_samples e = min(s + bar_samples, len(rms)) bar_energy[i] = np.mean(rms[s:e]) if n_bars < 4: return [StructuralSection(0, duration, "full", float(np.mean(rms)), 0)] # Step 2: Classify bars by energy level # Use percentiles of the track's own energy range e_max = np.max(bar_energy) e_min = np.min(bar_energy) if e_max <= e_min: return [StructuralSection(0, duration, "full", float(np.mean(rms)), 0)] bar_norm = (bar_energy - e_min) / (e_max - e_min) # High = top 35%, Low = bottom 35% high_thresh = 0.55 low_thresh = 0.30 # Look at 4-bar energy slope to detect energy TRANSITIONS. # A buildup is any region where energy is rising towards a drop. # Key insight: if the current bar is NOT high-energy but the NEXT 4 bars # include a high-energy bar, this is a buildup region. # Step 3: Label each bar bar_labels = [] for i in range(n_bars): e = bar_norm[i] # Look ahead: are we approaching a drop? (within 4 bars) lookahead = min(4, n_bars - i) future_max = max(bar_norm[i:i+lookahead]) if lookahead > 0 else e # Look at energy change over 4-bar windows if i >= 2: slope_4bar = bar_norm[i] - bar_norm[max(0, i-4)] else: slope_4bar = 0 if e >= high_thresh: bar_labels.append('drop') elif e > low_thresh and future_max >= high_thresh and (i + lookahead) < n_bars: # Energy is moderate but about to become high → buildup bar_labels.append('buildup') elif slope_4bar > 0.1 and e > low_thresh: bar_labels.append('buildup') elif slope_4bar < -0.15 and e < high_thresh: bar_labels.append('breakdown') elif e <= low_thresh: bar_labels.append('low') else: bar_labels.append('mid') # Step 4: Force intro/outro at track boundaries # Intro: all low/mid bars before the first high-energy bar first_high = next((i for i, l in enumerate(bar_labels) if l == 'drop'), n_bars // 2) for i in range(min(first_high, n_bars)): if bar_labels[i] not in ('drop', 'buildup'): bar_labels[i] = 'intro' else: break # Outro: all bars after the last high-energy bar last_high = next((n_bars - 1 - i for i, l in enumerate(reversed(bar_labels)) if l == 'drop'), n_bars // 2) for i in range(max(last_high + 1, 0), n_bars): if bar_labels[i] != 'drop': bar_labels[i] = 'outro' # Relabel remaining 'low' and 'mid' based on context for i in range(len(bar_labels)): if bar_labels[i] in ('low', 'mid'): before_has_drop = any(l == 'drop' for l in bar_labels[:i]) after_has_drop = any(l == 'drop' for l in bar_labels[i:]) if before_has_drop and after_has_drop: bar_labels[i] = 'breakdown' elif before_has_drop: bar_labels[i] = 'outro' else: bar_labels[i] = 'intro' # Step 5: Consolidate runs into sections sections = [] i = 0 while i < n_bars: label = bar_labels[i] j = i while j < n_bars and bar_labels[j] == label: j += 1 start_t = i * bar_dur end_t = min(j * bar_dur, duration) seg_e = float(np.mean(bar_norm[i:j])) seg_slope = float(bar_norm[min(j-1, n_bars-1)] - bar_norm[i]) if j > i else 0 sections.append(StructuralSection( start=round(start_t, 2), end=round(end_t, 2), label=label, energy=round(seg_e, 4), energy_slope=round(seg_slope, 4), )) i = j # Merge tiny sections (< 2 bars) into neighbors merged = [] for sec in sections: if merged and (sec.end - sec.start) < bar_dur * 2: # Absorb into previous merged[-1] = StructuralSection( merged[-1].start, sec.end, merged[-1].label, (merged[-1].energy + sec.energy) / 2, merged[-1].energy_slope) else: merged.append(sec) return merged if merged else [StructuralSection(0, duration, "full", 0.5, 0)] def find_mix_points(analysis) -> dict: """Find the ideal mix-in and mix-out points for a track. Returns dict with: mix_in_time: where to START bringing this track in on the audience channel. This is the start of the buildup (NOT bar 1 — the intro plays in headphones only, the audience hears from the buildup). first_drop: when the first drop hits (the climactic moment) mix_out_time: where to START fading this track out (breakdown/outro) last_drop_end: when the last drop ends """ segments = analysis.segments duration = analysis.duration def get(seg, key): return seg[key] if isinstance(seg, dict) else getattr(seg, key) drops = [s for s in segments if get(s, 'label') == 'drop'] breakdowns = [s for s in segments if get(s, 'label') == 'breakdown'] buildups = [s for s in segments if get(s, 'label') == 'buildup'] intros = [s for s in segments if get(s, 'label') == 'intro'] outros = [s for s in segments if get(s, 'label') == 'outro'] # Mix-in: where the audience first hears this track. # Prefer: buildup BEFORE the first drop (the energy ramp into the drop). # If no buildup before the drop, use the last 16 bars of the intro. first_drop_time = get(drops[0], 'start') if drops else duration * 0.3 pre_drop_buildups = [s for s in buildups if get(s, 'start') < first_drop_time] if pre_drop_buildups: mix_in_time = get(pre_drop_buildups[-1], 'start') elif intros: # No buildup detected — use intro, 16 bars before the drop bars_16 = 16 * 4 * 60.0 / max(analysis.bpm, 60) mix_in_time = max(0, first_drop_time - bars_16) else: bars_16 = 16 * 4 * 60.0 / max(analysis.bpm, 60) mix_in_time = max(0, first_drop_time - bars_16) # First drop if drops: first_drop = get(drops[0], 'start') elif buildups: first_drop = get(buildups[0], 'end') else: first_drop = duration * 0.3 # Mix-out: start of A's LAST DROP. This is counterintuitive — the DJ # starts blending in B while A is still at full energy. B enters # quietly underneath A's drop, then as A hits its breakdown (energy # drops naturally), B's buildup fills the gap. When B drops, it takes # over. The audience never hears a dip because A's drop covers B's # quiet buildup, and B's drop covers A's breakdown. if len(drops) >= 2: # Second-to-last drop start — so the last drop plays through mix_out_time = get(drops[-1], 'start') elif drops: mix_out_time = get(drops[0], 'start') elif breakdowns: mix_out_time = get(breakdowns[-1], 'start') else: mix_out_time = duration * 0.55 # Last drop end last_drop_end = get(drops[-1], 'end') if drops else duration * 0.7 # Snap to nearest downbeat if analysis.downbeats: db = np.array(analysis.downbeats) mix_in_time = float(db[np.argmin(np.abs(db - mix_in_time))]) first_drop = float(db[np.argmin(np.abs(db - first_drop))]) mix_out_time = float(db[np.argmin(np.abs(db - mix_out_time))]) last_drop_end = float(db[np.argmin(np.abs(db - last_drop_end))]) return { 'mix_in_time': round(mix_in_time, 2), 'first_drop': round(first_drop, 2), 'mix_out_time': round(mix_out_time, 2), 'last_drop_end': round(last_drop_end, 2), } def plan_structural_transition(track_a, track_b, compat, position_in_set=0.5, prev_type=None): """Plan one scored transition edge for the current renderer. The preferred archetype is still: A carries energy while B enters, then B's drop becomes the takeover point. Unlike the original version, this function keeps cue confidence, score components, assumptions, and alternatives so the result is auditable instead of pretending the structural estimate is certain. """ from transitions import TRANSITION_TYPES mp_a = find_mix_points(track_a) mp_b = find_mix_points(track_b) from transition_optimizer import choose_best_transition_edge best_edge, alternatives = choose_best_transition_edge(track_a, track_b, compat, prev_transition_type=prev_type) a_out = float(best_edge.a_out.get('time', mp_a['mix_out_time'])) b_in = float(best_edge.b_in.get('time', mp_b['mix_in_time'])) b_drop = float(best_edge.b_drop.get('time', mp_b['first_drop'])) transition_duration = float(best_edge.duration_seconds) duration_beats = int(best_edge.duration_beats) if abs((b_drop - b_in) - transition_duration) > 0.01: b_in = max(0.0, b_drop - transition_duration) bpm = max(track_b.bpm, 60) bar_sec = 4 * 60.0 / bpm # Transition type is part of the ranked candidate now. This lets the planner # compare cue timing and technique fit together instead of picking a recipe # after the best cue edge has already been chosen. transition_type = getattr(best_edge, "transition_type", "eq_crossfade") # Override durations for specific types — some should be shorter if transition_type == "slam": duration_beats = 4 transition_duration = bar_sec # 1 bar # Slam happens right at B's drop — so A plays right up to the cut a_out = max(a_out, mp_a['mix_out_time']) b_in = max(0, b_drop - transition_duration) elif transition_type in ("bass_swap", "double_drop"): # Sharp techniques: 4-8 bars duration_beats = min(duration_beats, 32) transition_duration = min(transition_duration, 8 * bar_sec) # Start B closer to the drop b_in = max(b_in, b_drop - transition_duration) elif transition_type in ("noise_riser_cut", "beat_repeat_stutter", "spinback"): # Short dramatic effects: 4-8 bars duration_beats = min(duration_beats, 32) transition_duration = min(transition_duration, 8 * bar_sec) b_in = max(b_in, b_drop - transition_duration) from tempo_policy import decide_bpm_adjustment tempo_decision = decide_bpm_adjustment(track_a, track_b, transition_type) bpm_adj = tempo_decision.ratio needs_stems = transition_type in ("bass_swap", "acapella_over_instrumental", "drums_first", "double_drop") def seg_str(segs): return " → ".join( f"{s['label'] if isinstance(s, dict) else s.label}" for s in segs[:6] ) selected_cues = { "a_out": dict(best_edge.a_out), "b_in": {**dict(best_edge.b_in), "time": round(b_in, 2)}, "b_drop": dict(best_edge.b_drop), } if abs(float(best_edge.b_in.get('time', b_in)) - b_in) > 0.01: selected_cues["b_in"]["execution_adjustment"] = "transition type shortened/re-anchored cue" reason = ( f"**{transition_type}**: {TRANSITION_TYPES.get(transition_type, transition_type)}\n\n" f"Track A: {seg_str(track_a.segments)}\n" f" → Mix out at {a_out:.1f}s via cue '{best_edge.a_out.get('label', 'unknown')}' " f"(confidence {float(best_edge.a_out.get('confidence', 0.0)):.0%})\n" f"Track B: {seg_str(track_b.segments)}\n" f" → Mix in at {b_in:.1f}s via cue '{best_edge.b_in.get('label', 'unknown')}', " f"drop at {b_drop:.1f}s via cue '{best_edge.b_drop.get('label', 'unknown')}'\n" f" → {duration_beats} beats ({transition_duration:.1f}s) selected from scored cue edge\n\n" f"BPM: {track_a.bpm}→{track_b.bpm}; tempo policy: ×{bpm_adj:.3f} — {tempo_decision.reason}\n" f"Key: {track_a.camelot}→{track_b.camelot} ({compat['key_description']})\n" f"Cue edge score: {best_edge.score:.2f}; musical quality={best_edge.musical_quality_score:.2f}; " f"breakdown={best_edge.score_breakdown}" ) from app_models import TransitionPlan return TransitionPlan( track_a_idx=-1, track_b_idx=-1, transition_type=transition_type, mix_out_point=round(a_out, 2), mix_in_point=round(b_in, 2), duration_beats=duration_beats, duration_seconds=round(transition_duration, 2), bpm_adjustment=round(bpm_adj, 4), needs_stems=needs_stems, compatibility_score=compat["overall"], reason=reason, cue_confidence=best_edge.score_breakdown.get("cue_confidence", 0.0), score_breakdown={"overall": best_edge.score, **best_edge.score_breakdown, "bpm_adjustment": bpm_adj, "bpm_adjustment_enabled": tempo_decision.enabled}, selected_cues=selected_cues, alternatives=[edge.to_dict() for edge in alternatives], assumptions=[*best_edge.assumptions, tempo_decision.reason], tempo_policy=tempo_decision.to_dict(), )