ai-techno-dj / structure.py
Rik Hoffbauer
Implement musical candidate ranking and feedback-driven learning
f02c67e
"""
Structural analysis and DJ transition planning β€” complete rewrite.
The core model is intentionally limited and uncertainty-aware:
- Structure detection proposes cue candidates; it does not claim certainty.
- A transition is scored as an edge from an outgoing cue on A to an incoming
cue and drop cue on B.
- The renderer still consumes one selected plan, but the plan records cue
evidence, score breakdown, assumptions, and alternatives for audition.
This is one useful electronic-music transition archetype, not a universal
model of how every professional DJ mixes.
"""
import numpy as np
import librosa
import logging
from dataclasses import dataclass, asdict
logger = logging.getLogger("dj_engine")
@dataclass
class StructuralSection:
"""A section of a track with a musical function."""
start: float # seconds
end: float # seconds
label: str # intro / buildup / drop / breakdown / outro
energy: float # mean RMS energy
energy_slope: float # positive = energy rising, negative = falling
def analyze_structure(y, sr, beat_times, rms) -> list:
"""Detect structural sections based on energy contour.
Uses a simple, robust approach:
1. Smooth the energy curve to bar-level resolution
2. Classify each bar by its energy level (high/mid/low)
3. Detect energy *transitions* (rising/falling) between bars
4. Combine into sections: intro, buildup, drop, breakdown, outro
"""
hop = 512
duration = len(y) / sr
if len(rms) < 20:
return [StructuralSection(0, duration, "full", float(np.mean(rms)), 0)]
# Step 1: Compute bar-level energy
# Each "bar" is 4 beats
if beat_times and len(beat_times) > 8:
avg_beat = np.median(np.diff(beat_times[:20]))
bar_dur = avg_beat * 4
else:
bar_dur = 4 * 60.0 / 128 # default 128 BPM
bar_samples = int(bar_dur * sr / hop)
n_bars = max(1, len(rms) // bar_samples)
# Average energy per bar
bar_energy = np.zeros(n_bars)
for i in range(n_bars):
s = i * bar_samples
e = min(s + bar_samples, len(rms))
bar_energy[i] = np.mean(rms[s:e])
if n_bars < 4:
return [StructuralSection(0, duration, "full", float(np.mean(rms)), 0)]
# Step 2: Classify bars by energy level
# Use percentiles of the track's own energy range
e_max = np.max(bar_energy)
e_min = np.min(bar_energy)
if e_max <= e_min:
return [StructuralSection(0, duration, "full", float(np.mean(rms)), 0)]
bar_norm = (bar_energy - e_min) / (e_max - e_min)
# High = top 35%, Low = bottom 35%
high_thresh = 0.55
low_thresh = 0.30
# Look at 4-bar energy slope to detect energy TRANSITIONS.
# A buildup is any region where energy is rising towards a drop.
# Key insight: if the current bar is NOT high-energy but the NEXT 4 bars
# include a high-energy bar, this is a buildup region.
# Step 3: Label each bar
bar_labels = []
for i in range(n_bars):
e = bar_norm[i]
# Look ahead: are we approaching a drop? (within 4 bars)
lookahead = min(4, n_bars - i)
future_max = max(bar_norm[i:i+lookahead]) if lookahead > 0 else e
# Look at energy change over 4-bar windows
if i >= 2:
slope_4bar = bar_norm[i] - bar_norm[max(0, i-4)]
else:
slope_4bar = 0
if e >= high_thresh:
bar_labels.append('drop')
elif e > low_thresh and future_max >= high_thresh and (i + lookahead) < n_bars:
# Energy is moderate but about to become high β†’ buildup
bar_labels.append('buildup')
elif slope_4bar > 0.1 and e > low_thresh:
bar_labels.append('buildup')
elif slope_4bar < -0.15 and e < high_thresh:
bar_labels.append('breakdown')
elif e <= low_thresh:
bar_labels.append('low')
else:
bar_labels.append('mid')
# Step 4: Force intro/outro at track boundaries
# Intro: all low/mid bars before the first high-energy bar
first_high = next((i for i, l in enumerate(bar_labels) if l == 'drop'), n_bars // 2)
for i in range(min(first_high, n_bars)):
if bar_labels[i] not in ('drop', 'buildup'):
bar_labels[i] = 'intro'
else:
break
# Outro: all bars after the last high-energy bar
last_high = next((n_bars - 1 - i for i, l in enumerate(reversed(bar_labels)) if l == 'drop'), n_bars // 2)
for i in range(max(last_high + 1, 0), n_bars):
if bar_labels[i] != 'drop':
bar_labels[i] = 'outro'
# Relabel remaining 'low' and 'mid' based on context
for i in range(len(bar_labels)):
if bar_labels[i] in ('low', 'mid'):
before_has_drop = any(l == 'drop' for l in bar_labels[:i])
after_has_drop = any(l == 'drop' for l in bar_labels[i:])
if before_has_drop and after_has_drop:
bar_labels[i] = 'breakdown'
elif before_has_drop:
bar_labels[i] = 'outro'
else:
bar_labels[i] = 'intro'
# Step 5: Consolidate runs into sections
sections = []
i = 0
while i < n_bars:
label = bar_labels[i]
j = i
while j < n_bars and bar_labels[j] == label:
j += 1
start_t = i * bar_dur
end_t = min(j * bar_dur, duration)
seg_e = float(np.mean(bar_norm[i:j]))
seg_slope = float(bar_norm[min(j-1, n_bars-1)] - bar_norm[i]) if j > i else 0
sections.append(StructuralSection(
start=round(start_t, 2),
end=round(end_t, 2),
label=label,
energy=round(seg_e, 4),
energy_slope=round(seg_slope, 4),
))
i = j
# Merge tiny sections (< 2 bars) into neighbors
merged = []
for sec in sections:
if merged and (sec.end - sec.start) < bar_dur * 2:
# Absorb into previous
merged[-1] = StructuralSection(
merged[-1].start, sec.end, merged[-1].label,
(merged[-1].energy + sec.energy) / 2, merged[-1].energy_slope)
else:
merged.append(sec)
return merged if merged else [StructuralSection(0, duration, "full", 0.5, 0)]
def find_mix_points(analysis) -> dict:
"""Find the ideal mix-in and mix-out points for a track.
Returns dict with:
mix_in_time: where to START bringing this track in on the audience channel.
This is the start of the buildup (NOT bar 1 β€” the intro
plays in headphones only, the audience hears from the buildup).
first_drop: when the first drop hits (the climactic moment)
mix_out_time: where to START fading this track out (breakdown/outro)
last_drop_end: when the last drop ends
"""
segments = analysis.segments
duration = analysis.duration
def get(seg, key):
return seg[key] if isinstance(seg, dict) else getattr(seg, key)
drops = [s for s in segments if get(s, 'label') == 'drop']
breakdowns = [s for s in segments if get(s, 'label') == 'breakdown']
buildups = [s for s in segments if get(s, 'label') == 'buildup']
intros = [s for s in segments if get(s, 'label') == 'intro']
outros = [s for s in segments if get(s, 'label') == 'outro']
# Mix-in: where the audience first hears this track.
# Prefer: buildup BEFORE the first drop (the energy ramp into the drop).
# If no buildup before the drop, use the last 16 bars of the intro.
first_drop_time = get(drops[0], 'start') if drops else duration * 0.3
pre_drop_buildups = [s for s in buildups if get(s, 'start') < first_drop_time]
if pre_drop_buildups:
mix_in_time = get(pre_drop_buildups[-1], 'start')
elif intros:
# No buildup detected β€” use intro, 16 bars before the drop
bars_16 = 16 * 4 * 60.0 / max(analysis.bpm, 60)
mix_in_time = max(0, first_drop_time - bars_16)
else:
bars_16 = 16 * 4 * 60.0 / max(analysis.bpm, 60)
mix_in_time = max(0, first_drop_time - bars_16)
# First drop
if drops:
first_drop = get(drops[0], 'start')
elif buildups:
first_drop = get(buildups[0], 'end')
else:
first_drop = duration * 0.3
# Mix-out: start of A's LAST DROP. This is counterintuitive β€” the DJ
# starts blending in B while A is still at full energy. B enters
# quietly underneath A's drop, then as A hits its breakdown (energy
# drops naturally), B's buildup fills the gap. When B drops, it takes
# over. The audience never hears a dip because A's drop covers B's
# quiet buildup, and B's drop covers A's breakdown.
if len(drops) >= 2:
# Second-to-last drop start β€” so the last drop plays through
mix_out_time = get(drops[-1], 'start')
elif drops:
mix_out_time = get(drops[0], 'start')
elif breakdowns:
mix_out_time = get(breakdowns[-1], 'start')
else:
mix_out_time = duration * 0.55
# Last drop end
last_drop_end = get(drops[-1], 'end') if drops else duration * 0.7
# Snap to nearest downbeat
if analysis.downbeats:
db = np.array(analysis.downbeats)
mix_in_time = float(db[np.argmin(np.abs(db - mix_in_time))])
first_drop = float(db[np.argmin(np.abs(db - first_drop))])
mix_out_time = float(db[np.argmin(np.abs(db - mix_out_time))])
last_drop_end = float(db[np.argmin(np.abs(db - last_drop_end))])
return {
'mix_in_time': round(mix_in_time, 2),
'first_drop': round(first_drop, 2),
'mix_out_time': round(mix_out_time, 2),
'last_drop_end': round(last_drop_end, 2),
}
def plan_structural_transition(track_a, track_b, compat,
position_in_set=0.5, prev_type=None):
"""Plan one scored transition edge for the current renderer.
The preferred archetype is still: A carries energy while B enters, then B's
drop becomes the takeover point. Unlike the original version, this function
keeps cue confidence, score components, assumptions, and alternatives so the
result is auditable instead of pretending the structural estimate is certain.
"""
from transitions import TRANSITION_TYPES
mp_a = find_mix_points(track_a)
mp_b = find_mix_points(track_b)
from transition_optimizer import choose_best_transition_edge
best_edge, alternatives = choose_best_transition_edge(track_a, track_b, compat, prev_transition_type=prev_type)
a_out = float(best_edge.a_out.get('time', mp_a['mix_out_time']))
b_in = float(best_edge.b_in.get('time', mp_b['mix_in_time']))
b_drop = float(best_edge.b_drop.get('time', mp_b['first_drop']))
transition_duration = float(best_edge.duration_seconds)
duration_beats = int(best_edge.duration_beats)
if abs((b_drop - b_in) - transition_duration) > 0.01:
b_in = max(0.0, b_drop - transition_duration)
bpm = max(track_b.bpm, 60)
bar_sec = 4 * 60.0 / bpm
# Transition type is part of the ranked candidate now. This lets the planner
# compare cue timing and technique fit together instead of picking a recipe
# after the best cue edge has already been chosen.
transition_type = getattr(best_edge, "transition_type", "eq_crossfade")
# Override durations for specific types β€” some should be shorter
if transition_type == "slam":
duration_beats = 4
transition_duration = bar_sec # 1 bar
# Slam happens right at B's drop β€” so A plays right up to the cut
a_out = max(a_out, mp_a['mix_out_time'])
b_in = max(0, b_drop - transition_duration)
elif transition_type in ("bass_swap", "double_drop"):
# Sharp techniques: 4-8 bars
duration_beats = min(duration_beats, 32)
transition_duration = min(transition_duration, 8 * bar_sec)
# Start B closer to the drop
b_in = max(b_in, b_drop - transition_duration)
elif transition_type in ("noise_riser_cut", "beat_repeat_stutter", "spinback"):
# Short dramatic effects: 4-8 bars
duration_beats = min(duration_beats, 32)
transition_duration = min(transition_duration, 8 * bar_sec)
b_in = max(b_in, b_drop - transition_duration)
from tempo_policy import decide_bpm_adjustment
tempo_decision = decide_bpm_adjustment(track_a, track_b, transition_type)
bpm_adj = tempo_decision.ratio
needs_stems = transition_type in ("bass_swap", "acapella_over_instrumental",
"drums_first", "double_drop")
def seg_str(segs):
return " β†’ ".join(
f"{s['label'] if isinstance(s, dict) else s.label}"
for s in segs[:6]
)
selected_cues = {
"a_out": dict(best_edge.a_out),
"b_in": {**dict(best_edge.b_in), "time": round(b_in, 2)},
"b_drop": dict(best_edge.b_drop),
}
if abs(float(best_edge.b_in.get('time', b_in)) - b_in) > 0.01:
selected_cues["b_in"]["execution_adjustment"] = "transition type shortened/re-anchored cue"
reason = (
f"**{transition_type}**: {TRANSITION_TYPES.get(transition_type, transition_type)}\n\n"
f"Track A: {seg_str(track_a.segments)}\n"
f" β†’ Mix out at {a_out:.1f}s via cue '{best_edge.a_out.get('label', 'unknown')}' "
f"(confidence {float(best_edge.a_out.get('confidence', 0.0)):.0%})\n"
f"Track B: {seg_str(track_b.segments)}\n"
f" β†’ Mix in at {b_in:.1f}s via cue '{best_edge.b_in.get('label', 'unknown')}', "
f"drop at {b_drop:.1f}s via cue '{best_edge.b_drop.get('label', 'unknown')}'\n"
f" β†’ {duration_beats} beats ({transition_duration:.1f}s) selected from scored cue edge\n\n"
f"BPM: {track_a.bpm}β†’{track_b.bpm}; tempo policy: Γ—{bpm_adj:.3f} β€” {tempo_decision.reason}\n"
f"Key: {track_a.camelot}β†’{track_b.camelot} ({compat['key_description']})\n"
f"Cue edge score: {best_edge.score:.2f}; musical quality={best_edge.musical_quality_score:.2f}; "
f"breakdown={best_edge.score_breakdown}"
)
from app_models import TransitionPlan
return TransitionPlan(
track_a_idx=-1, track_b_idx=-1,
transition_type=transition_type,
mix_out_point=round(a_out, 2),
mix_in_point=round(b_in, 2),
duration_beats=duration_beats,
duration_seconds=round(transition_duration, 2),
bpm_adjustment=round(bpm_adj, 4),
needs_stems=needs_stems,
compatibility_score=compat["overall"],
reason=reason,
cue_confidence=best_edge.score_breakdown.get("cue_confidence", 0.0),
score_breakdown={"overall": best_edge.score, **best_edge.score_breakdown, "bpm_adjustment": bpm_adj, "bpm_adjustment_enabled": tempo_decision.enabled},
selected_cues=selected_cues,
alternatives=[edge.to_dict() for edge in alternatives],
assumptions=[*best_edge.assumptions, tempo_decision.reason],
tempo_policy=tempo_decision.to_dict(),
)