Synesthesia / runtime /crossfade_engine.py
Ashiedu's picture
Sync unified workbench
0490201 verified
#!/usr/bin/env python3
"""
Crossfade engine for Synesthesia runtime.
Pure-function audio utilities: equal-power and linear crossfade,
loop-tail failsafe extension, and lightweight audio fingerprint extraction.
All functions operate on numpy arrays — no external audio library dependencies.
"""
from __future__ import annotations
import enum
import time
from typing import Any
import numpy as np
# ---------------------------------------------------------------------------
# Crossfade mode enum
# ---------------------------------------------------------------------------
class CrossfadeMode(enum.Enum):
LINEAR = "linear"
EQUAL_POWER = "equal_power"
# ---------------------------------------------------------------------------
# Crossfade functions
# ---------------------------------------------------------------------------
def equal_power_crossfade(
clip_a: np.ndarray,
clip_b: np.ndarray,
crossfade_samples: int,
) -> np.ndarray:
"""Equal-power crossfade between two audio clips.
Uses ``sqrt()`` gain curves so that the total acoustic power stays
perceptually constant through the transition::
gain_a(t) = sqrt(1 - t/N)
gain_b(t) = sqrt(t/N)
Parameters
----------
clip_a : np.ndarray
First audio clip. At least *crossfade_samples* frames long.
clip_b : np.ndarray
Second audio clip. At least *crossfade_samples* frames long.
crossfade_samples : int
Number of samples in the crossfade region.
Returns
-------
np.ndarray
A contiguous output array:
``clip_a[:-crossfade] | blended_region | clip_b[crossfade:]``
"""
crossfade_samples = min(crossfade_samples, len(clip_a), len(clip_b))
if crossfade_samples <= 0:
return np.concatenate([clip_a, clip_b])
t = np.linspace(0.0, 1.0, crossfade_samples, dtype=np.float32)
# Reshape for broadcasting if multi-channel
if clip_a.ndim == 2:
t = t[:, np.newaxis]
fade_out = np.sqrt(1.0 - t)
fade_in = np.sqrt(t)
blended = clip_a[-crossfade_samples:] * fade_out + clip_b[:crossfade_samples] * fade_in
return np.concatenate([
clip_a[:-crossfade_samples],
blended,
clip_b[crossfade_samples:],
])
def linear_crossfade(
clip_a: np.ndarray,
clip_b: np.ndarray,
crossfade_samples: int,
) -> np.ndarray:
"""Linear crossfade between two audio clips.
Parameters are identical to :func:`equal_power_crossfade`.
"""
crossfade_samples = min(crossfade_samples, len(clip_a), len(clip_b))
if crossfade_samples <= 0:
return np.concatenate([clip_a, clip_b])
t = np.linspace(0.0, 1.0, crossfade_samples, dtype=np.float32)
if clip_a.ndim == 2:
t = t[:, np.newaxis]
blended = clip_a[-crossfade_samples:] * (1.0 - t) + clip_b[:crossfade_samples] * t
return np.concatenate([
clip_a[:-crossfade_samples],
blended,
clip_b[crossfade_samples:],
])
def crossfade(
clip_a: np.ndarray,
clip_b: np.ndarray,
crossfade_samples: int,
mode: CrossfadeMode = CrossfadeMode.EQUAL_POWER,
) -> np.ndarray:
"""Dispatch to the appropriate crossfade function."""
if mode is CrossfadeMode.EQUAL_POWER:
return equal_power_crossfade(clip_a, clip_b, crossfade_samples)
return linear_crossfade(clip_a, clip_b, crossfade_samples)
# ---------------------------------------------------------------------------
# Failsafe: loop-tail extension
# ---------------------------------------------------------------------------
def loop_tail_extend(
clip: np.ndarray,
extension_samples: int,
fade_samples: int = 2048,
) -> np.ndarray:
"""Extend a clip by looping its tail audio.
**FAILSAFE**: called when the next clip is not ready at the crossfade
deadline. The tail of *clip* is repeated to fill *extension_samples*
with a short crossfade at each loop seam to avoid clicks.
Playback must **never** output silence — this fills the gap.
Parameters
----------
clip : np.ndarray
Source audio clip (full clip, not just the tail).
extension_samples : int
Number of extra samples to generate.
fade_samples : int
Short fade applied at each loop-seam to prevent clicks.
Returns
-------
np.ndarray
The original *clip* with the looped tail appended.
"""
if extension_samples <= 0:
return clip
# Use the last 5 seconds (or entire clip if shorter) as the loop source
loop_source_len = min(len(clip), 48000 * 5)
loop_source = clip[-loop_source_len:].copy()
# Apply fade-in/fade-out to the loop source for seamless looping
fade = min(fade_samples, len(loop_source) // 4)
if fade > 0:
if loop_source.ndim == 1:
loop_source[:fade] *= np.linspace(0.0, 1.0, fade, dtype=np.float32)
loop_source[-fade:] *= np.linspace(1.0, 0.0, fade, dtype=np.float32)
else:
ramp_up = np.linspace(0.0, 1.0, fade, dtype=np.float32)[:, np.newaxis]
ramp_down = np.linspace(1.0, 0.0, fade, dtype=np.float32)[:, np.newaxis]
loop_source[:fade] *= ramp_up
loop_source[-fade:] *= ramp_down
# Tile the source until we have enough extension material
repeats = (extension_samples // len(loop_source)) + 2
tiled = np.tile(loop_source, (repeats,) if loop_source.ndim == 1 else (repeats, 1))
extension = tiled[:extension_samples]
# Crossfade the junction between original clip tail and extension
junction_fade = min(fade, len(clip), extension_samples)
if junction_fade > 0:
t = np.linspace(0.0, 1.0, junction_fade, dtype=np.float32)
if clip.ndim == 2:
t = t[:, np.newaxis]
clip_end = clip[-junction_fade:].copy()
ext_start = extension[:junction_fade].copy()
blended = clip_end * (1.0 - t) + ext_start * t
result = np.concatenate([clip[:-junction_fade], blended, extension[junction_fade:]])
else:
result = np.concatenate([clip, extension])
return result
# ---------------------------------------------------------------------------
# Audio fingerprint extraction (≤ 20 ms target)
# ---------------------------------------------------------------------------
def _mel_filterbank(sample_rate: int, n_fft: int, n_mels: int = 40) -> np.ndarray:
"""Build a mel-scale filterbank matrix (numpy-only, no librosa)."""
def _hz_to_mel(f: float) -> float:
return 2595.0 * np.log10(1.0 + f / 700.0)
def _mel_to_hz(m: float) -> float:
return 700.0 * (10.0 ** (m / 2595.0) - 1.0)
mel_low = _hz_to_mel(0.0)
mel_high = _hz_to_mel(sample_rate / 2.0)
mel_points = np.linspace(mel_low, mel_high, n_mels + 2)
hz_points = np.array([_mel_to_hz(m) for m in mel_points])
bin_points = np.floor((n_fft + 1) * hz_points / sample_rate).astype(int)
filters = np.zeros((n_mels, n_fft // 2 + 1), dtype=np.float32)
for i in range(n_mels):
lo, mid, hi = bin_points[i], bin_points[i + 1], bin_points[i + 2]
if mid > lo:
filters[i, lo:mid] = (np.arange(lo, mid) - lo) / (mid - lo)
if hi > mid:
filters[i, mid:hi] = (hi - np.arange(mid, hi)) / (hi - mid)
return filters
# Pre-computed filterbanks (lazily populated)
_FILTERBANK_CACHE: dict[tuple[int, int, int], np.ndarray] = {}
def _get_filterbank(sample_rate: int, n_fft: int = 2048, n_mels: int = 40) -> np.ndarray:
key = (sample_rate, n_fft, n_mels)
if key not in _FILTERBANK_CACHE:
_FILTERBANK_CACHE[key] = _mel_filterbank(sample_rate, n_fft, n_mels)
return _FILTERBANK_CACHE[key]
def extract_audio_fingerprint(
audio: np.ndarray,
sample_rate: int,
tail_seconds: float = 2.0,
n_mels: int = 40,
) -> dict[str, Any]:
"""Extract lightweight spectral features from the tail of an audio clip.
Target runtime: **≤ 20 ms**. Uses only numpy — no librosa.
Parameters
----------
audio : np.ndarray
Full audio clip (mono or first-channel of stereo).
sample_rate : int
Sample rate in Hz.
tail_seconds : float
How many seconds from the end of the clip to analyse.
n_mels : int
Number of mel bands.
Returns
-------
dict
``mel_spectrogram_mean``, ``spectral_centroid``, ``tempo_envelope``
"""
_start = time.monotonic()
# Take the tail and ensure mono
tail_len = int(sample_rate * tail_seconds)
tail = audio[-tail_len:]
if tail.ndim == 2:
tail = tail[:, 0]
tail = tail.astype(np.float32)
n_fft = 2048
hop = n_fft // 2
# ---- Mel spectrogram mean ----
fb = _get_filterbank(sample_rate, n_fft, n_mels)
# Compute magnitude spectrogram (single averaged frame for speed)
num_frames = max(1, (len(tail) - n_fft) // hop + 1)
spec_sum = np.zeros(n_fft // 2 + 1, dtype=np.float64)
for i in range(num_frames):
frame = tail[i * hop: i * hop + n_fft]
if len(frame) < n_fft:
frame = np.pad(frame, (0, n_fft - len(frame)))
spec_sum += np.abs(np.fft.rfft(frame))
spec_mean = (spec_sum / num_frames).astype(np.float32)
mel_mean = (fb @ spec_mean).tolist()
# ---- Spectral centroid ----
freqs = np.fft.rfftfreq(n_fft, 1.0 / sample_rate)
total_mag = np.sum(spec_mean) + 1e-8
centroid = float(np.sum(freqs * spec_mean) / total_mag)
# ---- Tempo envelope (RMS slope) ----
window_size = int(0.05 * sample_rate) # 50 ms windows
if window_size > 0 and len(tail) > window_size:
num_windows = len(tail) // window_size
rms_values = np.array([
np.sqrt(np.mean(tail[i * window_size: (i + 1) * window_size] ** 2))
for i in range(num_windows)
])
if len(rms_values) >= 2:
# Linear regression slope
x = np.arange(len(rms_values), dtype=np.float32)
slope = float(np.polyfit(x, rms_values, 1)[0])
else:
slope = 0.0
else:
slope = 0.0
elapsed_ms = (time.monotonic() - _start) * 1000.0
return {
"mel_spectrogram_mean": mel_mean,
"spectral_centroid": centroid,
"tempo_envelope": slope,
"extraction_time_ms": round(elapsed_ms, 2),
}
class CrossfadeEngine:
def __init__(self):
self.active_index = 0
class CrossfadeState:
IDLE = "idle"
FADING = "fading"