#!/usr/bin/env python3 """ Crossfade engine for Synesthesia runtime. Pure-function audio utilities: equal-power and linear crossfade, loop-tail failsafe extension, and lightweight audio fingerprint extraction. All functions operate on numpy arrays — no external audio library dependencies. """ from __future__ import annotations import enum import time from typing import Any import numpy as np # --------------------------------------------------------------------------- # Crossfade mode enum # --------------------------------------------------------------------------- class CrossfadeMode(enum.Enum): LINEAR = "linear" EQUAL_POWER = "equal_power" # --------------------------------------------------------------------------- # Crossfade functions # --------------------------------------------------------------------------- def equal_power_crossfade( clip_a: np.ndarray, clip_b: np.ndarray, crossfade_samples: int, ) -> np.ndarray: """Equal-power crossfade between two audio clips. Uses ``sqrt()`` gain curves so that the total acoustic power stays perceptually constant through the transition:: gain_a(t) = sqrt(1 - t/N) gain_b(t) = sqrt(t/N) Parameters ---------- clip_a : np.ndarray First audio clip. At least *crossfade_samples* frames long. clip_b : np.ndarray Second audio clip. At least *crossfade_samples* frames long. crossfade_samples : int Number of samples in the crossfade region. Returns ------- np.ndarray A contiguous output array: ``clip_a[:-crossfade] | blended_region | clip_b[crossfade:]`` """ crossfade_samples = min(crossfade_samples, len(clip_a), len(clip_b)) if crossfade_samples <= 0: return np.concatenate([clip_a, clip_b]) t = np.linspace(0.0, 1.0, crossfade_samples, dtype=np.float32) # Reshape for broadcasting if multi-channel if clip_a.ndim == 2: t = t[:, np.newaxis] fade_out = np.sqrt(1.0 - t) fade_in = np.sqrt(t) blended = clip_a[-crossfade_samples:] * fade_out + clip_b[:crossfade_samples] * fade_in return np.concatenate([ clip_a[:-crossfade_samples], blended, clip_b[crossfade_samples:], ]) def linear_crossfade( clip_a: np.ndarray, clip_b: np.ndarray, crossfade_samples: int, ) -> np.ndarray: """Linear crossfade between two audio clips. Parameters are identical to :func:`equal_power_crossfade`. """ crossfade_samples = min(crossfade_samples, len(clip_a), len(clip_b)) if crossfade_samples <= 0: return np.concatenate([clip_a, clip_b]) t = np.linspace(0.0, 1.0, crossfade_samples, dtype=np.float32) if clip_a.ndim == 2: t = t[:, np.newaxis] blended = clip_a[-crossfade_samples:] * (1.0 - t) + clip_b[:crossfade_samples] * t return np.concatenate([ clip_a[:-crossfade_samples], blended, clip_b[crossfade_samples:], ]) def crossfade( clip_a: np.ndarray, clip_b: np.ndarray, crossfade_samples: int, mode: CrossfadeMode = CrossfadeMode.EQUAL_POWER, ) -> np.ndarray: """Dispatch to the appropriate crossfade function.""" if mode is CrossfadeMode.EQUAL_POWER: return equal_power_crossfade(clip_a, clip_b, crossfade_samples) return linear_crossfade(clip_a, clip_b, crossfade_samples) # --------------------------------------------------------------------------- # Failsafe: loop-tail extension # --------------------------------------------------------------------------- def loop_tail_extend( clip: np.ndarray, extension_samples: int, fade_samples: int = 2048, ) -> np.ndarray: """Extend a clip by looping its tail audio. **FAILSAFE**: called when the next clip is not ready at the crossfade deadline. The tail of *clip* is repeated to fill *extension_samples* with a short crossfade at each loop seam to avoid clicks. Playback must **never** output silence — this fills the gap. Parameters ---------- clip : np.ndarray Source audio clip (full clip, not just the tail). extension_samples : int Number of extra samples to generate. fade_samples : int Short fade applied at each loop-seam to prevent clicks. Returns ------- np.ndarray The original *clip* with the looped tail appended. """ if extension_samples <= 0: return clip # Use the last 5 seconds (or entire clip if shorter) as the loop source loop_source_len = min(len(clip), 48000 * 5) loop_source = clip[-loop_source_len:].copy() # Apply fade-in/fade-out to the loop source for seamless looping fade = min(fade_samples, len(loop_source) // 4) if fade > 0: if loop_source.ndim == 1: loop_source[:fade] *= np.linspace(0.0, 1.0, fade, dtype=np.float32) loop_source[-fade:] *= np.linspace(1.0, 0.0, fade, dtype=np.float32) else: ramp_up = np.linspace(0.0, 1.0, fade, dtype=np.float32)[:, np.newaxis] ramp_down = np.linspace(1.0, 0.0, fade, dtype=np.float32)[:, np.newaxis] loop_source[:fade] *= ramp_up loop_source[-fade:] *= ramp_down # Tile the source until we have enough extension material repeats = (extension_samples // len(loop_source)) + 2 tiled = np.tile(loop_source, (repeats,) if loop_source.ndim == 1 else (repeats, 1)) extension = tiled[:extension_samples] # Crossfade the junction between original clip tail and extension junction_fade = min(fade, len(clip), extension_samples) if junction_fade > 0: t = np.linspace(0.0, 1.0, junction_fade, dtype=np.float32) if clip.ndim == 2: t = t[:, np.newaxis] clip_end = clip[-junction_fade:].copy() ext_start = extension[:junction_fade].copy() blended = clip_end * (1.0 - t) + ext_start * t result = np.concatenate([clip[:-junction_fade], blended, extension[junction_fade:]]) else: result = np.concatenate([clip, extension]) return result # --------------------------------------------------------------------------- # Audio fingerprint extraction (≤ 20 ms target) # --------------------------------------------------------------------------- def _mel_filterbank(sample_rate: int, n_fft: int, n_mels: int = 40) -> np.ndarray: """Build a mel-scale filterbank matrix (numpy-only, no librosa).""" def _hz_to_mel(f: float) -> float: return 2595.0 * np.log10(1.0 + f / 700.0) def _mel_to_hz(m: float) -> float: return 700.0 * (10.0 ** (m / 2595.0) - 1.0) mel_low = _hz_to_mel(0.0) mel_high = _hz_to_mel(sample_rate / 2.0) mel_points = np.linspace(mel_low, mel_high, n_mels + 2) hz_points = np.array([_mel_to_hz(m) for m in mel_points]) bin_points = np.floor((n_fft + 1) * hz_points / sample_rate).astype(int) filters = np.zeros((n_mels, n_fft // 2 + 1), dtype=np.float32) for i in range(n_mels): lo, mid, hi = bin_points[i], bin_points[i + 1], bin_points[i + 2] if mid > lo: filters[i, lo:mid] = (np.arange(lo, mid) - lo) / (mid - lo) if hi > mid: filters[i, mid:hi] = (hi - np.arange(mid, hi)) / (hi - mid) return filters # Pre-computed filterbanks (lazily populated) _FILTERBANK_CACHE: dict[tuple[int, int, int], np.ndarray] = {} def _get_filterbank(sample_rate: int, n_fft: int = 2048, n_mels: int = 40) -> np.ndarray: key = (sample_rate, n_fft, n_mels) if key not in _FILTERBANK_CACHE: _FILTERBANK_CACHE[key] = _mel_filterbank(sample_rate, n_fft, n_mels) return _FILTERBANK_CACHE[key] def extract_audio_fingerprint( audio: np.ndarray, sample_rate: int, tail_seconds: float = 2.0, n_mels: int = 40, ) -> dict[str, Any]: """Extract lightweight spectral features from the tail of an audio clip. Target runtime: **≤ 20 ms**. Uses only numpy — no librosa. Parameters ---------- audio : np.ndarray Full audio clip (mono or first-channel of stereo). sample_rate : int Sample rate in Hz. tail_seconds : float How many seconds from the end of the clip to analyse. n_mels : int Number of mel bands. Returns ------- dict ``mel_spectrogram_mean``, ``spectral_centroid``, ``tempo_envelope`` """ _start = time.monotonic() # Take the tail and ensure mono tail_len = int(sample_rate * tail_seconds) tail = audio[-tail_len:] if tail.ndim == 2: tail = tail[:, 0] tail = tail.astype(np.float32) n_fft = 2048 hop = n_fft // 2 # ---- Mel spectrogram mean ---- fb = _get_filterbank(sample_rate, n_fft, n_mels) # Compute magnitude spectrogram (single averaged frame for speed) num_frames = max(1, (len(tail) - n_fft) // hop + 1) spec_sum = np.zeros(n_fft // 2 + 1, dtype=np.float64) for i in range(num_frames): frame = tail[i * hop: i * hop + n_fft] if len(frame) < n_fft: frame = np.pad(frame, (0, n_fft - len(frame))) spec_sum += np.abs(np.fft.rfft(frame)) spec_mean = (spec_sum / num_frames).astype(np.float32) mel_mean = (fb @ spec_mean).tolist() # ---- Spectral centroid ---- freqs = np.fft.rfftfreq(n_fft, 1.0 / sample_rate) total_mag = np.sum(spec_mean) + 1e-8 centroid = float(np.sum(freqs * spec_mean) / total_mag) # ---- Tempo envelope (RMS slope) ---- window_size = int(0.05 * sample_rate) # 50 ms windows if window_size > 0 and len(tail) > window_size: num_windows = len(tail) // window_size rms_values = np.array([ np.sqrt(np.mean(tail[i * window_size: (i + 1) * window_size] ** 2)) for i in range(num_windows) ]) if len(rms_values) >= 2: # Linear regression slope x = np.arange(len(rms_values), dtype=np.float32) slope = float(np.polyfit(x, rms_values, 1)[0]) else: slope = 0.0 else: slope = 0.0 elapsed_ms = (time.monotonic() - _start) * 1000.0 return { "mel_spectrogram_mean": mel_mean, "spectral_centroid": centroid, "tempo_envelope": slope, "extraction_time_ms": round(elapsed_ms, 2), } class CrossfadeEngine: def __init__(self): self.active_index = 0 class CrossfadeState: IDLE = "idle" FADING = "fading"