Spaces:
Running
Running
| """ | |
| Structural analysis and DJ transition planning β complete rewrite. | |
| The core model is intentionally limited and uncertainty-aware: | |
| - Structure detection proposes cue candidates; it does not claim certainty. | |
| - A transition is scored as an edge from an outgoing cue on A to an incoming | |
| cue and drop cue on B. | |
| - The renderer still consumes one selected plan, but the plan records cue | |
| evidence, score breakdown, assumptions, and alternatives for audition. | |
| This is one useful electronic-music transition archetype, not a universal | |
| model of how every professional DJ mixes. | |
| """ | |
| import numpy as np | |
| import librosa | |
| import logging | |
| from dataclasses import dataclass, asdict | |
| logger = logging.getLogger("dj_engine") | |
| class StructuralSection: | |
| """A section of a track with a musical function.""" | |
| start: float # seconds | |
| end: float # seconds | |
| label: str # intro / buildup / drop / breakdown / outro | |
| energy: float # mean RMS energy | |
| energy_slope: float # positive = energy rising, negative = falling | |
| def analyze_structure(y, sr, beat_times, rms) -> list: | |
| """Detect structural sections based on energy contour. | |
| Uses a simple, robust approach: | |
| 1. Smooth the energy curve to bar-level resolution | |
| 2. Classify each bar by its energy level (high/mid/low) | |
| 3. Detect energy *transitions* (rising/falling) between bars | |
| 4. Combine into sections: intro, buildup, drop, breakdown, outro | |
| """ | |
| hop = 512 | |
| duration = len(y) / sr | |
| if len(rms) < 20: | |
| return [StructuralSection(0, duration, "full", float(np.mean(rms)), 0)] | |
| # Step 1: Compute bar-level energy | |
| # Each "bar" is 4 beats | |
| if beat_times and len(beat_times) > 8: | |
| avg_beat = np.median(np.diff(beat_times[:20])) | |
| bar_dur = avg_beat * 4 | |
| else: | |
| bar_dur = 4 * 60.0 / 128 # default 128 BPM | |
| bar_samples = int(bar_dur * sr / hop) | |
| n_bars = max(1, len(rms) // bar_samples) | |
| # Average energy per bar | |
| bar_energy = np.zeros(n_bars) | |
| for i in range(n_bars): | |
| s = i * bar_samples | |
| e = min(s + bar_samples, len(rms)) | |
| bar_energy[i] = np.mean(rms[s:e]) | |
| if n_bars < 4: | |
| return [StructuralSection(0, duration, "full", float(np.mean(rms)), 0)] | |
| # Step 2: Classify bars by energy level | |
| # Use percentiles of the track's own energy range | |
| e_max = np.max(bar_energy) | |
| e_min = np.min(bar_energy) | |
| if e_max <= e_min: | |
| return [StructuralSection(0, duration, "full", float(np.mean(rms)), 0)] | |
| bar_norm = (bar_energy - e_min) / (e_max - e_min) | |
| # High = top 35%, Low = bottom 35% | |
| high_thresh = 0.55 | |
| low_thresh = 0.30 | |
| # Look at 4-bar energy slope to detect energy TRANSITIONS. | |
| # A buildup is any region where energy is rising towards a drop. | |
| # Key insight: if the current bar is NOT high-energy but the NEXT 4 bars | |
| # include a high-energy bar, this is a buildup region. | |
| # Step 3: Label each bar | |
| bar_labels = [] | |
| for i in range(n_bars): | |
| e = bar_norm[i] | |
| # Look ahead: are we approaching a drop? (within 4 bars) | |
| lookahead = min(4, n_bars - i) | |
| future_max = max(bar_norm[i:i+lookahead]) if lookahead > 0 else e | |
| # Look at energy change over 4-bar windows | |
| if i >= 2: | |
| slope_4bar = bar_norm[i] - bar_norm[max(0, i-4)] | |
| else: | |
| slope_4bar = 0 | |
| if e >= high_thresh: | |
| bar_labels.append('drop') | |
| elif e > low_thresh and future_max >= high_thresh and (i + lookahead) < n_bars: | |
| # Energy is moderate but about to become high β buildup | |
| bar_labels.append('buildup') | |
| elif slope_4bar > 0.1 and e > low_thresh: | |
| bar_labels.append('buildup') | |
| elif slope_4bar < -0.15 and e < high_thresh: | |
| bar_labels.append('breakdown') | |
| elif e <= low_thresh: | |
| bar_labels.append('low') | |
| else: | |
| bar_labels.append('mid') | |
| # Step 4: Force intro/outro at track boundaries | |
| # Intro: all low/mid bars before the first high-energy bar | |
| first_high = next((i for i, l in enumerate(bar_labels) if l == 'drop'), n_bars // 2) | |
| for i in range(min(first_high, n_bars)): | |
| if bar_labels[i] not in ('drop', 'buildup'): | |
| bar_labels[i] = 'intro' | |
| else: | |
| break | |
| # Outro: all bars after the last high-energy bar | |
| last_high = next((n_bars - 1 - i for i, l in enumerate(reversed(bar_labels)) if l == 'drop'), n_bars // 2) | |
| for i in range(max(last_high + 1, 0), n_bars): | |
| if bar_labels[i] != 'drop': | |
| bar_labels[i] = 'outro' | |
| # Relabel remaining 'low' and 'mid' based on context | |
| for i in range(len(bar_labels)): | |
| if bar_labels[i] in ('low', 'mid'): | |
| before_has_drop = any(l == 'drop' for l in bar_labels[:i]) | |
| after_has_drop = any(l == 'drop' for l in bar_labels[i:]) | |
| if before_has_drop and after_has_drop: | |
| bar_labels[i] = 'breakdown' | |
| elif before_has_drop: | |
| bar_labels[i] = 'outro' | |
| else: | |
| bar_labels[i] = 'intro' | |
| # Step 5: Consolidate runs into sections | |
| sections = [] | |
| i = 0 | |
| while i < n_bars: | |
| label = bar_labels[i] | |
| j = i | |
| while j < n_bars and bar_labels[j] == label: | |
| j += 1 | |
| start_t = i * bar_dur | |
| end_t = min(j * bar_dur, duration) | |
| seg_e = float(np.mean(bar_norm[i:j])) | |
| seg_slope = float(bar_norm[min(j-1, n_bars-1)] - bar_norm[i]) if j > i else 0 | |
| sections.append(StructuralSection( | |
| start=round(start_t, 2), | |
| end=round(end_t, 2), | |
| label=label, | |
| energy=round(seg_e, 4), | |
| energy_slope=round(seg_slope, 4), | |
| )) | |
| i = j | |
| # Merge tiny sections (< 2 bars) into neighbors | |
| merged = [] | |
| for sec in sections: | |
| if merged and (sec.end - sec.start) < bar_dur * 2: | |
| # Absorb into previous | |
| merged[-1] = StructuralSection( | |
| merged[-1].start, sec.end, merged[-1].label, | |
| (merged[-1].energy + sec.energy) / 2, merged[-1].energy_slope) | |
| else: | |
| merged.append(sec) | |
| return merged if merged else [StructuralSection(0, duration, "full", 0.5, 0)] | |
| def find_mix_points(analysis) -> dict: | |
| """Find the ideal mix-in and mix-out points for a track. | |
| Returns dict with: | |
| mix_in_time: where to START bringing this track in on the audience channel. | |
| This is the start of the buildup (NOT bar 1 β the intro | |
| plays in headphones only, the audience hears from the buildup). | |
| first_drop: when the first drop hits (the climactic moment) | |
| mix_out_time: where to START fading this track out (breakdown/outro) | |
| last_drop_end: when the last drop ends | |
| """ | |
| segments = analysis.segments | |
| duration = analysis.duration | |
| def get(seg, key): | |
| return seg[key] if isinstance(seg, dict) else getattr(seg, key) | |
| drops = [s for s in segments if get(s, 'label') == 'drop'] | |
| breakdowns = [s for s in segments if get(s, 'label') == 'breakdown'] | |
| buildups = [s for s in segments if get(s, 'label') == 'buildup'] | |
| intros = [s for s in segments if get(s, 'label') == 'intro'] | |
| outros = [s for s in segments if get(s, 'label') == 'outro'] | |
| # Mix-in: where the audience first hears this track. | |
| # Prefer: buildup BEFORE the first drop (the energy ramp into the drop). | |
| # If no buildup before the drop, use the last 16 bars of the intro. | |
| first_drop_time = get(drops[0], 'start') if drops else duration * 0.3 | |
| pre_drop_buildups = [s for s in buildups if get(s, 'start') < first_drop_time] | |
| if pre_drop_buildups: | |
| mix_in_time = get(pre_drop_buildups[-1], 'start') | |
| elif intros: | |
| # No buildup detected β use intro, 16 bars before the drop | |
| bars_16 = 16 * 4 * 60.0 / max(analysis.bpm, 60) | |
| mix_in_time = max(0, first_drop_time - bars_16) | |
| else: | |
| bars_16 = 16 * 4 * 60.0 / max(analysis.bpm, 60) | |
| mix_in_time = max(0, first_drop_time - bars_16) | |
| # First drop | |
| if drops: | |
| first_drop = get(drops[0], 'start') | |
| elif buildups: | |
| first_drop = get(buildups[0], 'end') | |
| else: | |
| first_drop = duration * 0.3 | |
| # Mix-out: start of A's LAST DROP. This is counterintuitive β the DJ | |
| # starts blending in B while A is still at full energy. B enters | |
| # quietly underneath A's drop, then as A hits its breakdown (energy | |
| # drops naturally), B's buildup fills the gap. When B drops, it takes | |
| # over. The audience never hears a dip because A's drop covers B's | |
| # quiet buildup, and B's drop covers A's breakdown. | |
| if len(drops) >= 2: | |
| # Second-to-last drop start β so the last drop plays through | |
| mix_out_time = get(drops[-1], 'start') | |
| elif drops: | |
| mix_out_time = get(drops[0], 'start') | |
| elif breakdowns: | |
| mix_out_time = get(breakdowns[-1], 'start') | |
| else: | |
| mix_out_time = duration * 0.55 | |
| # Last drop end | |
| last_drop_end = get(drops[-1], 'end') if drops else duration * 0.7 | |
| # Snap to nearest downbeat | |
| if analysis.downbeats: | |
| db = np.array(analysis.downbeats) | |
| mix_in_time = float(db[np.argmin(np.abs(db - mix_in_time))]) | |
| first_drop = float(db[np.argmin(np.abs(db - first_drop))]) | |
| mix_out_time = float(db[np.argmin(np.abs(db - mix_out_time))]) | |
| last_drop_end = float(db[np.argmin(np.abs(db - last_drop_end))]) | |
| return { | |
| 'mix_in_time': round(mix_in_time, 2), | |
| 'first_drop': round(first_drop, 2), | |
| 'mix_out_time': round(mix_out_time, 2), | |
| 'last_drop_end': round(last_drop_end, 2), | |
| } | |
| def plan_structural_transition(track_a, track_b, compat, | |
| position_in_set=0.5, prev_type=None): | |
| """Plan one scored transition edge for the current renderer. | |
| The preferred archetype is still: A carries energy while B enters, then B's | |
| drop becomes the takeover point. Unlike the original version, this function | |
| keeps cue confidence, score components, assumptions, and alternatives so the | |
| result is auditable instead of pretending the structural estimate is certain. | |
| """ | |
| from transitions import TRANSITION_TYPES | |
| mp_a = find_mix_points(track_a) | |
| mp_b = find_mix_points(track_b) | |
| from transition_optimizer import choose_best_transition_edge | |
| best_edge, alternatives = choose_best_transition_edge(track_a, track_b, compat, prev_transition_type=prev_type) | |
| a_out = float(best_edge.a_out.get('time', mp_a['mix_out_time'])) | |
| b_in = float(best_edge.b_in.get('time', mp_b['mix_in_time'])) | |
| b_drop = float(best_edge.b_drop.get('time', mp_b['first_drop'])) | |
| transition_duration = float(best_edge.duration_seconds) | |
| duration_beats = int(best_edge.duration_beats) | |
| if abs((b_drop - b_in) - transition_duration) > 0.01: | |
| b_in = max(0.0, b_drop - transition_duration) | |
| bpm = max(track_b.bpm, 60) | |
| bar_sec = 4 * 60.0 / bpm | |
| # Transition type is part of the ranked candidate now. This lets the planner | |
| # compare cue timing and technique fit together instead of picking a recipe | |
| # after the best cue edge has already been chosen. | |
| transition_type = getattr(best_edge, "transition_type", "eq_crossfade") | |
| # Override durations for specific types β some should be shorter | |
| if transition_type == "slam": | |
| duration_beats = 4 | |
| transition_duration = bar_sec # 1 bar | |
| # Slam happens right at B's drop β so A plays right up to the cut | |
| a_out = max(a_out, mp_a['mix_out_time']) | |
| b_in = max(0, b_drop - transition_duration) | |
| elif transition_type in ("bass_swap", "double_drop"): | |
| # Sharp techniques: 4-8 bars | |
| duration_beats = min(duration_beats, 32) | |
| transition_duration = min(transition_duration, 8 * bar_sec) | |
| # Start B closer to the drop | |
| b_in = max(b_in, b_drop - transition_duration) | |
| elif transition_type in ("noise_riser_cut", "beat_repeat_stutter", "spinback"): | |
| # Short dramatic effects: 4-8 bars | |
| duration_beats = min(duration_beats, 32) | |
| transition_duration = min(transition_duration, 8 * bar_sec) | |
| b_in = max(b_in, b_drop - transition_duration) | |
| from tempo_policy import decide_bpm_adjustment | |
| tempo_decision = decide_bpm_adjustment(track_a, track_b, transition_type) | |
| bpm_adj = tempo_decision.ratio | |
| needs_stems = transition_type in ("bass_swap", "acapella_over_instrumental", | |
| "drums_first", "double_drop") | |
| def seg_str(segs): | |
| return " β ".join( | |
| f"{s['label'] if isinstance(s, dict) else s.label}" | |
| for s in segs[:6] | |
| ) | |
| selected_cues = { | |
| "a_out": dict(best_edge.a_out), | |
| "b_in": {**dict(best_edge.b_in), "time": round(b_in, 2)}, | |
| "b_drop": dict(best_edge.b_drop), | |
| } | |
| if abs(float(best_edge.b_in.get('time', b_in)) - b_in) > 0.01: | |
| selected_cues["b_in"]["execution_adjustment"] = "transition type shortened/re-anchored cue" | |
| reason = ( | |
| f"**{transition_type}**: {TRANSITION_TYPES.get(transition_type, transition_type)}\n\n" | |
| f"Track A: {seg_str(track_a.segments)}\n" | |
| f" β Mix out at {a_out:.1f}s via cue '{best_edge.a_out.get('label', 'unknown')}' " | |
| f"(confidence {float(best_edge.a_out.get('confidence', 0.0)):.0%})\n" | |
| f"Track B: {seg_str(track_b.segments)}\n" | |
| f" β Mix in at {b_in:.1f}s via cue '{best_edge.b_in.get('label', 'unknown')}', " | |
| f"drop at {b_drop:.1f}s via cue '{best_edge.b_drop.get('label', 'unknown')}'\n" | |
| f" β {duration_beats} beats ({transition_duration:.1f}s) selected from scored cue edge\n\n" | |
| f"BPM: {track_a.bpm}β{track_b.bpm}; tempo policy: Γ{bpm_adj:.3f} β {tempo_decision.reason}\n" | |
| f"Key: {track_a.camelot}β{track_b.camelot} ({compat['key_description']})\n" | |
| f"Cue edge score: {best_edge.score:.2f}; musical quality={best_edge.musical_quality_score:.2f}; " | |
| f"breakdown={best_edge.score_breakdown}" | |
| ) | |
| from app_models import TransitionPlan | |
| return TransitionPlan( | |
| track_a_idx=-1, track_b_idx=-1, | |
| transition_type=transition_type, | |
| mix_out_point=round(a_out, 2), | |
| mix_in_point=round(b_in, 2), | |
| duration_beats=duration_beats, | |
| duration_seconds=round(transition_duration, 2), | |
| bpm_adjustment=round(bpm_adj, 4), | |
| needs_stems=needs_stems, | |
| compatibility_score=compat["overall"], | |
| reason=reason, | |
| cue_confidence=best_edge.score_breakdown.get("cue_confidence", 0.0), | |
| score_breakdown={"overall": best_edge.score, **best_edge.score_breakdown, "bpm_adjustment": bpm_adj, "bpm_adjustment_enabled": tempo_decision.enabled}, | |
| selected_cues=selected_cues, | |
| alternatives=[edge.to_dict() for edge in alternatives], | |
| assumptions=[*best_edge.assumptions, tempo_decision.reason], | |
| tempo_policy=tempo_decision.to_dict(), | |
| ) | |