Spaces:
Build error
Build error
| from __future__ import annotations | |
| import math | |
| from typing import Any, Dict, List | |
| from itertools import islice | |
| from collections import deque | |
| import numpy as np | |
| from numpy.typing import NDArray | |
| # Tunables | |
| SR = 16_000 | |
| FRAME_MS = 20 | |
| HOP_MS = 50 | |
| SWAY_MASTER = 1.5 | |
| SENS_DB_OFFSET = +4.0 | |
| VAD_DB_ON = -35.0 | |
| VAD_DB_OFF = -45.0 | |
| VAD_ATTACK_MS = 40 | |
| VAD_RELEASE_MS = 250 | |
| ENV_FOLLOW_GAIN = 0.65 | |
| SWAY_F_PITCH = 2.2 | |
| SWAY_A_PITCH_DEG = 4.5 | |
| SWAY_F_YAW = 0.6 | |
| SWAY_A_YAW_DEG = 7.5 | |
| SWAY_F_ROLL = 1.3 | |
| SWAY_A_ROLL_DEG = 2.25 | |
| SWAY_F_X = 0.35 | |
| SWAY_A_X_MM = 4.5 | |
| SWAY_F_Y = 0.45 | |
| SWAY_A_Y_MM = 3.75 | |
| SWAY_F_Z = 0.25 | |
| SWAY_A_Z_MM = 2.25 | |
| SWAY_DB_LOW = -46.0 | |
| SWAY_DB_HIGH = -18.0 | |
| LOUDNESS_GAMMA = 0.9 | |
| SWAY_ATTACK_MS = 50 | |
| SWAY_RELEASE_MS = 250 | |
| # Derived | |
| FRAME = int(SR * FRAME_MS / 1000) | |
| HOP = int(SR * HOP_MS / 1000) | |
| ATTACK_FR = max(1, int(VAD_ATTACK_MS / HOP_MS)) | |
| RELEASE_FR = max(1, int(VAD_RELEASE_MS / HOP_MS)) | |
| SWAY_ATTACK_FR = max(1, int(SWAY_ATTACK_MS / HOP_MS)) | |
| SWAY_RELEASE_FR = max(1, int(SWAY_RELEASE_MS / HOP_MS)) | |
| def _rms_dbfs(x: NDArray[np.float32]) -> float: | |
| """Root-mean-square in dBFS for float32 mono array in [-1,1].""" | |
| # numerically stable rms (avoid overflow) | |
| x = x.astype(np.float32, copy=False) | |
| rms = np.sqrt(np.mean(x * x, dtype=np.float32) + 1e-12, dtype=np.float32) | |
| return float(20.0 * math.log10(float(rms) + 1e-12)) | |
| def _loudness_gain(db: float, offset: float = SENS_DB_OFFSET) -> float: | |
| """Normalize dB into [0,1] with gamma; clipped to [0,1].""" | |
| t = (db + offset - SWAY_DB_LOW) / (SWAY_DB_HIGH - SWAY_DB_LOW) | |
| if t < 0.0: | |
| t = 0.0 | |
| elif t > 1.0: | |
| t = 1.0 | |
| return t**LOUDNESS_GAMMA if LOUDNESS_GAMMA != 1.0 else t | |
| def _to_float32_mono(x: NDArray[Any]) -> NDArray[np.float32]: | |
| """Convert arbitrary PCM array to float32 mono in [-1,1]. | |
| Accepts shapes: (N,), (1,N), (N,1), (C,N), (N,C). | |
| """ | |
| a = np.asarray(x) | |
| if a.ndim == 0: | |
| return np.zeros(0, dtype=np.float32) | |
| # If 2D, decide which axis is channels (prefer small first dim) | |
| if a.ndim == 2: | |
| # e.g., (channels, samples) if channels is small (<=8) | |
| if a.shape[0] <= 8 and a.shape[0] <= a.shape[1]: | |
| a = np.mean(a, axis=0) | |
| else: | |
| a = np.mean(a, axis=1) | |
| elif a.ndim > 2: | |
| a = np.mean(a.reshape(a.shape[0], -1), axis=0) | |
| # Now 1D, cast/scale | |
| if np.issubdtype(a.dtype, np.floating): | |
| return a.astype(np.float32, copy=False) | |
| # integer PCM | |
| info = np.iinfo(a.dtype) | |
| scale = float(max(-info.min, info.max)) | |
| return a.astype(np.float32) / (scale if scale != 0.0 else 1.0) | |
| def _resample_linear(x: NDArray[np.float32], sr_in: int, sr_out: int) -> NDArray[np.float32]: | |
| """Lightweight linear resampler for short buffers.""" | |
| if sr_in == sr_out or x.size == 0: | |
| return x | |
| # guard tiny sizes | |
| n_out = int(round(x.size * sr_out / sr_in)) | |
| if n_out <= 1: | |
| return np.zeros(0, dtype=np.float32) | |
| t_in = np.linspace(0.0, 1.0, num=x.size, dtype=np.float32, endpoint=True) | |
| t_out = np.linspace(0.0, 1.0, num=n_out, dtype=np.float32, endpoint=True) | |
| return np.interp(t_out, t_in, x).astype(np.float32, copy=False) | |
| class SwayRollRT: | |
| """Feed audio chunks → per-hop sway outputs. | |
| Usage: | |
| rt = SwayRollRT() | |
| rt.feed(pcm_int16_or_float, sr) -> List[dict] | |
| """ | |
| def __init__(self, rng_seed: int = 7): | |
| """Initialize state.""" | |
| self._seed = int(rng_seed) | |
| self.samples: deque[float] = deque(maxlen=10 * SR) # sliding window for VAD/env | |
| self.carry: NDArray[np.float32] = np.zeros(0, dtype=np.float32) | |
| self.vad_on = False | |
| self.vad_above = 0 | |
| self.vad_below = 0 | |
| self.sway_env = 0.0 | |
| self.sway_up = 0 | |
| self.sway_down = 0 | |
| rng = np.random.default_rng(self._seed) | |
| self.phase_pitch = float(rng.random() * 2 * math.pi) | |
| self.phase_yaw = float(rng.random() * 2 * math.pi) | |
| self.phase_roll = float(rng.random() * 2 * math.pi) | |
| self.phase_x = float(rng.random() * 2 * math.pi) | |
| self.phase_y = float(rng.random() * 2 * math.pi) | |
| self.phase_z = float(rng.random() * 2 * math.pi) | |
| self.t = 0.0 | |
| def reset(self) -> None: | |
| """Reset state (VAD/env/buffers/time) but keep initial phases/seed.""" | |
| self.samples.clear() | |
| self.carry = np.zeros(0, dtype=np.float32) | |
| self.vad_on = False | |
| self.vad_above = 0 | |
| self.vad_below = 0 | |
| self.sway_env = 0.0 | |
| self.sway_up = 0 | |
| self.sway_down = 0 | |
| self.t = 0.0 | |
| def feed(self, pcm: NDArray[Any], sr: int | None) -> List[Dict[str, float]]: | |
| """Stream in PCM chunk. Returns a list of sway dicts, one per hop (HOP_MS). | |
| Args: | |
| pcm: np.ndarray, shape (N,) or (C,N)/(N,C); int or float. | |
| sr: sample rate of `pcm` (None -> assume SR). | |
| """ | |
| sr_in = SR if sr is None else int(sr) | |
| x = _to_float32_mono(pcm) | |
| if x.size == 0: | |
| return [] | |
| if sr_in != SR: | |
| x = _resample_linear(x, sr_in, SR) | |
| if x.size == 0: | |
| return [] | |
| # append to carry and consume fixed HOP chunks | |
| if self.carry.size: | |
| self.carry = np.concatenate([self.carry, x]) | |
| else: | |
| self.carry = x | |
| out: List[Dict[str, float]] = [] | |
| while self.carry.size >= HOP: | |
| hop = self.carry[:HOP] | |
| remaining: NDArray[np.float32] = self.carry[HOP:] | |
| self.carry = remaining | |
| # keep sliding window for VAD/env computation | |
| # (deque accepts any iterable; list() for small HOP is fine) | |
| self.samples.extend(hop.tolist()) | |
| if len(self.samples) < FRAME: | |
| self.t += HOP_MS / 1000.0 | |
| continue | |
| frame = np.fromiter( | |
| islice(self.samples, len(self.samples) - FRAME, len(self.samples)), | |
| dtype=np.float32, | |
| count=FRAME, | |
| ) | |
| db = _rms_dbfs(frame) | |
| # VAD with hysteresis + attack/release | |
| if db >= VAD_DB_ON: | |
| self.vad_above += 1 | |
| self.vad_below = 0 | |
| if not self.vad_on and self.vad_above >= ATTACK_FR: | |
| self.vad_on = True | |
| elif db <= VAD_DB_OFF: | |
| self.vad_below += 1 | |
| self.vad_above = 0 | |
| if self.vad_on and self.vad_below >= RELEASE_FR: | |
| self.vad_on = False | |
| if self.vad_on: | |
| self.sway_up = min(SWAY_ATTACK_FR, self.sway_up + 1) | |
| self.sway_down = 0 | |
| else: | |
| self.sway_down = min(SWAY_RELEASE_FR, self.sway_down + 1) | |
| self.sway_up = 0 | |
| up = self.sway_up / SWAY_ATTACK_FR | |
| down = 1.0 - (self.sway_down / SWAY_RELEASE_FR) | |
| target = up if self.vad_on else down | |
| self.sway_env += ENV_FOLLOW_GAIN * (target - self.sway_env) | |
| # clamp | |
| if self.sway_env < 0.0: | |
| self.sway_env = 0.0 | |
| elif self.sway_env > 1.0: | |
| self.sway_env = 1.0 | |
| loud = _loudness_gain(db) * SWAY_MASTER | |
| env = self.sway_env | |
| self.t += HOP_MS / 1000.0 | |
| # oscillators | |
| pitch = ( | |
| math.radians(SWAY_A_PITCH_DEG) | |
| * loud | |
| * env | |
| * math.sin(2 * math.pi * SWAY_F_PITCH * self.t + self.phase_pitch) | |
| ) | |
| yaw = ( | |
| math.radians(SWAY_A_YAW_DEG) | |
| * loud | |
| * env | |
| * math.sin(2 * math.pi * SWAY_F_YAW * self.t + self.phase_yaw) | |
| ) | |
| roll = ( | |
| math.radians(SWAY_A_ROLL_DEG) | |
| * loud | |
| * env | |
| * math.sin(2 * math.pi * SWAY_F_ROLL * self.t + self.phase_roll) | |
| ) | |
| x_mm = SWAY_A_X_MM * loud * env * math.sin(2 * math.pi * SWAY_F_X * self.t + self.phase_x) | |
| y_mm = SWAY_A_Y_MM * loud * env * math.sin(2 * math.pi * SWAY_F_Y * self.t + self.phase_y) | |
| z_mm = SWAY_A_Z_MM * loud * env * math.sin(2 * math.pi * SWAY_F_Z * self.t + self.phase_z) | |
| out.append( | |
| { | |
| "pitch_rad": pitch, | |
| "yaw_rad": yaw, | |
| "roll_rad": roll, | |
| "pitch_deg": math.degrees(pitch), | |
| "yaw_deg": math.degrees(yaw), | |
| "roll_deg": math.degrees(roll), | |
| "x_mm": x_mm, | |
| "y_mm": y_mm, | |
| "z_mm": z_mm, | |
| }, | |
| ) | |
| return out | |